fix(deps): bump node-rdkafka to ^3.6.0 to fix cooperative-sticky rebalance bug#2728
fix(deps): bump node-rdkafka to ^3.6.0 to fix cooperative-sticky rebalance bug#2728delthas wants to merge 5 commits intodevelopment/9.4from
Conversation
Hello delthas,My role is to assist you with the merge of this Available options
Available commands
Status report is not available. |
Waiting for approvalThe following approvals are needed before I can proceed with the merge:
|
|
Codecov Report❌ Patch coverage is
Additional details and impacted files
... and 2 files with indirect coverage changes
@@ Coverage Diff @@
## development/9.4 #2728 +/- ##
===================================================
- Coverage 74.50% 74.43% -0.07%
===================================================
Files 200 200
Lines 13610 13614 +4
===================================================
- Hits 10140 10134 -6
- Misses 3460 3470 +10
Partials 10 10
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
|
LGTM — the dependency bump is clean with no code changes required. The node-rdkafka 3.x release has no API breaking changes (only dropped EOL Node.js support), and the librdkafka upgrade path from 2.3.0 to 2.12.0 has no consumer-breaking changes. The root cause analysis in the PR body is thorough. |
f528083 to
1735ada
Compare
|
Excellent investigation and write-up. The dependency bump itself is clean — only |
|
LGTM |
|
On hold because it is not known yet whether this PR actually helps with the CI. |
Jira issue not foundThe Jira issue BB-760 was not found. |
Upgrades node-rdkafka to ^3.6.0 (resolving to 3.6.1 / librdkafka 2.12.0) to fix a cooperative-sticky partition assignor bug that causes partitions to become orphaned during consumer group rebalances. Issue: BB-760
…scribe The bootstrap consumer uses setInterval(200ms) to call consume(1, cb), creating multiple concurrent C++ async workers (each with a 1000ms timeout). Since librdkafka 2.10.0, these workers survive an unsubscribe→subscribe transition and can dequeue messages from the next subscription, causing them to be lost to the normal consume pipeline. Track in-flight workers with a counter and defer unsubscribe() until all have completed. Issue: BB-760
aa79ee9 to
11fe6ee
Compare
|
LGTM |
Waiting for approvalThe following approvals are needed before I can proceed with the merge:
|
Pin 'range,roundrobin' (the current librdkafka default) explicitly so that future version updates cannot silently change the rebalance strategy. Our consumers rely on eager rebalancing. Issue: BB-760
|
LGTM |
…trap Address review feedback: instead of tracking in-flight consume workers and polling for drain, eliminate concurrent workers entirely by using chained setTimeout. Each consume(1) call is only scheduled after the previous one completes, guaranteeing at most one C++ async worker is in flight. This makes unsubscribe() safe to call immediately from the callback. Issue: BB-760
|
LGTM |
lib/BackbeatConsumer.js
Outdated
| if (err || bootstrapDone) { | ||
| if (!bootstrapDone) { | ||
| setTimeout(consumeNext, 200); | ||
| } | ||
| return undefined; | ||
| } |
There was a problem hiding this comment.
| if (err || bootstrapDone) { | |
| if (!bootstrapDone) { | |
| setTimeout(consumeNext, 200); | |
| } | |
| return undefined; | |
| } | |
| if (bootstrapDone) { | |
| return undefined; | |
| } | |
| if (err) { | |
| setTimeout(consumeNext, 200); | |
| return undefined; | |
| } |
lib/BackbeatConsumer.js
Outdated
| return undefined; | ||
| } | ||
| let matched = false; | ||
| messages.forEach(message => { |
There was a problem hiding this comment.
instead of forEach and a variable, best to use find():
- so we don't even try to process the message beyond the bootstrap
- make core more readable
| messages.forEach(message => { | |
| cont receivedBootstrap = messages.find(message => { | |
| const bootstrapId = JSON.parse(message.value).bootstrapId; | |
| self._log.info('bootstraping backbeat consumer: received bootstrap message', { | |
| bootstrapId, topic: self._topic, groupId: self._groupId | |
| }); | |
| return bootstrapId === lastBootstrapId; | |
| }); | |
| if (!receivedBootstrap) { | |
| return setTimeout(consumeNext, 200); | |
| } | |
| self._log.info('backbeat consumer is bootstrapped', { topic: self._topic, groupId: self._groupId }); | |
| bootstrapDone = true; | |
| clearInterval(producerTimer); | |
| self._consumer.offsetsStore([{ topic: self._topic, partition: message.partition, offset: message.offset + 1 }]); | |
| self._consumer.commit(); | |
| self._consumer.unsubscribe(); | |
| producer.close(() => { | |
| self._bootstrapping = false; | |
| self._onReady(); | |
| }); |
lib/BackbeatConsumer.js
Outdated
| self._log.info('backbeat consumer is bootstrapped', | ||
| { topic: self._topic, groupId: self._groupId }); | ||
| matched = true; | ||
| bootstrapDone = true; |
There was a problem hiding this comment.
is this variable still needed?
- we update it when processing the "result" of
consume() - since we are now guaranteed to have a single call to
consume(), it is the current one - thus no guard needed
lib/BackbeatConsumer.js
Outdated
| { topic: self._topic, groupId: self._groupId }); | ||
| matched = true; | ||
| bootstrapDone = true; | ||
| clearInterval(producerTimer); |
There was a problem hiding this comment.
producerTimer is a local variable, and does not seem to be initialized?
lib/BackbeatConsumer.js
Outdated
| // since librdkafka 2.10.0 they survive an | ||
| // unsubscribe→subscribe transition, stealing messages from | ||
| // the next subscription. | ||
| function consumeNext() { |
There was a problem hiding this comment.
since it runs the timer repeatedly until bootstrap received, we can't really call this function consume next... so maybe just consume ?
- Rename consumeNext to consume - Remove bootstrapDone flag (unnecessary with chained setTimeout since only one consume worker is ever in flight) - Use find() instead of forEach + matched flag - Flatten the match handling out of the forEach callback Issue: BB-760
|
LGTM |
Summary
Bumps
node-rdkafkafrom^2.12.0(librdkafka 2.3.0) to^3.6.0(currently resolving to 3.6.1 / librdkafka 2.12.0) as a maintenance upgrade, picking up bugfixes accumulated across librdkafka 2.3.0→2.12.0.Also fixes a latent race condition in
BackbeatConsumer._bootstrapConsumer()exposed by librdkafka 2.10.0+.Context
This was initially motivated by investigating a flaky CI failure in the "Kafka Cleaner" test (Zenko CI run), where the
backbeat-metricstopic had unconsumed messages after rapid pod replacement. While the original hypothesis (cooperative-sticky rebalance bug confluentinc/librdkafka#4908) turned out not to apply — BackbeatConsumer uses eager rebalance (range,roundrobin), not cooperative-sticky — the upgrade is still worthwhile for the accumulated bugfixes, and exposed a real bug in the bootstrap code that needed fixing.Bootstrap consumer fix
The librdkafka upgrade exposed a latent race condition in
_bootstrapConsumer(). The bootstrap previously usedsetInterval(200ms)to callconsume(1, consumeCb), dispatching C++ async workers to the libuv thread pool (each with a 1000ms timeout). Up to 5 workers could be in flight concurrently.When the bootstrap match was found, the old code called
clearIntervalthen immediatelyunsubscribe(). ButclearIntervalonly prevents new consume calls — workers already in the C++ thread pool continue running. With librdkafka < 2.10.0,unsubscribe()effectively invalidated these stale workers (they'd return empty or error). With librdkafka >= 2.10.0 ("Enhanced handling for subscribe/unsubscribe edge cases"), these workers survive across the unsubscribe→subscribe transition and dequeue messages from the next subscription, delivering them to the bootstrap'sconsumeCbwhich silently ignores non-bootstrap messages.This was confirmed by local reproduction and bisection:
_bootstrapConsumertestThe fix replaces
setIntervalwith chainedsetTimeout: eachconsume(1)is only scheduled after the previous one completes, guaranteeing at most one C++ async worker is in flight at a time. This makesunsubscribe()safe to call directly from the callback with no drain/polling needed.Why ^3.6.0
We set the floor to
^3.6.0(librdkafka 2.12.0). KIP-848 (new consumer group protocol) is opt-in (group.protocol=consumermust be explicitly set) and does not affect the defaultclassicprotocol.Upgrade safety
librdkafka 2.3.0 → 2.12.0: The librdkafka CHANGELOG shows no consumer-breaking changes in this range. The only notable breaking change is in v2.4.0 where
INVALID_RECORDproducer errors became non-retriable — this does not affect consumers. The metadata recovery behavior change in 2.10.0 (brokers not in metadata responses are removed and clients re-bootstrap) is a minor behavioral difference that should not impact normal operation.node-rdkafka 2.x → 3.x: The 3.0.0 release only dropped support for EOL Node.js versions — no API changes. Backbeat requires Node >= 20 and runs on Node 22.14.0.
Issue: BB-760