moq-net: serve one subscription and N fetches concurrently per track#1634
Conversation
The relay's upstream track-serving loop (`run_subscribe`) served a single subscription or fetch at a time: once a subscription started it blocked the loop for the subscription's whole lifetime, so no fetches could run meanwhile, and the task exited when the subscription ended. Rework it around a `TrackServe` struct driven by one kio-poll `select!` loop: - Fetches run as independent futures in a `FuturesUnordered`, concurrent with each other and with the live subscription. - The single upstream subscription opens lazily on the first subscriber, forwards downstream preference changes as SUBSCRIBE_UPDATE, pauses (cap + priority 0, stream kept open) when the last subscriber leaves, and resumes on return. Lite01/02 (no SUBSCRIBE_UPDATE) skip the pause and tear down on idle. - A task-level 5s linger keeps an idle track alive (no subscriber, no fetch, no consumers) so a returning consumer reuses the cache. Model layer: replace the loose `FetchRequest` + `serve_fetch`/`insert_fetch_group` pair with a `GroupRequest` handle (symmetric with `TrackRequest`). Its `accept(timescale) -> GroupProducer` inserts the fetched group and carries its own producer handle, so it works the same before or after the track is accepted. Add `TrackProducer::poll_unused`. Add a regression test that fetches a past group while a subscription is live. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| /// The producer serves it via [`GroupRequest::accept`] (a relay issues a wire | ||
| /// FETCH first; an origin already has the group cached, so the fetch resolves | ||
| /// without ever reaching here). Errors once the track is aborted. | ||
| pub async fn requested_fetch(&mut self) -> Result<GroupRequest> { |
There was a problem hiding this comment.
Naming: maybe group_request or group_requested instead of requested_fetch.
Same for track in BroadcastDynamic, track_request or track_requested?
There was a problem hiding this comment.
Done. Renamed to group_request/poll_group_request (now on TrackDynamic), and BroadcastDynamic::requested_track → track_request/poll_track_request. (Written by Claude)
| /// The producer serves it via [`GroupRequest::accept`] (a relay issues a wire | ||
| /// FETCH first; an origin already has the group cached, so the fetch resolves | ||
| /// without ever reaching here). Errors once the track is aborted. | ||
| pub async fn requested_fetch(&mut self) -> Result<GroupRequest> { |
There was a problem hiding this comment.
Also we can't put this on TrackProducer. There needs to be some way of knowing that a TrackProducer supports fetches for old content, because most of our TrackProducers today don't and will never call this method.
We need a separate TrackDynamic or something similar to BroadcastDynamic.
There was a problem hiding this comment.
Done. Added a TrackDynamic handle (the group-level analogue of BroadcastDynamic), created via TrackProducer::dynamic() or TrackRequest::dynamic(). group_request lives only there, so a plain TrackProducer no longer exposes fetch-serving. It bumps a dynamic counter on the track; with no handler, a cache-miss fetch on an accepted track fails fast with NotFound instead of blocking forever (pre-accept it still waits, since the relay is racing the producer that creates the handler — same pattern as BroadcastState::dynamic). The relay holds one TrackDynamic per track for its serving lifetime. (Written by Claude)
| } | ||
| }); | ||
|
|
||
| match popped { |
There was a problem hiding this comment.
ready! might help, as might ?
There was a problem hiding this comment.
Done — poll_group_request now uses ready! + ? over the state.poll(...) result (kept the read-only is_empty guard so an idle poll doesn't flag the state modified and wake unrelated waiters). (Written by Claude)
| if !state.fetches.is_empty() { | ||
| return Poll::Ready(Ok(state.fetches.pop_front().unwrap())); | ||
| } | ||
| match &state.abort { |
There was a problem hiding this comment.
Done — folded into ok_or-style: match state.fetches.pop_front() → the empty arm returns abort.clone().map_or(Pending, ...). (Written by Claude)
| /// A specific group requested via [`TrackConsumer::fetch`], queued for the producer. | ||
| #[derive(Clone, Copy, Debug, PartialEq, Eq)] | ||
| pub struct FetchRequest { | ||
| struct FetchRequest { |
There was a problem hiding this comment.
Do we still need this? If so, new name that starts with Group.
And move the Group stuff into group.rs.
There was a problem hiding this comment.
Still needed as the queued element (sequence + priority) the consumer pushes and the handler pops. Renamed FetchRequest → GroupRequested and moved it, Fetch, and GroupRequest into group.rs. The insertion that touches private TrackState internals stays in track.rs as a pub(crate) serve_group_request helper that GroupRequest::accept calls. (Written by Claude)
| /// `timescale` comes from the wire FETCH_OK (or `None` for an untimed track). | ||
| /// Returns [`Error::Duplicate`] if the group is already present, or the track's | ||
| /// abort error if it closed while pending. | ||
| pub fn accept(self, timescale: impl Into<Option<Timescale>>) -> Result<GroupProducer> { |
There was a problem hiding this comment.
Why do we need this? Timescale should come from the TrackInfo.
And if we don't have TrackInfo yet, that's reason to take a impl Into<Option<TrackInfo>> instead and set it.
There was a problem hiding this comment.
Done — accept now takes impl Into<Option<TrackInfo>> and sets the track info if it isn't accepted yet; the group's timescale comes from that TrackInfo (info.timescale). When already accepted the arg is ignored and the group inherits the accepted timescale. (Written by Claude)
- Move fetch-serving off TrackProducer onto a dedicated TrackDynamic handle (group-level analogue of BroadcastDynamic), created via TrackProducer::dynamic or TrackRequest::dynamic. A `dynamic` counter on the track means a cache-miss fetch on an accepted track with no handler fails fast with NotFound instead of blocking forever, while pre-accept fetches still wait (mirrors BroadcastState). - GroupRequest::accept now takes `impl Into<Option<TrackInfo>>` and sets the track info if unaccepted; the group's timescale comes from TrackInfo rather than a bare timescale arg. - Rename requested_fetch/poll_requested_fetch -> group_request/poll_group_request (on TrackDynamic) and BroadcastDynamic::requested_track -> track_request. - Move Fetch, GroupRequest, and the renamed GroupRequested queue struct into group.rs; the insertion that touches private TrackState stays in track.rs as a pub(crate) serve_group_request helper. - Tidy poll_group_request with ready!/ok_or-style flow. - The relay holds one TrackDynamic per track for the serving lifetime. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ynamic from birth Address review on the fetch API shape: - `poll_fetch` no longer returns `Ok(None)` (which read as EOF) — it yields `Result<GroupConsumer>`, with `Error::NotFound` when a miss can never be served. - `TrackConsumer::fetch` → `fetch_group`, which decides synchronously: cached → a handle that resolves immediately; a miss with no handler (or past the final sequence) → `Err(NotFound)` up front; otherwise a handle that waits to be served. It no longer hands back a pending that could sit forever. - Add a synchronous cache-only `TrackConsumer::get_group`. - To make "no dynamic support" a sound synchronous check without racing the relay, a `TrackRequest` now counts as a dynamic handle from creation: on-demand tracks are fetch-capable before they're accepted, so a relay's fetch waits to be served instead of failing fast. Drops the earlier `info.is_some()` race guard. Updates the publisher/web/relay call sites and the model fetch tests. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
cargo doc -D warnings flagged intra-doc links still pointing at the old TrackConsumer::fetch name. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| // on the client, so this issues a wire FETCH upstream. | ||
| let mut group_sub = tokio::time::timeout(TIMEOUT, async { | ||
| bc.track("video").unwrap().fetch(0, None).unwrap().await | ||
| bc.track("video").unwrap().fetch_group(0, None).unwrap().await |
There was a problem hiding this comment.
We should also rename track so it's similar. get_track or consume_track or fetch_track or IDK.
There was a problem hiding this comment.
Agree on the inconsistency, but BroadcastConsumer::track has ~78 call sites across ~30 crates (hang, moq-mux, moq-boy, gst, ffi, libmoq, audio, examples, tests) and per the Cross-Package Sync table it ripples to js/net, swift, kt, go, py, and docs. That's a large mechanical rename well outside this PR's scope (concurrent serving). Proposing a dedicated follow-up PR for it — I lean get_track (parallels the sync get_group), but happy to take whichever name you prefer. Want me to spin that up separately? (Written by Claude)
| loop { | ||
| let request = tokio::select! { | ||
| request = broadcast.requested_track() => match request { | ||
| request = broadcast.track_request() => match request { |
There was a problem hiding this comment.
actually revert, we should be consistent and have the verb then noun
There was a problem hiding this comment.
Done — reverted to verb-then-noun: BroadcastDynamic::requested_track (and poll_requested_track). For consistency I also took TrackDynamic::group_request → requested_group / poll_requested_group. (Written by Claude)
|
|
||
| // (1) Track demand: a fetch, a subscription change, or full idle. One | ||
| // poll_fn so the borrows of `dynamic` and `track` are held together. | ||
| event = std::future::poll_fn(|cx| { |
There was a problem hiding this comment.
Don't we have a kio::wait method or something?
There was a problem hiding this comment.
Good call — switched to kio::wait(|waiter| ...), which drops the std::future::poll_fn + manual Waiter stashing. (Written by Claude)
|
|
||
| #[derive(Default)] | ||
| struct TrackState { | ||
| pub(crate) struct TrackState { |
There was a problem hiding this comment.
Why pub(crate)? Whatever needs access should be in this file or use helpers; I really don't like leaking internal state.
There was a problem hiding this comment.
Agreed. Reverted TrackState to private and moved GroupRequest / GroupRequested back into track.rs next to the internals they touch (dropped the pub(crate) serve_group_request helper). Only the Fetch options struct (no TrackState reference) stays in group.rs. (Written by Claude)
| // Adopt the supplied info only if the track hasn't been accepted yet. | ||
| let info = self.info.get_or_insert_with(|| info.unwrap_or_default()); | ||
|
|
||
| let group = GroupProducer::new_with_timescale(Group { sequence }, info.timescale); |
There was a problem hiding this comment.
Please get rid of new_with_timescale. I didn't realize it made it into the public API; it's not worth it. Have publisher.rs warn if frames have the wrong timescale or something, but silently convert them.
There was a problem hiding this comment.
new_with_timescale predates this PR (added in #1439 with the per-track timescale model), and removing it means reworking how groups carry/validate timescale: today GroupProducer binds it at construction and append_frame rejects mismatches with TimestampMismatch; the wire encoder reads group.timescale(). Moving that to "publisher.rs warns + silently rescales against the track timescale" is a real change to the timestamp contract that's orthogonal to the fetch/concurrency work here. I'd rather not bundle it — can do it as a focused follow-up. Sound ok, or do you want it in this PR? (Written by Claude)
… private - Revert to verb-then-noun for consistency: BroadcastDynamic::track_request -> requested_track, TrackDynamic::group_request -> requested_group (+ poll_ forms). - Use kio::wait instead of std::future::poll_fn + manual Waiter stashing in the relay serve loop. - Stop leaking TrackState: revert it to private and move GroupRequest / GroupRequested back into track.rs (next to the TrackState internals they touch), dropping the pub(crate) serve_group_request helper. Fetch options stay in group.rs. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Summary
Follow-up to #1601. The relay's upstream track-serving loop (
run_subscribeinrs/moq-net/src/lite/subscriber.rs) served a single subscription or fetch at a time: once a subscription started,run_subscribe_track().awaitblocked the loop for the subscription's entire lifetime (no fetches could run meanwhile), and the task exited when the subscription ended.This reworks it so one track serves its lone subscription and any number of fetches concurrently, with a linger before tearing down an idle track.
Serve loop
New
TrackServestruct, one task per track, driven by a single kio-polltokio::select!:poll_fnmultiplexes track demand (fetch / subscription-change / idle).FuturesUnordered— concurrent with each other and with the live subscription.SUBSCRIBE_UPDATE, pauses (cap + priority 0, stream kept open) when the last subscriber leaves, and resumes (uncap) on return.LINGER_TIMEOUTarms; if nobody returns, the task tears down so a returning consumer reuses the cache. Lite01/02 (noSUBSCRIBE_UPDATE) skip the pause and tear down immediately.Model layer (
track.rs/group.rs)TrackDynamic— a dedicated handle for serving on-demand fetches of uncached (old) groups, the group-level analogue ofBroadcastDynamic. Created viaTrackProducer::dynamic()/TrackRequest::dynamic(). PlainTrackProducers (the common case) no longer expose fetch-serving. Adynamiccounter on the track makes a cache-missfetchon an accepted track fail fast withNotFoundwhen no handler exists, while pre-accept fetches still wait.GroupRequest(handed out byTrackDynamic::group_request) —accept(impl Into<Option<TrackInfo>>)inserts the fetched group and, if the track isn't accepted yet, sets itsTrackInfo; the group's timescale comes from that info.requested_fetch→group_request,BroadcastDynamic::requested_track→track_request.Fetch,GroupRequest, and the queue struct (FetchRequest→GroupRequested) moved intogroup.rs; theTrackStateinsertion stays intrack.rsbehind apub(crate) serve_group_requesthelper.Test plan
broadcast_moq_lite_05_fetch_during_subscribe_webtransport: fetches a past group while a subscription is live — the case the old code hung on.TrackDynamic(fetch_miss_signals_dynamic,fetch_miss_no_dynamic_not_found, …).cargo test -p moq-net— 366 lib tests pass.cargo test -p moq-native --test broadcast— 58 integration tests pass (existing fetch round-trip + linger tests included).cargo clippy -p moq-net -p moq-native --all-targetsclean (nix toolchain).Cross-package sync
No wire-format change (FETCH/FETCH_OK/SUBSCRIBE/SUBSCRIBE_UPDATE framing unchanged) — internal relay behavior plus a Rust-only producer-side model API, so
js/net/doc/conceptneed no updates.🤖 Generated with Claude Code
(Written by Claude)