Reproduction and fix for backpressure processing stalling#254
Reproduction and fix for backpressure processing stalling#254ShogunPanda merged 14 commits intoplatformatic:mainfrom
Conversation
--- Signed-off-by: Igor Savin <iselwin@gmail.com>
| // | ||
| // Unconditionally scheduling is safe because #fetch() checks #paused, | ||
| // #closed, and other guards before issuing a Kafka fetch request. | ||
| process.nextTick(() => { |
There was a problem hiding this comment.
@kibertoad can you clarify why it's safe and provide some references?
There was a problem hiding this comment.
See:
#fetch () {
/* c8 ignore next 4 - Hard to test */
if (this.#closed || this.closed || this.destroyed) {
this.push(null)
return
}- If the stream was closed/destroyed during record processing, #fetch() returns immediately at line 487-490;
- If backpressure kicked in (pipeline called pause(), setting #paused = true), #fetch() returns at line 493 without issuing a Kafka request. The resume() override (line 336-361) will restart the loop when backpressure clears;
- If readableFlowing === null (no consumer attached yet), it also bails out at line 493;
- If offset refresh is happening, same early return.
Did I miss anything?
There was a problem hiding this comment.
Can you add a test that verifies under extreme load and non-consumption (stream paused), the memory is not ballooning? I see the tests are verifying non-stalling, not avoiding leaks.
| return super.resume() | ||
| const result = super.resume() | ||
|
|
||
| // Restart the fetch loop when transitioning from paused → unpaused. |
--- Signed-off-by: Igor Savin <iselwin@gmail.com>
|
@ShogunPanda could you please take a look? |
…/backpressure-stall # Conflicts: # src/clients/consumer/messages-stream.ts
--- Signed-off-by: Igor Savin <iselwin@gmail.com>
…/backpressure-stall # Conflicts: # src/clients/consumer/messages-stream.ts
--- Signed-off-by: Igor Savin <iselwin@gmail.com>
|
@kibertoad In general it LGTM. I'm waiting for @mcollina to confirm that removing In the meanwhile, do you think you can provide a test? |
|
@ShogunPanda I tried to, but it doesn't reproduce well within a test, unfortunately, as it needs significant load to surface. Let me try to reproduce exactly what the load script is doing... |
--- Signed-off-by: Igor Savin <iselwin@gmail.com>
|
@ShogunPanda go figure - asking Claude to reproduce exactly the setup in the load test did the trick - added the test that fails without the fix. |
| // | ||
| // Unconditionally scheduling is safe because #fetch() checks #paused, | ||
| // #closed, and other guards before issuing a Kafka fetch request. | ||
| process.nextTick(() => { |
There was a problem hiding this comment.
Can you add a test that verifies under extreme load and non-consumption (stream paused), the memory is not ballooning? I see the tests are verifying non-stalling, not avoiding leaks.
|
@mcollina what pattern would you recommend for checking memory usage in tests? |
|
I would do two measures:
|
--- Signed-off-by: Igor Savin <iselwin@gmail.com>
--- Signed-off-by: Igor Savin <iselwin@gmail.com>
--- Signed-off-by: Igor Savin <iselwin@gmail.com>
--- Signed-off-by: Igor Savin <iselwin@gmail.com>
|
@mcollina I've added the tests. Memory use one is not executed in CI, as it's too beefy for it, unfortunately. |
|
@mcollina failing test is just flaky, I think, could you rerun? |
|
@kibertoad Done |
|
@kibertoad In order to have the memory one running, I had to add: Do you mind checking it locally and eventually push it to your branch? |
--- Signed-off-by: Igor Savin <iselwin@gmail.com>
|
@ShogunPanda applied |
|
@mcollina @ShogunPanda thank you! would it be possible to release a new version now? |
Problem
When MessagesStream is consumed via pipeline() through a downstream Duplex that triggers backpressure (e.g. a batching stream that groups messages by topic-partition), the internal fetch loop can die permanently, leaving unconsumed messages in Kafka.
This happens in production when a consumer uses pipeline(consumerStream, batchStream) with a batch-accumulating Duplex. Under moderate load (e.g. 15 topics ×1000 messages), the consumer stalls after processing ~30% of messages.
Root cause
Two bugs in the fetch loop lifecycle:
After pushing fetched messages to the readable buffer, the next #fetch() was only scheduled when push() returned true (buffer below highWaterMark). When push() returned false, the loop relied on _read() to restart it. In pull mode this works, but in flowing mode with pipeline(), _read() is not reliably called again when the buffer is already empty - Node.js considers the stream "already flowing" and skips the _read() call. The fetch loop dies.
When the downstream Duplex's write() returns false, pipeline()'s internal pipe() calls pause() on the consumer stream, setting #paused = true. If a previously scheduled process.nextTick(#fetch) fires while #paused is still true, #fetch() returns early without scheduling another iteration — the loop is now dead. When pipe() later calls resume() (after the downstream drains), super.resume() does not reliably trigger _read() when the readable buffer is empty and the stream is in flowing mode.
Sequence leading to stall
(downstream batch stream's write() returned false — backpressure)
Fix
Fix 1: Remove the canPush gate — always schedule process.nextTick(#fetch) after #pushRecords. This is safe because
#fetch() checks #paused, #closed, and other guards before issuing a Kafka fetch request.
Fix 2: In resume(), when transitioning from paused → unpaused, schedule process.nextTick(#fetch) to explicitly restart
the loop. A wasPaused guard prevents firing during initial pipeline() setup (where resume() is called before
_construct() completes).
Reproduction
The load test in playground/load-tests/ reproduces the stall deterministically:
Start Kafka
Without fix: stalls at ~4500/15000 (30%)
With fix: 15000/15000 consumed
The test mirrors the real-world setup: consumer starts before publishing (fetches partial results as messages arrive),
messages are published interleaved across 15 topics, and a batch-accumulating Duplex downstream triggers
backpressure.