moq-lite-05: implement FETCH for past groups via TrackConsumer::fetch#1601
Merged
Conversation
Add a one-shot `TrackConsumer::fetch(group, options) -> GroupConsumer` that retrieves a single past group without holding a subscription. A cached group is returned directly; otherwise the request bridges to a wire moq-lite FETCH, blocking on a new FETCH_OK (mirroring SUBSCRIBE/SUBSCRIBE_OK). - model: a dynamic group-request channel parallel to dynamic track requests (request_fetch / FetchPending / BroadcastDynamic::requested_group / GroupRequest). Cache hit returns the group; an aborted cached group is bypassed. - wire: new FetchOk message; Fetch.frame_start (lite-05+); the publisher honors frame_start by skipping earlier frames. - publisher: recv_fetch / run_fetch serve the group's frames on the fetch stream. - subscriber: serve group requests by issuing FETCH upstream and routing frames into the producer that resolves the fetcher. - relay web.rs: /fetch?group=N uses fetch() instead of subscribe(). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…y-colden-96e186 # Conflicts: # rs/moq-net/src/lite/fetch.rs
…y-colden-96e186 # Conflicts: # rs/moq-relay/src/web.rs
5 tasks
…y-colden-96e186 # Conflicts: # rs/moq-net/src/lite/publisher.rs
…y-colden-96e186 # Conflicts: # rs/moq-net/src/lite/subscriber.rs # rs/moq-net/src/model/broadcast.rs # rs/moq-relay/src/web.rs
Implementing std Future on a kio-backed type means stashing the strong Waiter in an Option<Waiter> field and replacing it every poll, because the channel's WaiterList only holds a Weak (a dropped waiter loses its wakeup). kio::wait() already hides this for closures, but named futures returned from sync methods had to repeat the dance by hand. Add `kio::Future` (a poll-based trait, method `poll(&mut self, waiter)`) and `kio::Pending<F>`, which carries the retained Waiter and provides the std Future impl once. Pending derefs to the inner value, so any inherent methods you put on it are reachable through the awaitable handle. Convert `TrackFetchPending` to `kio::Pending<TrackFetch>` as the first user: the hand-written waiter field and std Future impl are gone; `TrackFetch` writes only `kio::Future::poll`. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Drop the hand-written `waiter: Option<Waiter>` field and `impl std::future::Future` from `TrackSubscriberPending` and `TrackInfoPending`, mirroring `TrackFetchPending`: each is now `kio::Pending<Inner>` where `Inner` implements `kio::Future`. `kio::Future::poll` takes `&self` (kio channels poll immutably), so a pollable can be driven through a shared borrow. `Pending` derefs to the inner, so the existing `poll_ok` / `update` surface keeps working through the wrapper with no caller changes (moq-mux polls `poll_ok` through an `&self`-borrowed enum, and the broadcast test macro calls it on a shared handle). No hand-written future plumbing or retained-waiter fields remain in moq-net. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Adds a one-shot fetch on
TrackConsumer, so you can retrieve a single past group without holding a live subscription:TrackFetchPendingresolves to aGroupConsumer: immediately if the group is already cached, otherwise after a wire moq-liteFETCHcompletes (blocking on a newFETCH_OK, mirroring howsubscribeblocks onSUBSCRIBE_OK).FETCHwas previously a wire-only definition the publisher rejected withUnexpectedStream.Why
The only way to read a group was to
subscribeto a track and walk its cache. There was no first-class way to grab one past group.fetchlives onTrackConsumer(not a live subscription) on purpose: you no longer have to subscribe just to fetch.How it works (model)
Fetch fits the track model right where the
// TODO fetchplaceholders were:TrackConsumer::fetch(sequence, options)registers a want in the track's shared state (only if the group isn't already determined by the cache) and returns aTrackFetchPendingthat resolves viapoll_get_group.TrackRequest::requested_fetch()(pre-accept) /TrackProducer::requested_fetch()(post-accept), then makes the group available withTrackRequest::serve_fetch(sequence, timescale) -> GroupProducer(which doesn't require the track to be accepted, so a fetch-only track needs noTrackInfo).The FETCH response streams on the same bidi control stream as the request: subscriber writes
Fetch; publisher repliesFetchOk(codec + timescale) then streams the group's frames; stream FIN ends the group. Errors propagate via stream reset.Changes
model/track.rs):Fetchoptions +FetchRequest;TrackConsumer::fetch→TrackFetchPending;TrackState.fetches; producer/requestrequested_fetch()+TrackRequest::serve_fetch().GroupConsumer::timescale()accessor (model/group.rs).lite/fetch.rs): newFetchOk(group echo + compression + timescale);Fetch.frame_start(lite-05+, honored by the publisher).lite/publisher.rs):recv_fetch/run_fetchserve the group on the fetch stream (gated to lite-05+).lite/subscriber.rs):run_subscribeis now a demand loop that serves both subscriptions and fetches off oneTrackRequest; a fetch opens a wire FETCH upstream and fills the group (spawned).kio/producer.rs): exposepoll_unused(poll sibling of the existing asyncunused), used by the relay's demand loop to stop serving a track once every consumer drops.moq-relay/src/web.rs):GET /fetch/<broadcast>/<track>?group=Nusesfetch()instead ofsubscribe().?group=latestkeeps the subscribe path (fetch needs a concrete group).Out of scope (follow-ups)
frame_startresume reseed. The publisher honorsframe_starton the wire, butfetch()always requests0; an aborted/evicted group is re-fetched whole. True resume (requestframe_start = N, stitch onto cached frames) is a separate change.FetchOk.compression = None).rs/moq-netwire/API withjs/netanddoc/concept; deferred.Test plan
cargo test -p moq-net— wire roundtrip (FetchOk,frame_startgating) + model unit tests (cache hit, miss-signals-producer, past-final NotFound, abort).cargo test -p moq-native --test broadcast—broadcast_moq_lite_05_fetch_webtransport: end-to-end fetch over a real session (client fetches a past group; the relay forwards a wire FETCH upstream). WebTransport only, since Lite05Wip isn't ALPN-advertised.cargo test -p moq-relay -p kio,cargo clippy --workspace,cargo fmt --check.Targets
devper branch targeting (wire-protocol + model change underrs/moq-net). Rebased onto theTrackInfo/ async-TrackConsumerreshape (#1631) and theframe_startwire field (#1595).🤖 Generated with Claude Code
(Written by Claude)