Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions crates/nodes/src/transport/moq/catalog_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,59 @@ impl From<moq_lite::TrackConsumer> for CatalogConsumer {
Self::new(inner)
}
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;

fn make_track_pair() -> (moq_lite::TrackProducer, moq_lite::TrackConsumer) {
let origin = moq_lite::Origin::random().produce();
let mut broadcast = origin.create_broadcast("test-broadcast").expect("create_broadcast");
let track = moq_lite::Track { name: ".catalog".to_string(), priority: 0 };
let producer = broadcast.create_track(track.clone()).expect("create_track");
let consumer = origin.consume();
let bc = consumer.get_broadcast("test-broadcast").expect("get_broadcast");
let consumer_track = bc.subscribe_track(&track).expect("subscribe_track");
(producer, consumer_track)
}

#[tokio::test]
async fn new_initialises_empty_state() {
let (_producer, consumer) = make_track_pair();
let cc = CatalogConsumer::new(consumer);
assert!(cc.group.is_none());
}

#[tokio::test]
async fn from_trait_constructs_consumer() {
let (_producer, consumer) = make_track_pair();
let cc = CatalogConsumer::from(consumer);
assert!(cc.group.is_none());
}

#[tokio::test]
async fn next_returns_none_when_track_finished() {
let (mut producer, consumer) = make_track_pair();
let mut cc = CatalogConsumer::new(consumer);
producer.finish().expect("finish");
let result = cc.next().await.expect("next should not error");
assert!(result.is_none());
}

#[tokio::test]
async fn next_returns_catalog_from_written_frame() {
let (mut producer, consumer) = make_track_pair();
let mut cc = CatalogConsumer::new(consumer);

let catalog = hang::catalog::Catalog::default();
let payload = serde_json::to_vec(&catalog).expect("serialize catalog");

let mut group = producer.append_group().expect("append_group");
group.write_frame(bytes::Bytes::from(payload)).expect("write_frame");
group.finish().expect("finish group");

let result = cc.next().await.expect("next should not error");
assert!(result.is_some(), "should have received a catalog");
}
}
136 changes: 136 additions & 0 deletions crates/nodes/src/transport/moq/ordered_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,139 @@ impl std::ops::Deref for OrderedProducer {
&self.track
}
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;

fn make_track_producer() -> moq_lite::TrackProducer {
let origin = moq_lite::Origin::random().produce();
let mut broadcast =
origin.create_broadcast("test-broadcast").expect("create_broadcast should succeed");
broadcast
.create_track(moq_lite::Track { name: "test/track".to_string(), priority: 0 })
.expect("create_track should succeed")
}

fn ts(micros: u64) -> Timestamp {
Timestamp::from_micros(micros).unwrap()
}

fn make_frame(ts_micros: u64) -> Frame {
Frame { timestamp: ts(ts_micros), payload: bytes::Bytes::from_static(b"test-payload") }
}

#[tokio::test]
async fn new_initialises_empty_state() {
let tp = make_track_producer();
let op = OrderedProducer::new(tp);
assert!(op.group.is_none());
assert!(op.group_start.is_none());
assert_eq!(op.group_frames, 0);
assert!(op.max_group_duration.is_none());
}

#[tokio::test]
async fn with_max_group_duration_sets_limit() {
let tp = make_track_producer();
let op = OrderedProducer::new(tp).with_max_group_duration(ts(1_000_000));
assert_eq!(op.max_group_duration, Some(ts(1_000_000)));
}

#[tokio::test]
async fn from_trait_constructs_producer() {
let tp = make_track_producer();
let op = OrderedProducer::from(tp);
assert!(op.group.is_none());
}

#[tokio::test]
async fn deref_exposes_inner_track() {
let tp = make_track_producer();
let op = OrderedProducer::new(tp);
let _: &moq_lite::TrackProducer = &op;
}

#[tokio::test]
async fn write_creates_group_on_first_frame() {
let tp = make_track_producer();
let mut op = OrderedProducer::new(tp);
assert!(op.group.is_none());
op.write(&make_frame(0)).expect("write should succeed");
assert!(op.group.is_some());
assert_eq!(op.group_frames, 1);
assert_eq!(op.group_start, Some(ts(0)));
}

#[tokio::test]
async fn write_multiple_frames_increments_counter() {
let tp = make_track_producer();
let mut op = OrderedProducer::new(tp);
op.write(&make_frame(0)).unwrap();
op.write(&make_frame(1000)).unwrap();
op.write(&make_frame(2000)).unwrap();
assert_eq!(op.group_frames, 3);
}

#[tokio::test]
async fn keyframe_on_empty_is_noop() {
let tp = make_track_producer();
let mut op = OrderedProducer::new(tp);
op.keyframe().expect("keyframe on empty should succeed");
assert!(op.group.is_none());
}

#[tokio::test]
async fn keyframe_closes_active_group() {
let tp = make_track_producer();
let mut op = OrderedProducer::new(tp);
op.write(&make_frame(0)).unwrap();
assert!(op.group.is_some());
op.keyframe().expect("keyframe should succeed");
assert!(op.group.is_none());
}

#[tokio::test]
async fn keyframe_then_write_starts_new_group() {
let tp = make_track_producer();
let mut op = OrderedProducer::new(tp);
op.write(&make_frame(0)).unwrap();
assert_eq!(op.group_start, Some(ts(0)));
op.keyframe().unwrap();
op.write(&make_frame(5000)).unwrap();
assert_eq!(op.group_start, Some(ts(5000)));
assert_eq!(op.group_frames, 1);
}

#[tokio::test]
async fn max_duration_auto_closes_old_group_and_starts_new() {
let tp = make_track_producer();
let mut op = OrderedProducer::new(tp).with_max_group_duration(ts(10_000));

op.write(&make_frame(0)).unwrap();
assert!(op.group.is_some());
assert_eq!(op.group_start, Some(ts(0)));

op.write(&make_frame(10_000)).unwrap();
assert!(op.group.is_some());
assert_eq!(op.group_start, Some(ts(10_000)));
assert_eq!(op.group_frames, 1);
}

#[tokio::test]
async fn finish_closes_group_and_track() {
let tp = make_track_producer();
let mut op = OrderedProducer::new(tp);
op.write(&make_frame(0)).unwrap();
op.finish().expect("finish should succeed");
assert!(op.group.is_none());
}

#[tokio::test]
async fn finish_on_empty_finishes_track() {
let tp = make_track_producer();
let mut op = OrderedProducer::new(tp);
op.finish().expect("finish on empty should succeed");
}
}
Loading