Skip to content

adapters: Fix StreamFromLatest race condition#190

Merged
echarrod merged 4 commits intomainfrom
fix-stream-from-latest-race
Apr 1, 2026
Merged

adapters: Fix StreamFromLatest race condition#190
echarrod merged 4 commits intomainfrom
fix-stream-from-latest-race

Conversation

@echarrod
Copy link
Copy Markdown
Contributor

@echarrod echarrod commented Apr 1, 2026

Summary

StreamFromLatest was resolving the "latest" cursor position on the first Recv() call instead of at NewReceiver() time. This created a race where events sent between NewReceiver and the first Recv could be skipped — or cause Recv to hang indefinitely waiting for events that were already in the log/table.

This was causing flaky test failures in downstream consumers (e.g. timing out after 5 minutes in CI).

Changes

Adapter Fix
memstreamer Set cursor to current log length in NewReceiver() instead of first Recv()
reflexstreamer New WithEventsTableName option resolves HEAD via SELECT MAX(id) at NewReceiver() time. Without the option, falls back to existing WithStreamFromHead() behaviour
adaptertest Simplified StreamFromLatest test — send synchronously before Recv (now safe with cursor fix). Removed goroutines + sync.WaitGroup

Test plan

  • go test ./adapters/memstreamer/... -count=10 passes
  • go test ./adapters/reflexstreamer/... -count=10 passes (including gap filler test)
  • Previously-hanging memstreamer test now completes immediately

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Configurable events-table name for the event streamer, with validation and explicit error feedback.
  • Bug Fixes

    • Receivers now initialise their cursor on creation to avoid races when starting from latest events.
    • Messaging consumer groups are created eagerly with the appropriate start position, reducing first-read races and tolerant handling of existing groups.
  • Tests

    • Simplified event-streaming test to use a sequential send/receive flow (removed concurrent synchronisation).

StreamFromLatest was resolving the "latest" cursor position on the first
Recv() call instead of at NewReceiver() time. This created a race where
events sent between NewReceiver and the first Recv could be skipped or
cause Recv to hang indefinitely.

Fix by capturing the cursor position at NewReceiver time:
- memstreamer: set cursor to current log length in NewReceiver
- reflexstreamer: add WithEventsTableName option to resolve HEAD via
  SELECT MAX(id) at NewReceiver time
- adaptertest: simplify StreamFromLatest test to send synchronously
  before Recv, which is now safe with the cursor fix

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 1, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ec28da31-6a20-40e0-b65d-67f21878b220

📥 Commits

Reviewing files that changed from the base of the PR and between c95ab3c and 2c4c136.

📒 Files selected for processing (1)
  • adapters/wredis/streamer.go

📝 Walkthrough

Walkthrough

The diff moves cursor/consumer-group initialisation from per-receive loops into receiver construction across multiple streamers. memstreamer seeds its cursor from the in-memory log length in NewReceiver when StreamFromLatest is set. reflexstreamer gains a variadic Option mechanism and WithEventsTableName; when provided and StreamFromLatest is requested it validates the table name and queries max(id) to seed the initial cursor (returning errors on invalid names or query failures). wredis now eagerly creates the Redis consumer group in NewReceiver (using "$" for StreamFromLatest) and removes group-creation logic from Recv. A test was simplified to remove goroutine/WaitGroup use and perform send/recv sequentially.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~40 minutes

Possibly related PRs

Poem

🐰 I hopped through lines with nimble feet,

