Replace mpsc with conducer for coalesced origin consumer updates#1433
Conversation
Replace the per-consumer unbounded mpsc with a conducer-backed HashMap so duplicate or invalidated announcement updates cancel against each other. A dead or slow consumer's pending set is now bounded by the number of distinct paths instead of the number of updates. Each path tracks one of Announce, Unannounce, or UnannounceAnnounce. An announce queued ahead of an unannounce collapses to nothing (the consumer never observed the announce), while an unannounce followed by an announce preserves both updates so the consumer sees the broadcast change.
The VecDeque-backed queue allowed tombstones from collapsed Announce+Unannounce pairs to accumulate without bound, undermining the memory cap. Switch to a BTreeMap keyed by path so the pending set is exactly the number of distinct paths with outstanding work, and delivery order is deterministic lexicographic. Derive Ord/PartialOrd on Path so it can key the BTreeMap. Zero-pad the test_many_announces names so their lexicographic order matches the loop index.
Split the polling logic out of `announced` into a public `poll_announced` that takes a `conducer::Waiter`, matching the pattern used by GroupConsumer / TrackConsumer. Lets callers integrate with custom poll loops without going through the async wrapper.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
WalkthroughThis PR refactors the update-delivery mechanism for OriginConsumer by replacing a Tokio mpsc::UnboundedReceiver with a conducer::Producer that coalesces per-path announce/unannounce events. It adds a PendingUpdate state machine and OriginConsumerState keyed by PathOwned to collapse stale sequences and provide deterministic lexicographic delivery ordering. OriginConsumerNotify now writes transitions into the shared producer. OriginConsumer::announced(), try_announced(), and a new poll_announced() are reimplemented to use conducer waiting/polling. Tests were updated and new regression tests verify coalescing behavior and bounded pending updates. Path<'a> now derives PartialOrd and Ord. 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/moq-net/src/model/origin.rs`:
- Around line 2094-2101: Replace the brittle check that expects exactly one
immediate try_announced() with logic that drains all responses from
consumer.try_announced() into a collection and then assert that the collection's
length is <= 1 and that every announced path (if any) equals Path::new("test");
this accepts zero entries (fully coalesced) and one entry (including the
PendingUpdate::UnannounceAnnounce case which can produce two successive
announcements) and prevents flapping due to cleanup progress — use
consumer.try_announced() in a loop to collect results, then assert
collected.len() <= 1 and collected.iter().all(|p| p.0 == Path::new("test")).
- Around line 2001-2013: These tests (test_coalesce_announce_then_unannounce,
test_coalesce_announce_unannounce_announce,
test_coalesce_unannounce_announce_preserved,
test_coalesce_unannounce_announce_unannounce, test_coalesce_churn_bounded) use
tokio::time::sleep(...) but never call tokio::time::pause(), making them depend
on real-world timing; fix each test by calling tokio::time::pause() near the
start of the async test (before any sleeps) and then replace or supplement the
sleep usage with tokio::time::advance(Duration::from_millis(...)) so the test
scheduler advances virtual time deterministically while keeping the existing
assertions in functions like Origin::random().produce(), origin.consume(), and
origin.publish_broadcast(...) unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8ac6637c-847b-42e9-aa3a-efb23eba1b8a
📒 Files selected for processing (2)
rs/moq-net/src/model/origin.rsrs/moq-net/src/path.rs
Add tokio::time::pause() at the start of each coalesce test so the 1ms sleeps complete instantly via auto-advance instead of waiting on the real clock. Relax test_coalesce_churn_bounded to drain into a Vec and assert len() <= 1 with every path equal to "test". Backup-promotion order during cleanup is non-deterministic, so insisting on exactly one delivery was brittle.
Replace the unbounded mpsc channel in
OriginConsumerwith aconducer-backed state machine that coalesces redundant announce/unannounce pairs. This bounds the pending update queue for slow consumers and simplifies the delivery logic.Summary
The previous implementation used
tokio::sync::mpsc::UnboundedReceiverto queue all announce/unannounce events for consumers. A slow consumer that doesn't drain between updates could accumulate many redundant messages (e.g., announce, unannounce, announce of the same path). The new implementation usesconducer::Producerto manage a coalesced state where at most one pending update exists per path.Key Changes
Replaced mpsc with conducer:
OriginConsumerNotifynow holds aconducer::Producer<OriginConsumerState>instead of anmpsc::UnboundedSender. TheOriginConsumerholds the producer directly for polling.Added
PendingUpdateenum: Represents the four possible states for a queued update:Announce(BroadcastConsumer): a new broadcast at a pathUnannounce: removal of a broadcastUnannounceAnnounce(BroadcastConsumer): a removal followed by a new broadcast (preserves the signal that the origin changed)Added
OriginConsumerState: Manages coalescing logic with:pending: HashMap<PathOwned, PendingUpdate>: at most one update per pathqueue: VecDeque<PathOwned>: FIFO delivery order across pathsapply_announce()andapply_unannounce()methods that collapse redundant updatestake()method that yields the next update to deliver, skipping stale tombstonesUpdated
announced()andtry_announced(): Now poll the conducer state directly instead of receiving from a channel.Fixed test expectations: Updated
test_128(nowtest_many_announces) to reflect that the conducer-backed implementation can deliver all updates synchronously without the previous mpsc limitation. Removed the#[should_panic]attribute and added proper assertions.Added coalescing tests: New test suite validates the coalescing behavior:
test_coalesce_announce_then_unannounce: unobserved announce+unannounce collapses to nothingtest_coalesce_announce_unannounce_announce: multiple updates to the same path coalesce to the final statetest_coalesce_unannounce_announce_preserved: unannounce followed by announce of a different broadcast is preserved as two deliveriestest_coalesce_churn_bounded: rapid churn on a single path keeps the pending set bounded to one entryImplementation Details
The coalescing logic preserves correctness by distinguishing between updates the consumer has already observed and those still pending. When an announce is replaced before delivery, it's simply overwritten. When an unannounce arrives after an unobserved announce, the pair collapses. However, when an announce follows an unannounce that the consumer must see, both are preserved as
UnannounceAnnounceto signal the origin changed.The queue uses a FIFO deque to maintain cross-path delivery order matching publish order, with stale entries (from collapsed pairs) skipped during
take().https://claude.ai/code/session_01KZDLekCBPsYpQzjYCdbfnj