-
Notifications
You must be signed in to change notification settings - Fork 134
Add more compatibility for draft 14 #645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds comprehensive fetch protocol types (Fetch, FetchType, FetchOk, FetchError, FetchCancel, FetchHeader, FetchObject) with encode/decode and message IDs; introduces Location type with serialization and re-exports it. Refactors group protocol to GroupOrder, GroupFlags, and GroupHeader and removes prior Object/ObjectExtensions. Extends subscribe/publish surface with FilterType, updated Subscribe/SubscribeOk/SubscribeUpdate, and expanded Publish/PublishOk/PublishError. Renames TrackStatusRequest to TrackStatus and adds TrackStatusCode. Refactors Subscriber state handling to a single SubscriberState and updates Publisher/Subscriber session routing for new messages. Adds Reader::decode_peek for non-destructive decoding. Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
rs/moq/src/ietf/group.rs (1)
80-84: Fix group type ID upper bound (accept up to 0x1F).Current check rejects valid flag combinations (0x1E, 0x1F). This will spuriously fail decoding when multiple flags are set.
Apply this diff:
- if !(0x10..=0x1D).contains(&id) { + if !(0x10..=0x1F).contains(&id) { return Err(DecodeError::InvalidValue); }rs/moq/src/ietf/subscribe.rs (3)
92-107: Subscribe::encode ignores self.group_orderHard-coding GroupOrder::Descending can surprise callers and break round-trips. Encode the provided field.
- GroupOrder::Descending.encode(w); + self.group_order.encode(w);
352-369: Invalid filter_type test crafts a non‑conformant byte sequenceThe raw bytes don’t follow the Subscribe wire format (namespace, strings, varints). Build the vector using Encode helpers to ensure proper varints and field order.
- let invalid_bytes = vec![ /* hand-built bytes ... */ ]; + let invalid_bytes = { + let mut b = BytesMut::new(); + 1u64.encode(&mut b); // request_id + encode_namespace(&mut b, &Path::new("test")); // namespace + Cow::<str>::from("video").encode(&mut b); // track_name + 0x80u8.encode(&mut b); // subscriber_priority + GroupOrder::Descending.encode(&mut b); // group_order + true.encode(&mut b); // forward + (0x99u64).encode(&mut b); // INVALID filter_type + 0u8.encode(&mut b); // num params + b.to_vec() + };
371-385: SubscribeOk invalid expires test is malformedThe sequence is missing track_alias and mislabels fields. Construct with Encode helpers:
- let invalid_bytes = vec![ /* hand-built bytes ... */ ]; + let invalid_bytes = { + let mut b = BytesMut::new(); + 1u64.encode(&mut b); // request_id + 1u64.encode(&mut b); // track_alias + 5u64.encode(&mut b); // expires != 0 (invalid) + GroupOrder::Descending.encode(&mut b); // group_order + false.encode(&mut b); // content_exists + 0u8.encode(&mut b); // num params + b.to_vec() + };As per coding guidelines.
🧹 Nitpick comments (16)
rs/moq/src/ietf/location.rs (1)
9-22: Codec looks correct; add round‑trip tests inline.Encode/decode order is consistent. Please add unit tests in this file to lock the wire format. As per coding guidelines.
#[cfg(test)] mod tests { use super::*; use bytes::{BufMut, BytesMut}; #[test] fn location_roundtrip() { let loc = Location { group: 42, object: 7 }; let mut buf = BytesMut::new(); loc.encode(&mut buf); let mut rd = &buf[..]; let got = Location::decode(&mut rd).unwrap(); assert_eq!(got, loc); assert!(rd.is_empty()); } }rs/moq/src/ietf/publish.rs (3)
164-181: Publish encode/decode reads well; add unit tests.Fields match the spec excerpt, optional Largest Location is gated by a boolean, and parameters are consumed. Please add a small round‑trip test to prevent regressions. As per coding guidelines.
Also applies to: 182-205
208-216: PublishOk is a placeholder; clarify or implement.Struct exists with ID constant but no Message impl and no End Group field. Either:
- add a TODO explaining it’s intentionally unsupported, or
- implement Message encode/decode (consider adding
end_group: Option<u64>), mirroring Subscribe/Fetch handling.
222-246: PublishError implemented but currently not routed.Message is encodable/decodable here, but session handling returns Unsupported for PublishError. If intentional, add a brief comment; otherwise, wire it to the appropriate handler for symmetry with other control messages.
rs/moq/src/ietf/group.rs (1)
44-75: Add codec tests for Group and flag combinations.Recommend tests covering all flag masks, subgroup presence validation, and publisher priority consumption.
#[cfg(test)] mod tests { use super::*; use bytes::{BufMut, BytesMut}; fn rt(g: Group) { let mut buf = BytesMut::new(); g.encode(&mut buf); let mut rd = &buf[..]; let got = Group::decode(&mut rd).unwrap(); // Compare key fields; flags round-trip assert_eq!(got.track_alias, g.track_alias); assert_eq!(got.group_id, g.group_id); assert_eq!(got.has_extensions, g.has_extensions); assert_eq!(got.has_subgroup, g.has_subgroup); assert_eq!(got.has_subgroup_object, g.has_subgroup_object); assert_eq!(got.has_end, g.has_end); assert!(rd.is_empty()); } #[test] fn flags_all_permitted() { rt(Group { track_alias: 1, group_id: 2, has_extensions: true, has_subgroup: false, has_subgroup_object: true, has_end: true }); rt(Group { track_alias: 1, group_id: 2, has_extensions: true, has_subgroup: true, has_subgroup_object: false, has_end: true }); } #[test] fn subgroup_conflict_rejected() { // encode path prevents this; check decode rejection let mut buf = BytesMut::new(); // Manually craft ID 0x10 | 0x02 | 0x04 0x16u8.encode(&mut buf); 1u64.encode(&mut buf); // track_alias 2u64.encode(&mut buf); // group_id // subgroup id expected but will be read; any value is fine for this test 0u8.encode(&mut buf); 0u8.encode(&mut buf); // publisher priority let mut rd = &buf[..]; assert!(matches!(Group::decode(&mut rd), Err(DecodeError::InvalidValue))); } }Also applies to: 94-114
rs/moq/src/ietf/publisher.rs (1)
137-143: Return a more precise error for unsupported subscribe updates500 is generic. Prefer 501 (Not Implemented) or a dedicated MOQT error if you have one, and log the unsupported fields for telemetry.
rs/moq/src/ietf/track.rs (2)
42-49: Decode filter_type using the typed enum for validationYou decode filter_type as u64 and ignore invalid values. Use FilterType::decode to enforce validity, mirroring Subscribe.
- let _filter_type = u64::decode(r)?; + let _filter_type = FilterType::decode(r)?;
15-35: Add unit tests for TrackStatus and TrackStatusCode round‑tripsPer repo guidelines, add in-file tests for TrackStatus encode/decode and TrackStatusCode enum serialization, including negative cases (invalid code -> DecodeError::InvalidValue).
As per coding guidelines.
Also applies to: 58-77
rs/moq/src/ietf/subscribe.rs (3)
129-141: SubscribeOk::decode should parse group_order via GroupOrderUse the enum’s Decode impl for consistent validation.
- // Ignore group order, who cares. - let _group_order = u8::decode(r)?; + // Ignore group order, but still validate it. + let _group_order = GroupOrder::decode(r)?;
260-295: Strengthen Subscribe tests: assert new fieldsThe round‑trip tests don’t assert group_order and filter_type. Add checks to ensure the new fields survive decode.
assert_eq!(decoded.subscriber_priority, 128); + assert!(matches!(decoded.group_order, GroupOrder::Descending)); + assert!(matches!(decoded.filter_type, FilterType::LargestObject));
205-258: SubscribeUpdate: add unit testsAdd encode/decode round‑trip tests (including forward=false -> Unsupported if that rule should carry over) to lock the format.
As per coding guidelines.
rs/moq/src/ietf/fetch.rs (1)
31-64: Great coverage of Fetch message types; add tests and confirm TYPE mapping
- Please add unit tests for:
- FetchType::{Standalone, RelativeJoining, AbsoluteJoining} encode/decode.
- Fetch encode/decode with each variant.
- FetchOk/FetchError/FetchCancel round‑trips.
- FetchHeader encode/decode.
- Confirm that FetchHeader::TYPE = 0x5 is the agreed uni-stream discriminator with the subscriber, or gate its emission until the subscriber consumes it.
Example test skeleton to include under cfg(test):
#[test] fn fetch_relative_joining_roundtrip() { use bytes::BytesMut; let msg = Fetch { request_id: 7, subscriber_priority: 42, group_order: GroupOrder::Descending, fetch_type: FetchType::RelativeJoining { subscriber_request_id: 1, group_offset: 0 }, }; let mut b = BytesMut::new(); msg.encode(&mut b); let mut r = bytes::Bytes::from(b.to_vec()); let dec = Fetch::decode(&mut r).unwrap(); assert_eq!(dec, msg); }Also applies to: 66-101, 113-139, 149-175, 184-203, 205-220, 222-241
rs/moq/src/ietf/subscriber.rs (4)
169-201: Don’t hold the state lock while calling create_groupMinimize critical section and avoid unexpected blocking within the lock. Extract needed values, drop the lock, then call create_group.
- let producer = { - let mut state = self.state.lock(); - let request_id = *state.aliases.get(&group.track_alias).unwrap_or(&group.track_alias); - let track = state.subscribes.get_mut(&request_id).ok_or(Error::NotFound)?; - let group = Group { sequence: group.group_id }; - track.producer.create_group(group).ok_or(Error::Old)? - }; + let (track_producer, group_id) = { + let mut state = self.state.lock(); + let request_id = *state.aliases.get(&group.track_alias).unwrap_or(&group.track_alias); + let track = state.subscribes.get(&request_id).ok_or(Error::NotFound)?; + (track.producer.clone(), group.group_id) + }; + let producer = track_producer.create_group(Group { sequence: group_id }).ok_or(Error::Old)?;
344-350: Treat FetchOk/FetchError as no‑ops for nowReturning Unsupported escalates a benign control message to an error. Until fetch is implemented on the subscriber, consume and log.
- pub fn recv_fetch_ok(&mut self, _msg: ietf::FetchOk) -> Result<(), Error> { - Err(Error::Unsupported) - } + pub fn recv_fetch_ok(&mut self, _msg: ietf::FetchOk) -> Result<(), Error> { + tracing::debug!("FETCH_OK ignored (not implemented)"); + Ok(()) + } @@ - pub fn recv_fetch_error(&mut self, _msg: ietf::FetchError<'_>) -> Result<(), Error> { - Err(Error::Unsupported) - } + pub fn recv_fetch_error(&mut self, _msg: ietf::FetchError<'_>) -> Result<(), Error> { + tracing::debug!("FETCH_ERROR ignored (not implemented)"); + Ok(()) + }
352-358: Use a neutral error code and phrase for PublishError“publish not supported bro” can leak into logs/UX. Use a neutral phrase and a more conventional code.
- error_code: 300, - reason_phrase: "publish not supported bro".into(), + error_code: 501, + reason_phrase: "PUBLISH not supported".into(),
142-168: Subscriber only parses Group uni-streamsGiven publisher now emits a fetch uni-stream, consider extending run_uni_stream to discriminate stream types (Group vs FetchHeader) and route accordingly, or coordinate to disable fetch uni-stream emission until implemented here.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
rs/moq/src/ietf/fetch.rs(1 hunks)rs/moq/src/ietf/group.rs(4 hunks)rs/moq/src/ietf/location.rs(1 hunks)rs/moq/src/ietf/mod.rs(2 hunks)rs/moq/src/ietf/publish.rs(2 hunks)rs/moq/src/ietf/publisher.rs(7 hunks)rs/moq/src/ietf/session.rs(3 hunks)rs/moq/src/ietf/subscribe.rs(11 hunks)rs/moq/src/ietf/subscriber.rs(6 hunks)rs/moq/src/ietf/track.rs(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
rs/**/src/**/*.rs
📄 CodeRabbit inference engine (CLAUDE.md)
Write Rust tests integrated within source files (unit tests alongside code).
Files:
rs/moq/src/ietf/location.rsrs/moq/src/ietf/mod.rsrs/moq/src/ietf/subscribe.rsrs/moq/src/ietf/session.rsrs/moq/src/ietf/publish.rsrs/moq/src/ietf/track.rsrs/moq/src/ietf/group.rsrs/moq/src/ietf/publisher.rsrs/moq/src/ietf/fetch.rsrs/moq/src/ietf/subscriber.rs
🧬 Code graph analysis (9)
rs/moq/src/ietf/location.rs (3)
rs/moq/src/ietf/subscribe.rs (12)
encode(25-27)encode(92-107)encode(120-127)encode(168-172)encode(195-197)encode(230-238)decode(31-33)decode(51-90)decode(129-154)decode(173-183)decode(199-202)decode(240-257)rs/moq/src/ietf/fetch.rs (12)
encode(32-63)encode(116-123)encode(152-159)encode(187-191)encode(212-214)encode(231-233)decode(67-100)decode(125-138)decode(161-174)decode(193-202)decode(216-219)decode(237-240)rs/moq/src/ietf/publish.rs (4)
encode(129-134)encode(164-180)decode(136-147)decode(182-205)
rs/moq/src/ietf/subscribe.rs (6)
rs/moq/src/ietf/namespace.rs (2)
decode_namespace(19-33)encode_namespace(4-16)rs/moq/src/ietf/group.rs (4)
encode(14-16)encode(45-75)decode(20-22)decode(79-114)rs/moq/src/ietf/fetch.rs (12)
encode(32-63)encode(116-123)encode(152-159)encode(187-191)encode(212-214)encode(231-233)decode(67-100)decode(125-138)decode(161-174)decode(193-202)decode(216-219)decode(237-240)rs/moq/src/ietf/location.rs (2)
encode(10-13)decode(17-21)rs/moq/src/ietf/track.rs (4)
encode(26-35)encode(68-70)decode(37-55)decode(74-76)js/moq/src/ietf/subscribe.ts (1)
SubscribeOk(74-126)
rs/moq/src/ietf/session.rs (5)
js/moq/src/lite/subscribe.ts (1)
SubscribeUpdate(5-32)js/moq/src/ietf/connection.ts (2)
msg(132-205)msg(211-216)rs/moq/src/ietf/fetch.rs (6)
decode(67-100)decode(125-138)decode(161-174)decode(193-202)decode(216-219)decode(237-240)js/moq/src/ietf/fetch.ts (4)
Fetch(5-55)FetchCancel(113-137)FetchOk(57-81)FetchError(83-111)js/moq/src/ietf/publish.ts (1)
Publish(7-25)
rs/moq/src/ietf/publish.rs (3)
rs/moq/src/ietf/namespace.rs (2)
decode_namespace(19-33)encode_namespace(4-16)rs/moq/src/ietf/subscribe.rs (12)
encode(25-27)encode(92-107)encode(120-127)encode(168-172)encode(195-197)encode(230-238)decode(31-33)decode(51-90)decode(129-154)decode(173-183)decode(199-202)decode(240-257)rs/moq/src/ietf/track.rs (4)
encode(26-35)encode(68-70)decode(37-55)decode(74-76)
rs/moq/src/ietf/track.rs (3)
rs/moq/src/ietf/namespace.rs (2)
decode_namespace(19-33)encode_namespace(4-16)rs/moq/src/ietf/subscribe.rs (12)
encode(25-27)encode(92-107)encode(120-127)encode(168-172)encode(195-197)encode(230-238)decode(31-33)decode(51-90)decode(129-154)decode(173-183)decode(199-202)decode(240-257)rs/moq/src/ietf/control.rs (1)
request_id(31-33)
rs/moq/src/ietf/group.rs (4)
rs/moq/src/ietf/subscribe.rs (12)
encode(25-27)encode(92-107)encode(120-127)encode(168-172)encode(195-197)encode(230-238)decode(31-33)decode(51-90)decode(129-154)decode(173-183)decode(199-202)decode(240-257)rs/moq/src/ietf/fetch.rs (12)
encode(32-63)encode(116-123)encode(152-159)encode(187-191)encode(212-214)encode(231-233)decode(67-100)decode(125-138)decode(161-174)decode(193-202)decode(216-219)decode(237-240)rs/moq/src/ietf/publish.rs (6)
encode(129-134)encode(164-180)encode(230-234)decode(136-147)decode(182-205)decode(236-245)js/moq/src/ietf/object.ts (1)
Group(18-85)
rs/moq/src/ietf/publisher.rs (4)
js/moq/src/ietf/connection.ts (3)
msg(132-205)msg(211-216)stream(261-269)js/moq/src/ietf/subscribe.ts (2)
SubscribeError(128-162)SubscribeOk(74-126)rs/moq/src/ietf/control.rs (2)
request_id(31-33)new(12-17)js/moq/src/ietf/fetch.ts (4)
Fetch(5-55)FetchError(83-111)FetchOk(57-81)FetchCancel(113-137)
rs/moq/src/ietf/fetch.rs (6)
rs/moq/src/path.rs (1)
borrow(200-202)rs/moq/src/ietf/namespace.rs (2)
decode_namespace(19-33)encode_namespace(4-16)rs/moq/src/ietf/group.rs (4)
encode(14-16)encode(45-75)decode(20-22)decode(79-114)rs/moq/src/ietf/subscribe.rs (12)
encode(25-27)encode(92-107)encode(120-127)encode(168-172)encode(195-197)encode(230-238)decode(31-33)decode(51-90)decode(129-154)decode(173-183)decode(199-202)decode(240-257)rs/moq/src/ietf/location.rs (2)
encode(10-13)decode(17-21)rs/moq/src/ietf/track.rs (4)
encode(26-35)encode(68-70)decode(37-55)decode(74-76)
rs/moq/src/ietf/subscriber.rs (5)
js/moq/src/ietf/connection.ts (2)
msg(132-205)msg(211-216)js/moq/src/ietf/subscribe.ts (2)
SubscribeOk(74-126)Subscribe(13-72)rs/moq/src/ietf/control.rs (1)
request_id(31-33)js/moq/src/ietf/fetch.ts (2)
FetchOk(57-81)FetchError(83-111)js/moq/src/ietf/publish.ts (2)
Publish(7-25)PublishError(47-65)
🔇 Additional comments (9)
rs/moq/src/ietf/mod.rs (1)
5-5: LGTM: module added and re‑exported.Public surfacing of Location is consistent with new call sites.
Also applies to: 23-23
rs/moq/src/ietf/session.rs (3)
78-81: SubscribeUpdate routing added — OK.Decodes and forwards to Publisher; matches the new control flow.
118-121: TrackStatus now routed to Publisher.Ensure Publisher exposes
recv_track_statusand Subscriber no longer relies on it.
147-166: FETCH/PUBLISH routing added — OK; ensure handlers are non‑blocking.*Forwarding looks correct; confirm
recv_fetch*andrecv_publishdo not block the control loop.rs/moq/src/ietf/publisher.rs (3)
59-70: FilterType validation on subscribe is correctRejecting AbsoluteStart/AbsoluteRange early is fine; allowing NextGroup/LargestObject aligns with current support.
102-105: SubscribeOk includes track_alias (LGTM)Returning track_alias=request_id is consistent with current aliasing model and JS decoder expectations.
196-206: Group uses track_alias=request_id (LGTM)This keeps alias resolution simple and matches SubscribeOk.
rs/moq/src/ietf/subscribe.rs (1)
15-34: FilterType enum and (de)serialization look goodThe enum values and Encode/Decode implementation are consistent with other message types.
rs/moq/src/ietf/subscriber.rs (1)
104-113: Alias bookkeeping looks correctStoring alias mapping on SubscribeOk and cleaning on error/done is sound.
Also applies to: 117-137
| pub fn recv_fetch(&mut self, msg: ietf::Fetch<'_>) -> Result<(), Error> { | ||
| let subscribe_id = match msg.fetch_type { | ||
| FetchType::Standalone { .. } => { | ||
| return self.control.send(ietf::FetchError { | ||
| request_id: msg.request_id, | ||
| error_code: 500, | ||
| reason_phrase: "not supported".into(), | ||
| }); | ||
| } | ||
| FetchType::RelativeJoining { | ||
| subscriber_request_id, | ||
| group_offset, | ||
| } => { | ||
| if group_offset != 0 { | ||
| return self.control.send(ietf::FetchError { | ||
| request_id: msg.request_id, | ||
| error_code: 500, | ||
| reason_phrase: "not supported".into(), | ||
| }); | ||
| } | ||
|
|
||
| subscriber_request_id | ||
| } | ||
| FetchType::AbsoluteJoining { .. } => { | ||
| return self.control.send(ietf::FetchError { | ||
| request_id: msg.request_id, | ||
| error_code: 500, | ||
| reason_phrase: "not supported".into(), | ||
| }); | ||
| } | ||
| }; | ||
|
|
||
| let subscribes = self.subscribes.lock(); | ||
| if !subscribes.contains_key(&subscribe_id) { | ||
| return self.control.send(ietf::FetchError { | ||
| request_id: msg.request_id, | ||
| error_code: 404, | ||
| reason_phrase: "Subscribe not found".into(), | ||
| }); | ||
| } | ||
|
|
||
| self.control.send(ietf::FetchOk { | ||
| request_id: msg.request_id, | ||
| group_order: GroupOrder::Descending, | ||
| end_of_track: false, | ||
| // TODO get the proper group_id | ||
| end_location: Location { group: 0, object: 0 }, | ||
| })?; | ||
|
|
||
| let session = self.session.clone(); | ||
| let request_id = msg.request_id; | ||
|
|
||
| web_async::spawn(async move { | ||
| if let Err(err) = Self::run_fetch(session, request_id).await { | ||
| tracing::warn!(?err, "error running fetch"); | ||
| } | ||
| }); | ||
|
|
||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Fetch uni-stream is sent but subscriber never recognizes it (protocol mismatch)
You send a uni stream with FetchHeader::TYPE (0x5) and header, but rs/moq/src/ietf/subscriber.rs only decodes Group frames in run_uni_stream and aborts on unknown types. This will cause the fetch stream to be aborted and noisy logs without functional value.
Options:
- Implement fetch uni-stream handling on the subscriber side (detect TYPE==FetchHeader::TYPE and consume the header), or
- Defer opening the uni stream for now: reply with FetchOk and return without spawning run_fetch.
Also:
- Track active fetches so recv_fetch_cancel can actually cancel in-flight work.
- Consider echoing the requested group_order in FetchOk (if supported), and compute a real end_location when available.
Proposed minimal safe patch (defer uni stream until subscriber support lands):
- let session = self.session.clone();
- let request_id = msg.request_id;
-
- web_async::spawn(async move {
- if let Err(err) = Self::run_fetch(session, request_id).await {
- tracing::warn!(?err, "error running fetch");
- }
- });
+ // TODO: open a fetch uni stream once the subscriber can parse FetchHeader.And add basic tracking for cancelability:
// at struct level
- subscribes: Lock<HashMap<u64, oneshot::Sender<()>>>,
+ subscribes: Lock<HashMap<u64, oneshot::Sender<()>>>,
+ fetches: Lock<HashMap<u64, oneshot::Sender<()>>>,
@@
pub fn new(...) -> Self {
Self {
...
- subscribes: Default::default(),
+ subscribes: Default::default(),
+ fetches: Default::default(),
}
}
@@
pub fn recv_fetch_cancel(&mut self, msg: ietf::FetchCancel) -> Result<(), Error> {
- tracing::warn!(?msg, "fetch cancel");
+ if let Some(tx) = self.fetches.lock().remove(&msg.request_id) {
+ let _ = tx.send(());
+ } else {
+ tracing::debug!(id = %msg.request_id, "fetch cancel for unknown request");
+ }
Ok(())
}Also applies to: 396-410, 411-414
🏁 Script executed:
#!/bin/bash
# 1. Get full recv_fetch function and surrounding context
echo "=== Publisher recv_fetch function ==="
sed -n '330,420p' rs/moq/src/ietf/publisher.rs
echo -e "\n=== Checking for run_fetch implementation ==="
rg -n 'fn run_fetch' rs/moq/src/ietf/publisher.rs -A 20
echo -e "\n=== Checking recv_fetch_cancel implementation ==="
rg -n 'fn recv_fetch_cancel' rs/moq/src/ietf/publisher.rs -A 10
echo -e "\n=== Checking Publisher struct fields ==="
rg -n 'struct Publisher' rs/moq/src/ietf/publisher.rs -A 15Length of output: 4417
🏁 Script executed:
#!/bin/bash
# 2. Check subscriber.rs run_uni_stream function
echo "=== Subscriber run_uni_stream function ==="
rg -n 'fn run_uni_stream' rs/moq/src/ietf/subscriber.rs -A 40
echo -e "\n=== Check for FetchHeader in codebase ==="
rg -n 'FetchHeader' rs/moq/src/ietf/ -C 2
echo -e "\n=== Check for TYPE constant related to Fetch ==="
rg -n 'TYPE.*0x5' rs/moq/src/ietf/ -C 1Length of output: 3255
🏁 Script executed:
#!/bin/bash
# 3. Search for any fetch-related async spawn or task tracking
echo "=== Search for web_async::spawn usage in publisher.rs ==="
rg -n 'web_async::spawn' rs/moq/src/ietf/publisher.rs -B 5 -A 5
echo -e "\n=== Search for fetches HashMap usage ==="
rg -n 'fetches' rs/moq/src/ietf/publisher.rs -C 3Length of output: 783
🏁 Script executed:
#!/bin/bash
# 4. Verify subscriber does NOT handle FetchHeader anywhere
echo "=== Search subscriber.rs for any FetchHeader handling ==="
rg -n 'FetchHeader' rs/moq/src/ietf/subscriber.rs
echo -e "\n=== Check recv_group function to confirm it doesn't handle TYPE 0x5 ==="
rg -n 'fn recv_group' rs/moq/src/ietf/subscriber.rs -A 30
echo -e "\n=== Search for any match on TYPE 0x5 in subscriber context ==="
rg -n '0x5|FetchHeader::TYPE' rs/moq/src/ietf/subscriber.rsLength of output: 1368
Fix protocol mismatch and add fetch cancellation tracking
The current implementation sends a FetchHeader uni-stream (TYPE 0x5) that the subscriber cannot decode. The subscriber's run_uni_stream immediately calls recv_group() which expects a Group message and will abort on the unknown type. Additionally, recv_fetch_cancel cannot cancel in-flight fetches because there is no tracking mechanism—the fetches HashMap does not exist and recv_fetch_cancel only logs a warning.
Defer uni-stream opening until subscriber support for FetchHeader is implemented:
- let session = self.session.clone();
- let request_id = msg.request_id;
-
- web_async::spawn(async move {
- if let Err(err) = Self::run_fetch(session, request_id).await {
- tracing::warn!(?err, "error running fetch");
- }
- });
+ // TODO: open a fetch uni stream once the subscriber can parse FetchHeader.Add fetch tracking for cancellation (lines 15-30):
pub(super) struct Publisher<S: web_transport_trait::Session> {
session: S,
origin: OriginConsumer,
control: Control,
subscribes: Lock<HashMap<u64, oneshot::Sender<()>>>,
+ fetches: Lock<HashMap<u64, oneshot::Sender<()>>>,
}
impl<S: web_transport_trait::Session> Publisher<S> {
pub fn new(session: S, origin: Option<OriginConsumer>, control: Control) -> Self {
let origin = origin.unwrap_or_else(|| Origin::produce().consumer);
Self {
session,
origin,
control,
subscribes: Default::default(),
+ fetches: Default::default(),
}
}Update recv_fetch_cancel (line 411-414) to actually cancel:
pub fn recv_fetch_cancel(&mut self, msg: ietf::FetchCancel) -> Result<(), Error> {
- tracing::warn!(?msg, "fetch cancel");
+ if let Some(tx) = self.fetches.lock().remove(&msg.request_id) {
+ let _ = tx.send(());
+ } else {
+ tracing::debug!(id = %msg.request_id, "fetch cancel for unknown request");
+ }
Ok(())
}Also address the hardcoded end_location with a real group ID once available.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In rs/moq/src/ietf/publisher.rs around lines 335 to 394, the Fetch handling
opens a FetchHeader uni-stream that subscribers can't decode and there is no
tracking to allow cancellation; defer opening any FetchHeader uni-stream (do not
send TYPE 0x5) until subscriber support exists, and implement a fetches HashMap
(or similar) keyed by request_id to track in-flight fetch tasks/handles; when
starting the async run_fetch spawn, insert a cancellable handle (JoinHandle or
cancellation token) into fetches and remove it when finished; update
recv_fetch_cancel (around lines ~411-414) to look up the request_id in fetches,
cancel or abort the stored handle and remove the entry, and respond with
appropriate FetchCancelOk/FetchError; also replace the hardcoded end_location
placeholder with the real group id when available.
We don't actually use this code path though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
rs/moq/src/ietf/publisher.rs (1)
335-414: Protocol mismatch and missing cancellation tracking persist.The issues identified in the previous review remain unresolved:
Protocol mismatch: Lines 387-409 spawn
run_fetchwhich opens a uni-stream withFetchHeader::TYPE(0x5), butsubscriber.rsline 165 explicitly returnsError::Unsupportedfor this type, causing the stream to be aborted.No cancellation tracking: Lines 411-414 show
recv_fetch_cancelonly logs a warning without any mechanism to cancel in-flight fetch tasks. There is nofetches: HashMap<u64, oneshot::Sender<()>>field to track active fetches.Hardcoded location: Line 381 uses a placeholder
Location { group: 0, object: 0 }with a TODO comment.The previous review comment provides detailed remediation steps that should be followed.
🧹 Nitpick comments (2)
rs/moq/src/coding/reader.rs (1)
55-76: Consider extracting common decode logic.The
decode_peekanddecodemethods share nearly identical logic (~20 lines) with only one difference:decodeadvances the buffer whiledecode_peekdoes not. Consider extracting the common decoding and buffer-filling logic into a private helper method.Example refactor:
async fn decode_internal<T: Decode>(&mut self, advance: bool) -> Result<T, Error> { loop { let mut cursor = io::Cursor::new(&self.buffer); match T::decode(&mut cursor) { Ok(msg) => { if advance { self.buffer.advance(cursor.position() as usize); } return Ok(msg); } Err(DecodeError::Short) => { // Try to read more data if self .stream .read_buf(&mut self.buffer) .await .map_err(|e| Error::Transport(Arc::new(e)))? .is_none() { // Stream closed while we still need more data return Err(Error::Decode(DecodeError::Short)); } } Err(e) => return Err(Error::Decode(e)), } } } pub async fn decode<T: Decode>(&mut self) -> Result<T, Error> { self.decode_internal(true).await } pub async fn decode_peek<T: Decode>(&mut self) -> Result<T, Error> { self.decode_internal(false).await }rs/moq/src/ietf/subscriber.rs (1)
213-235: Subscribe implementation looks good; fetch joining noted as future work.The
Subscribemessage is correctly constructed withgroup_orderandfilter_type. The TODO comment at line 224 about joining fetches is noted as future work—this is acceptable for the current draft compatibility scope.If you'd like to implement the joining fetch functionality mentioned in the TODO, I can help design and generate the required code changes.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
rs/moq/src/coding/reader.rs(1 hunks)rs/moq/src/ietf/group.rs(4 hunks)rs/moq/src/ietf/publisher.rs(9 hunks)rs/moq/src/ietf/subscriber.rs(10 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
rs/**/src/**/*.rs
📄 CodeRabbit inference engine (CLAUDE.md)
Write Rust tests integrated within source files (unit tests alongside code).
Files:
rs/moq/src/coding/reader.rsrs/moq/src/ietf/group.rsrs/moq/src/ietf/subscriber.rsrs/moq/src/ietf/publisher.rs
🧬 Code graph analysis (4)
rs/moq/src/coding/reader.rs (2)
rs/moq/src/coding/writer.rs (1)
new(12-17)rs/moq/src/ietf/message.rs (1)
decode(24-24)
rs/moq/src/ietf/group.rs (4)
rs/moq/src/ietf/publish.rs (6)
encode(129-134)encode(164-180)encode(230-234)decode(136-147)decode(182-205)decode(236-245)rs/moq/src/ietf/track.rs (4)
encode(26-35)encode(68-70)decode(37-55)decode(74-76)rs/moq/src/ietf/fetch.rs (9)
encode(32-63)encode(116-123)encode(152-159)encode(187-191)encode(212-214)decode(67-100)decode(125-138)decode(161-174)decode(193-202)rs/moq/src/ietf/subscribe.rs (12)
encode(25-27)encode(92-107)encode(120-127)encode(168-172)encode(195-197)encode(230-238)decode(31-33)decode(51-90)decode(129-154)decode(173-183)decode(199-202)decode(240-257)
rs/moq/src/ietf/subscriber.rs (6)
js/moq/src/ietf/object.ts (3)
GroupFlags(7-12)Frame(87-151)Group(18-85)js/moq/src/ietf/subscriber.ts (2)
Subscriber(25-268)broadcast(106-134)js/moq/src/ietf/subscribe.ts (4)
SubscribeOk(74-126)SubscribeError(128-162)PublishDone(192-228)Subscribe(13-72)rs/moq/src/ietf/control.rs (1)
request_id(31-33)js/moq/src/ietf/fetch.ts (2)
FetchOk(57-81)FetchError(83-111)js/moq/src/ietf/publish.ts (2)
Publish(7-25)PublishError(47-65)
rs/moq/src/ietf/publisher.rs (5)
js/moq/src/ietf/connection.ts (3)
msg(132-205)msg(211-216)stream(261-269)js/moq/src/ietf/subscribe.ts (1)
SubscribeOk(74-126)rs/moq/src/ietf/group.rs (1)
default(92-99)rs/moq/src/ietf/subscriber.rs (3)
run_group(274-322)stream(280-280)new(37-44)js/moq/src/ietf/fetch.ts (4)
Fetch(5-55)FetchError(83-111)FetchOk(57-81)FetchCancel(113-137)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Check
🔇 Additional comments (11)
rs/moq/src/ietf/group.rs (3)
6-23: LGTM!The
GroupOrderenum withrepr(u8)and itsEncode/Decodeimplementations are straightforward and correct. The use ofnum_enumderives ensures type-safe conversion between the enum and its wire representation.
25-100: LGTM!The
GroupFlagsencoding and decoding logic is well-designed:
- Bitmask encoding is space-efficient and correctly validates the mutual exclusion constraint
- Range check ensures only valid flag combinations are accepted
- Default implementation provides sensible defaults (
has_end: true)
102-145: LGTM!The
GroupHeaderencoding and decoding correctly implements the wire format:
- Flags are encoded as a u64 via
GroupFlags::encode()- Subgroup ID is conditionally written/read based on flags
- Validation ensures unsupported subgroup IDs are rejected
The implementation is consistent with usage in
publisher.rsandsubscriber.rs.rs/moq/src/ietf/publisher.rs (4)
59-135: LGTM!The filter type validation correctly rejects unsupported absolute subscribe modes and sets
track_aliasequal torequest_id, which aligns with the implementation expectations injs/moq/src/ietf/subscribe.ts.
137-143: Stub implementation for subscribe updates.This method correctly rejects subscribe update requests with an error, indicating the feature is not yet implemented. This is appropriate for draft compatibility.
196-201: LGTM!The refactor to use
GroupHeaderwith default flags is clean and consistent with the new header-centric encoding. The flag checks inrun_groupcorrectly mirror the encoding logic.Also applies to: 228-296
331-333: LGTM!The method rename and signature update align with the new
TrackStatustype. ReturningUnsupportedis appropriate for unimplemented functionality.rs/moq/src/ietf/subscriber.rs (4)
15-33: LGTM!The refactor to centralized state management using
SubscriberStateandSubscriberTrackis a clean design that simplifies lifecycle tracking and alias resolution across the subscriber implementation.
104-140: LGTM!The state-based lifecycle management is implemented correctly:
- Alias mappings are properly maintained when
track_aliasdiffers fromrequest_id- Error and completion handlers perform appropriate cleanup (abort vs. close) and remove alias mappings
161-175: LGTM!The use of
decode_peek()for non-destructive stream type checking is appropriate. ReturningUnsupportedforFetchHeader::TYPEis correct for the current implementation capabilities, though it creates the protocol mismatch already flagged inpublisher.rs.
237-322: LGTM!The alias resolution logic in
recv_groupcorrectly mapstrack_aliastorequest_idusing the state's alias table, with appropriate fallback. Therun_groupmethod properly usesGroupHeaderand checks flags for conditional encoding.
| pub async fn decode_peek<T: Decode>(&mut self) -> Result<T, Error> { | ||
| loop { | ||
| let mut cursor = io::Cursor::new(&self.buffer); | ||
| match T::decode(&mut cursor) { | ||
| Ok(msg) => return Ok(msg), | ||
| Err(DecodeError::Short) => { | ||
| // Try to read more data | ||
| if self | ||
| .stream | ||
| .read_buf(&mut self.buffer) | ||
| .await | ||
| .map_err(|e| Error::Transport(Arc::new(e)))? | ||
| .is_none() | ||
| { | ||
| // Stream closed while we still need more data | ||
| return Err(Error::Decode(DecodeError::Short)); | ||
| } | ||
| } | ||
| Err(e) => return Err(Error::Decode(e)), | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Add documentation for the peek semantics.
The implementation is correct and provides the expected non-destructive lookahead behavior. However, please add a doc comment explaining:
- That this method decodes without consuming the buffer
- That multiple calls will return the same decoded message
- How it differs from
decode
Apply this diff to add documentation:
+ /// Decode a message without consuming it from the buffer.
+ ///
+ /// This method behaves like `decode`, but does not advance the buffer on success.
+ /// Multiple calls to `decode_peek` will return the same decoded message until
+ /// the buffer is consumed by calling `decode` or other buffer-consuming methods.
pub async fn decode_peek<T: Decode>(&mut self) -> Result<T, Error> {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub async fn decode_peek<T: Decode>(&mut self) -> Result<T, Error> { | |
| loop { | |
| let mut cursor = io::Cursor::new(&self.buffer); | |
| match T::decode(&mut cursor) { | |
| Ok(msg) => return Ok(msg), | |
| Err(DecodeError::Short) => { | |
| // Try to read more data | |
| if self | |
| .stream | |
| .read_buf(&mut self.buffer) | |
| .await | |
| .map_err(|e| Error::Transport(Arc::new(e)))? | |
| .is_none() | |
| { | |
| // Stream closed while we still need more data | |
| return Err(Error::Decode(DecodeError::Short)); | |
| } | |
| } | |
| Err(e) => return Err(Error::Decode(e)), | |
| } | |
| } | |
| } | |
| /// Decode a message without consuming it from the buffer. | |
| /// | |
| /// This method behaves like `decode`, but does not advance the buffer on success. | |
| /// Multiple calls to `decode_peek` will return the same decoded message until | |
| /// the buffer is consumed by calling `decode` or other buffer-consuming methods. | |
| pub async fn decode_peek<T: Decode>(&mut self) -> Result<T, Error> { | |
| loop { | |
| let mut cursor = io::Cursor::new(&self.buffer); | |
| match T::decode(&mut cursor) { | |
| Ok(msg) => return Ok(msg), | |
| Err(DecodeError::Short) => { | |
| // Try to read more data | |
| if self | |
| .stream | |
| .read_buf(&mut self.buffer) | |
| .await | |
| .map_err(|e| Error::Transport(Arc::new(e)))? | |
| .is_none() | |
| { | |
| // Stream closed while we still need more data | |
| return Err(Error::Decode(DecodeError::Short)); | |
| } | |
| } | |
| Err(e) => return Err(Error::Decode(e)), | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
In rs/moq/src/coding/reader.rs around lines 55 to 76, add a doc comment for
decode_peek describing its peek semantics: state that it decodes a message from
the internal buffer without consuming or advancing the buffer, that repeated
calls will return the same decoded message until a consuming decode method is
called, and clarify how it differs from decode (which consumes/advances the
buffer or removes bytes). Keep the doc concise, mention it will attempt to read
more data from the stream when the buffer is incomplete and that stream closure
with insufficient data returns a DecodeError::Short.
| pub fn recv_publish(&mut self, msg: ietf::Publish<'_>) -> Result<(), Error> { | ||
| self.control.send(ietf::PublishError { | ||
| request_id: msg.request_id, | ||
| error_code: 300, | ||
| reason_phrase: "publish not supported bro".into(), | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use professional error message.
The error message "publish not supported bro" at line 364 is unprofessional and should be replaced with a standard error phrase for production code.
Apply this diff to use a professional error message:
pub fn recv_publish(&mut self, msg: ietf::Publish<'_>) -> Result<(), Error> {
self.control.send(ietf::PublishError {
request_id: msg.request_id,
error_code: 300,
- reason_phrase: "publish not supported bro".into(),
+ reason_phrase: "Publish not supported".into(),
})
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub fn recv_publish(&mut self, msg: ietf::Publish<'_>) -> Result<(), Error> { | |
| self.control.send(ietf::PublishError { | |
| request_id: msg.request_id, | |
| error_code: 300, | |
| reason_phrase: "publish not supported bro".into(), | |
| }) | |
| } | |
| pub fn recv_publish(&mut self, msg: ietf::Publish<'_>) -> Result<(), Error> { | |
| self.control.send(ietf::PublishError { | |
| request_id: msg.request_id, | |
| error_code: 300, | |
| reason_phrase: "Publish not supported".into(), | |
| }) | |
| } |
🤖 Prompt for AI Agents
In rs/moq/src/ietf/subscriber.rs around lines 360 to 366, replace the
unprofessional reason_phrase "publish not supported bro" with a standard,
professional message such as "Publish operation not supported" (or similar),
updating the ietf::PublishError construction so reason_phrase reads "Publish
operation not supported". Keep the rest of the fields and call to
self.control.send unchanged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
js/moq/src/ietf/subscribe.ts (2)
32-32: Consider using a named constant or FilterType enum.The hardcoded
0x2magic number reduces code clarity. If a FilterType enum exists elsewhere in the codebase (as suggested by the PR summary), consider importing and using it here.Example refactor:
- await w.u8(0x2); // LargestObject + await w.u8(FilterType.LargestObject);If FilterType doesn't exist yet, define local constants:
+const FILTER_TYPE_LATEST_GROUP = 0x1; +const FILTER_TYPE_LARGEST_OBJECT = 0x2; + export class Subscribe {Then use:
- await w.u8(0x2); // LargestObject + await w.u8(FILTER_TYPE_LARGEST_OBJECT);
60-63: Use named constants for filter type values, following the GROUP_ORDER pattern.The asymmetry between encode (sending
0x2) and decode (accepting0x1and0x2) is intentional—it mirrors the groupOrder handling and allows backward compatibility with different protocol versions. However, the magic numbers should use named constants following the established pattern at line 7:+const FILTER_TYPE_LATEST_GROUP = 0x1; +const FILTER_TYPE_LARGEST_OBJECT = 0x2; + async #encode(w: Writer): Promise<void> { // ... - await w.u8(0x2); // LargestObject + await w.u8(FILTER_TYPE_LARGEST_OBJECT); static async #decode(r: Reader): Promise<Subscribe> { // ... - if (filterType !== 0x1 && filterType !== 0x2) { + if (filterType !== FILTER_TYPE_LATEST_GROUP && filterType !== FILTER_TYPE_LARGEST_OBJECT) {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
js/moq/src/ietf/subscribe.ts(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Check
Summary by CodeRabbit
New Features
Refactor
Stability
Client