Skip to content

feat(ups): implement queue subs#4486

Draft
MasterPtato wants to merge 1 commit into03-19-feat_cache_add_in_flight_dedupingfrom
03-23-fix_ups_implement_queue_subs
Draft

feat(ups): implement queue subs#4486
MasterPtato wants to merge 1 commit into03-19-feat_cache_add_in_flight_dedupingfrom
03-23-fix_ups_implement_queue_subs

Conversation

@MasterPtato
Copy link
Contributor

Description

Please include a summary of the changes and the related issue. Please also include relevant motivation and context.

Type of change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

Please describe the tests that you ran to verify your changes.

Checklist:

  • My code follows the style guidelines of this project
  • I have performed a self-review of my code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes

@railway-app
Copy link

railway-app bot commented Mar 24, 2026

🚅 Deployed to the rivet-pr-4486 environment in rivet-frontend

Service Status Web Updated (UTC)
frontend-inspector 😴 Sleeping (View Logs) Web Mar 24, 2026 at 12:44 am
frontend-cloud 😴 Sleeping (View Logs) Web Mar 24, 2026 at 12:40 am
website 😴 Sleeping (View Logs) Web Mar 24, 2026 at 12:40 am
kitchen-sink ❌ Build Failed (View Logs) Web Mar 24, 2026 at 12:36 am
mcp-hub ✅ Success (View Logs) Web Mar 24, 2026 at 12:31 am
ladle ❌ Build Failed (View Logs) Web Mar 24, 2026 at 12:31 am

Copy link
Contributor Author

MasterPtato commented Mar 24, 2026

@MasterPtato MasterPtato changed the title fix(ups): implement queue subs feat(ups): add queue subscription support Mar 24, 2026
@MasterPtato MasterPtato changed the title feat(ups): add queue subscription support feat(ups): add Mar 24, 2026
@MasterPtato MasterPtato changed the title feat(ups): add feat(ups): implement queue subs Mar 24, 2026
@pkg-pr-new
Copy link

pkg-pr-new bot commented Mar 24, 2026

More templates

@rivetkit/cloudflare-workers

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/cloudflare-workers@4486

@rivetkit/framework-base

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/framework-base@4486

@rivetkit/next-js

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/next-js@4486

@rivetkit/react

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/react@4486

rivetkit

pnpm add https://pkg.pr.new/rivet-dev/rivet/rivetkit@4486

@rivetkit/sql-loader

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sql-loader@4486

@rivetkit/sqlite-vfs

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sqlite-vfs@4486

@rivetkit/traces

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/traces@4486

@rivetkit/workflow-engine

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/workflow-engine@4486

@rivetkit/virtual-websocket

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/virtual-websocket@4486

@rivetkit/engine-runner

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner@4486

@rivetkit/engine-runner-protocol

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner-protocol@4486

commit: ec32bdf

@claude
Copy link

claude bot commented Mar 24, 2026

PR Review: feat(ups): implement queue subs

Overall this is a solid implementation of queue subscriptions across all three drivers. The design (heartbeats + FOR UPDATE SKIP LOCKED + NOTIFY wakeup) is a good fit for Postgres. A few issues to address:


Bug: Queue channel cleanup doesn't fire for second+ drops (postgres/mod.rs)

When multiple subscribers share the same (subject, queue) channel, all of them hold a drop_guard cloned from the same CancellationToken. The cleanup task is spawned once and awaits token_clone.cancelled(). When Sub1 drops, it cancels the shared token and the cleanup task wakes up — but tx.receiver_count() > 0 (Sub2 is still alive), so it exits without doing anything. When Sub2 later drops, the token is already cancelled; the cleanup task has already completed; no second cleanup runs. The queue_subscriptions entry is never removed and UNLISTEN is never called.

This causes an unbounded memory leak in queue_subscriptions whenever a (subject, queue) pair gets more than one concurrent subscriber and then all subscribers drop.

A straightforward fix is to use a tokio::sync::watch counter (increment on subscribe, decrement on drop) and use the watch receiver in the cleanup task to detect when the count reaches zero, rather than relying on a one-shot cancellation.


Concern: DefaultHasher used for database keys (postgres/mod.rs)

hash_subject (pre-existing) and the new hash_queue both use std::collections::hash_map::DefaultHasher. The Rust stdlib explicitly says its algorithm is subject to change across Rust versions. Upgrading the Rust compiler could silently produce different hash values, making existing rows in ups_queue_subs and ups_queue_messages unreachable by new processes. Consider a stable hash like FxHasher (already in the workspace?) or truncated SHA-256.


Performance: Two pool connections per publish call (postgres/mod.rs)

publish_to_queues acquires a second connection from the pool unconditionally for every publish, even when ups_queue_subs is empty. Under high-throughput publishing this doubles pool pressure. A cheap optimization: skip acquiring the connection if the queue_subscriptions map is empty (i.e. this process has no queue subscribers for this subject) — though this doesn't help in a multi-process deploy where other processes have subscribers.


Non-atomic INSERT + NOTIFY in publish_to_queues (postgres/mod.rs)

For each active queue group, the INSERT into ups_queue_messages and the subsequent NOTIFY are issued as separate statements without a wrapping transaction. A crash or connection failure between the two leaves an undelivered message in the table that will only be claimed when the next NOTIFY arrives (e.g. from the next publish). Wrapping each pair in a transaction would provide stronger delivery guarantees.


Non-idiomatic return at end of function (pubsub.rs)

return Ok(Subscriber::new(
    driver,
    ...
));

The explicit return on the last expression is non-idiomatic Rust. Drop the return keyword.


Minor: anyhow! imported but new guideline says to prefer .context() (postgres/mod.rs)

use anyhow::{Context, Result, anyhow};

The anyhow! import is kept (it may be used in pre-existing code below the diff), but worth double-checking that no new uses of anyhow! were added that could be .context() instead.


Minor: Cargo.lock version downgrade looks like a rebase artifact

All workspace packages are downgraded from 2.1.7 to 2.1.6-rc.1. This appears to be a branch-base mismatch rather than an intentional change — should sort itself out on merge but worth confirming.


Test: 500ms collection timeout may be flaky (tests/integration.rs)

test_queue_subscribe_load_balance uses Duration::from_millis(500) for collecting messages from each subscriber. In CI under load this could be tight. The single-message tests use 5s — consider bumping the collection window to match.


The core algorithm (claim-then-wait loop, FOR UPDATE SKIP LOCKED, heartbeat TTL, GC background task) is well-designed. The memory/NATS driver implementations look correct. The cleanup bug in the Postgres driver is the main item that needs fixing before merge.

@claude
Copy link

claude bot commented Mar 24, 2026

Code Review summary for feat(ups): implement queue subs -- The implementation is well-structured and the test coverage is solid. Two medium issues to address before merge: (1) retain_sync on scc::HashMap inside the memory driver GC task runs synchronously inside an async closure and can block the executor; (2) The SELECT and INSERT+NOTIFY in publish_to_queues are not wrapped in a transaction, creating a race where a message can be orphaned for up to 1 hour if the subscriber dies between the two operations. Minor items: one remaining anyhow! macro call in publish() should use .context() instead; the queue parameter is missing from the tracing span in queue_subscribe (fields(%subject) should also include queue); and the Cargo.lock version downgrade from 2.1.7 to 2.1.6-rc.1 appears to be a stacked-PR artifact. On test coverage: test_queue_subscribe_load_balance only checks total message count, not per-subscriber distribution -- this is the right choice to avoid flakiness but worth noting in a comment. Also missing a test for Postgres reconnection behavior (does not drain pending ups_queue_messages on reconnect).

@claude
Copy link

claude bot commented Mar 24, 2026

PR Review: fix(ups): implement queue subs

Good implementation of queue subscription semantics across all three drivers (memory, NATS, Postgres). The core design is sound, particularly the use of FOR UPDATE SKIP LOCKED in the Postgres driver and the heartbeat/TTL liveness mechanism.

Issues

Memory driver: silent message loss on dead channels (medium)

In the publish method, when a randomly chosen subscriber's channel is closed (receiver dropped between GC runs), tx.send() fails silently with let _ = .... The message is permanently lost. Consider retrying with another available subscriber in the same queue group if the send fails:

// Instead of silently dropping on error, try each subscriber until one succeeds:
let mut rng = rand::thread_rng();
let mut indices: Vec<usize> = (0..subs.len()).collect();
indices.shuffle(&mut rng);
for i in indices {
    if subs[i].send(payload.to_vec()).is_ok() {
        break;
    }
}

The GC eventually prunes dead channels, but in the window between GC runs, messages to dead subscribers are silently dropped.

Subscriber count metric undercounts queue subscribers (minor)

queue_subscribers.len() returns the count of unique subjects (top-level map keys), not the total number of subscriber channels. If 50 workers all subscribe to the same subject, the metric reports 1 instead of 50. The regular subscribers.len() has the same characteristic, but worth being accurate. Consider summing the actual sender count across all nested maps.

Unclaimed messages not redelivered (design note)

After claim_message() atomically deletes a message from ups_queue_messages, if the subscriber crashes before processing it, the message is permanently lost. This is at-most-once delivery. This is fine for many use cases but should be documented in a module-level doc comment so callers have the right expectations.

GC task errors (minor)

Verify that errors in the background GC task (deleting messages older than QUEUE_MESSAGE_MAX_AGE_SECS) are logged with tracing::warn! or tracing::error! rather than silently swallowed. Silent GC failures would cause ups_queue_messages to grow unboundedly.

Positive observations

  • The FOR UPDATE SKIP LOCKED pattern in claim_message() is exactly right for concurrent queue consumers. Multiple subscribers in the same queue group can race to claim a message without deadlocking.
  • Heartbeat (10s interval, 30s TTL) is a solid liveness pattern. If a subscriber crashes without calling Drop, a new subscriber can still claim its pending messages after 30s, since messages are per queue-group, not per subscriber.
  • The Drop impl for cleanup of the ups_queue_subs row is important for correctness.
  • Integration tests cover the key behavioral properties: single delivery, load balancing across subscribers in a group, and fanout across groups.
  • Fixing use anyhow::* glob imports is correct per CLAUDE.md conventions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant