Skip to content

fix(streams): deliver ephemeral broadcasts, idempotent disconnect counter#157

Merged
mhenrixon merged 1 commit into
mainfrom
fix/ephemeral-delivery-and-counter-idempotency
May 14, 2026
Merged

fix(streams): deliver ephemeral broadcasts, idempotent disconnect counter#157
mhenrixon merged 1 commit into
mainfrom
fix/ephemeral-delivery-and-counter-idempotency

Conversation

@mhenrixon
Copy link
Copy Markdown
Owner

@mhenrixon mhenrixon commented May 14, 2026

Summary

Two related streamer bugs surfaced after 0.8.1. Both ship together because they affect the same delivery path and share a fix branch.

1. Ephemeral broadcasts never reach browsers (the load-bearing fix)

**Ephemeral broadcasts arrive at the streamer as PG NOTIFY payloads but never reach the browser.

The StreamEventDispatcher tags ephemeral envelopes with negative msg_ids (-@ephemeral_seq) so they don't pollute the PGMQ cursor space:

envelope = StreamEnvelope.new(msg_id: -@ephemeral_seq, ..., source: "ephemeral")

But Connection#enqueue applied the same cursor check used for durable replay:

next if envelope.msg_id <= @last_msg_id_sent

A fresh connection starts at since_id = 0, so last_msg_id_sent = 0. Every ephemeral envelope has msg_id = -1, -2, ..., and -1 <= 0 is true — every ephemeral broadcast got dropped before being written to the SSE socket.

Failure scenario (Juraj's reproduction):

  1. Browser connects to SSE stream.
  2. Another tab updates a payment link, calling Pgbus.stream(...).broadcast(html).
  3. pgbus sends ephemeral PG NOTIFY → streamer receives it → creates envelope with msg_id = -1.
  4. Connection rejects it because -1 <= 0.
  5. Browser never sees the Turbo stream.

Fix: ephemeral envelopes (negative msg_id) bypass the cursor check and do not advance last_msg_id_sent. They have no replay semantics, so the cursor must continue tracking only durable PGMQ ids — otherwise a single ephemeral broadcast would set the cursor to a negative value and break subsequent durable replay.

2. Duplicate DisconnectMessages over-decremented active_connections (CodeRabbit comment)

prune_dead enqueues a DisconnectMessage on every wake when a connection is dead. Under burst traffic, two wakes can fire before the first DisconnectMessage drains the queue — leaving two DisconnectMessages for the same connection. The second decrement_connections call drove the active-connection counter negative, undercounting subsequent legitimate connections.

Fix: Registry#unregister now returns true when it actually removed a connection, false when the connection was already gone. handle_disconnect only decrements when unregister returns true.

Test plan

  • bundle exec rspec spec/pgbus/web/streamer/ — 158 examples, 0 failures
  • New specs prove the cursor bug exists and is fixed:
    • Connection#enqueue ephemeral envelopes (negative msg_id) — 4 examples covering since_id=0, post-replay cursor positions, mixed durable+ephemeral, and cursor non-advancement
  • New spec proves duplicate-disconnect idempotency:
    • is idempotent on duplicate disconnect (does not over-decrement when prune_dead races)
  • bundle exec rubocop — clean

Files changed

File Why
lib/pgbus/web/streamer/connection.rb Cursor bypass for negative msg_id; do not advance cursor for ephemeral
lib/pgbus/web/streamer/registry.rb unregister returns true/false
lib/pgbus/web/streamer/stream_event_dispatcher.rb Decrement only when registry actually removed
spec/pgbus/web/streamer/connection_spec.rb 4 new ephemeral envelope specs
spec/pgbus/web/streamer/registry_spec.rb 3 new return-value specs for unregister
spec/pgbus/web/streamer/stream_event_dispatcher_spec.rb Idempotent disconnect spec

Summary by CodeRabbit

  • New Features

    • Added support for ephemeral messages that bypass normal message filtering and don't advance the cursor.
  • Bug Fixes

    • Improved disconnect handling to be idempotent, preventing duplicate disconnects from incorrectly decrementing connection counters.
  • Tests

    • Added comprehensive test coverage for ephemeral message delivery and idempotent connection cleanup.

Review Change Stack

…nter

Two related streamer bugs surfaced after 0.8.1.

1. Ephemeral broadcasts never reached browsers
   The Dispatcher tags ephemeral envelopes with negative msg_ids
   (-@ephemeral_seq) so they don't pollute the PGMQ cursor space, but
   Connection#enqueue applied the same `msg_id <= last_msg_id_sent`
   filter used for durable replay. Every fresh connection starts at
   since_id=0, so every ephemeral envelope (-1, -2, ...) was rejected
   before being written to the SSE socket. Symptom: a second tab
   broadcasts via Pgbus.stream(...).broadcast(html), the streamer logs
   the wake, but the browser tab listening on the same stream sees
   nothing.

   Fix: ephemeral envelopes (negative msg_id) bypass the cursor check
   and do not advance last_msg_id_sent (they have no replay semantics,
   so the cursor must continue tracking only durable PGMQ ids).

2. Duplicate DisconnectMessages over-decremented active_connections
   prune_dead enqueues a DisconnectMessage on every wake when a
   connection is dead. If two wakes fire before the first
   DisconnectMessage drains, two DisconnectMessages exist for the same
   connection. The second decrement_connections call drove the counter
   negative, undercounting subsequent legitimate connections.

   Fix: Registry#unregister now returns true/false depending on whether
   it actually removed a connection. handle_disconnect only decrements
   the counter when unregister returns true.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 14, 2026

📝 Walkthrough

Walkthrough

This PR adds support for ephemeral envelopes (negative msg_id) that skip cursor-based filtering and do not advance the replay cursor, updates Registry#unregister to return a boolean indicating whether removal occurred, and uses that contract to make disconnect event handling idempotent against duplicate disconnects from race conditions.

Changes

Ephemeral Message Handling and Idempotent Disconnect

Layer / File(s) Summary
Registry unregister boolean return contract
lib/pgbus/web/streamer/registry.rb, spec/pgbus/web/streamer/registry_spec.rb
Registry#unregister now explicitly returns true when a connection is removed and false when the connection is not found or already unregistered. Tests verify all three cases: successful removal, unknown connection, and duplicate unregister attempts.
Ephemeral envelope handling in Connection
lib/pgbus/web/streamer/connection.rb, spec/pgbus/web/streamer/connection_spec.rb
Connection#enqueue treats negative msg_id envelopes as ephemeral: they bypass cursor filtering so they are written regardless of last_msg_id_sent position, and they do not advance the cursor after write. Tests cover cursor-filter acceptance, cursor non-advancement, delivery before and after durable replay, and mixed durable+ephemeral batches.
Idempotent disconnect handling in StreamEventDispatcher
lib/pgbus/web/streamer/stream_event_dispatcher.rb, spec/pgbus/web/streamer/stream_event_dispatcher_spec.rb
StreamEventDispatcher#handle_disconnect now captures the boolean result from Registry#unregister and only decrements the per-stream connection counter when unregister actually removed a connection. Test verifies duplicate DisconnectMessages do not over-decrement the counter during prune/dequeue races.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

  • mhenrixon/pgbus#151: The main PR's Connection#enqueue now preserves negative msg_id (ephemeral) envelopes against the last_msg_id_sent cursor filtering, which directly matches and complements the retrieved PR's streamer work where ephemeral wakes create StreamEnvelopes with negative msg_id.

Suggested labels

bug, streaming

Poem

🐰 A message with negative id
Slips past the cursor grid—
Ephemeral and free,
While duplicates can't be,
The disconnect race is forbid! 📨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the two main fixes: delivering ephemeral broadcasts and making disconnect counter idempotent, directly matching the PR's primary objectives.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/ephemeral-delivery-and-counter-idempotency

Comment @coderabbitai help to get the list of available commands and usage tips.

@mhenrixon mhenrixon self-assigned this May 14, 2026
@mhenrixon mhenrixon added the bug Something isn't working label May 14, 2026
@mhenrixon mhenrixon merged commit ca2e351 into main May 14, 2026
9 checks passed
@mhenrixon mhenrixon deleted the fix/ephemeral-delivery-and-counter-idempotency branch May 14, 2026 16:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant