diff --git a/crates/nodes/src/transport/moq/catalog_consumer.rs b/crates/nodes/src/transport/moq/catalog_consumer.rs index a83fdcd3..87f1c67a 100644 --- a/crates/nodes/src/transport/moq/catalog_consumer.rs +++ b/crates/nodes/src/transport/moq/catalog_consumer.rs @@ -42,3 +42,59 @@ impl From for CatalogConsumer { Self::new(inner) } } + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] +mod tests { + use super::*; + + fn make_track_pair() -> (moq_lite::TrackProducer, moq_lite::TrackConsumer) { + let origin = moq_lite::Origin::random().produce(); + let mut broadcast = origin.create_broadcast("test-broadcast").expect("create_broadcast"); + let track = moq_lite::Track { name: ".catalog".to_string(), priority: 0 }; + let producer = broadcast.create_track(track.clone()).expect("create_track"); + let consumer = origin.consume(); + let bc = consumer.get_broadcast("test-broadcast").expect("get_broadcast"); + let consumer_track = bc.subscribe_track(&track).expect("subscribe_track"); + (producer, consumer_track) + } + + #[tokio::test] + async fn new_initialises_empty_state() { + let (_producer, consumer) = make_track_pair(); + let cc = CatalogConsumer::new(consumer); + assert!(cc.group.is_none()); + } + + #[tokio::test] + async fn from_trait_constructs_consumer() { + let (_producer, consumer) = make_track_pair(); + let cc = CatalogConsumer::from(consumer); + assert!(cc.group.is_none()); + } + + #[tokio::test] + async fn next_returns_none_when_track_finished() { + let (mut producer, consumer) = make_track_pair(); + let mut cc = CatalogConsumer::new(consumer); + producer.finish().expect("finish"); + let result = cc.next().await.expect("next should not error"); + assert!(result.is_none()); + } + + #[tokio::test] + async fn next_returns_catalog_from_written_frame() { + let (mut producer, consumer) = make_track_pair(); + let mut cc = CatalogConsumer::new(consumer); + + let catalog = hang::catalog::Catalog::default(); + let payload = serde_json::to_vec(&catalog).expect("serialize catalog"); + + let mut group = producer.append_group().expect("append_group"); + group.write_frame(bytes::Bytes::from(payload)).expect("write_frame"); + group.finish().expect("finish group"); + + let result = cc.next().await.expect("next should not error"); + assert!(result.is_some(), "should have received a catalog"); + } +} diff --git a/crates/nodes/src/transport/moq/ordered_producer.rs b/crates/nodes/src/transport/moq/ordered_producer.rs index fc988239..78cfc00e 100644 --- a/crates/nodes/src/transport/moq/ordered_producer.rs +++ b/crates/nodes/src/transport/moq/ordered_producer.rs @@ -120,3 +120,139 @@ impl std::ops::Deref for OrderedProducer { &self.track } } + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] +mod tests { + use super::*; + + fn make_track_producer() -> moq_lite::TrackProducer { + let origin = moq_lite::Origin::random().produce(); + let mut broadcast = + origin.create_broadcast("test-broadcast").expect("create_broadcast should succeed"); + broadcast + .create_track(moq_lite::Track { name: "test/track".to_string(), priority: 0 }) + .expect("create_track should succeed") + } + + fn ts(micros: u64) -> Timestamp { + Timestamp::from_micros(micros).unwrap() + } + + fn make_frame(ts_micros: u64) -> Frame { + Frame { timestamp: ts(ts_micros), payload: bytes::Bytes::from_static(b"test-payload") } + } + + #[tokio::test] + async fn new_initialises_empty_state() { + let tp = make_track_producer(); + let op = OrderedProducer::new(tp); + assert!(op.group.is_none()); + assert!(op.group_start.is_none()); + assert_eq!(op.group_frames, 0); + assert!(op.max_group_duration.is_none()); + } + + #[tokio::test] + async fn with_max_group_duration_sets_limit() { + let tp = make_track_producer(); + let op = OrderedProducer::new(tp).with_max_group_duration(ts(1_000_000)); + assert_eq!(op.max_group_duration, Some(ts(1_000_000))); + } + + #[tokio::test] + async fn from_trait_constructs_producer() { + let tp = make_track_producer(); + let op = OrderedProducer::from(tp); + assert!(op.group.is_none()); + } + + #[tokio::test] + async fn deref_exposes_inner_track() { + let tp = make_track_producer(); + let op = OrderedProducer::new(tp); + let _: &moq_lite::TrackProducer = &op; + } + + #[tokio::test] + async fn write_creates_group_on_first_frame() { + let tp = make_track_producer(); + let mut op = OrderedProducer::new(tp); + assert!(op.group.is_none()); + op.write(&make_frame(0)).expect("write should succeed"); + assert!(op.group.is_some()); + assert_eq!(op.group_frames, 1); + assert_eq!(op.group_start, Some(ts(0))); + } + + #[tokio::test] + async fn write_multiple_frames_increments_counter() { + let tp = make_track_producer(); + let mut op = OrderedProducer::new(tp); + op.write(&make_frame(0)).unwrap(); + op.write(&make_frame(1000)).unwrap(); + op.write(&make_frame(2000)).unwrap(); + assert_eq!(op.group_frames, 3); + } + + #[tokio::test] + async fn keyframe_on_empty_is_noop() { + let tp = make_track_producer(); + let mut op = OrderedProducer::new(tp); + op.keyframe().expect("keyframe on empty should succeed"); + assert!(op.group.is_none()); + } + + #[tokio::test] + async fn keyframe_closes_active_group() { + let tp = make_track_producer(); + let mut op = OrderedProducer::new(tp); + op.write(&make_frame(0)).unwrap(); + assert!(op.group.is_some()); + op.keyframe().expect("keyframe should succeed"); + assert!(op.group.is_none()); + } + + #[tokio::test] + async fn keyframe_then_write_starts_new_group() { + let tp = make_track_producer(); + let mut op = OrderedProducer::new(tp); + op.write(&make_frame(0)).unwrap(); + assert_eq!(op.group_start, Some(ts(0))); + op.keyframe().unwrap(); + op.write(&make_frame(5000)).unwrap(); + assert_eq!(op.group_start, Some(ts(5000))); + assert_eq!(op.group_frames, 1); + } + + #[tokio::test] + async fn max_duration_auto_closes_old_group_and_starts_new() { + let tp = make_track_producer(); + let mut op = OrderedProducer::new(tp).with_max_group_duration(ts(10_000)); + + op.write(&make_frame(0)).unwrap(); + assert!(op.group.is_some()); + assert_eq!(op.group_start, Some(ts(0))); + + op.write(&make_frame(10_000)).unwrap(); + assert!(op.group.is_some()); + assert_eq!(op.group_start, Some(ts(10_000))); + assert_eq!(op.group_frames, 1); + } + + #[tokio::test] + async fn finish_closes_group_and_track() { + let tp = make_track_producer(); + let mut op = OrderedProducer::new(tp); + op.write(&make_frame(0)).unwrap(); + op.finish().expect("finish should succeed"); + assert!(op.group.is_none()); + } + + #[tokio::test] + async fn finish_on_empty_finishes_track() { + let tp = make_track_producer(); + let mut op = OrderedProducer::new(tp); + op.finish().expect("finish on empty should succeed"); + } +}