diff --git a/crates/nodes/src/core/bytes_input.rs b/crates/nodes/src/core/bytes_input.rs index 36057c97..a95b36d4 100644 --- a/crates/nodes/src/core/bytes_input.rs +++ b/crates/nodes/src/core/bytes_input.rs @@ -121,3 +121,102 @@ impl ProcessorNode for BytesInputNode { Ok(()) } } + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] // Test assertions use unwrap/expect to fail loudly. +mod tests { + use super::*; + use crate::test_utils::{ + assert_state_initializing, assert_state_running, assert_state_stopped, create_test_context, + }; + use std::collections::HashMap; + use streamkit_core::ProcessorNode; + use tokio::sync::mpsc; + + #[test] + fn new_single_stream_output_pins() { + let (_tx, rx) = mpsc::channel(10); + let node = BytesInputNode::new("body", rx, Some("audio/wav".to_string())); + let pins = node.output_pins(); + assert_eq!(pins.len(), 1); + assert_eq!(pins[0].name, "body"); + assert_eq!(pins[0].produces_type, PacketType::Any); + assert_eq!(pins[0].cardinality, PinCardinality::Broadcast); + } + + #[test] + fn new_has_no_input_pins() { + let (_tx, rx) = mpsc::channel(10); + let node = BytesInputNode::new("out", rx, None); + assert!(node.input_pins().is_empty()); + } + + #[test] + fn with_streams_multi_output_pins() { + let (_tx1, rx1) = mpsc::channel(10); + let (_tx2, rx2) = mpsc::channel(10); + let node = BytesInputNode::with_streams(vec![ + ("audio".to_string(), rx1, Some("audio/wav".to_string())), + ("video".to_string(), rx2, None), + ]); + let pins = node.output_pins(); + assert_eq!(pins.len(), 2); + assert_eq!(pins[0].name, "audio"); + assert_eq!(pins[1].name, "video"); + } + + #[tokio::test] + async fn run_sends_binary_packets_with_content_type() { + let (stream_tx, stream_rx) = mpsc::channel(10); + let node = BytesInputNode::new("out", stream_rx, Some("audio/wav".to_string())); + + let (mut context, mock_sender, mut state_rx) = create_test_context(HashMap::new(), 10); + context.cancellation_token = Some(tokio_util::sync::CancellationToken::new()); + let handle = tokio::spawn(async move { Box::new(node).run(context).await }); + + assert_state_initializing(&mut state_rx).await; + assert_state_running(&mut state_rx).await; + + stream_tx.send(Bytes::from_static(b"chunk1")).await.unwrap(); + stream_tx.send(Bytes::from_static(b"chunk2")).await.unwrap(); + drop(stream_tx); + + assert_state_stopped(&mut state_rx).await; + handle.await.unwrap().unwrap(); + + let packets = mock_sender.get_packets_for_pin("out").await; + assert_eq!(packets.len(), 2); + + for (i, expected) in [b"chunk1".as_slice(), b"chunk2"].iter().enumerate() { + match &packets[i] { + Packet::Binary { data, content_type, .. } => { + assert_eq!(data.as_ref(), *expected); + assert_eq!(content_type.as_deref(), Some("audio/wav")); + }, + other => panic!("Expected Binary, got {other:?}"), + } + } + } + + #[tokio::test] + async fn run_cancellation_stops_node() { + let (stream_tx, stream_rx) = mpsc::channel(10); + let node = BytesInputNode::new("out", stream_rx, None); + + let (mut context, _mock_sender, mut state_rx) = create_test_context(HashMap::new(), 10); + let token = tokio_util::sync::CancellationToken::new(); + context.cancellation_token = Some(token.clone()); + + let handle = tokio::spawn(async move { Box::new(node).run(context).await }); + + assert_state_initializing(&mut state_rx).await; + assert_state_running(&mut state_rx).await; + + token.cancel(); + + assert_state_stopped(&mut state_rx).await; + handle.await.unwrap().unwrap(); + + drop(stream_tx); + } +} diff --git a/crates/nodes/src/core/bytes_output.rs b/crates/nodes/src/core/bytes_output.rs index e5f3ce69..28ee0c48 100644 --- a/crates/nodes/src/core/bytes_output.rs +++ b/crates/nodes/src/core/bytes_output.rs @@ -103,3 +103,137 @@ impl ProcessorNode for BytesOutputNode { Ok(()) } } + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] // Test assertions use unwrap/expect to fail loudly. +mod tests { + use super::*; + use crate::test_utils::{ + assert_state_initializing, assert_state_running, assert_state_stopped, create_test_context, + }; + use std::borrow::Cow; + use std::collections::HashMap; + use tokio::sync::mpsc; + + #[test] + fn new_returns_no_configured_content_type() { + let (tx, _rx) = mpsc::channel(10); + let node = BytesOutputNode::new(tx); + assert!(node.configured_content_type().is_none()); + } + + #[test] + fn new_with_config_parses_content_type() { + let (tx, _rx) = mpsc::channel(10); + let params = serde_json::json!({"content_type": "audio/wav"}); + let node = BytesOutputNode::new_with_config(tx, Some(¶ms)).unwrap(); + assert_eq!(node.configured_content_type().as_deref(), Some("audio/wav")); + } + + #[test] + fn new_with_config_default_has_no_content_type() { + let (tx, _rx) = mpsc::channel(10); + let node = BytesOutputNode::new_with_config(tx, None).unwrap(); + assert!(node.configured_content_type().is_none()); + } + + #[test] + fn new_with_config_ignores_unknown_fields() { + let (tx, _rx) = mpsc::channel(10); + let params = serde_json::json!({"unknown": 42}); + let node = BytesOutputNode::new_with_config(tx, Some(¶ms)).unwrap(); + assert!(node.configured_content_type().is_none()); + } + + #[test] + fn input_pins_shape() { + let (tx, _rx) = mpsc::channel(10); + let node = BytesOutputNode::new(tx); + let pins = node.input_pins(); + assert_eq!(pins.len(), 1); + assert_eq!(pins[0].name, "in"); + assert_eq!(pins[0].accepts_types, vec![PacketType::Binary]); + assert_eq!(pins[0].cardinality, PinCardinality::One); + } + + #[test] + fn output_pins_empty() { + let (tx, _rx) = mpsc::channel(10); + let node = BytesOutputNode::new(tx); + assert!(node.output_pins().is_empty()); + } + + #[tokio::test] + async fn run_forwards_binary_data_to_result_tx() { + let (result_tx, mut result_rx) = mpsc::channel(10); + let node = BytesOutputNode::new(result_tx); + + let (input_tx, input_rx) = mpsc::channel(10); + let mut inputs = HashMap::new(); + inputs.insert("in".to_string(), input_rx); + let (context, _mock_sender, mut state_rx) = create_test_context(inputs, 10); + + let handle = tokio::spawn(async move { Box::new(node).run(context).await }); + + assert_state_initializing(&mut state_rx).await; + assert_state_running(&mut state_rx).await; + + let data1 = Bytes::from_static(b"hello"); + let data2 = Bytes::from_static(b"world"); + input_tx + .send(Packet::Binary { + data: data1.clone(), + content_type: Some(Cow::Borrowed("text/plain")), + metadata: None, + }) + .await + .unwrap(); + input_tx + .send(Packet::Binary { data: data2.clone(), content_type: None, metadata: None }) + .await + .unwrap(); + + drop(input_tx); + assert_state_stopped(&mut state_rx).await; + handle.await.unwrap().unwrap(); + + let r1 = result_rx.recv().await.unwrap(); + let r2 = result_rx.recv().await.unwrap(); + assert_eq!(r1, data1); + assert_eq!(r2, data2); + } + + #[tokio::test] + async fn run_receiver_closed_triggers_cancellation() { + let (result_tx, result_rx) = mpsc::channel(1); + let node = BytesOutputNode::new(result_tx); + + let (input_tx, input_rx) = mpsc::channel(10); + let mut inputs = HashMap::new(); + inputs.insert("in".to_string(), input_rx); + let (mut context, _mock_sender, mut state_rx) = create_test_context(inputs, 10); + let token = tokio_util::sync::CancellationToken::new(); + context.cancellation_token = Some(token.clone()); + + let handle = tokio::spawn(async move { Box::new(node).run(context).await }); + + assert_state_initializing(&mut state_rx).await; + assert_state_running(&mut state_rx).await; + + drop(result_rx); + + input_tx + .send(Packet::Binary { + data: Bytes::from_static(b"data"), + content_type: None, + metadata: None, + }) + .await + .unwrap(); + + assert_state_stopped(&mut state_rx).await; + handle.await.unwrap().unwrap(); + + assert!(token.is_cancelled()); + } +} diff --git a/crates/nodes/src/core/json_serialize.rs b/crates/nodes/src/core/json_serialize.rs index b7df11e5..cd037ab6 100644 --- a/crates/nodes/src/core/json_serialize.rs +++ b/crates/nodes/src/core/json_serialize.rs @@ -106,3 +106,175 @@ impl ProcessorNode for JsonSerialize { Ok(()) } } + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] // Test assertions use unwrap/expect to fail loudly. +mod tests { + use super::*; + use crate::test_utils::{assert_state_running, assert_state_stopped, create_test_context}; + use std::collections::HashMap; + use streamkit_core::types::PacketType; + use tokio::sync::mpsc; + + #[test] + fn new_default_config() { + let node = JsonSerialize::new(None).unwrap(); + assert!(!node.pretty); + assert!(!node.newline_delimited); + } + + #[test] + fn new_pretty_enabled() { + let params = serde_json::json!({"pretty": true}); + let node = JsonSerialize::new(Some(¶ms)).unwrap(); + assert!(node.pretty); + assert!(!node.newline_delimited); + } + + #[test] + fn new_newline_delimited_enabled() { + let params = serde_json::json!({"newline_delimited": true}); + let node = JsonSerialize::new(Some(¶ms)).unwrap(); + assert!(!node.pretty); + assert!(node.newline_delimited); + } + + #[test] + fn new_ignores_unknown_fields_and_uses_defaults() { + let params = serde_json::json!({"unknown_field": 42}); + let node = JsonSerialize::new(Some(¶ms)).unwrap(); + assert!(!node.pretty); + assert!(!node.newline_delimited); + } + + #[test] + fn input_pins_shape() { + let pins = JsonSerialize::input_pins(); + assert_eq!(pins.len(), 1); + assert_eq!(pins[0].name, "in"); + assert_eq!(pins[0].accepts_types, vec![PacketType::Any]); + assert_eq!(pins[0].cardinality, PinCardinality::One); + } + + #[test] + fn output_pins_shape() { + let pins = JsonSerialize::output_pins(); + assert_eq!(pins.len(), 1); + assert_eq!(pins[0].name, "out"); + assert_eq!(pins[0].produces_type, PacketType::Binary); + assert_eq!(pins[0].cardinality, PinCardinality::Broadcast); + } + + #[tokio::test] + async fn run_serializes_text_to_json_binary() { + let (input_tx, input_rx) = mpsc::channel(10); + let mut inputs = HashMap::new(); + inputs.insert("in".to_string(), input_rx); + let (context, mock_sender, mut state_rx) = create_test_context(inputs, 10); + + let node = JsonSerialize::new(None).unwrap(); + let handle = tokio::spawn(async move { Box::new(node).run(context).await }); + + assert_state_running(&mut state_rx).await; + + input_tx.send(Packet::Text("hello".into())).await.unwrap(); + drop(input_tx); + + assert_state_stopped(&mut state_rx).await; + handle.await.unwrap().unwrap(); + + let packets = mock_sender.get_packets_for_pin("out").await; + assert_eq!(packets.len(), 1); + + match &packets[0] { + Packet::Binary { data, content_type, .. } => { + assert_eq!(content_type.as_deref(), Some("application/json")); + let parsed: serde_json::Value = serde_json::from_slice(data).unwrap(); + assert!(parsed.get("Text").is_some()); + }, + other => panic!("Expected Binary packet, got {other:?}"), + } + } + + #[tokio::test] + async fn run_pretty_output_contains_indentation() { + let (input_tx, input_rx) = mpsc::channel(10); + let mut inputs = HashMap::new(); + inputs.insert("in".to_string(), input_rx); + let (context, mock_sender, mut state_rx) = create_test_context(inputs, 10); + + let params = serde_json::json!({"pretty": true}); + let node = JsonSerialize::new(Some(¶ms)).unwrap(); + let handle = tokio::spawn(async move { Box::new(node).run(context).await }); + + assert_state_running(&mut state_rx).await; + + input_tx.send(Packet::Text("hi".into())).await.unwrap(); + drop(input_tx); + + assert_state_stopped(&mut state_rx).await; + handle.await.unwrap().unwrap(); + + let packets = mock_sender.get_packets_for_pin("out").await; + let data = match &packets[0] { + Packet::Binary { data, .. } => data, + other => panic!("Expected Binary, got {other:?}"), + }; + let text = std::str::from_utf8(data).unwrap(); + assert!(text.contains('\n'), "pretty output should contain newlines"); + assert!(text.contains(" "), "pretty output should contain indentation"); + } + + #[tokio::test] + async fn run_newline_delimited_appends_newline() { + let (input_tx, input_rx) = mpsc::channel(10); + let mut inputs = HashMap::new(); + inputs.insert("in".to_string(), input_rx); + let (context, mock_sender, mut state_rx) = create_test_context(inputs, 10); + + let params = serde_json::json!({"newline_delimited": true}); + let node = JsonSerialize::new(Some(¶ms)).unwrap(); + let handle = tokio::spawn(async move { Box::new(node).run(context).await }); + + assert_state_running(&mut state_rx).await; + + input_tx.send(Packet::Text("x".into())).await.unwrap(); + drop(input_tx); + + assert_state_stopped(&mut state_rx).await; + handle.await.unwrap().unwrap(); + + let packets = mock_sender.get_packets_for_pin("out").await; + let data = match &packets[0] { + Packet::Binary { data, .. } => data, + other => panic!("Expected Binary, got {other:?}"), + }; + assert_eq!(*data.last().unwrap(), b'\n'); + } + + #[tokio::test] + async fn run_compact_output_has_no_trailing_newline() { + let (input_tx, input_rx) = mpsc::channel(10); + let mut inputs = HashMap::new(); + inputs.insert("in".to_string(), input_rx); + let (context, mock_sender, mut state_rx) = create_test_context(inputs, 10); + + let node = JsonSerialize::new(None).unwrap(); + let handle = tokio::spawn(async move { Box::new(node).run(context).await }); + + assert_state_running(&mut state_rx).await; + + input_tx.send(Packet::Text("x".into())).await.unwrap(); + drop(input_tx); + + assert_state_stopped(&mut state_rx).await; + handle.await.unwrap().unwrap(); + + let packets = mock_sender.get_packets_for_pin("out").await; + let data = match &packets[0] { + Packet::Binary { data, .. } => data, + other => panic!("Expected Binary, got {other:?}"), + }; + assert_ne!(*data.last().unwrap(), b'\n'); + } +} diff --git a/crates/nodes/src/core/sink.rs b/crates/nodes/src/core/sink.rs index d91c634e..bf501be5 100644 --- a/crates/nodes/src/core/sink.rs +++ b/crates/nodes/src/core/sink.rs @@ -70,3 +70,79 @@ pub fn register(registry: &mut streamkit_core::NodeRegistry) { (e.g., telemetry taps) without affecting the main pipeline.", ); } + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] // Test assertions use unwrap/expect to fail loudly. +mod tests { + use super::*; + use crate::test_utils::{ + assert_state_running, assert_state_stopped, create_test_binary_packet, create_test_context, + }; + use std::collections::HashMap; + use streamkit_core::types::Packet; + use tokio::sync::mpsc; + + #[test] + fn new_default_config() { + let node = SinkNode::new(None).unwrap(); + assert_eq!(format!("{node:?}"), "SinkNode"); + } + + #[test] + fn new_ignores_unknown_fields() { + let node = SinkNode::new(Some(&serde_json::json!({"unknown": true}))).unwrap(); + assert_eq!(format!("{node:?}"), "SinkNode"); + } + + #[test] + fn input_pins_shape() { + let pins = SinkNode::input_pins(); + assert_eq!(pins.len(), 1); + assert_eq!(pins[0].name, "in"); + assert_eq!(pins[0].accepts_types, vec![PacketType::Any]); + assert_eq!(pins[0].cardinality, PinCardinality::One); + } + + #[test] + fn output_pins_empty() { + let node = SinkNode::new(None).unwrap(); + assert!(node.output_pins().is_empty()); + } + + #[tokio::test] + async fn run_consumes_packets_and_stops() { + let (input_tx, input_rx) = mpsc::channel(10); + let mut inputs = HashMap::new(); + inputs.insert("in".to_string(), input_rx); + let (context, _mock_sender, mut state_rx) = create_test_context(inputs, 10); + + let node = SinkNode::new(None).unwrap(); + let handle = tokio::spawn(async move { Box::new(node).run(context).await }); + + assert_state_running(&mut state_rx).await; + + input_tx.send(Packet::Text("a".into())).await.unwrap(); + input_tx.send(create_test_binary_packet(vec![1, 2])).await.unwrap(); + + drop(input_tx); + assert_state_stopped(&mut state_rx).await; + handle.await.unwrap().unwrap(); + } + + #[tokio::test] + async fn run_stops_immediately_on_closed_input() { + let (input_tx, input_rx) = mpsc::channel::(10); + let mut inputs = HashMap::new(); + inputs.insert("in".to_string(), input_rx); + let (context, _mock_sender, mut state_rx) = create_test_context(inputs, 10); + + drop(input_tx); + + let node = SinkNode::new(None).unwrap(); + let handle = tokio::spawn(async move { Box::new(node).run(context).await }); + + assert_state_running(&mut state_rx).await; + assert_state_stopped(&mut state_rx).await; + handle.await.unwrap().unwrap(); + } +} diff --git a/crates/nodes/src/streaming_utils.rs b/crates/nodes/src/streaming_utils.rs index b6014cc7..9ee6583c 100644 --- a/crates/nodes/src/streaming_utils.rs +++ b/crates/nodes/src/streaming_utils.rs @@ -63,3 +63,146 @@ impl Read for StreamingReader { } } } + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] // Test assertions use unwrap/expect to fail loudly. +mod tests { + use super::*; + use std::io::Read; + use tokio::sync::mpsc; + + #[tokio::test] + async fn read_single_chunk() { + let (tx, rx) = mpsc::channel(10); + tx.send(Bytes::from_static(b"hello")).await.unwrap(); + drop(tx); + + let result = tokio::task::spawn_blocking(move || { + let mut reader = StreamingReader::new(rx); + let mut buf = vec![0u8; 1024]; + let n = reader.read(&mut buf).unwrap(); + buf.truncate(n); + buf + }) + .await + .unwrap(); + + assert_eq!(result, b"hello"); + } + + #[tokio::test] + async fn read_multiple_chunks() { + let (tx, rx) = mpsc::channel(10); + tx.send(Bytes::from_static(b"hello")).await.unwrap(); + tx.send(Bytes::from_static(b" world")).await.unwrap(); + drop(tx); + + let result = tokio::task::spawn_blocking(move || { + let mut reader = StreamingReader::new(rx); + let mut all = Vec::new(); + let mut buf = [0u8; 1024]; + loop { + let n = reader.read(&mut buf).unwrap(); + if n == 0 { + break; + } + all.extend_from_slice(&buf[..n]); + } + all + }) + .await + .unwrap(); + + assert_eq!(result, b"hello world"); + } + + #[tokio::test] + async fn read_partial_small_buffer() { + let (tx, rx) = mpsc::channel(10); + tx.send(Bytes::from_static(b"abcdefgh")).await.unwrap(); + drop(tx); + + let result = tokio::task::spawn_blocking(move || { + let mut reader = StreamingReader::new(rx); + let mut buf = [0u8; 3]; + + let n1 = reader.read(&mut buf).unwrap(); + let part1 = buf[..n1].to_vec(); + + let n2 = reader.read(&mut buf).unwrap(); + let part2 = buf[..n2].to_vec(); + + let n3 = reader.read(&mut buf).unwrap(); + let part3 = buf[..n3].to_vec(); + + (part1, part2, part3) + }) + .await + .unwrap(); + + assert_eq!(result.0, b"abc"); + assert_eq!(result.1, b"def"); + assert_eq!(result.2, b"gh"); + } + + #[tokio::test] + async fn read_skips_empty_chunks() { + let (tx, rx) = mpsc::channel(10); + tx.send(Bytes::new()).await.unwrap(); + tx.send(Bytes::from_static(b"data")).await.unwrap(); + tx.send(Bytes::new()).await.unwrap(); + drop(tx); + + let result = tokio::task::spawn_blocking(move || { + let mut reader = StreamingReader::new(rx); + let mut buf = vec![0u8; 1024]; + let n = reader.read(&mut buf).unwrap(); + buf.truncate(n); + buf + }) + .await + .unwrap(); + + assert_eq!(result, b"data"); + } + + #[tokio::test] + async fn read_channel_close_returns_eof() { + let (tx, rx) = mpsc::channel::(10); + drop(tx); + + let result = tokio::task::spawn_blocking(move || { + let mut reader = StreamingReader::new(rx); + let mut buf = [0u8; 64]; + reader.read(&mut buf).unwrap() + }) + .await + .unwrap(); + + assert_eq!(result, 0); + } + + #[tokio::test] + async fn read_after_eof_returns_zero() { + let (tx, rx) = mpsc::channel(10); + tx.send(Bytes::from_static(b"x")).await.unwrap(); + drop(tx); + + let result = tokio::task::spawn_blocking(move || { + let mut reader = StreamingReader::new(rx); + let mut buf = [0u8; 64]; + + let n1 = reader.read(&mut buf).unwrap(); + assert_eq!(n1, 1); + + let n2 = reader.read(&mut buf).unwrap(); + assert_eq!(n2, 0); + + reader.read(&mut buf).unwrap() + }) + .await + .unwrap(); + + assert_eq!(result, 0); + } +}