Skip to content

Conversation

@irfanh94
Copy link
Contributor

@irfanh94 irfanh94 commented Nov 21, 2025

Problem

The Kafka consumer was processing messages without flow control, which could lead to:

  • OOM (Out of Memory) issues: Messages could accumulate in memory if handlers process slower than messages arrive, eventually causing the application to run out of memory and crash
  • Backpressure issues: The consumer stream would continue emitting messages even when handlers were still processing previous ones, leading to unbounded message buffering
  • Concurrent processing: Multiple messages could be processed simultaneously, potentially causing race conditions or resource exhaustion
  • No backpressure mechanism: Without pause/resume, there was no way to signal the stream to slow down when processing capacity was reached

Solution

Added controlled data fetching by using for await instead of stream.on('data') which is not respecting promises.

Summary by CodeRabbit

  • Refactor

    • Kafka consumer now processes messages via sequential async stream iteration for predictable ordering and clearer error propagation
  • Tests

    • Added integration tests validating single-message processing, rapid-publish ordering, cross-topic handling, and strict one-message-at-a-time behavior
  • Chores

    • Package version bump; no public API or signature changes

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Nov 21, 2025

Walkthrough

Replaces the consumer's event-driven per-message on('data') handler with a private async iterator handleSyncStream(stream) that sequentially for await-consumes messages; init() calls this handler. Adds integration tests asserting single-threaded, ordered processing across topics and rapid publishes.

Changes

Cohort / File(s) Summary
Core consumer refactor
packages/kafka/lib/AbstractKafkaConsumer.ts
Replaces stream on('data') handler with a private handleSyncStream(stream) using for await (const msg of stream) that calls consume(msg) sequentially; init() invokes handleSyncStream(stream).catch(this.handlerError); retains consumerStream.on('error') handling.
Integration test additions
packages/kafka/test/consumer/PermissionConsumer.spec.ts
Adds "sync message processing" test suite with lifecycle setup/teardown for PermissionPublisher and PermissionConsumer; new tests verify sequential single-message processing, ordered processing under rapid publish, enforced single concurrency, and cross-topic synchronous ordering (uses PERMISSION_REMOVED_SCHEMA).
Metadata
packages/kafka/package.json
Bumps package version from 0.7.6 to 0.7.7 (metadata only).

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant Init as init()
    participant Consumer as AbstractKafkaConsumer
    participant Stream as ConsumerStream
    participant Processor as consume(msg)
    participant Errors as handlerError / stream error

    rect rgb(240,248,255)
      Note over Init,Stream: Old flow (event-driven)
      Init->>Stream: stream.on('data', handler)
      Stream->>Processor: emit 'data' (per message)
      Processor->>Processor: process message
    end

    rect rgb(245,255,240)
      Note over Init,Consumer: New flow (async sequential)
      Init->>Consumer: handleSyncStream(stream)
      Consumer->>Consumer: for await (msg of stream)
      Consumer->>Processor: consume(msg)
      Processor->>Processor: process message
      Consumer->>Errors: .catch(handlerError)
    end

    rect rgb(255,250,240)
      Note over Stream,Errors: Error events preserved
      Stream->>Errors: stream.on('error', ...)
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Inspect ordering/backpressure and ensure for await retains desired semantics in AbstractKafkaConsumer.ts.
  • Confirm handlerError handling and existing consumerStream.on('error') behavior.
  • Review new tests for timing/flakiness and correct schema usage (PERMISSION_REMOVED_SCHEMA).

Poem

🐇 I hopped from events into a steady stream,
One-by-one the messages now gleam.
For-await I chew each tasty byte,
In ordered rows from dawn to night.
A rabbit's clap — synchronized delight!

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and concisely describes the main technical change: replacing event-driven stream handling with async iteration using for await.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6156c5e and 207da85.

📒 Files selected for processing (1)
  • packages/kafka/package.json (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • packages/kafka/package.json

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3f9efc3 and 5f7f573.

📒 Files selected for processing (2)
  • packages/kafka/lib/AbstractKafkaConsumer.ts (1 hunks)
  • packages/kafka/test/consumer/PermissionConsumer.spec.ts (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
packages/kafka/test/consumer/PermissionConsumer.spec.ts (2)
packages/kafka/test/publisher/PermissionPublisher.ts (1)
  • PermissionPublisher (18-32)
packages/kafka/test/consumer/PermissionConsumer.ts (1)
  • PermissionConsumer (37-97)
packages/kafka/lib/AbstractKafkaConsumer.ts (1)
packages/kafka/lib/types.ts (2)
  • DeserializedMessage (46-51)
  • SupportedMessageValues (42-44)
🔇 Additional comments (2)
packages/kafka/lib/AbstractKafkaConsumer.ts (2)

214-218: Excellent refactoring for flow control and backpressure.

Replacing the event-driven stream.on('data') handler with the async handleSyncStream method successfully addresses the PR objectives. The .catch(this.handlerError) ensures errors in the async iteration are properly routed to the existing error handling mechanism.


223-232: LGTM! The for await loop provides natural backpressure.

This implementation correctly:

  • Waits for each message to be fully processed before fetching the next one (preventing OOM from message accumulation)
  • Provides backpressure by suspending stream reads during handler execution
  • Ensures sequential processing without race conditions
  • Handles stream termination gracefully when close() is called

The type cast on line 229 is safe and necessary to narrow from the stream's generic object type to the specific SupportedMessageValues<TopicsConfig> type.

@CarlosGamero CarlosGamero merged commit 39234ae into kibertoad:main Nov 21, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants