Skip to content

fix(streams): ephemeral broadcast mode prevents orphan queue accumulation#151

Merged
mhenrixon merged 5 commits into
mainfrom
fix/149-ephemeral-stream-queues
May 8, 2026
Merged

fix(streams): ephemeral broadcast mode prevents orphan queue accumulation#151
mhenrixon merged 5 commits into
mainfrom
fix/149-ephemeral-stream-queues

Conversation

@mhenrixon
Copy link
Copy Markdown
Owner

@mhenrixon mhenrixon commented May 8, 2026

Summary

Fixes the orphan queue accumulation problem where Turbo broadcasts silently materialized permanent PGMQ queues per distinct stream name, causing long-tail apps to accumulate tens of thousands of orphan queues with millions of unread messages.

Approach: Ephemeral by Default + Automatic Cleanup

  • Ephemeral broadcast mode (durable: false): Turbo broadcasts now use raw PG NOTIFY instead of creating PGMQ queues. No queue, no storage, no orphan tables. When a subscriber is connected, the StreamEventDispatcher receives the NOTIFY payload and delivers it inline — same latency, zero storage overhead.

  • Durable broadcast mode (durable: true): Preserved for apps that need message persistence and replay (chat history, activity feeds). Opt-in via Pgbus.stream(name, durable: true).broadcast(html) or by setting config.streams_default_broadcast_mode = :durable.

  • Orphan stream queue sweeper: The dispatcher's maintenance loop now periodically drops stream queues with zero messages or only messages older than streams_orphan_threshold (default: 24 hours). Cleans up pre-existing damage.

  • Dashboard N+1 fix: queues_with_metrics now uses a single batched SQL query (UNION ALL) instead of N separate queries per queue, preventing the dashboard crash with large queue counts.

Configuration

Pgbus.configure do |c|
  # :ephemeral (default) — NOTIFY-only, no queue creation
  # :durable — queue-backed, message persistence + replay
  c.streams_default_broadcast_mode = :ephemeral

  # Orphan sweeper: drop stream queues with no recent activity
  c.streams_orphan_threshold = 24.hours    # age threshold
  c.streams_orphan_sweep_interval = 1.hour # sweep frequency
end

Migration Path

  • Existing apps upgrading to this version: the orphan sweeper will automatically clean up stale stream queues over time.
  • Apps using Pgbus.stream(name).broadcast(html) directly (not through Turbo) still default to durable: true — no behavior change.
  • Only the Turbo monkey-patch path defaults to ephemeral. Apps that need durable Turbo broadcasts can set streams_default_broadcast_mode = :durable.

Closes #149

Files Changed

File Change
lib/pgbus/streams.rb durable: param on Stream; ephemeral broadcast path
lib/pgbus/client/notify_stream.rb New: fire-and-forget PG NOTIFY for ephemeral broadcasts
lib/pgbus/streams/turbo_broadcastable.rb Pass broadcast mode from config
lib/pgbus/web/streamer/listener.rb Forward NOTIFY payload through WakeMessage
lib/pgbus/web/streamer/stream_event_dispatcher.rb Handle ephemeral WakeMessages inline
lib/pgbus/process/dispatcher.rb Orphan stream queue sweeper
lib/pgbus/web/data_source.rb Batched queue metrics query
lib/pgbus/configuration.rb New config options
lib/pgbus.rb Cache key includes durability mode

Test plan

  • bundle exec rspec — 1913 examples, 0 failures
  • bundle exec rubocop — 352 files, 0 offenses
  • Ephemeral broadcast does NOT create PGMQ queue (unit test)
  • Ephemeral broadcast sends PG NOTIFY with payload (unit test)
  • Durable broadcast preserves current behavior (unit test)
  • Orphan sweeper drops empty/stale stream queues (unit test)
  • Dashboard batched query replaces N+1 pattern (unit test)
  • TurboBroadcastable uses ephemeral mode by default (unit test)
  • Configuration validation for new settings (unit test)
  • Backwards compatibility: existing stream specs unchanged

Summary by CodeRabbit

  • New Features

    • Streams support durable or ephemeral modes; default broadcast mode is configurable (default: ephemeral).
    • Ephemeral broadcasts use a lightweight notify path and include payloads delivered to listeners.
    • Listener wake events now carry optional payloads.
    • Automatic orphan stream queue cleanup (configurable interval; default 3600s).
  • Performance

    • Batched metrics queries for dashboard/queue monitoring.
  • Tests

    • Added specs for broadcast modes, notify path, orphan sweeper, and batched metrics.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 8, 2026

Review Change Stack

Caution

Review failed

Pull request was closed or merged during review

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR adds an ephemeral (NOTIFY) broadcast path and per-stream durable: flag, passes durability through Pgbus.stream caching, provides a client notify_stream API, updates Turbo integration and the web streamer to carry ephemeral payloads, introduces an hourly orphan-stream sweeper, batches queue-metrics queries, and adds specs for the new behaviors.

