Skip to content

Commit

Permalink
Ensure only one worker can operate on a topic-partition at any given …
Browse files Browse the repository at this point in the history
…time
  • Loading branch information
Nevon committed Mar 7, 2022
1 parent 65a82c8 commit 1c4b02e
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 15 deletions.
11 changes: 7 additions & 4 deletions src/consumer/fetchManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ const { KafkaJSFetcherRebalanceError } = require('../errors')

/**
* @param {object} options
* @param {import('types').Logger} options.logger
* @param {import('../../types').Logger} options.logger
* @param {() => number[]} options.getNodeIds
* @param {(nodeId: number) => Promise<T[]>} options.fetch
* @param {(nodeId: number) => Promise<import('../../types').Batch[]>} options.fetch
* @param {import('./worker').Handler<T>} options.handler
* @param {number} [options.concurrency]
* @template T
Expand All @@ -23,7 +23,10 @@ const createFetchManager = ({
concurrency = 1,
}) => {
const logger = rootLogger.namespace('FetchManager')
const workers = seq(concurrency, workerId => createWorker({ handler, workerId }))
const partitionAssignments = new Map()
const workers = seq(concurrency, workerId =>
createWorker({ handler, workerId, partitionAssignments, logger })
)
const workerQueue = createWorkerQueue({ workers })

let fetchers = []
Expand All @@ -48,7 +51,7 @@ const createFetchManager = ({
workerQueue,
fetch: async nodeId => {
validateShouldRebalance()
return fetch(nodeId)
return await fetch(nodeId)
},
})
)
Expand Down
13 changes: 12 additions & 1 deletion src/consumer/fetchManager.spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const sleep = require('../utils/sleep')
const seq = require('../utils/seq')
const createFetchManager = require('./fetchManager')
const Batch = require('./batch')
const { newLogger } = require('testHelpers')
const waitFor = require('../utils/waitFor')

Expand All @@ -12,7 +13,17 @@ describe('FetchManager', () => {

beforeEach(() => {
batchSize = 10
fetch = jest.fn(async nodeId => seq(batchSize, id => `message ${id} from node ${nodeId}`))
fetch = jest.fn(async nodeId =>
seq(
batchSize,
id =>
new Batch('test-topic', 0, {
partition: id.toString(),
highWatermark: '100',
messages: [],
})
)
)
handler = jest.fn(async () => {
await sleep(20)
})
Expand Down
18 changes: 16 additions & 2 deletions src/consumer/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
const sharedPromiseTo = require('../utils/sharedPromiseTo')

/**
* @param {{ handler: Handler<T>, workerId: number }} options
* @param {{ handler: Handler<T>, workerId: number, partitionAssignments: Map, logger: import('../../types').Logger }} options
* @template T
*/
const createWorker = ({ handler, workerId }) => {
const createWorker = ({ handler, workerId, partitionAssignments, logger: rootLogger }) => {
const logger = rootLogger.namespace(`Worker ${workerId}`)
/**
* Takes batches from next() until it returns undefined.
*
Expand All @@ -24,10 +25,23 @@ const createWorker = ({ handler, workerId }) => {
if (!item) break

const { batch, resolve, reject } = item
const { topic, partition } = batch
const key = `${topic}|${partition}`
if (partitionAssignments.has(key)) {
logger.info('Skipping batch due to partition already being assigned to another worker', {
assignedWorker: partitionAssignments[partition],
topic,
partition,
})
continue
}
partitionAssignments.set(key, workerId)
try {
await handler(batch, { workerId })
partitionAssignments.delete(key)
resolve()
} catch (error) {
partitionAssignments.delete(key)
reject(error)
}
}
Expand Down
58 changes: 52 additions & 6 deletions src/consumer/worker.spec.js
Original file line number Diff line number Diff line change
@@ -1,28 +1,47 @@
const createWorker = require('./worker')
const Batch = require('./batch')
const seq = require('../utils/seq')
const { newLogger } = require('../../testHelpers')

const createBatch = partition =>
new Batch('test-topic', 0, {
partition: partition.toString(),
highWatermark: '100',
messages: [
{ offset: '0' },
{ offset: '1' },
{ offset: '2' },
{ offset: '3' },
{ offset: '4' },
{ offset: '5' },
],
})

describe('Worker', () => {
let worker, handler, next, workerId, resolve, reject
let worker, handler, next, workerId, partitionAssignments, resolve, reject

beforeEach(() => {
resolve = jest.fn(() => {})
reject = jest.fn(() => {})
handler = jest.fn(async () => {})
next = jest.fn(() => undefined)
workerId = 0
worker = createWorker({ handler, workerId })
partitionAssignments = new Map()
worker = createWorker({ handler, workerId, partitionAssignments, logger: newLogger() })
})

it('should loop until next() returns undefined', async () => {
const [first, second] = seq(2).map(createBatch)
next
.mockImplementationOnce(() => ({ batch: 'first', resolve, reject }))
.mockImplementationOnce(() => ({ batch: 'second', resolve, reject }))
.mockImplementationOnce(() => ({ batch: first, resolve, reject }))
.mockImplementationOnce(() => ({ batch: second, resolve, reject }))

await worker.run({ next })

expect(next).toHaveBeenCalledTimes(3)
expect(handler).toHaveBeenCalledTimes(2)
expect(handler).toHaveBeenCalledWith('first', { workerId })
expect(handler).toHaveBeenCalledWith('second', { workerId })
expect(handler).toHaveBeenCalledWith(first, { workerId })
expect(handler).toHaveBeenCalledWith(second, { workerId })
expect(resolve).toHaveBeenCalledTimes(2)
})

Expand All @@ -37,4 +56,31 @@ describe('Worker', () => {
await worker.run({ next })
expect(reject).toHaveBeenCalledWith(error)
})

it('should skip batches that are already assigned to another worker', async () => {
const secondWorker = createWorker({
handler,
workerId: 1,
partitionAssignments,
logger: newLogger(),
})

const secondNext = jest.fn(() => undefined)
const batches = seq(2).map(createBatch)

;[next, secondNext].forEach(queue => {
batches.forEach(batch => {
queue.mockImplementationOnce(() => ({ batch, resolve, reject }))
})
})

await Promise.all([
worker.run({ next, resolve, reject }),
secondWorker.run({ next: secondNext, resolve, reject }),
])

expect(next).toHaveBeenCalledTimes(3)
expect(secondNext).toHaveBeenCalledTimes(3)
expect(handler).toHaveBeenCalledTimes(2)
})
})
17 changes: 15 additions & 2 deletions src/consumer/workerQueue.spec.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
const createWorkerQueue = require('./workerQueue')
const createWorker = require('./worker')
const Batch = require('./batch')
const seq = require('../utils/seq')
const { newLogger } = require('../../testHelpers')

describe('WorkerQueue', () => {
const batches = seq(100, index => `message ${index}`)
const batches = seq(
100,
index =>
new Batch('test-topic', 0, {
partition: index.toString(),
highWatermark: '100',
messages: [],
})
)
let workerQueue, workers, handler

beforeEach(() => {
handler = jest.fn(async () => {})

workers = seq(3, workerId => createWorker({ handler, workerId }))
const partitionAssignments = new Map()
workers = seq(3, workerId =>
createWorker({ handler, workerId, logger: newLogger(), partitionAssignments })
)
workerQueue = createWorkerQueue({ workers })
})

Expand Down

0 comments on commit 1c4b02e

Please sign in to comment.