Tweak the API to revert some breaking changes.#1036
Conversation
WalkthroughThis pull request refactors resource lifecycle management across multiple modules. The primary changes include: renaming 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
rs/moq-lite/src/model/broadcast.rs (1)
109-125: Deduplicate abort cascade logic to avoid behavior drift.
BroadcastProducer::abortandBroadcastDynamic::abortcurrently duplicate the same state-drain/cascade code. A shared helper onStatewould reduce maintenance risk.♻️ Suggested refactor
+impl State { + fn abort_all(&mut self, err: Error) { + for weak in self.tracks.values() { + weak.abort(err.clone()); + } + for mut request in self.requests.drain(..) { + request.abort(err.clone()).ok(); + } + self.abort(err); + } +} + impl BroadcastProducer { pub fn abort(&mut self, err: Error) -> Result<(), Error> { let mut state = self.state.modify()?; - // Cascade abort to all child tracks. - for weak in state.tracks.values() { - weak.abort(err.clone()); - } - // Abort any pending dynamic track requests. - for mut request in state.requests.drain(..) { - request.abort(err.clone()).ok(); - } - state.abort(err); + state.abort_all(err); Ok(()) } } impl BroadcastDynamic { pub fn abort(&mut self, err: Error) -> Result<(), Error> { let mut state = self.state.modify()?; - // Cascade abort to all child tracks. - for weak in state.tracks.values() { - weak.abort(err.clone()); - } - // Abort any pending dynamic track requests. - for mut request in state.requests.drain(..) { - request.abort(err.clone()).ok(); - } - state.abort(err); + state.abort_all(err); Ok(()) } }Also applies to: 185-201
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/model/broadcast.rs` around lines 109 - 125, The abort cascade logic is duplicated in BroadcastProducer::abort and BroadcastDynamic::abort; add a helper method on State (e.g., State::cascade_abort or State::abort_with_cascade) that takes the Error, drains requests and calls abort on each child track and request (cloning the error as needed), then performs the existing state.abort(err) call; update BroadcastProducer::abort and BroadcastDynamic::abort to call this new State helper after acquiring state via self.state.modify() so behavior is identical but centralized.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@CLAUDE.md`:
- Line 91: The doc change in CLAUDE.md introducing tokio::time::pause()
conflicts with the repo coding guideline preferring #[tokio::test(start_paused =
true)]; either update the coding guidelines to adopt the new pattern or revert
the doc to match the existing rule. Locate references to the async-test pattern
in the coding guidelines/config and change them to document using explicit
tokio::time::pause() in tests (or change CLAUDE.md back to
#[tokio::test(start_paused = true)]) and ensure examples and any CI/test-lint
checks mention the chosen approach (tokio::time::pause() vs
#[tokio::test(start_paused = true)]) so both docs and configs remain consistent.
In `@rs/moq-lite/src/model/track.rs`:
- Around line 74-78: The current code returns termination (None) as soon as
final_sequence is Some, which allows consumers to stop early and miss later
groups; instead keep reads pending/retriable for sequences below the final
marker and only terminate when no more groups can ever arrive. Concretely: in
the Stream impl method poll_next (and any equivalent reader function that
returns Poll::Ready(None) when self.final_sequence.is_some()), change the
early-return to Poll::Pending unless the requested/next sequence is >=
final_sequence and there is proof no group can still be created; keep
create_group() semantics to still accept groups with sequence < final_sequence
(so late lower-sequence groups are allowed); update any
finish/set_final_sequence handling so it only prevents creation of sequences >=
final_sequence (or otherwise enforces your chosen final bound), and remove the
unconditional None returns in methods referencing final_sequence (e.g., the
blocks around final_sequence checks at the sites you noted) so consumers remain
pending until true closure.
- Around line 671-672: The two async test functions (including
next_group_finishes_without_waiting_for_gaps) are annotated with plain
#[tokio::test]; change their attributes to #[tokio::test(start_paused = true)]
so the tokio runtime starts with time paused for deterministic time
simulation—locate the test function declarations (e.g., fn
next_group_finishes_without_waiting_for_gaps and the other test around lines
~688) and replace the attribute accordingly.
---
Nitpick comments:
In `@rs/moq-lite/src/model/broadcast.rs`:
- Around line 109-125: The abort cascade logic is duplicated in
BroadcastProducer::abort and BroadcastDynamic::abort; add a helper method on
State (e.g., State::cascade_abort or State::abort_with_cascade) that takes the
Error, drains requests and calls abort on each child track and request (cloning
the error as needed), then performs the existing state.abort(err) call; update
BroadcastProducer::abort and BroadcastDynamic::abort to call this new State
helper after acquiring state via self.state.modify() so behavior is identical
but centralized.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
CLAUDE.mdrs/hang/src/container/consumer.rsrs/hang/src/container/frame.rsrs/moq-lite/src/ietf/subscriber.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-lite/src/model/broadcast.rsrs/moq-lite/src/model/frame.rsrs/moq-lite/src/model/group.rsrs/moq-lite/src/model/state.rsrs/moq-lite/src/model/track.rsrs/moq-mux/src/import/fmp4.rsrs/moq-relay/src/cluster.rs
| - Run `just fix` to automatically fix formating and easy things. | ||
| - Rust tests are integrated within source files | ||
| - Async tests should use `#[tokio::test(start_paused = true)]` to simulate time | ||
| - Async tests that sleep should call `tokio::time::pause()` at the start to simulate time instantly |
There was a problem hiding this comment.
Documentation update conflicts with existing coding guidelines.
This update changes the recommended async test pattern from #[tokio::test(start_paused = true)] to explicit tokio::time::pause() calls. However, the repository's coding guidelines still specify the old pattern.
If this is an intentional change in testing strategy, ensure the coding guidelines configuration is also updated to maintain consistency.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@CLAUDE.md` at line 91, The doc change in CLAUDE.md introducing
tokio::time::pause() conflicts with the repo coding guideline preferring
#[tokio::test(start_paused = true)]; either update the coding guidelines to
adopt the new pattern or revert the doc to match the existing rule. Locate
references to the async-test pattern in the coding guidelines/config and change
them to document using explicit tokio::time::pause() in tests (or change
CLAUDE.md back to #[tokio::test(start_paused = true)]) and ensure examples and
any CI/test-lint checks mention the chosen approach (tokio::time::pause() vs
#[tokio::test(start_paused = true)]) so both docs and configs remain consistent.
| if self.final_sequence.is_some() { | ||
| Poll::Ready(None) | ||
| } else { | ||
| Poll::Pending | ||
| } |
There was a problem hiding this comment.
finish* semantics are internally inconsistent with consumer termination behavior.
Line 152 allows create_group() after finish for sequence < final_sequence, but Line 74 and Line 96 can return None as soon as finalization is set and no match exists right now. That lets consumers terminate early and miss late lower-sequence groups.
Please align the contract: either reject all post-finish group creation, or keep consumer reads pending/retriable for below-final gaps until true closure.
Also applies to: 96-98, 152-155, 202-207
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rs/moq-lite/src/model/track.rs` around lines 74 - 78, The current code
returns termination (None) as soon as final_sequence is Some, which allows
consumers to stop early and miss later groups; instead keep reads
pending/retriable for sequences below the final marker and only terminate when
no more groups can ever arrive. Concretely: in the Stream impl method poll_next
(and any equivalent reader function that returns Poll::Ready(None) when
self.final_sequence.is_some()), change the early-return to Poll::Pending unless
the requested/next sequence is >= final_sequence and there is proof no group can
still be created; keep create_group() semantics to still accept groups with
sequence < final_sequence (so late lower-sequence groups are allowed); update
any finish/set_final_sequence handling so it only prevents creation of sequences
>= final_sequence (or otherwise enforces your chosen final bound), and remove
the unconditional None returns in methods referencing final_sequence (e.g., the
blocks around final_sequence checks at the sites you noted) so consumers remain
pending until true closure.
| #[tokio::test] | ||
| async fn next_group_finishes_without_waiting_for_gaps() { |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's look at the specific lines mentioned in the review
cat -n rs/moq-lite/src/model/track.rs | sed -n '670,675p'Repository: moq-dev/moq
Length of output: 314
🏁 Script executed:
# Also check the second location mentioned
cat -n rs/moq-lite/src/model/track.rs | sed -n '686,692p'Repository: moq-dev/moq
Length of output: 323
🏁 Script executed:
# Search for all #[tokio::test] usages in this file to understand the pattern
rg -n '#\[tokio::test' rs/moq-lite/src/model/track.rs -A2Repository: moq-dev/moq
Length of output: 914
Use #[tokio::test(start_paused = true)] for new async tests instead of plain #[tokio::test].
The async tests at lines 671–672 and 688–689 use plain #[tokio::test], which should be #[tokio::test(start_paused = true)] to align with the Rust testing guidelines for time simulation.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rs/moq-lite/src/model/track.rs` around lines 671 - 672, The two async test
functions (including next_group_finishes_without_waiting_for_gaps) are annotated
with plain #[tokio::test]; change their attributes to #[tokio::test(start_paused
= true)] so the tokio runtime starts with time paused for deterministic time
simulation—locate the test function declarations (e.g., fn
next_group_finishes_without_waiting_for_gaps and the other test around lines
~688) and replace the attribute accordingly.
No description provided.