Changes

Ephemeral Broadcast Implementation

Layer / File(s) Summary
Configuration & Broadcast Modes
lib/pgbus/configuration.rb
Adds streams_default_broadcast_mode (default :ephemeral), streams_orphan_sweep_interval, streams_orphan_threshold, VALID_BROADCAST_MODES, setter validation, and validate_streams! checks.
Client NOTIFY API
lib/pgbus/client.rb, lib/pgbus/client/notify_stream.rb
New NotifyStream module and client inclusion; notify_stream(stream_name, payload) computes sanitized channel, JSON-serializes payload when needed, instruments size, and executes SELECT pg_notify via a synchronized raw connection.
Stream Durability & Broadcast Logic
lib/pgbus/streams.rb
Streams::Stream#initialize accepts durable: (default true); #durable? added; #broadcast wraps payload JSON and branches: ephemeral uses notify_stream and returns nil, durable ensures queue and uses existing send semantics.
Stream Caching by Durability
lib/pgbus.rb
Pgbus.stream(name, durable:) appends durability suffix (:d/:e) to cache key so durable and ephemeral streams are cached separately.
TurboBroadcastable Integration
lib/pgbus/streams/turbo_broadcastable.rb
Reads Pgbus.configuration.streams_default_broadcast_mode and calls Pgbus.stream(name, durable: mode == :durable) before broadcasting.
Web Streamer Listener & Dispatcher
lib/pgbus/web/streamer/listener.rb, lib/pgbus/web/streamer/stream_event_dispatcher.rb
Listener WakeMessage carries optional payload; listener forwards payload to handler; dispatcher adds ephemeral sequencing and splits wake handling into ephemeral (parse payload → negative msg_id envelope → enqueue) and durable (existing flow + record_stat) paths.

Orphan Queue Maintenance & Performance

Layer / File(s) Summary
Orphan Stream Sweeper
lib/pgbus/process/dispatcher.rb
Adds ORPHAN_STREAM_SWEEP_INTERVAL, tracks @last_orphan_stream_sweep_at, invokes sweep_orphan_streams from maintenance loop, scans pgmq.meta for stream queues, inspects counts, and drops empty queues via Pgbus.client.drop_queue while logging per-queue errors.
Batched Queue Metrics
lib/pgbus/web/data_source.rb
queues_with_metrics returns [] for empty names, uses batched_queue_metrics(queue_names) to run a single UNION ALL across pgmq.q_* tables, skips failing fragments, and merges metrics with paused flags; method-level rescues log and return [].

Test Coverage

Layer / File(s) Summary
NotifyStream Tests
spec/pgbus/client/notify_stream_spec.rb
New tests verify notify_stream issues pg_notify on sanitized channel, includes JSON payload parameter, and does not create a PGMQ queue.
Orphan Sweeper Tests
spec/pgbus/process/orphan_stream_sweeper_spec.rb
New tests exercise sweep_orphan_streams: drop empty/stale queues, retain recent queues, ignore non-stream queues, handle missing stats gracefully, and assert method presence.
Configuration Tests
spec/pgbus/streams/broadcast_mode_config_spec.rb
Tests for default broadcast mode, sweep interval/threshold defaults, setter coercion/validation, and validate_streams! behavior.
Ephemeral vs Durable Broadcast Tests
spec/pgbus/streams/ephemeral_broadcast_spec.rb
Ephemeral: no queue/PGMQ send, uses notify_stream and returns nil; Durable: ensures queue and sends PGMQ message returning msg_id; default durability tested.
Turbo Integration & DataSource Tests
spec/pgbus/streams/turbo_broadcastable_spec.rb, spec/pgbus/web/data_source_spec.rb, spec/pgbus/web/data_source_batched_metrics_spec.rb
Turbo specs updated to expect durable: argument; DataSource specs updated to stub and assert batched Pgbus Batched Queue Metrics select_all usage and N+1 prevention.

Sequence Diagram(s)

sequenceDiagram
  participant App
  participant PgbusStream
  participant Client
  participant Postgres
  participant Dispatcher
  App->>PgbusStream: Pgbus.stream(name, durable:false).broadcast(payload)
  alt durable=false
    PgbusStream->>Client: notify_stream(name, json_payload)
    Client->>Postgres: SELECT pg_notify(channel, payload)
    Postgres-->>Client: OK
    Client-->>PgbusStream: nil
    Postgres->>Dispatcher: LISTEN/NOTIFY triggers (payload forwarded)
    Dispatcher->>Dispatcher: handle_ephemeral_wake(parse payload -> enqueue)
  else durable=true
    PgbusStream->>Client: ensure_queue! + send_message (PGMQ)
    Client->>Postgres: INSERT into pgmq.q_*
    Postgres-->>Client: OK
    Postgres->>Dispatcher: queue drain wake
    Dispatcher->>Dispatcher: handle_durable_wake(read queue -> unwrap -> enqueue)
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • mhenrixon/pgbus#81: Changes to Streams::Stream broadcast/delivery logic related to transactional vs synchronous delivery.
  • mhenrixon/pgbus#88: Related edits to Pgbus.stream memoization and cache behavior.
  • mhenrixon/pgbus#8: Touches dispatcher maintenance logic similar to this PR's orphan-sweeper additions.

Suggested labels

streaming, bug

🐰 I hop through queues with a twitch and a cheer,
Notify whispers scatter, no durable hoard near.
I sweep the stale at the rise of the sun,
Batch the counts, keep the dashboards cleanly run.
Durable or fleeting — broadcast, then I’m done.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 14.29% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title clearly and concisely describes the main change: introducing ephemeral broadcast mode to prevent orphan queue accumulation.
Linked Issues check ✅ Passed All primary objectives from #149 are addressed: ephemeral mode via durable: parameter [149], default Turbo to ephemeral [149], orphan sweeper implementation [149], batched metrics [149], configuration knobs [149].
Out of Scope Changes check ✅ Passed All changes are within scope: stream durability control, orphan sweeper, notify_stream implementation, dashboard batched queries, configuration additions, and listener payload handling are all required by #149.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/149-ephemeral-stream-queues

Warning

Review ran into problems

🔥 Problems

Timed out fetching pipeline failures after 30000ms


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
lib/pgbus/web/streamer/stream_event_dispatcher.rb (1)

140-143: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Don't coalesce inline-payload wakes for the same stream.

This dedupes wakes by queue_name only. That is safe for durable streams because one read_after can fetch all queued rows, but ephemeral wakes have no backing queue; dropping a second WakeMessage here drops the second broadcast permanently. Two NOTIFYs on the same stream in one dispatcher drain will currently deliver only the first payload.

💡 Possible fix
         def run_loop
           while `@running`
             msg = `@queue.pop`
             break if msg == :__stop__
@@
-            if msg.is_a?(WakeMessage)
+            if msg.is_a?(WakeMessage) && msg.payload.nil?
               wakes, trailing = drain_wakes_for(msg)
               wakes.each { |w| handle(w) }
               handle(trailing) if trailing
             else
               handle(msg)
@@
         def drain_wakes_for(first)
           seen = Set.new([first.queue_name])
           coalesced = [first]
           loop do
@@
-            return [coalesced, peek] unless peek.is_a?(WakeMessage)
+            return [coalesced, peek] unless peek.is_a?(WakeMessage)
+            return [coalesced, peek] if peek.payload
 
             next if seen.include?(peek.queue_name)

Also applies to: 159-175

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/pgbus/web/streamer/stream_event_dispatcher.rb` around lines 140 - 143,
The dispatcher is currently deduping WakeMessage instances by queue_name in
drain_wakes_for, which drops subsequent inline-payload wakes (ephemeral
NOTIFYs); update drain_wakes_for (and any similar logic around lines handling
WakeMessage) to preserve multiple WakeMessage entries for the same queue when
they contain an inline payload (i.e., only dedupe when payload is nil or empty),
so wakes.each { |w| handle(w) } still receives each payload-bearing WakeMessage;
ensure the same change is applied to the other wake-deduping block referenced
around lines 159-175.
lib/pgbus/streams.rb (1)

80-97: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Keep ephemeral broadcasts behind after_commit too.

The new early return skips current_open_transaction, so durable: false broadcasts are emitted immediately even inside an open ActiveRecord transaction. If that transaction rolls back, clients can still observe a change that never committed. Ephemeral delivery and transactional deferral are orthogonal here; notify_stream should go through the same after_commit path and still return nil.

💡 Possible fix
-        return broadcast_ephemeral(wrapped) unless `@durable`
-
-        ensure_queue!
+        ensure_queue! if `@durable`
         transaction = current_open_transaction
         instrument_payload = {
           stream: `@name`,
           visible_to: visible_to,
           deferred: !transaction.nil?,
@@
         Instrumentation.instrument("pgbus.stream.broadcast", instrument_payload) do
           if transaction
-            transaction.after_commit { `@client.send_message`(`@name`, wrapped) }
+            transaction.after_commit do
+              `@durable` ? `@client.send_message`(`@name`, wrapped) : `@client.notify_stream`(`@name`, wrapped)
+            end
             nil
           else
-            `@client.send_message`(`@name`, wrapped)
+            `@durable` ? `@client.send_message`(`@name`, wrapped) : broadcast_ephemeral(wrapped)
           end
         end
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/pgbus/streams.rb` around lines 80 - 97, The early return for ephemeral
broadcasts bypasses transaction detection so ephemeral messages (when `@durable`
is false) are sent immediately even inside an open transaction; remove or change
the early return in the method that currently calls broadcast_ephemeral(wrapped)
so that you always call current_open_transaction and run the same
Instrumentation.instrument block, and if a transaction exists register
transaction.after_commit { `@client.send_message`(`@name`, wrapped) } and return
nil, otherwise call `@client.send_message`(`@name`, wrapped); keep the existing
instrument_payload (stream, visible_to, deferred, bytes) and behavior so
ephemeral delivery follows the same after_commit deferral logic as durable
messages while still returning nil when deferred.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@lib/pgbus/client/notify_stream.rb`:
- Around line 28-31: notify_stream currently calls with_raw_connection which may
PG.connect/close per call causing expensive handshakes; modify notify_stream to
send pg_notify using the client's pooled/owned connection instead of opening a
new raw connection (e.g., use the existing connection object or pool used
elsewhere in the client), or introduce and reuse a dedicated long-lived NOTIFY
connection created once and stored on the client; locate notify_stream and
replace the with_raw_connection { |conn| conn.exec_params("SELECT pg_notify($1,
$2)", [channel, json]) } pattern to obtain a persistent/pooled conn (instead of
PG.connect) and call exec_params on that connection, ensuring proper
synchronization and lifecycle management for the reused connection.

In `@lib/pgbus/web/data_source.rb`:
- Around line 1013-1028: The rescue in the queue_names.filter_map block
currently swallows exceptions; change the rescue to capture the exception (e.g.,
rescue StandardError => e) and log it via Pgbus.logger (include context: the
failing queue name, method/variables like sanitize_name, qtable/seq_name, and
e.message/backtrace) before returning nil so failures are visible; update the
block around unions/queue_names.filter_map that builds qtable/seq_name to use
Pgbus.logger.error (or debug) with a concise message and the exception details.

---

Outside diff comments:
In `@lib/pgbus/streams.rb`:
- Around line 80-97: The early return for ephemeral broadcasts bypasses
transaction detection so ephemeral messages (when `@durable` is false) are sent
immediately even inside an open transaction; remove or change the early return
in the method that currently calls broadcast_ephemeral(wrapped) so that you
always call current_open_transaction and run the same Instrumentation.instrument
block, and if a transaction exists register transaction.after_commit {
`@client.send_message`(`@name`, wrapped) } and return nil, otherwise call
`@client.send_message`(`@name`, wrapped); keep the existing instrument_payload
(stream, visible_to, deferred, bytes) and behavior so ephemeral delivery follows
the same after_commit deferral logic as durable messages while still returning
nil when deferred.

In `@lib/pgbus/web/streamer/stream_event_dispatcher.rb`:
- Around line 140-143: The dispatcher is currently deduping WakeMessage
instances by queue_name in drain_wakes_for, which drops subsequent
inline-payload wakes (ephemeral NOTIFYs); update drain_wakes_for (and any
similar logic around lines handling WakeMessage) to preserve multiple
WakeMessage entries for the same queue when they contain an inline payload
(i.e., only dedupe when payload is nil or empty), so wakes.each { |w| handle(w)
} still receives each payload-bearing WakeMessage; ensure the same change is
applied to the other wake-deduping block referenced around lines 159-175.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI (base), Organization UI (inherited)

Review profile: ASSERTIVE

Plan: Pro

Run ID: 28513a37-ea45-4487-8e91-28da0680618c

📥 Commits

Reviewing files that changed from the base of the PR and between 463fb2a and 048a421.

📒 Files selected for processing (17)
  • lib/pgbus.rb
  • lib/pgbus/client.rb
  • lib/pgbus/client/notify_stream.rb
  • lib/pgbus/configuration.rb
  • lib/pgbus/process/dispatcher.rb
  • lib/pgbus/streams.rb
  • lib/pgbus/streams/turbo_broadcastable.rb
  • lib/pgbus/web/data_source.rb
  • lib/pgbus/web/streamer/listener.rb
  • lib/pgbus/web/streamer/stream_event_dispatcher.rb
  • spec/pgbus/client/notify_stream_spec.rb
  • spec/pgbus/process/orphan_stream_sweeper_spec.rb
  • spec/pgbus/streams/broadcast_mode_config_spec.rb
  • spec/pgbus/streams/ephemeral_broadcast_spec.rb
  • spec/pgbus/streams/turbo_broadcastable_spec.rb
  • spec/pgbus/web/data_source_batched_metrics_spec.rb
  • spec/pgbus/web/data_source_spec.rb

Comment thread lib/pgbus/client/notify_stream.rb
Comment thread lib/pgbus/web/data_source.rb
@mhenrixon mhenrixon self-assigned this May 8, 2026
mhenrixon added 2 commits May 8, 2026 16:03
…tion (#149)

Turbo broadcasts now default to ephemeral mode (PG NOTIFY only, no PGMQ
queue creation), preventing the long-tail orphan queue problem where
per-record broadcasts silently materialized permanent PGMQ queues that
accumulated millions of unread messages.

Key changes:
- Stream#broadcast supports durable: true/false; ephemeral skips queue
  creation and uses raw PG NOTIFY instead
- Client#notify_stream sends fire-and-forget PG NOTIFY on the PGMQ
  channel convention so existing subscribers receive ephemeral broadcasts
- StreamEventDispatcher handles NOTIFY payloads inline for ephemeral
  broadcasts (no read_after roundtrip needed)
- Listener passes NOTIFY payload through WakeMessage for inline delivery
- Dispatcher sweeps orphan stream queues (empty or all-stale) hourly
- Dashboard queues_with_metrics uses a single batched SQL query instead
  of N+1 per-queue queries
- Configuration: streams_default_broadcast_mode, streams_orphan_threshold,
  streams_orphan_sweep_interval

Closes #149
- Use @pgmq.with_connection instead of with_raw_connection for NOTIFY
  to avoid opening a new PG connection per ephemeral broadcast
- Add debug logging to silent rescue in batched_queue_metrics
@mhenrixon mhenrixon force-pushed the fix/149-ephemeral-stream-queues branch from 9059803 to af6cdd7 Compare May 8, 2026 14:03
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
lib/pgbus/streams.rb (1)

100-110: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Ephemeral streams still expose queue-backed operations.

current_msg_id, read_after, and ensure! all assume a backing PGMQ queue, but durable: false intentionally skips creating one. On an ephemeral stream these methods can either recreate the queue (ensure!) or hit a missing queue at runtime. Please guard them behind durable? and fail fast with a clear error or no-op.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/pgbus/streams.rb` around lines 100 - 110, current_msg_id, read_after and
ensure! assume a PGMQ queue exists even when the stream is created with durable:
false; update each method to first check durable? and fail fast or become a
no-op: have current_msg_id and read_after raise a clear error (e.g. "ephemeral
stream has no queue") when durable? is false, and make ensure! either no-op or
raise consistently instead of calling ensure_queue! for ephemeral streams.
Modify current_msg_id, read_after, and ensure! to guard with durable? before
calling `@client.stream_current_msg_id`, `@client.read_after` or ensure_queue! and
include the clear error message referenced above.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@lib/pgbus/configuration.rb`:
- Around line 364-370: The current validation forces
streams_orphan_sweep_interval and streams_orphan_threshold to be positive
numbers, making the orphan sweeper mandatory; change the checks in the
configuration so nil is treated as “disabled” (i.e., accept nil or a positive
Numeric) instead of raising. Specifically, update the validation around
streams_orphan_sweep_interval and streams_orphan_threshold so they only raise
ArgumentError when the value is non-nil and not a positive Numeric, and ensure
the logic that returns early (the guard that currently returns if
streams_orphan_threshold.is_a?(Numeric) && streams_orphan_threshold.positive?)
also allows nil to skip configuring the sweep so the dispatcher can skip
sweeping when threshold or interval is unset.

In `@lib/pgbus/process/dispatcher.rb`:
- Line 18: The config value streams_orphan_sweep_interval is ignored because
run_maintenance uses the hardcoded ORPHAN_STREAM_SWEEP_INTERVAL; update
run_maintenance to read and use the configured value (fallback to
ORPHAN_STREAM_SWEEP_INTERVAL if nil) so the schedule respects the new knob.
Locate run_maintenance in dispatcher.rb and replace the hardcoded constant with
something like config.streams_orphan_sweep_interval ||
ORPHAN_STREAM_SWEEP_INTERVAL (or the equivalent method call to get the
configured value), ensuring any places that schedule the orphan sweep (including
the second occurrence noted) use the same config-derived interval.
- Around line 345-347: The current logic calls
Pgbus.client.drop_queue(full_name, prefixed: false) when newest_age exceeds
threshold even if queue_length > 0, which can delete unread durable streams;
modify the condition in the dispatcher (the code that uses queue_length,
newest_age and threshold before calling drop_queue) to only drop queues that are
provably orphaned: either require queue_length == 0 before calling drop_queue,
or first read the queue metadata (durable flag) and only drop if durable? is
false OR queue_length == 0; update the branch that references
queue_length/newest_age to check durability via the queue metadata accessor used
elsewhere in this class and only invoke drop_queue when that check passes.
- Around line 327-338: The dispatcher is directly querying pgmq.meta and
pgmq.q_* (see use of conn.select_values and conn.select_one with
QueueNameValidator.sanitize!), which violates the rule to always go through
Pgbus::Client and config.queue_name; refactor by adding client methods (e.g.
Pgbus::Client.list_stream_queues(prefix) and
Pgbus::Client.queue_stats(queue_name) or similar) that encapsulate the SELECT
from pgmq.meta and the per-queue stats query, then replace the direct calls in
the dispatcher (remove conn.select_values and conn.select_one usage and the
QueueNameValidator.sanitize! interpolation) with calls to those client methods
and use config.queue_name() to build prefixes rather than hardcoding them.

In `@lib/pgbus/web/streamer/stream_event_dispatcher.rb`:
- Around line 202-206: The current branch in stream_event_dispatcher routes
payload-bearing NOTIFYs to handle_ephemeral_wake but drain_wakes_for still
coalesces same-stream wakes unconditionally, which collapses multiple ephemeral
payloads; update the coalescing logic in drain_wakes_for (and the similar block
covering the other branch around the second region) so it only
deduplicates/merges wakes that have no payload (i.e., payload-less
notifications), leaving payload-bearing wakes and any durable-after-ephemeral
ordering intact; ensure WakeMessage creation/path preserves every ephemeral
payload and add a regression test that emits two consecutive ephemeral
broadcasts on the same stream and asserts both payloads are delivered (and that
a durable wake queued after an ephemeral is not hidden).

In `@spec/pgbus/process/orphan_stream_sweeper_spec.rb`:
- Around line 101-103: The test currently only asserts the existence of
sweep_orphan_streams rather than proving run_maintenance wires it in; update the
spec to either stub the scheduler by stubbing DispatcherMaintenance.run_if_due
(or the instance method run_if_due) to yield/trigger and then expect an instance
to receive :sweep_orphan_streams, or advance the test clock to cause
run_maintenance to execute and assert that sweep_orphan_streams was invoked;
target the run_maintenance invocation and set the expectation on described_class
(or a DispatcherMaintenance instance) to receive :sweep_orphan_streams when
run_if_due fires.

---

Outside diff comments:
In `@lib/pgbus/streams.rb`:
- Around line 100-110: current_msg_id, read_after and ensure! assume a PGMQ
queue exists even when the stream is created with durable: false; update each
method to first check durable? and fail fast or become a no-op: have
current_msg_id and read_after raise a clear error (e.g. "ephemeral stream has no
queue") when durable? is false, and make ensure! either no-op or raise
consistently instead of calling ensure_queue! for ephemeral streams. Modify
current_msg_id, read_after, and ensure! to guard with durable? before calling
`@client.stream_current_msg_id`, `@client.read_after` or ensure_queue! and include
the clear error message referenced above.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI (base), Organization UI (inherited)

Review profile: ASSERTIVE

Plan: Pro

Run ID: 106420b0-87b6-40ec-a87e-aaf05f2b3f86

📥 Commits

Reviewing files that changed from the base of the PR and between 048a421 and af6cdd7.

📒 Files selected for processing (17)
  • lib/pgbus.rb
  • lib/pgbus/client.rb
  • lib/pgbus/client/notify_stream.rb
  • lib/pgbus/configuration.rb
  • lib/pgbus/process/dispatcher.rb
  • lib/pgbus/streams.rb
  • lib/pgbus/streams/turbo_broadcastable.rb
  • lib/pgbus/web/data_source.rb
  • lib/pgbus/web/streamer/listener.rb
  • lib/pgbus/web/streamer/stream_event_dispatcher.rb
  • spec/pgbus/client/notify_stream_spec.rb
  • spec/pgbus/process/orphan_stream_sweeper_spec.rb
  • spec/pgbus/streams/broadcast_mode_config_spec.rb
  • spec/pgbus/streams/ephemeral_broadcast_spec.rb
  • spec/pgbus/streams/turbo_broadcastable_spec.rb
  • spec/pgbus/web/data_source_batched_metrics_spec.rb
  • spec/pgbus/web/data_source_spec.rb

Comment thread lib/pgbus/configuration.rb Outdated
Comment thread lib/pgbus/process/dispatcher.rb
Comment thread lib/pgbus/process/dispatcher.rb
Comment thread lib/pgbus/process/dispatcher.rb Outdated
Comment thread lib/pgbus/web/streamer/stream_event_dispatcher.rb
Comment thread spec/pgbus/process/orphan_stream_sweeper_spec.rb
- Allow orphan sweeper to be disabled via nil threshold/interval
- Use config.streams_orphan_sweep_interval instead of hardcoded constant
- Only drop empty stream queues (preserves durable replay contract)
- Skip wake coalescing for payload-bearing ephemeral wakes (prevents data loss)
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
lib/pgbus/web/streamer/stream_event_dispatcher.rb (1)

236-268: 🧹 Nitpick | 🔵 Trivial | 💤 Low value

Consider adding explicit JSON error handling for consistency.

The outer handle() rescue will catch parse errors, but adding an explicit rescue here would align with unwrap_stream_envelope's approach (line 454) and provide a clearer early-return path for malformed payloads.

♻️ Optional defensive rescue
 def handle_ephemeral_wake(msg, stream, registered, in_flight_pairs, started_at)
-  parsed = JSON.parse(msg.payload)
+  begin
+    parsed = JSON.parse(msg.payload)
+  rescue JSON::ParserError
+    `@logger.warn` { "[Pgbus::Streamer::StreamEventDispatcher] malformed ephemeral payload for #{stream}" }
+    return
+  end
   html = parsed.is_a?(Hash) ? parsed["html"] : nil
   return unless html.is_a?(String)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/pgbus/web/streamer/stream_event_dispatcher.rb` around lines 236 - 268, In
handle_ephemeral_wake, add an explicit rescue around JSON.parse (mirroring
unwrap_stream_envelope) so malformed JSON is caught locally: wrap the parse in
begin/rescue JSON::ParserError => e, optionally log the error with contextual
info (stream and msg) and then return early instead of letting the outer handler
handle it; ensure you still only proceed when parsed is a Hash and html is a
String, and keep the rest of the logic that builds StreamEnvelope and
enqueues/prunes unchanged.
♻️ Duplicate comments (2)
lib/pgbus/process/dispatcher.rb (2)

329-339: 🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift

Direct PGMQ queries violate the coding guideline.

Lines 330 and 336-339 query pgmq.meta and pgmq.q_* tables directly via conn.select_values and conn.select_one, bypassing Pgbus::Client. This duplicates queue/schema knowledge outside the client layer.

As per coding guidelines **/*.{rb,erb}: "Never make direct PGMQ calls — always go through Pgbus::Client".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/pgbus/process/dispatcher.rb` around lines 329 - 339, The code is querying
pgmq.meta and pgmq.q_* directly (using conn.select_values/select_one) which
violates the guideline to always go through Pgbus::Client; change the
orphan-check loop to call the appropriate Pgbus::Client APIs (use the client
method that lists queues instead of querying pgmq.meta, and the client method
that returns queue length or metadata instead of selecting from pgmq.q_*) and
retain QueueNameValidator.sanitize! where needed; replace uses of
Pgbus::BusRecord.connection / ActiveRecord::Base.connection with calls into
Pgbus::Client within the same method so all PGMQ access goes through
Pgbus::Client.

18-18: 🧹 Nitpick | 🔵 Trivial | 💤 Low value

Remove unused constant.

ORPHAN_STREAM_SWEEP_INTERVAL is now dead code since line 91-92 reads from config.streams_orphan_sweep_interval. This constant is never referenced.

🧹 Proposed cleanup
-      ORPHAN_STREAM_SWEEP_INTERVAL = 3600 # Run orphan stream sweep every hour
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/pgbus/process/dispatcher.rb` at line 18, Remove the dead constant
ORPHAN_STREAM_SWEEP_INTERVAL from dispatcher.rb: delete its declaration so the
code uses the configured value (config.streams_orphan_sweep_interval)
exclusively and ensure no other code references ORPHAN_STREAM_SWEEP_INTERVAL
(search for that symbol and remove or update any usages if found).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@spec/pgbus/process/orphan_stream_sweeper_spec.rb`:
- Around line 109-114: The custom matcher have_instance_method is defined inside
the spec file and should be extracted for reuse: move the RSpec::Matchers.define
:have_instance_method block into a new file under spec/support/matchers (e.g.
create a matcher file named have_instance_method.rb), remove the definition from
spec/pgbus/process/orphan_stream_sweeper_spec.rb, and ensure your test suite
loads support matchers (add or verify a Dir[“spec/support/**/*.rb”].each { |f|
require f } entry in your spec helper or confirm support files are auto-required
in RSpec configuration).

---

Outside diff comments:
In `@lib/pgbus/web/streamer/stream_event_dispatcher.rb`:
- Around line 236-268: In handle_ephemeral_wake, add an explicit rescue around
JSON.parse (mirroring unwrap_stream_envelope) so malformed JSON is caught
locally: wrap the parse in begin/rescue JSON::ParserError => e, optionally log
the error with contextual info (stream and msg) and then return early instead of
letting the outer handler handle it; ensure you still only proceed when parsed
is a Hash and html is a String, and keep the rest of the logic that builds
StreamEnvelope and enqueues/prunes unchanged.

---

Duplicate comments:
In `@lib/pgbus/process/dispatcher.rb`:
- Around line 329-339: The code is querying pgmq.meta and pgmq.q_* directly
(using conn.select_values/select_one) which violates the guideline to always go
through Pgbus::Client; change the orphan-check loop to call the appropriate
Pgbus::Client APIs (use the client method that lists queues instead of querying
pgmq.meta, and the client method that returns queue length or metadata instead
of selecting from pgmq.q_*) and retain QueueNameValidator.sanitize! where
needed; replace uses of Pgbus::BusRecord.connection /
ActiveRecord::Base.connection with calls into Pgbus::Client within the same
method so all PGMQ access goes through Pgbus::Client.
- Line 18: Remove the dead constant ORPHAN_STREAM_SWEEP_INTERVAL from
dispatcher.rb: delete its declaration so the code uses the configured value
(config.streams_orphan_sweep_interval) exclusively and ensure no other code
references ORPHAN_STREAM_SWEEP_INTERVAL (search for that symbol and remove or
update any usages if found).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI (base), Organization UI (inherited)

