moq-lite: add fetch_group API + TrackDynamic#1357
Conversation
Adds a first-class FETCH path at the track level so callers can request
a specific group by sequence without spinning up a full subscribe.
API surface:
- TrackConsumer::fetch_group(seq) -> Result<GroupConsumer>
- Cache hit: returns the cached consumer regardless of dynamic state.
- Cache miss + no TrackDynamic: returns Error::NotFound.
- Cache miss + TrackDynamic registered: queues a request, returns a
consumer that fills as the publisher writes frames.
- TrackConsumer::latest_group() -> Option<GroupConsumer> (replaces
latest() returning Option<u64>; consistent suffix with get_group/
recv_group/next_group).
- TrackSubscriber::latest_group() (same rename).
- TrackProducer::dynamic() -> TrackDynamic, mirroring
BroadcastProducer::dynamic() / BroadcastDynamic.
- TrackDynamic::poll_requested_group / requested_group yields
GroupProducer for the requested sequence.
Concurrent fetch_group(seq) calls share the in-flight group via the
existing groups cache. Dropping the last TrackDynamic aborts pending
requests with Error::Cancel.
Caller migrations:
- moq-relay/src/web.rs fetch handler: drop the upfront subscribe_track
round-trip; use consume_track + fetch_group / latest_group directly.
- lite/ietf publishers: latest() -> latest_group().map(|g| g.sequence)
for the LargestObject / fallback start_group case.
Wire-side hookup (lite ControlType::Fetch and ietf::run_fetch_stream)
is intentionally out of scope; the breaking API surface is captured
here so the wire work can be a clean follow-up.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
WalkthroughAdds on-demand group fetching via a new TrackDynamic produced by TrackProducer::dynamic(), which queues and fulfills group requests and aborts pending requests on drop. Renames subscription bounds from start/end to start_group/end_group and replaces latest() with latest_group(). TrackConsumer.get_group now returns cached groups or enqueues fetches under a dynamic and returns NotFound when uncached and no dynamic exists. Publisher and lite paths use start_group, subscriber messages use start_group/end_group, relay serve_fetch uses consume_track and direct group resolution via get_group or latest_group, and one crate doc and a catalog constant were updated. 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 6/8 reviews remaining, refill in 9 minutes and 32 seconds.Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rs/moq-lite/src/model/track.rs`:
- Around line 573-599: The derived Clone for TrackDynamic bypasses
TrackDynamic::new and therefore does not increment state.dynamic_groups;
implement Clone manually for TrackDynamic so cloning performs the same increment
logic as new(): clone the Track (info.clone()), reuse the
conducer::Producer<State> (state.clone()), then call state.write() and increment
dynamic_groups before returning the new TrackDynamic; also ensure there is a
Drop impl that decrements dynamic_groups (or update the existing Drop) so
increments and decrements stay balanced when handles are cloned/dropped.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 9e19dd82-7457-44de-a757-2b5a21f1bce4
📒 Files selected for processing (4)
rs/moq-lite/src/ietf/publisher.rsrs/moq-lite/src/lite/publisher.rsrs/moq-lite/src/model/track.rsrs/moq-relay/src/web.rs
The derived Clone copied fields directly without going through `new()`, so cloning didn't increment `dynamic_groups`. Dropping one of two clones would then incorrectly decrement to zero and abort pending fetches even though a live handler remained. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
rs/moq-lite/src/model/track.rs (1)
814-823:latest_groupis duplicated in consumer and subscriberBoth methods use the same state scan logic. Consider extracting a small shared helper on
Stateto keep behavior locked together and reduce drift risk.Also applies to: 1068-1077
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/model/track.rs` around lines 814 - 823, The latest_group logic is duplicated in the consumer and subscriber implementations; add a small helper method on State (e.g., State::latest_group_consumer or State::latest_group) that encapsulates the scan using state.max_sequence, state.groups.iter().flatten(), comparison to max and returning Option<GroupConsumer> via group.consume(), then update the existing Track::latest_group (and the duplicate at lines ~1068-1077) to call that State helper instead of repeating the iteration to keep behavior centralized and avoid drift.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rs/moq-lite/src/model/track.rs`:
- Around line 832-865: In fetch_group, before creating and enqueuing a dynamic
request (i.e., before calling Group::produce and pushing into
state.fetch_requests/state.groups), check state.final_sequence and return
Err(Error::NotFound) when final_sequence.is_some() and sequence >
final_sequence.unwrap() (because that sequence can never exist); add this guard
after the dynamic_groups == 0 check and before constructing group/consumer, and
keep using the same state and error types (state.final_sequence,
state.fetch_requests, state.groups, state.duplicates, Group::produce/consume) so
concurrent fetches still share frames when allowed.
---
Nitpick comments:
In `@rs/moq-lite/src/model/track.rs`:
- Around line 814-823: The latest_group logic is duplicated in the consumer and
subscriber implementations; add a small helper method on State (e.g.,
State::latest_group_consumer or State::latest_group) that encapsulates the scan
using state.max_sequence, state.groups.iter().flatten(), comparison to max and
returning Option<GroupConsumer> via group.consume(), then update the existing
Track::latest_group (and the duplicate at lines ~1068-1077) to call that State
helper instead of repeating the iteration to keep behavior centralized and avoid
drift.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ea9a1152-b664-4566-9c3b-1bbb6d48f548
📒 Files selected for processing (1)
rs/moq-lite/src/model/track.rs
Two fixes: - rustfmt: split a long assert_eq! that CI rejected. - fetch_group: when the track is finalized, fail fast with NotFound for sequences at/after final_sequence instead of queueing a request the publisher can never fulfill. Mirrors the existing guards in TrackProducer::create_group / get_group / recv_group. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
rs/moq-lite/src/model/track.rs (1)
862-870: Consider:max_sequenceupdate may affectlatest_group()behavior.When queuing a fetch request,
max_sequenceis updated to include the requested sequence (line 868). This means a fetch for a high sequence number (e.g., 1000) whenmax_sequenceis low (e.g., 5) will causelatest_group()to return the not-yet-filled group.This appears intentional since the group is added to the cache and concurrent consumers share frames as they arrive. If this behavior is unexpected, consider either:
- Documenting that
latest_group()may return in-flight groups from pending fetches- Tracking "published max" separately from "cache max"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/model/track.rs` around lines 862 - 870, The current update to state.max_sequence inside the fetch-queueing snippet causes latest_group() to consider in-flight (not-yet-filled) groups as "latest"; to preserve previous semantics, stop mutating state.max_sequence here and instead add a separate cache/pending max (e.g., state.cache_max_sequence or state.pending_max_sequence) that you set in this block, leaving state.max_sequence (or introduce state.published_max_sequence) to be updated only when a group is actually filled/published; update latest_group() to consult the published_max_sequence for determining the latest completed group, and update the publishing/completion codepath to set published_max_sequence when frames arrive so consumers still share in-flight groups via state.fetch_requests and state.groups.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@rs/moq-lite/src/model/track.rs`:
- Around line 862-870: The current update to state.max_sequence inside the
fetch-queueing snippet causes latest_group() to consider in-flight
(not-yet-filled) groups as "latest"; to preserve previous semantics, stop
mutating state.max_sequence here and instead add a separate cache/pending max
(e.g., state.cache_max_sequence or state.pending_max_sequence) that you set in
this block, leaving state.max_sequence (or introduce
state.published_max_sequence) to be updated only when a group is actually
filled/published; update latest_group() to consult the published_max_sequence
for determining the latest completed group, and update the publishing/completion
codepath to set published_max_sequence when frames arrive so consumers still
share in-flight groups via state.fetch_requests and state.groups.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 9761c391-0226-4e3c-a498-d3fd6a708903
📒 Files selected for processing (1)
rs/moq-lite/src/model/track.rs
…n start/end
- TrackConsumer::fetch_group(seq) and the passive async get_group(seq) collapse
into a single sync get_group(Group) with fetch_group's behavior: cache hit
returns immediately, cache miss + dynamic handler queues the request and
returns a filling consumer, otherwise NotFound. Taking a Group keeps the
signature future-proof and mirrors create_group(Group).
- Drop the now-unused TrackSubscriber::{poll_get_group,get_group} and the
internal State::poll_get_group.
- Rename Subscription::{start,end} to {start_group,end_group} for consistency
with the wire types in lite/subscribe.rs and ietf/subscribe.rs.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rs/moq-lite/src/model/track.rs`:
- Around line 830-839: get_group is enqueueing uncached fetch requests into
state.groups and state.fetch_requests without invoking evict_expired, allowing
stale entries to accumulate; modify get_group to call the same eviction logic
used by create_group/append_group (invoke evict_expired(state) before inserting
and/or immediately after pushing the new group) so expired entries are removed
and duplicates/max_sequence are updated consistently; reference the get_group
function and the state.groups, state.fetch_requests, state.duplicates,
create_group, append_group, and evict_expired symbols when applying the change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8c7621ad-84aa-4cd3-b59f-1e2ac1cfc040
📒 Files selected for processing (7)
rs/hang/src/catalog/root.rsrs/moq-lite/src/ietf/publisher.rsrs/moq-lite/src/lib.rsrs/moq-lite/src/lite/publisher.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-lite/src/model/track.rsrs/moq-relay/src/web.rs
✅ Files skipped from review due to trivial changes (1)
- rs/moq-lite/src/lib.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- rs/moq-lite/src/ietf/publisher.rs
- rs/moq-lite/src/lite/publisher.rs
Mirror the create_group / append_group pattern so accumulated fetch requests don't linger past MAX_GROUP_AGE when the publisher isn't producing concurrently. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
A first-class FETCH path at the track level. The breaking API change is captured here so the wire-side hookup (lite
ControlType::Fetch,ietf::run_fetch_stream) can land as a clean follow-up.New API
TrackConsumer::fetch_group(seq) -> Result<GroupConsumer>TrackDynamicneeded).Err(NotFound).TrackConsumer::latest_group() -> Option<GroupConsumer>(replaceslatest()returningOption<u64>; consistent withget_group/recv_group/next_group).TrackSubscriber::latest_group()— same rename.TrackProducer::dynamic() -> TrackDynamic— mirrorsBroadcastProducer::dynamic(). Drop the last dynamic and pending requests are aborted withError::Cancel.TrackDynamic::poll_requested_group/requested_group— yieldsGroupProducerfor the publisher to fill.Caller migrations
moq-relay/src/web.rsfetch handler: drops the upfrontsubscribe_trackround-trip; usesconsume_track+fetch_group/latest_groupdirectly.lite/publisher.rsandietf/publisher.rs:consumer.latest()→consumer.latest_group().map(|g| g.sequence)for the LargestObject / fallbackstart_groupcase.What's not here
Wire-side hookup. Specifically:
lite::ControlType::Fetchstill returnsError::UnexpectedStream(the breaking change captures the in-process API; the wire format choice for response framing — inline-on-bidi vs. uni stream with fetch IDs — is its own conversation).ietf::run_fetch_streamStandalone variant still returns "not supported".When those land they slot into the existing
fetch_groupAPI without further breaks.Test plan
rs/moq-lite/src/model/track.rs:fetch_group_cache_hit(no dynamic needed)fetch_group_no_handler_returns_not_foundfetch_group_via_dynamic_handler(round-trip with frame data)fetch_group_shares_in_flight(single queued request, both consumers see frames)fetch_group_aborted_by_publisher(publisher abort surfaces to consumer)fetch_pending_aborted_when_dynamic_droppedlatest_group_returns_max_sequence_consumerlatest_group_none_on_empty_trackcargo test --workspace— 290 moq-lite tests pass (up from 282), all other crates green.cargo build --workspace.🤖 Generated with Claude Code