Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions crates/nodes/src/core/bytes_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,102 @@ impl ProcessorNode for BytesInputNode {
Ok(())
}
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)] // Test assertions use unwrap/expect to fail loudly.
mod tests {
use super::*;
use crate::test_utils::{
assert_state_initializing, assert_state_running, assert_state_stopped, create_test_context,
};
use std::collections::HashMap;
use streamkit_core::ProcessorNode;
use tokio::sync::mpsc;

#[test]
fn new_single_stream_output_pins() {
let (_tx, rx) = mpsc::channel(10);
let node = BytesInputNode::new("body", rx, Some("audio/wav".to_string()));
let pins = node.output_pins();
assert_eq!(pins.len(), 1);
assert_eq!(pins[0].name, "body");
assert_eq!(pins[0].produces_type, PacketType::Any);
assert_eq!(pins[0].cardinality, PinCardinality::Broadcast);
}

#[test]
fn new_has_no_input_pins() {
let (_tx, rx) = mpsc::channel(10);
let node = BytesInputNode::new("out", rx, None);
assert!(node.input_pins().is_empty());
}

#[test]
fn with_streams_multi_output_pins() {
let (_tx1, rx1) = mpsc::channel(10);
let (_tx2, rx2) = mpsc::channel(10);
let node = BytesInputNode::with_streams(vec![
("audio".to_string(), rx1, Some("audio/wav".to_string())),
("video".to_string(), rx2, None),
]);
let pins = node.output_pins();
assert_eq!(pins.len(), 2);
assert_eq!(pins[0].name, "audio");
assert_eq!(pins[1].name, "video");
}

#[tokio::test]
async fn run_sends_binary_packets_with_content_type() {
let (stream_tx, stream_rx) = mpsc::channel(10);
let node = BytesInputNode::new("out", stream_rx, Some("audio/wav".to_string()));

let (mut context, mock_sender, mut state_rx) = create_test_context(HashMap::new(), 10);
context.cancellation_token = Some(tokio_util::sync::CancellationToken::new());
let handle = tokio::spawn(async move { Box::new(node).run(context).await });

assert_state_initializing(&mut state_rx).await;
assert_state_running(&mut state_rx).await;

stream_tx.send(Bytes::from_static(b"chunk1")).await.unwrap();
stream_tx.send(Bytes::from_static(b"chunk2")).await.unwrap();
drop(stream_tx);

assert_state_stopped(&mut state_rx).await;
handle.await.unwrap().unwrap();

let packets = mock_sender.get_packets_for_pin("out").await;
assert_eq!(packets.len(), 2);

for (i, expected) in [b"chunk1".as_slice(), b"chunk2"].iter().enumerate() {
match &packets[i] {
Packet::Binary { data, content_type, .. } => {
assert_eq!(data.as_ref(), *expected);
assert_eq!(content_type.as_deref(), Some("audio/wav"));
},
other => panic!("Expected Binary, got {other:?}"),
}
}
}

#[tokio::test]
async fn run_cancellation_stops_node() {
let (stream_tx, stream_rx) = mpsc::channel(10);
let node = BytesInputNode::new("out", stream_rx, None);

let (mut context, _mock_sender, mut state_rx) = create_test_context(HashMap::new(), 10);
let token = tokio_util::sync::CancellationToken::new();
context.cancellation_token = Some(token.clone());

let handle = tokio::spawn(async move { Box::new(node).run(context).await });

assert_state_initializing(&mut state_rx).await;
assert_state_running(&mut state_rx).await;

token.cancel();

assert_state_stopped(&mut state_rx).await;
handle.await.unwrap().unwrap();

drop(stream_tx);
}
}
134 changes: 134 additions & 0 deletions crates/nodes/src/core/bytes_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,137 @@ impl ProcessorNode for BytesOutputNode {
Ok(())
}
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)] // Test assertions use unwrap/expect to fail loudly.
mod tests {
use super::*;
use crate::test_utils::{
assert_state_initializing, assert_state_running, assert_state_stopped, create_test_context,
};
use std::borrow::Cow;
use std::collections::HashMap;
use tokio::sync::mpsc;

#[test]
fn new_returns_no_configured_content_type() {
let (tx, _rx) = mpsc::channel(10);
let node = BytesOutputNode::new(tx);
assert!(node.configured_content_type().is_none());
}

#[test]
fn new_with_config_parses_content_type() {
let (tx, _rx) = mpsc::channel(10);
let params = serde_json::json!({"content_type": "audio/wav"});
let node = BytesOutputNode::new_with_config(tx, Some(&params)).unwrap();
assert_eq!(node.configured_content_type().as_deref(), Some("audio/wav"));
}

#[test]
fn new_with_config_default_has_no_content_type() {
let (tx, _rx) = mpsc::channel(10);
let node = BytesOutputNode::new_with_config(tx, None).unwrap();
assert!(node.configured_content_type().is_none());
}

#[test]
fn new_with_config_ignores_unknown_fields() {
let (tx, _rx) = mpsc::channel(10);
let params = serde_json::json!({"unknown": 42});
let node = BytesOutputNode::new_with_config(tx, Some(&params)).unwrap();
assert!(node.configured_content_type().is_none());
}
Comment on lines +140 to +146
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Info: Unknown-field config tests document fallback behavior, not strict validation

These newly added tests may look surprising because the config structs use #[serde(deny_unknown_fields)], but BytesOutputNode::new_with_config calls config_helpers::parse_config_optional, whose implementation returns unwrap_or_default() on any deserialization failure (crates/core/src/helpers.rs:25-30). That means an unknown field causes the entire optional config to fall back to defaults instead of returning an error, so the test expectation matches existing shared helper semantics rather than being a new functional bug. If strict unknown-field rejection is desired for these nodes, the root behavior is the shared helper/constructor contract rather than these tests alone.

Open in Devin Review (Staging)

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground


#[test]
fn input_pins_shape() {
let (tx, _rx) = mpsc::channel(10);
let node = BytesOutputNode::new(tx);
let pins = node.input_pins();
assert_eq!(pins.len(), 1);
assert_eq!(pins[0].name, "in");
assert_eq!(pins[0].accepts_types, vec![PacketType::Binary]);
assert_eq!(pins[0].cardinality, PinCardinality::One);
}

#[test]
fn output_pins_empty() {
let (tx, _rx) = mpsc::channel(10);
let node = BytesOutputNode::new(tx);
assert!(node.output_pins().is_empty());
}

#[tokio::test]
async fn run_forwards_binary_data_to_result_tx() {
let (result_tx, mut result_rx) = mpsc::channel(10);
let node = BytesOutputNode::new(result_tx);

let (input_tx, input_rx) = mpsc::channel(10);
let mut inputs = HashMap::new();
inputs.insert("in".to_string(), input_rx);
let (context, _mock_sender, mut state_rx) = create_test_context(inputs, 10);

let handle = tokio::spawn(async move { Box::new(node).run(context).await });

assert_state_initializing(&mut state_rx).await;
assert_state_running(&mut state_rx).await;

let data1 = Bytes::from_static(b"hello");
let data2 = Bytes::from_static(b"world");
input_tx
.send(Packet::Binary {
data: data1.clone(),
content_type: Some(Cow::Borrowed("text/plain")),
metadata: None,
})
.await
.unwrap();
input_tx
.send(Packet::Binary { data: data2.clone(), content_type: None, metadata: None })
.await
.unwrap();

drop(input_tx);
assert_state_stopped(&mut state_rx).await;
handle.await.unwrap().unwrap();

let r1 = result_rx.recv().await.unwrap();
let r2 = result_rx.recv().await.unwrap();
assert_eq!(r1, data1);
assert_eq!(r2, data2);
}

#[tokio::test]
async fn run_receiver_closed_triggers_cancellation() {
let (result_tx, result_rx) = mpsc::channel(1);
let node = BytesOutputNode::new(result_tx);

let (input_tx, input_rx) = mpsc::channel(10);
let mut inputs = HashMap::new();
inputs.insert("in".to_string(), input_rx);
let (mut context, _mock_sender, mut state_rx) = create_test_context(inputs, 10);
let token = tokio_util::sync::CancellationToken::new();
context.cancellation_token = Some(token.clone());

let handle = tokio::spawn(async move { Box::new(node).run(context).await });

assert_state_initializing(&mut state_rx).await;
assert_state_running(&mut state_rx).await;

drop(result_rx);

input_tx
.send(Packet::Binary {
data: Bytes::from_static(b"data"),
content_type: None,
metadata: None,
})
.await
.unwrap();

assert_state_stopped(&mut state_rx).await;
handle.await.unwrap().unwrap();

assert!(token.is_cancelled());
}
}
Loading
Loading