Skip to content

jcoreio/postgres-shard-coordinator

Repository files navigation

@jcoreio/postgres-shard-coordinator

CircleCI Coverage Status semantic-release Commitizen friendly npm version

Helps processes pick a unique shard index and determine the number of shards, using Postgres to coordinate registration.

Introduction

This is designed for any situation where batch processing needs to be divided between multiple processes using hash-based sharding. For example, Clarity uses multiple processes to handle the notification queue; each process restricts itself to events where

  knuth_hash(userId) % numShards >= (shard * MAX_USER_ID) / numShards &&
  knuth_hash(userId) % numShards < ((shard + 1) * MAX_USER_ID) / numShards

Each of these processes can use @jcoreio/postgres-shard-coordinator to pick a unique shard index and determine the total numShards (number of processes) in a decentralized fashion that automatically adapts as processes are spawned or die.

Usage

npm i --save @jcoreio/postgres-shard-coordinator

Database migration

You will need to perform provided migrations to create the tables and functions for coordination:

import { Client } from 'pg'
import Umzug from 'umzug'
import { umzugMigrationOptions } from '@jcoreio/postgres-shard-coordinator'

export default async function migrate({ database }) {
  const umzug = new Umzug({
    storage: 'umzug-postgres-storage',
    storageOptions: {
      database,
      relation: '"SequelizeMeta"',
      column: 'name',
    },
    migrations: {
      ...umzugMigrationOptions(),
      params: [{ query }],
    },
  })

  await umzug.up()
}

async function query(sql) {
  const client = new Client(database)
  try {
    await client.connect()
    migrationDebug(sql)
    return await client.query(sql)
  } finally {
    await client.end()
  }
}

Shard registration

import { ShardRegistrar } from '@jcoreio/postgres-shard-coordinator'
import { Pool } from 'pg'
import PgIpc from '@jcoreio/pg-ipc'
import requireEnv from '@jcoreio/require-env'
import migrate from './migrate'

const database = {
  user: requireEnv('DB_USER'),
  host: requireEnv('DB_HOST'),
  database: requireEnv('DB_NAME'),
  password: requireEnv('DB_PASSWORD'),
  port: parseInt(requireEnv('DB_PORT')),
}

const ipc = new PgIpc({
  newClient: () => new Client(database),
})
ipc.on('error', (err) => console.error(err.stack))

const pool = new Pool(database)

const registrar = new ShardRegistrar({
  pool,
  ipc,
  cluster: 'clarity_notifications',
  heartbeatInterval: 60, // seconds
  gracePeriod: 30, // seconds
  reshardInterval: 60, // seconds
})

registrar.on('shardChanged', ({ shard, numShards }) => {
  // reconfigure the notification queue processor
})
registrar.on('error', (err) => console.error(err.stack))

async function go() {
  await migrate({ database })
  await registrar.start()
}

go()