Conversation
Replace the internal model/state.rs and model/waiter.rs modules with the standalone conducer crate. Each model state gets an abort: Option<Error> field to preserve error propagation across the Option-based conducer API. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
WalkthroughThis pull request removes the local state and waiter modules and replaces the internal state-management implementation with conducer-based types (conducer::Producer, conducer::Consumer, conducer::Weak). Modules affected include broadcast, frame, group, and track; their state fields and access patterns were migrated to conducer guards and wait APIs. An abort field was added to several State structs and error paths were refactored to propagate abort or Dropped via the new modify/write/wait helpers. The crate manifest was updated to add the conductor dependency. 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (3)
rs/moq-lite/src/model/broadcast.rs (1)
251-306:subscribe_track()temp-producer flow is fine; cleanup task logic is correct but hard to read.The “remove then reinsert if it wasn’t the same weak” trick is valid, but it’s pretty subtle. Consider rewriting it as an explicit
match/ifthat checksis_clonebefore removing.Possible readability refactor (no behavior change)
web_async::spawn(async move { let _ = weak.unused().await; - if let Some(producer) = consumer_state.produce() - && let Ok(mut state) = producer.write() - && let Some(current) = state.tracks.remove(&weak.info.name) - && !current.is_clone(&weak) - { - state.tracks.insert(current.info.name.clone(), current); - } + let Some(producer) = consumer_state.produce() else { return; }; + let Ok(mut state) = producer.write() else { return; }; + + // Only remove if the map still points at *this* weak (avoid clobbering a newer entry). + let should_remove = state + .tracks + .get(&weak.info.name) + .is_some_and(|current| current.is_clone(&weak)); + + if should_remove { + state.tracks.remove(&weak.info.name); + } });
is_some_andrequires a sufficiently recent MSRV; if MSRV is older, swap to amatchinstead. Please confirm against your workspace MSRV before adopting.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/model/broadcast.rs` around lines 251 - 306, The cleanup async task inside subscribe_track uses a subtle "remove then reinsert if not same" sequence that hurts readability; refactor the closure (the web_async::spawn block) to first acquire producer.write(), then explicitly check state.tracks.get(&weak.info.name) (via match or if let) and call is_clone on the found entry before removing it—only remove if it is_clone==false, otherwise leave it; avoid using is_some_and (replace with match) to preserve older MSRV compatibility; keep the remaining behavior unchanged (reinsert the removed entry if it was not the same weak).rs/moq-lite/src/model/track.rs (1)
371-413: TrackConsumer wait + closed handling looks reasonable; consider clarifying theError::Closedspecial-case.If
Error::Closedis expected to be the “normal close” marker coming from conducer, a short comment inclosed()would make the intent clearer for future readers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/model/track.rs` around lines 371 - 413, The closed() method special-cases Error::Closed as a normal/expected close marker; add a brief comment inside TrackConsumer::closed (near the match on self.state.read().abort.clone()) clarifying that Error::Closed represents a normal conduit-initiated shutdown (not an error) and therefore should return Ok(()), while other Some(err) cases are propagated as Err(err); reference the match on self.state.read().abort.clone() in closed() and the Error::Closed variant to locate where to add the comment.rs/moq-lite/src/model/group.rs (1)
63-90: Extract error mapping into a module-level helper to eliminate repetition across the codebase.The pattern
r.abort.clone().unwrap_or(Error::Dropped)appears 6 times in this file alone (lines 89, 182, 190, 235, 258, 269), and recurs 14+ additional times acrosstrack.rs,frame.rs, andbroadcast.rs. Whilemodify()handles thewrite()case, the same logic is duplicated in closures forunused()andwait()calls. Consider extracting a generic helper function (or module-level helper macro) to consolidate this mapping—it will reduce cognitive overhead, prevent future drift, and align with the single-source-of-truth pattern already established by theabortfield.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/model/group.rs` around lines 63 - 90, Extract the repeated mapping r.abort.clone().unwrap_or(Error::Dropped) into a single module-level helper (e.g. fn map_abort_to_error<T>(poison: &PoisonType) -> Error) and use it everywhere the conducer error is converted (replace the call site inside modify() and the closures passed to unused(), wait(), and any write()/read() error handling). The helper should accept the error wrapper returned by conducer (the value bound to r in current code) and return the appropriate Error by returning r.abort.clone().unwrap_or(Error::Dropped); update all occurrences in this file (and optionally across track.rs, frame.rs, broadcast.rs) to call that helper so modify(), unused()/wait() closures, and conducer::Producer/Mut error mappings all use the single helper.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@rs/moq-lite/src/model/broadcast.rs`:
- Around line 251-306: The cleanup async task inside subscribe_track uses a
subtle "remove then reinsert if not same" sequence that hurts readability;
refactor the closure (the web_async::spawn block) to first acquire
producer.write(), then explicitly check state.tracks.get(&weak.info.name) (via
match or if let) and call is_clone on the found entry before removing it—only
remove if it is_clone==false, otherwise leave it; avoid using is_some_and
(replace with match) to preserve older MSRV compatibility; keep the remaining
behavior unchanged (reinsert the removed entry if it was not the same weak).
In `@rs/moq-lite/src/model/group.rs`:
- Around line 63-90: Extract the repeated mapping
r.abort.clone().unwrap_or(Error::Dropped) into a single module-level helper
(e.g. fn map_abort_to_error<T>(poison: &PoisonType) -> Error) and use it
everywhere the conducer error is converted (replace the call site inside
modify() and the closures passed to unused(), wait(), and any write()/read()
error handling). The helper should accept the error wrapper returned by conducer
(the value bound to r in current code) and return the appropriate Error by
returning r.abort.clone().unwrap_or(Error::Dropped); update all occurrences in
this file (and optionally across track.rs, frame.rs, broadcast.rs) to call that
helper so modify(), unused()/wait() closures, and conducer::Producer/Mut error
mappings all use the single helper.
In `@rs/moq-lite/src/model/track.rs`:
- Around line 371-413: The closed() method special-cases Error::Closed as a
normal/expected close marker; add a brief comment inside TrackConsumer::closed
(near the match on self.state.read().abort.clone()) clarifying that
Error::Closed represents a normal conduit-initiated shutdown (not an error) and
therefore should return Ok(()), while other Some(err) cases are propagated as
Err(err); reference the match on self.state.read().abort.clone() in closed() and
the Error::Closed variant to locate where to add the comment.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b25b3177-d4c0-4529-90d2-fa2f8bc5192b
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
rs/moq-lite/Cargo.tomlrs/moq-lite/src/model/broadcast.rsrs/moq-lite/src/model/frame.rsrs/moq-lite/src/model/group.rsrs/moq-lite/src/model/mod.rsrs/moq-lite/src/model/state.rsrs/moq-lite/src/model/track.rsrs/moq-lite/src/model/waiter.rs
💤 Files with no reviewable changes (3)
- rs/moq-lite/src/model/waiter.rs
- rs/moq-lite/src/model/mod.rs
- rs/moq-lite/src/model/state.rs
Refactor the cleanup task in BroadcastConsumer::subscribe_track to use explicit let/else guards for readability. Add a clarifying comment in TrackConsumer::closed() explaining Error::Closed as normal shutdown. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (2)
rs/moq-lite/src/model/track.rs (1)
737-739: Preferexpectover.ok().unwrap()in tests.This preserves clearer failure context if the write lock cannot be acquired.
♻️ Small readability/debuggability improvement
- let mut state = producer.state.write().ok().unwrap(); + let mut state = producer + .state + .write() + .expect("track state should be writable in this test");🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/model/track.rs` around lines 737 - 739, The test is using producer.state.write().ok().unwrap() which loses context on failure; replace that chain with producer.state.write().expect("failed to acquire producer.state write lock in test") so the panic message explains why the lock acquisition failed (keep the access to state.max_sequence = Some(u64::MAX) unchanged).rs/moq-lite/src/model/broadcast.rs (1)
110-125: Extract shared abort logic to avoid divergence.
BroadcastProducer::abortandBroadcastDynamic::abortare effectively identical; centralizing them will reduce maintenance risk.♻️ Suggested DRY refactor
+fn abort_state(state: &conducer::Producer<State>, err: Error) -> Result<(), Error> { + let mut guard = modify(state)?; + + for weak in guard.tracks.values() { + weak.abort(err.clone()); + } + + for mut request in guard.requests.drain(..) { + request.abort(err.clone()).ok(); + } + + guard.abort = Some(err); + guard.close(); + Ok(()) +} + impl BroadcastProducer { /// Abort the broadcast and all child tracks with the given error. pub fn abort(&mut self, err: Error) -> Result<(), Error> { - let mut guard = modify(&self.state)?; - // Cascade abort to all child tracks. - for weak in guard.tracks.values() { - weak.abort(err.clone()); - } - // Abort any pending dynamic track requests. - for mut request in guard.requests.drain(..) { - request.abort(err.clone()).ok(); - } - guard.abort = Some(err); - guard.close(); - Ok(()) + abort_state(&self.state, err) } } impl BroadcastDynamic { /// Abort the broadcast with the given error. pub fn abort(&mut self, err: Error) -> Result<(), Error> { - let mut guard = modify(&self.state)?; - // Cascade abort to all child tracks. - for weak in guard.tracks.values() { - weak.abort(err.clone()); - } - // Abort any pending dynamic track requests. - for mut request in guard.requests.drain(..) { - request.abort(err.clone()).ok(); - } - guard.abort = Some(err); - guard.close(); - Ok(()) + abort_state(&self.state, err) } }Also applies to: 186-201
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/model/broadcast.rs` around lines 110 - 125, Both BroadcastProducer::abort and BroadcastDynamic::abort contain identical logic; extract that shared behavior into a single helper (e.g., BroadcastState::abort or a private fn abort_inner) and call it from both impls to avoid divergence. The helper should take &self (or &mut self) and the Error, perform modify(&self.state) to get the guard, iterate guard.tracks calling weak.abort(err.clone()), drain guard.requests and call request.abort(err.clone()).ok(), then set guard.abort = Some(err) and call guard.close(); finally have the original BroadcastProducer::abort and BroadcastDynamic::abort delegate to this new helper. Ensure cloning/ownership matches the existing signatures and preserve Result<(), Error> return semantics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@rs/moq-lite/src/model/broadcast.rs`:
- Around line 110-125: Both BroadcastProducer::abort and BroadcastDynamic::abort
contain identical logic; extract that shared behavior into a single helper
(e.g., BroadcastState::abort or a private fn abort_inner) and call it from both
impls to avoid divergence. The helper should take &self (or &mut self) and the
Error, perform modify(&self.state) to get the guard, iterate guard.tracks
calling weak.abort(err.clone()), drain guard.requests and call
request.abort(err.clone()).ok(), then set guard.abort = Some(err) and call
guard.close(); finally have the original BroadcastProducer::abort and
BroadcastDynamic::abort delegate to this new helper. Ensure cloning/ownership
matches the existing signatures and preserve Result<(), Error> return semantics.
In `@rs/moq-lite/src/model/track.rs`:
- Around line 737-739: The test is using producer.state.write().ok().unwrap()
which loses context on failure; replace that chain with
producer.state.write().expect("failed to acquire producer.state write lock in
test") so the panic message explains why the lock acquisition failed (keep the
access to state.max_sequence = Some(u64::MAX) unchanged).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6805314b-a93b-4fd0-80f9-3262151515e2
📒 Files selected for processing (2)
rs/moq-lite/src/model/broadcast.rsrs/moq-lite/src/model/track.rs
The API is pretty nuts, even if the name could be better.