Review profile: ASSERTIVE

Plan: Pro

Run ID: a508361b-6a56-4849-ac66-4c319a3ad961

📥 Commits

Reviewing files that changed from the base of the PR and between af6cdd7 and ae70931.

📒 Files selected for processing (5)
  • lib/pgbus/configuration.rb
  • lib/pgbus/process/dispatcher.rb
  • lib/pgbus/web/streamer/stream_event_dispatcher.rb
  • spec/pgbus/process/orphan_stream_sweeper_spec.rb
  • spec/pgbus/streams/broadcast_mode_config_spec.rb

Comment thread spec/pgbus/process/orphan_stream_sweeper_spec.rb Outdated
Use Ruby's private_method_defined? directly instead of the custom matcher.
The matcher was only used in one place and added more complexity than it
saved.
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@spec/pgbus/process/orphan_stream_sweeper_spec.rb`:
- Around line 77-89: The test "preserves non-empty queues even when stale
(durable replay contract)" is only asserting non-empty behavior by stubbing
queue_length = 100; split it into two specs: keep the existing case asserting a
recent non-empty queue is preserved (mock conn.select_values to return the
stream name and conn.select_one for q_pgbus_test_stream_stale to return
"queue_length" => "100" and a recent timestamp field such as "last_message_at"
within the staleness threshold), and add a new spec that models a truly stale
queue (mock conn.select_one to return an old "last_message_at" or per-message
timestamps older than the sweep threshold and a queue_length that should be
considered stale) and assert mock_client.drop_queue is called; update mocks of
select_values/select_one used by dispatcher.send(:sweep_orphan_streams) and
reference the sweep_orphan_streams method, the q_pgbus_test_stream_stale query,
and mock_client.drop_queue when implementing these two focused tests.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI (base), Organization UI (inherited)

Review profile: ASSERTIVE

Plan: Pro

Run ID: fc8b3cd4-a8bc-4eae-b1bb-7e715600538f

📥 Commits

Reviewing files that changed from the base of the PR and between ae70931 and 3316caf.

📒 Files selected for processing (1)
  • spec/pgbus/process/orphan_stream_sweeper_spec.rb

Comment thread spec/pgbus/process/orphan_stream_sweeper_spec.rb Outdated
The test verifies non-empty queues are preserved — remove the misleading
"even when stale" qualifier since the sweep only checks queue_length.
@mhenrixon mhenrixon merged commit 7abd1d6 into main May 8, 2026
8 of 9 checks passed
@mhenrixon mhenrixon deleted the fix/149-ephemeral-stream-queues branch May 8, 2026 15:14
mhenrixon added a commit that referenced this pull request May 8, 2026
* feat(streams): per-broadcast and per-stream durable opt-in

Adds two complementary opt-in mechanisms on top of the global
streams_default_broadcast_mode setting introduced in #151:

1. Per-broadcast override: stream.broadcast(html, durable: true/false)
   flips the mode for a single call. nil (the default) defers to the
   stream's own setting.

2. Per-stream pattern config: streams_durable_patterns accepts an Array
   of strings (exact match) or Regexps. Pgbus.stream(name) without an
   explicit durable: kwarg now resolves the mode through the patterns
   first, then falls back to streams_default_broadcast_mode.

Pgbus.stream's durable: parameter changed default from true to nil so
the resolver can run. Explicit durable: true/false still bypasses the
resolver.

Example:
  Pgbus.configure do |c|
    c.streams_durable_patterns = ["chat", /^orders:/]
  end

  Pgbus.stream("chat").broadcast(html)         # durable
  Pgbus.stream("notifications").broadcast(html) # ephemeral
  Pgbus.stream("notifications").broadcast(html, durable: true) # one-off durable

* fix(ci): fix integration test failures for ephemeral broadcast mode

- Use __send__(:with_connection) since PGMQ::Client#with_connection is
  private — unit tests passed because mocks respond to private methods,
  but integration tests hit the real PGMQ client
- Set streams_default_broadcast_mode = :durable in integration_helper so
  integration specs using Pgbus.stream(name).broadcast() get durable
  queues for read_after verification
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

1 participant