feat: Reconnect feature on stream error#419
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughAdds lazy consumer initialization and a reconnect mechanism (up to 5 attempts with exponential backoff) to AbstractKafkaConsumer, consolidates stream consumption into a single async handler, renames Changes
Sequence DiagramsequenceDiagram
actor Client
participant Consumer as AbstractKafkaConsumer
participant Stream as ConsumerStream
participant Handler as MessageHandler
participant ErrorReporter as ErrorReporter
Client->>Consumer: init()
activate Consumer
Consumer->>Consumer: create consumer instance
Consumer->>Stream: start async iteration (handleStream)
deactivate Consumer
loop stream processing
Stream->>Consumer: yield message(s)
alt single or batch
Consumer->>Handler: process(message(s), topic)
Handler-->>Consumer: result
Consumer->>Stream: commit(offset of last message)
else stream error
Stream->>Consumer: throw error
Consumer->>Consumer: reconnect(error)
activate Consumer
Consumer->>Consumer: for attempt in 1..5
Consumer->>Consumer: close()
Consumer->>Consumer: wait backoff (2^attempt * 1000ms)
Consumer->>Consumer: init()
alt init succeeds
Consumer->>Stream: resume handleStream
break
end
end
alt max attempts exhausted
Consumer->>ErrorReporter: handleError({ message: "Consumer failed to reconnect after max attempts", maxAttempts: 5 })
Consumer->>Consumer: isReconnecting = false
end
deactivate Consumer
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| const topics = Object.keys(this.options.handlers) | ||
| if (topics.length === 0) throw new Error('At least one topic must be defined') | ||
|
|
||
| this.consumer = new Consumer({ |
There was a problem hiding this comment.
Consumer needs to be recreated, once you call close it ends in a final state so we need to start from scratch
There was a problem hiding this comment.
should we add this as a comment?
There was a problem hiding this comment.
Good point! added :D
| messageOrBatch: MessageOrBatch<SupportedMessageValues<TopicsConfig>>, | ||
| ): Promise<void> { | ||
| const messageProcessingStartTimestamp = Date.now() | ||
| this.logger.debug({ origin: this.constructor.name, topic }, 'Consuming message(s)') |
There was a problem hiding this comment.
Origin is now added with logger.child to avoid repetition
| consumer = new PermissionBatchConsumer(testContext.cradle, { handlers: {} }) | ||
| // When - Then | ||
| await expect(consumer.close()).resolves.not.toThrowError() | ||
| await expect(consumer.close()).resolves.not.toThrow() |
There was a problem hiding this comment.
toThrowError is deprecated
| expect(consumer.isConnected).toBe(true) | ||
| }) | ||
|
|
||
| it('should handle errors on reconnection', { timeout: 40_000 }, async () => { |
There was a problem hiding this comment.
Not super happy with this, I tried using vitest fake timers but they do not work with the stream for some reason 😢
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/kafka/lib/AbstractKafkaService.ts (1)
128-136:⚠️ Potential issue | 🟠 MajorKeep
handlerError()as a compatibility hook or treat this as a breaking change.Renaming a protected method on an exported base class is semver-breaking. Downstream subclasses can still compile with
handlerError(...), but after this change their override is silently bypassed because internal callers now dispatch tohandleError(...)instead.Suggested compatibility shim
+ protected handleError(error: unknown, context: Record<string, unknown> = {}): void { + this.handlerError(error, context) + } + + /** `@deprecated` Use handleError() */ - protected handleError(error: unknown, context: Record<string, unknown> = {}): void { + protected handlerError(error: unknown, context: Record<string, unknown> = {}): void { const resolvedErrorLog = resolveGlobalErrorLogObject(error) this.logger.error({ ...resolvedErrorLog, ...context }) if (isError(error)) this.errorReporter.report({ error, context: { ...context, error: resolvedErrorLog.error }, }) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/kafka/lib/AbstractKafkaService.ts` around lines 128 - 136, The protected method was renamed from handlerError to handleError which is a breaking change for downstream subclasses overriding handlerError; restore compatibility by adding a protected handlerError(...) shim that delegates to the new handleError(...) (or alternatively update internal callers to invoke handlerError instead). Specifically, in AbstractKafkaService add a protected handlerError(error: unknown, context: Record<string, unknown> = {}) method that calls this.handleError(error, context) so existing subclasses overriding handlerError continue to be invoked, and ensure handleError remains the canonical implementation used by internal callers like any existing error dispatch sites.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/kafka/lib/AbstractKafkaConsumer.ts`:
- Around line 228-245: Set and check a shutdown flag to cancel in-flight
reconnects: in close() set a durable boolean (e.g. this._closed = true or
this._closing = true), clear any scheduled reconnect timer (e.g.
this._reconnectTimer/this._reconnectTimeout) and then proceed to close
consumer/streams; in reconnect() check that flag before awaiting backoff and
again immediately before calling init() and bail out (throw or return) if the
flag is set. Update the code paths that schedule reconnection (the reconnect()
loop and any backoff scheduling) to store the timer id so close() can clear it,
and ensure both reconnect() and init() honor the same this._closed/this._closing
flag to avoid reopening the consumer after an explicit close.
- Around line 155-170: The init() guard currently sets this.consumer before
startup completes, which makes subsequent init() calls no-ops if joinGroup() or
consume() throws; change the flow so you only assign this.consumer after the
consumer has fully started (i.e., after joinGroup() and consume() complete
successfully). Concretely, create the Consumer instance in a local variable, run
await localConsumer.joinGroup() and await localConsumer.consume(), then set
this.consumer = localConsumer; apply the same pattern for the other consumer
assignment site referenced around the second block (the code near the 202-207
region) so no partial initialization can be left on error.
In `@packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts`:
- Around line 43-51: The test currently only waits for initSpy to be called
which can pass before the reconnect completes; update the test to wait for
init() to resolve and then assert a post-reconnect behavior (e.g., successfully
consuming a new message) to prove reconnect finished. Specifically, use
waitAndRetry or a Promise that waits until initSpy.mock.calls.length > 0 AND the
initSpy call has resolved (inspect the Promise returned by the spied init or
replace the spy with one that returns a controllable Promise), then publish a
fresh message and assert the consumer consumed it (reference consumer, initSpy,
closeSpy, waitAndRetry, and init()). Ensure the assertion verifies message
receipt after init() resolves rather than only that initSpy was invoked.
---
Outside diff comments:
In `@packages/kafka/lib/AbstractKafkaService.ts`:
- Around line 128-136: The protected method was renamed from handlerError to
handleError which is a breaking change for downstream subclasses overriding
handlerError; restore compatibility by adding a protected handlerError(...) shim
that delegates to the new handleError(...) (or alternatively update internal
callers to invoke handlerError instead). Specifically, in AbstractKafkaService
add a protected handlerError(error: unknown, context: Record<string, unknown> =
{}) method that calls this.handleError(error, context) so existing subclasses
overriding handlerError continue to be invoked, and ensure handleError remains
the canonical implementation used by internal callers like any existing error
dispatch sites.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c91381fa-2d77-47c3-a5f4-ad985b737cc5
📒 Files selected for processing (6)
packages/kafka/lib/AbstractKafkaConsumer.tspackages/kafka/lib/AbstractKafkaPublisher.tspackages/kafka/lib/AbstractKafkaService.tspackages/kafka/test/consumer/PermissionBatchConsumer.spec.tspackages/kafka/test/consumer/PermissionConsumer.reconnect.spec.tspackages/kafka/test/consumer/PermissionConsumer.spec.ts
There was a problem hiding this comment.
🧹 Nitpick comments (1)
packages/kafka/lib/AbstractKafkaConsumer.ts (1)
216-227: Consider guarding against empty batches.If
messageOrBatchis an empty array, accessingmessageOrBatch[0].topicwill throw. WhileKafkaMessageBatchStreamshouldn't yield empty batches, defensive coding would guard this edge case.🛡️ Optional defensive check
for await (const messageOrBatch of stream) { + if (Array.isArray(messageOrBatch) && messageOrBatch.length === 0) continue await this.consume( Array.isArray(messageOrBatch) ? messageOrBatch[0].topic : messageOrBatch.topic, messageOrBatch, ) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/kafka/lib/AbstractKafkaConsumer.ts` around lines 216 - 227, The handleStream method can throw if a yielded KafkaMessageBatchStream batch is an empty array; add a defensive guard in handleStream before accessing messageOrBatch[0].topic to skip or error on empty arrays (check Array.isArray(messageOrBatch) && messageOrBatch.length > 0), and only call this.consume with the topic when the batch has at least one message; update logic around the Array.isArray(messageOrBatch) conditional so consume is invoked with the correct topic for both single messages and non-empty batches and empty batches are safely ignored or logged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@packages/kafka/lib/AbstractKafkaConsumer.ts`:
- Around line 216-227: The handleStream method can throw if a yielded
KafkaMessageBatchStream batch is an empty array; add a defensive guard in
handleStream before accessing messageOrBatch[0].topic to skip or error on empty
arrays (check Array.isArray(messageOrBatch) && messageOrBatch.length > 0), and
only call this.consume with the topic when the batch has at least one message;
update logic around the Array.isArray(messageOrBatch) conditional so consume is
invoked with the correct topic for both single messages and non-empty batches
and empty batches are safely ignored or logged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f2bd4cb4-3780-45a8-badf-31901926a6da
📒 Files selected for processing (1)
packages/kafka/lib/AbstractKafkaConsumer.ts
There was a problem hiding this comment.
♻️ Duplicate comments (2)
packages/kafka/lib/AbstractKafkaConsumer.ts (2)
155-183:⚠️ Potential issue | 🟠 MajorDelay assigning
this.consumeruntil startup is fully successful.Line 155 uses
this.consumeras the init guard, but Line 160 sets it beforejoinGroup()/consume(). If startup fails, laterinit()calls become no-ops on a half-initialized instance.Suggested fix
- this.consumer = new Consumer({ + const consumer = new Consumer({ ...this.options.kafka, ...this.options, autocommit: false, // Handling commits manually deserializers: { key: stringDeserializer, value: safeJsonDeserializer, headerKey: stringDeserializer, headerValue: stringDeserializer, }, }) try { const { handlers: _, ...consumeOptions } = this.options // Handlers cannot be passed to consume method - await this.consumer.joinGroup({ + await consumer.joinGroup({ sessionTimeout: consumeOptions.sessionTimeout, rebalanceTimeout: consumeOptions.rebalanceTimeout, heartbeatInterval: consumeOptions.heartbeatInterval, }) - this.consumerStream = await this.consumer.consume({ ...consumeOptions, topics }) + const consumerStream = await consumer.consume({ ...consumeOptions, topics }) + this.consumer = consumer + this.consumerStream = consumerStream @@ } catch (error) { + await consumer.close().catch(() => undefined) throw new InternalError({ message: 'Consumer init failed', errorCode: 'KAFKA_CONSUMER_INIT_ERROR', cause: error, }) }Also applies to: 202-208
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/kafka/lib/AbstractKafkaConsumer.ts` around lines 155 - 183, The init guard currently sets this.consumer early which makes subsequent failed starts permanent; instead instantiate a local Consumer (e.g., const consumer = new Consumer(...)) and use that local for joinGroup() and consume(); only after joinGroup() and consume() succeed assign this.consumer = consumer and this.consumerStream = consumerStream (or assign consumerStream to this.consumerStream after successful consume). Also apply the same pattern for the second block referenced (where this.consumer and this.consumerStream are currently assigned around lines 202-208) so you never set instance fields until startup completes successfully.
228-245:⚠️ Potential issue | 🟠 MajorMake reconnect single-flight and cancelable by explicit
close().Right now, reconnect attempts can overlap, and an explicit
close()can still be followed by a laterinit()from an in-flight reconnect loop.Suggested direction
+ private isClosed = false + private reconnectPromise?: Promise<void> async init(): Promise<void> { + if (this.isClosed) return Promise.resolve() if (this.consumer) return Promise.resolve() @@ async close(): Promise<void> { + this.isClosed = true if (!this.consumer) return Promise.resolve() @@ private async reconnect(error: unknown): Promise<void> { + if (this.reconnectPromise) return this.reconnectPromise + this.reconnectPromise = (async () => { + if (this.isClosed) return this.isReconnecting = true @@ - await setTimeout(Math.pow(2, attempt) * 1000) // Backoff delay starting with 1s + await setTimeout(Math.pow(2, attempt) * 1000) // Backoff delay starting with 1s + if (this.isClosed) return await this.init() @@ - this.handleError(new Error('Consumer failed to reconnect after max attempts'), { + this.handleError(new Error('Consumer failed to reconnect after max attempts'), { maxAttempts: MAX_RECONNECT_ATTEMPTS, }) + })().finally(() => { + this.isReconnecting = false + this.reconnectPromise = undefined + }) + return this.reconnectPromise }Also applies to: 247-261
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/kafka/lib/AbstractKafkaConsumer.ts` around lines 228 - 245, The reconnect logic allows overlapping reconnect attempts and does not cancel in-flight reconnect loops when close() is called; make reconnect single-flight and cancelable by introducing a cancelation and single-flight guard: add an AbortController or boolean flag (e.g., this._closing / this._closed and this._reconnectAbort) and a single-flight promise/lock (e.g., this._reconnectPromise) used by init() and the reconnect loop to ensure only one reconnect runs at a time; update init() and any reconnect loop function (referenced as the reconnect loop around init/reconnect logic, and the init() method) to check the abort flag/AbortSignal before performing work and to await or reuse this._reconnectPromise to prevent concurrent runs; modify close() to set the abort flag/signal, call this._reconnectAbort.abort() (if using AbortController), await the in-flight this._reconnectPromise to finish or cancel, and ensure further init() calls return early when this._closed is true so no new reconnect starts after close() completes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@packages/kafka/lib/AbstractKafkaConsumer.ts`:
- Around line 155-183: The init guard currently sets this.consumer early which
makes subsequent failed starts permanent; instead instantiate a local Consumer
(e.g., const consumer = new Consumer(...)) and use that local for joinGroup()
and consume(); only after joinGroup() and consume() succeed assign this.consumer
= consumer and this.consumerStream = consumerStream (or assign consumerStream to
this.consumerStream after successful consume). Also apply the same pattern for
the second block referenced (where this.consumer and this.consumerStream are
currently assigned around lines 202-208) so you never set instance fields until
startup completes successfully.
- Around line 228-245: The reconnect logic allows overlapping reconnect attempts
and does not cancel in-flight reconnect loops when close() is called; make
reconnect single-flight and cancelable by introducing a cancelation and
single-flight guard: add an AbortController or boolean flag (e.g., this._closing
/ this._closed and this._reconnectAbort) and a single-flight promise/lock (e.g.,
this._reconnectPromise) used by init() and the reconnect loop to ensure only one
reconnect runs at a time; update init() and any reconnect loop function
(referenced as the reconnect loop around init/reconnect logic, and the init()
method) to check the abort flag/AbortSignal before performing work and to await
or reuse this._reconnectPromise to prevent concurrent runs; modify close() to
set the abort flag/signal, call this._reconnectAbort.abort() (if using
AbortController), await the in-flight this._reconnectPromise to finish or
cancel, and ensure further init() calls return early when this._closed is true
so no new reconnect starts after close() completes.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d166066f-c010-4527-9f29-d4413dc63f21
📒 Files selected for processing (1)
packages/kafka/lib/AbstractKafkaConsumer.ts
Summary by CodeRabbit
New Features
Bug Fixes
Tests
Chores