Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit GROUP_JOIN event on stale partition assignments #937

Merged
merged 4 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/consumer/__tests__/runner.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ describe('Consumer > Runner', () => {
connect: jest.fn(),
join: jest.fn(),
sync: jest.fn(),
joinAndSync: jest.fn(),
fetch: jest.fn(() => BufferedAsyncIterator([Promise.resolve([emptyBatch])])),
resolveOffset: jest.fn(),
commitOffsets: jest.fn(),
Expand Down Expand Up @@ -209,7 +210,7 @@ describe('Consumer > Runner', () => {

it('calls onCrash for any other errors', async () => {
const unknownError = new KafkaJSProtocolError(createErrorFromCode(UNKNOWN))
consumerGroup.join
consumerGroup.joinAndSync
.mockImplementationOnce(() => {
throw unknownError
})
Expand Down Expand Up @@ -248,7 +249,7 @@ describe('Consumer > Runner', () => {
offsets = { topics: [{ topic: topicName, partitions: [{ offset: '1', partition }] }] }
await runner.start()

consumerGroup.join.mockClear()
consumerGroup.joinAndSync.mockClear()
consumerGroup.commitOffsetsIfNecessary.mockClear()
consumerGroup.commitOffsets.mockClear()
})
Expand All @@ -267,11 +268,11 @@ describe('Consumer > Runner', () => {
})

expect(runner.commitOffsets(offsets)).rejects.toThrow('The group is rebalancing')
expect(consumerGroup.join).toHaveBeenCalledTimes(0)
expect(consumerGroup.joinAndSync).toHaveBeenCalledTimes(0)

await sleep(100)

expect(consumerGroup.join).toHaveBeenCalledTimes(1)
expect(consumerGroup.joinAndSync).toHaveBeenCalledTimes(1)
})

it('correctly catch exceptions in parallel "eachBatch" processing', async () => {
Expand Down Expand Up @@ -334,7 +335,7 @@ describe('Consumer > Runner', () => {

it('a triggered rejoin failing should cause a crash', async () => {
const unknownError = new KafkaJSProtocolError(createErrorFromCode(UNKNOWN))
consumerGroup.join.mockImplementationOnce(() => {
consumerGroup.joinAndSync.mockImplementationOnce(() => {
throw unknownError
})
consumerGroup.commitOffsets.mockImplementationOnce(() => {
Expand Down
61 changes: 54 additions & 7 deletions src/consumer/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ const sleep = require('../utils/sleep')
const BufferedAsyncIterator = require('../utils/bufferedAsyncIterator')
const websiteUrl = require('../utils/websiteUrl')
const arrayDiff = require('../utils/arrayDiff')
const createRetry = require('../retry')

const OffsetManager = require('./offsetManager')
const Batch = require('./batch')
const SeekOffsets = require('./seekOffsets')
const SubscriptionState = require('./subscriptionState')
const {
events: { HEARTBEAT, CONNECT, RECEIVED_UNSUBSCRIBED_TOPICS },
events: { GROUP_JOIN, HEARTBEAT, CONNECT, RECEIVED_UNSUBSCRIBED_TOPICS },
} = require('./instrumentationEvents')
const { MemberAssignment } = require('./assignerProtocol')
const {
Expand All @@ -30,8 +31,17 @@ const STALE_METADATA_ERRORS = [
'UNKNOWN_TOPIC_OR_PARTITION',
]

const isRebalancing = e =>
e.type === 'REBALANCE_IN_PROGRESS' || e.type === 'NOT_COORDINATOR_FOR_GROUP'

const PRIVATE = {
JOIN: Symbol('private:ConsumerGroup:join'),
SYNC: Symbol('private:ConsumerGroup:sync'),
}

module.exports = class ConsumerGroup {
constructor({
retry,
cluster,
groupId,
topics,
Expand Down Expand Up @@ -59,6 +69,7 @@ module.exports = class ConsumerGroup {
this.topicConfigurations = topicConfigurations
this.logger = logger.namespace('ConsumerGroup')
this.instrumentationEmitter = instrumentationEmitter
this.retrier = createRetry(Object.assign({}, retry))
this.assigners = assigners
this.sessionTimeout = sessionTimeout
this.rebalanceTimeout = rebalanceTimeout
Expand Down Expand Up @@ -106,7 +117,7 @@ module.exports = class ConsumerGroup {
await this.cluster.refreshMetadataIfNecessary()
}

async join() {
async [PRIVATE.JOIN]() {
const { groupId, sessionTimeout, rebalanceTimeout } = this

this.coordinator = await this.cluster.findGroupCoordinator({ groupId })
Expand Down Expand Up @@ -138,7 +149,7 @@ module.exports = class ConsumerGroup {
}
}

async sync() {
async [PRIVATE.SYNC]() {
let assignment = []
const {
groupId,
Expand Down Expand Up @@ -272,6 +283,44 @@ module.exports = class ConsumerGroup {
})
}

joinAndSync() {
const startJoin = Date.now()
return this.retrier(async bail => {
try {
await this[PRIVATE.JOIN]()
await this[PRIVATE.SYNC]()

const memberAssignment = this.assigned().reduce(
(result, { topic, partitions }) => ({ ...result, [topic]: partitions }),
{}
)

const payload = {
groupId: this.groupId,
memberId: this.memberId,
leaderId: this.leaderId,
isLeader: this.isLeader(),
memberAssignment,
groupProtocol: this.groupProtocol,
duration: Date.now() - startJoin,
}

this.instrumentationEmitter.emit(GROUP_JOIN, payload)
this.logger.info('Consumer has joined the group', payload)
} catch (e) {
if (isRebalancing(e)) {
// Rebalance in progress isn't a retriable error since the consumer
// has to go through find coordinator and join again before it can
// actually retry. Throwing a retriable error to allow the retrier
// to keep going
throw new KafkaJSError('The group is rebalancing')
Nevon marked this conversation as resolved.
Show resolved Hide resolved
}

bail(e)
}
})
}

resetOffset({ topic, partition }) {
this.offsetManager.resetOffset({ topic, partition })
}
Expand Down Expand Up @@ -528,8 +577,7 @@ module.exports = class ConsumerGroup {
})

await this.cluster.refreshMetadata()
await this.join()
await this.sync()
await this.joinAndSync()
throw new KafkaJSError(e.message)
}

Expand All @@ -541,8 +589,7 @@ module.exports = class ConsumerGroup {
unknownPartitions: e.unknownPartitions,
})

await this.join()
await this.sync()
await this.joinAndSync()
}

if (e.name === 'KafkaJSOffsetOutOfRange') {
Expand Down
1 change: 1 addition & 0 deletions src/consumer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ module.exports = ({
logger: rootLogger,
topics: keys(topics),
topicConfigurations: topics,
retry,
cluster,
groupId,
assigners,
Expand Down
40 changes: 3 additions & 37 deletions src/consumer/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const { KafkaJSError } = require('../errors')
const barrier = require('./barrier')

const {
events: { GROUP_JOIN, FETCH, FETCH_START, START_BATCH_PROCESS, END_BATCH_PROCESS },
events: { FETCH, FETCH_START, START_BATCH_PROCESS, END_BATCH_PROCESS },
} = require('./instrumentationEvents')

const isRebalancing = e =>
Expand Down Expand Up @@ -74,42 +74,8 @@ module.exports = class Runner extends EventEmitter {
}

async join() {
const startJoin = Date.now()
return this.retrier(async (bail, retryCount, retryTime) => {
try {
await this.consumerGroup.join()
await this.consumerGroup.sync()

this.running = true

const memberAssignment = this.consumerGroup
.assigned()
.reduce((result, { topic, partitions }) => ({ ...result, [topic]: partitions }), {})

const payload = {
groupId: this.consumerGroup.groupId,
memberId: this.consumerGroup.memberId,
leaderId: this.consumerGroup.leaderId,
isLeader: this.consumerGroup.isLeader(),
memberAssignment,
groupProtocol: this.consumerGroup.groupProtocol,
duration: Date.now() - startJoin,
}

this.instrumentationEmitter.emit(GROUP_JOIN, payload)
this.logger.info('Consumer has joined the group', payload)
} catch (e) {
if (isRebalancing(e)) {
// Rebalance in progress isn't a retriable error since the consumer
// has to go through find coordinator and join again before it can
// actually retry. Throwing a retriable error to allow the retrier
// to keep going
throw new KafkaJSError('The group is rebalancing')
}

bail(e)
}
})
await this.consumerGroup.joinAndSync()
this.running = true
}

async scheduleJoin() {
Expand Down