moved starts upstream so streams greet neat.
No races now, just orderly queues,
a tidy wake, no threaded blues.
🍃 I nibble a carrot and hummy a tune.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarises the primary change: fixing a StreamFromLatest race condition across multiple adapters.
Description check ✅ Passed The description is comprehensive and directly related to the changeset, explaining the bug, the fixes across multiple adapters, and the test plan.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix-stream-from-latest-race

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@adapters/reflexstreamer/reflex.go`:
- Around line 120-126: The code concatenates c.eventsTableName into the SQL in
the QueryRowContext call (in the receiver init path around the QueryRowContext
usage) which risks SQL injection or malformed identifiers; validate
c.eventsTableName against a strict identifier regex (e.g. start with
letter/underscore, only alphanumerics/underscores) and then safely quote/escape
it using a proper identifier-quoting helper for your driver (or use the driver's
QuoteIdentifier like pq.QuoteIdentifier) before building the query string passed
to c.reader.QueryRowContext; update the code that computes the query (the
QueryRowContext call and its surrounding logic that reads max(id)) to use the
validated-and-quoted table name.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 70514e39-829d-4dbf-b523-65838f2a19e9

📥 Commits

Reviewing files that changed from the base of the PR and between 49d5074 and 44b73e3.

📒 Files selected for processing (4)
  • adapters/adaptertest/eventstreaming.go
  • adapters/memstreamer/memstreamer.go
  • adapters/reflexstreamer/reflex.go
  • adapters/reflexstreamer/reflex_test.go

Comment thread adapters/reflexstreamer/reflex.go
Comment thread adapters/adaptertest/eventstreaming.go
…ection

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Comment thread adapters/reflexstreamer/reflex.go
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
adapters/reflexstreamer/reflex.go (1)

124-143: SQL injection mitigated, but consider database-agnostic identifier quoting.

The regex validation (line 26) prevents SQL injection through strict allowlisting of identifier characters (^[a-zA-Z_][a-zA-Z0-9_]*$). Backtick quoting adds defence-in-depth, though the validation alone is sufficient.

However, the library's documentation states "Infrastructure Agnostic" and "Your choice of database", yet uses MySQL-specific backtick quoting. Whilst only MySQL is currently implemented, backticks are MySQL-specific; PostgreSQL uses double quotes ("table") and SQL Server uses square brackets ([table]). To align with the stated database-agnostic design goals, consider using ANSI SQL double quotes or a database driver-aware identifier quoting mechanism for future multi-database support.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapters/reflexstreamer/reflex.go` around lines 124 - 143, The code currently
builds a query using MySQL-specific backticks around c.eventsTableName after
validating it with validIdentifier; to make this database-agnostic, add a small
identifier-quoting helper (e.g. quoteIdentifier(name string) or method on the
connector) that chooses the proper quoting characters (ANSI double quotes by
default or driver-configured quotes) and use that helper when constructing the
query passed to c.reader.QueryRowContext; keep the existing validIdentifier
check, and update the call site where the query string is built (referencing
c.eventsTableName and c.reader.QueryRowContext) to use the helper so the adapter
can support multiple SQL dialects.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@adapters/reflexstreamer/reflex.go`:
- Around line 124-143: The code currently builds a query using MySQL-specific
backticks around c.eventsTableName after validating it with validIdentifier; to
make this database-agnostic, add a small identifier-quoting helper (e.g.
quoteIdentifier(name string) or method on the connector) that chooses the proper
quoting characters (ANSI double quotes by default or driver-configured quotes)
and use that helper when constructing the query passed to
c.reader.QueryRowContext; keep the existing validIdentifier check, and update
the call site where the query string is built (referencing c.eventsTableName and
c.reader.QueryRowContext) to use the helper so the adapter can support multiple
SQL dialects.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2bfa4806-7cd5-4364-9642-35162a30319e

📥 Commits

Reviewing files that changed from the base of the PR and between 44b73e3 and fc922bb.

📒 Files selected for processing (1)
  • adapters/reflexstreamer/reflex.go

…ceiver

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@adapters/wredis/streamer.go`:
- Around line 59-60: The error check that rejects Redis BUSYGROUP errors is
brittle because it compares the full literal message; change the condition
around the err check to detect BUSYGROUP by prefix (use
strings.HasPrefix(err.Error(), "BUSYGROUP")) so variations won't cause false
failures, and add the strings import if missing; update the conditional that
currently compares to "BUSYGROUP Consumer Group name already exists" to use
HasPrefix on err.Error() and only return the error when it is not a BUSYGROUP
prefix match.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 4fc4f981-cb92-4474-8c8e-67505ec3672d

📥 Commits

Reviewing files that changed from the base of the PR and between fc922bb and c95ab3c.

📒 Files selected for processing (1)
  • adapters/wredis/streamer.go

Comment thread adapters/wredis/streamer.go Outdated
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@sonarqubecloud
Copy link
Copy Markdown

sonarqubecloud bot commented Apr 1, 2026

Copy link
Copy Markdown
Collaborator

@andrewwormald andrewwormald left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

@echarrod echarrod merged commit c4a417e into main Apr 1, 2026
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants