From d70c1ebcb28e79691f9fd7a3654cb0e1f9d20c4b Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 16 May 2026 15:24:04 +0000 Subject: [PATCH 1/3] test(moq): add unit tests for vendored OrderedProducer and CatalogConsumer Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- .../src/transport/moq/catalog_consumer.rs | 55 +++++++ .../src/transport/moq/ordered_producer.rs | 137 ++++++++++++++++++ 2 files changed, 192 insertions(+) diff --git a/crates/nodes/src/transport/moq/catalog_consumer.rs b/crates/nodes/src/transport/moq/catalog_consumer.rs index a83fdcd3..328fab42 100644 --- a/crates/nodes/src/transport/moq/catalog_consumer.rs +++ b/crates/nodes/src/transport/moq/catalog_consumer.rs @@ -42,3 +42,58 @@ impl From for CatalogConsumer { Self::new(inner) } } + +#[cfg(test)] +mod tests { + use super::*; + + fn make_track_pair() -> (moq_lite::TrackProducer, moq_lite::TrackConsumer) { + let origin = moq_lite::Origin::random().produce(); + let mut broadcast = origin.create_broadcast("test-broadcast").expect("create_broadcast"); + let track = moq_lite::Track { name: ".catalog".to_string(), priority: 0 }; + let producer = broadcast.create_track(track.clone()).expect("create_track"); + let consumer = origin.consume(); + let bc = consumer.get_broadcast("test-broadcast").expect("get_broadcast"); + let consumer_track = bc.subscribe_track(&track).expect("subscribe_track"); + (producer, consumer_track) + } + + #[tokio::test] + async fn new_initialises_empty_state() { + let (_producer, consumer) = make_track_pair(); + let cc = CatalogConsumer::new(consumer); + assert!(cc.group.is_none()); + } + + #[tokio::test] + async fn from_trait_constructs_consumer() { + let (_producer, consumer) = make_track_pair(); + let cc = CatalogConsumer::from(consumer); + assert!(cc.group.is_none()); + } + + #[tokio::test] + async fn next_returns_none_when_track_finished() { + let (mut producer, consumer) = make_track_pair(); + let mut cc = CatalogConsumer::new(consumer); + producer.finish().expect("finish"); + let result = cc.next().await.expect("next should not error"); + assert!(result.is_none()); + } + + #[tokio::test] + async fn next_returns_catalog_from_written_frame() { + let (mut producer, consumer) = make_track_pair(); + let mut cc = CatalogConsumer::new(consumer); + + let catalog = hang::catalog::Catalog::default(); + let payload = serde_json::to_vec(&catalog).expect("serialize catalog"); + + let mut group = producer.append_group().expect("append_group"); + group.write_frame(bytes::Bytes::from(payload)).expect("write_frame"); + group.finish().expect("finish group"); + + let result = cc.next().await.expect("next should not error"); + assert!(result.is_some(), "should have received a catalog"); + } +} diff --git a/crates/nodes/src/transport/moq/ordered_producer.rs b/crates/nodes/src/transport/moq/ordered_producer.rs index fc988239..34e30894 100644 --- a/crates/nodes/src/transport/moq/ordered_producer.rs +++ b/crates/nodes/src/transport/moq/ordered_producer.rs @@ -120,3 +120,140 @@ impl std::ops::Deref for OrderedProducer { &self.track } } + +#[cfg(test)] +mod tests { + use super::*; + + fn make_track_producer() -> moq_lite::TrackProducer { + let origin = moq_lite::Origin::random().produce(); + let mut broadcast = + origin.create_broadcast("test-broadcast").expect("create_broadcast should succeed"); + broadcast + .create_track(moq_lite::Track { name: "test/track".to_string(), priority: 0 }) + .expect("create_track should succeed") + } + + fn ts(micros: u64) -> Timestamp { + Timestamp::from_micros(micros).unwrap() + } + + fn make_frame(ts_micros: u64) -> Frame { + Frame { timestamp: ts(ts_micros), payload: bytes::Bytes::from_static(b"test-payload") } + } + + #[tokio::test] + async fn new_initialises_empty_state() { + let tp = make_track_producer(); + let op = OrderedProducer::new(tp); + assert!(op.group.is_none()); + assert!(op.group_start.is_none()); + assert_eq!(op.group_frames, 0); + assert!(op.max_group_duration.is_none()); + } + + #[tokio::test] + async fn with_max_group_duration_sets_limit() { + let tp = make_track_producer(); + let op = OrderedProducer::new(tp).with_max_group_duration(ts(1_000_000)); + assert_eq!(op.max_group_duration, Some(ts(1_000_000))); + } + + #[tokio::test] + async fn from_trait_constructs_producer() { + let tp = make_track_producer(); + let op = OrderedProducer::from(tp); + assert!(op.group.is_none()); + } + + #[tokio::test] + async fn deref_exposes_inner_track() { + let tp = make_track_producer(); + let op = OrderedProducer::new(tp); + let _: &moq_lite::TrackProducer = &*op; + } + + #[tokio::test] + async fn write_creates_group_on_first_frame() { + let tp = make_track_producer(); + let mut op = OrderedProducer::new(tp); + assert!(op.group.is_none()); + op.write(&make_frame(0)).expect("write should succeed"); + assert!(op.group.is_some()); + assert_eq!(op.group_frames, 1); + assert_eq!(op.group_start, Some(ts(0))); + } + + #[tokio::test] + async fn write_multiple_frames_increments_counter() { + let tp = make_track_producer(); + let mut op = OrderedProducer::new(tp); + op.write(&make_frame(0)).unwrap(); + op.write(&make_frame(1000)).unwrap(); + op.write(&make_frame(2000)).unwrap(); + assert_eq!(op.group_frames, 3); + } + + #[tokio::test] + async fn keyframe_on_empty_is_noop() { + let tp = make_track_producer(); + let mut op = OrderedProducer::new(tp); + op.keyframe().expect("keyframe on empty should succeed"); + assert!(op.group.is_none()); + } + + #[tokio::test] + async fn keyframe_closes_active_group() { + let tp = make_track_producer(); + let mut op = OrderedProducer::new(tp); + op.write(&make_frame(0)).unwrap(); + assert!(op.group.is_some()); + op.keyframe().expect("keyframe should succeed"); + assert!(op.group.is_none()); + } + + #[tokio::test] + async fn keyframe_then_write_starts_new_group() { + let tp = make_track_producer(); + let mut op = OrderedProducer::new(tp); + op.write(&make_frame(0)).unwrap(); + assert_eq!(op.group_start, Some(ts(0))); + op.keyframe().unwrap(); + op.write(&make_frame(5000)).unwrap(); + assert_eq!(op.group_start, Some(ts(5000))); + assert_eq!(op.group_frames, 1); + } + + #[tokio::test] + async fn max_duration_auto_closes_old_group_and_starts_new() { + let tp = make_track_producer(); + let mut op = OrderedProducer::new(tp).with_max_group_duration(ts(10_000)); + + op.write(&make_frame(0)).unwrap(); + assert!(op.group.is_some()); + assert_eq!(op.group_start, Some(ts(0))); + + // Frame at the max duration boundary closes the old group and + // starts a fresh one for this frame. + op.write(&make_frame(10_000)).unwrap(); + assert!(op.group.is_some()); + assert_eq!(op.group_start, Some(ts(10_000))); + assert_eq!(op.group_frames, 1); + } + + #[tokio::test] + async fn finish_closes_group_and_track() { + let tp = make_track_producer(); + let mut op = OrderedProducer::new(tp); + op.write(&make_frame(0)).unwrap(); + op.finish().expect("finish should succeed"); + assert!(op.group.is_none()); + } + + #[tokio::test] + async fn finish_on_empty_finishes_track() { + let tp = make_track_producer(); + let mut op = OrderedProducer::new(tp); + op.finish().expect("finish on empty should succeed"); + } +} From 663d8ca3d0d016babf0d0010875d5ef3ea3cae48 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 16 May 2026 15:27:54 +0000 Subject: [PATCH 2/3] style: remove redundant test comment per AGENTS.md Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/nodes/src/transport/moq/ordered_producer.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/nodes/src/transport/moq/ordered_producer.rs b/crates/nodes/src/transport/moq/ordered_producer.rs index 34e30894..b8a571c5 100644 --- a/crates/nodes/src/transport/moq/ordered_producer.rs +++ b/crates/nodes/src/transport/moq/ordered_producer.rs @@ -233,8 +233,6 @@ mod tests { assert!(op.group.is_some()); assert_eq!(op.group_start, Some(ts(0))); - // Frame at the max duration boundary closes the old group and - // starts a fresh one for this frame. op.write(&make_frame(10_000)).unwrap(); assert!(op.group.is_some()); assert_eq!(op.group_start, Some(ts(10_000))); From f76d0100582a883119041e084aa494aa6b747700 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 16 May 2026 15:45:49 +0000 Subject: [PATCH 3/3] fix: add clippy test suppressions matching existing crate convention Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/nodes/src/transport/moq/catalog_consumer.rs | 1 + crates/nodes/src/transport/moq/ordered_producer.rs | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/nodes/src/transport/moq/catalog_consumer.rs b/crates/nodes/src/transport/moq/catalog_consumer.rs index 328fab42..87f1c67a 100644 --- a/crates/nodes/src/transport/moq/catalog_consumer.rs +++ b/crates/nodes/src/transport/moq/catalog_consumer.rs @@ -44,6 +44,7 @@ impl From for CatalogConsumer { } #[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] mod tests { use super::*; diff --git a/crates/nodes/src/transport/moq/ordered_producer.rs b/crates/nodes/src/transport/moq/ordered_producer.rs index b8a571c5..78cfc00e 100644 --- a/crates/nodes/src/transport/moq/ordered_producer.rs +++ b/crates/nodes/src/transport/moq/ordered_producer.rs @@ -122,6 +122,7 @@ impl std::ops::Deref for OrderedProducer { } #[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] mod tests { use super::*; @@ -170,7 +171,7 @@ mod tests { async fn deref_exposes_inner_track() { let tp = make_track_producer(); let op = OrderedProducer::new(tp); - let _: &moq_lite::TrackProducer = &*op; + let _: &moq_lite::TrackProducer = &op; } #[tokio::test]