diff --git a/crates/core/src/control.rs b/crates/core/src/control.rs index fa67d20a..652067b4 100644 --- a/crates/core/src/control.rs +++ b/crates/core/src/control.rs @@ -147,3 +147,128 @@ pub enum EngineControlMessage { }, Shutdown, } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn node_control_update_params_serialization_roundtrip() { + let msg = NodeControlMessage::UpdateParams(serde_json::json!({"gain": 0.5})); + let json = serde_json::to_string(&msg).unwrap(); + let deserialized: NodeControlMessage = serde_json::from_str(&json).unwrap(); + match deserialized { + NodeControlMessage::UpdateParams(v) => { + assert_eq!(v["gain"], 0.5); + }, + _ => panic!("expected UpdateParams"), + } + } + + #[test] + fn node_control_start_serialization_roundtrip() { + let msg = NodeControlMessage::Start; + let json = serde_json::to_string(&msg).unwrap(); + let deserialized: NodeControlMessage = serde_json::from_str(&json).unwrap(); + assert!(matches!(deserialized, NodeControlMessage::Start)); + } + + #[test] + fn node_control_shutdown_serialization_roundtrip() { + let msg = NodeControlMessage::Shutdown; + let json = serde_json::to_string(&msg).unwrap(); + let deserialized: NodeControlMessage = serde_json::from_str(&json).unwrap(); + assert!(matches!(deserialized, NodeControlMessage::Shutdown)); + } + + #[test] + fn connection_mode_default_is_reliable() { + assert_eq!(ConnectionMode::default(), ConnectionMode::Reliable); + } + + #[test] + fn connection_mode_serialization_roundtrip() { + for mode in [ConnectionMode::Reliable, ConnectionMode::BestEffort] { + let json = serde_json::to_string(&mode).unwrap(); + let deserialized: ConnectionMode = serde_json::from_str(&json).unwrap(); + assert_eq!(mode, deserialized); + } + } + + #[test] + fn connection_mode_serde_uses_snake_case() { + let json = serde_json::to_string(&ConnectionMode::BestEffort).unwrap(); + assert_eq!(json, "\"best_effort\""); + } + + #[test] + fn engine_control_add_node_debug_snapshot() { + let msg = EngineControlMessage::AddNode { + node_id: "node1".into(), + kind: "gain".into(), + params: Some(serde_json::json!({"gain": 1.0})), + }; + let dbg = format!("{msg:?}"); + assert!(dbg.contains("AddNode"), "Debug must name the variant"); + assert!(dbg.contains("node1")); + assert!(dbg.contains("gain")); + } + + #[test] + fn engine_control_remove_node_debug_snapshot() { + let msg = EngineControlMessage::RemoveNode { node_id: "node1".into() }; + let dbg = format!("{msg:?}"); + assert!(dbg.contains("RemoveNode")); + assert!(dbg.contains("node1")); + } + + #[test] + fn engine_control_connect_debug_snapshot() { + let msg = EngineControlMessage::Connect { + from_node: "src".into(), + from_pin: "audio_out".into(), + to_node: "dst".into(), + to_pin: "audio_in".into(), + mode: ConnectionMode::BestEffort, + }; + let dbg = format!("{msg:?}"); + assert!(dbg.contains("Connect")); + assert!(dbg.contains("src")); + assert!(dbg.contains("audio_out")); + assert!(dbg.contains("dst")); + assert!(dbg.contains("audio_in")); + assert!(dbg.contains("BestEffort")); + } + + #[test] + fn engine_control_disconnect_debug_snapshot() { + let msg = EngineControlMessage::Disconnect { + from_node: "src".into(), + from_pin: "out".into(), + to_node: "dst".into(), + to_pin: "in".into(), + }; + let dbg = format!("{msg:?}"); + assert!(dbg.contains("Disconnect")); + assert!(dbg.contains("src")); + assert!(dbg.contains("dst")); + } + + #[test] + fn engine_control_tune_node_debug_snapshot() { + let msg = EngineControlMessage::TuneNode { + node_id: "node1".into(), + message: NodeControlMessage::UpdateParams(serde_json::json!({"rate": 44100})), + }; + let dbg = format!("{msg:?}"); + assert!(dbg.contains("TuneNode")); + assert!(dbg.contains("node1")); + assert!(dbg.contains("UpdateParams")); + } + + #[test] + fn engine_control_shutdown_debug_snapshot() { + let msg = EngineControlMessage::Shutdown; + assert_eq!(format!("{msg:?}"), "Shutdown"); + } +} diff --git a/crates/core/src/helpers.rs b/crates/core/src/helpers.rs index 820ac6e7..35d44ac2 100644 --- a/crates/core/src/helpers.rs +++ b/crates/core/src/helpers.rs @@ -116,3 +116,130 @@ pub mod packet_helpers { batch } } + +#[cfg(test)] +mod tests { + use super::*; + use serde::Deserialize; + + #[derive(Debug, Deserialize, Default, PartialEq)] + struct TestConfig { + #[serde(default)] + gain: f32, + #[serde(default)] + channels: u16, + } + + #[test] + fn parse_config_optional_with_valid_json() { + let params = serde_json::json!({"gain": 0.5, "channels": 2}); + let cfg: TestConfig = config_helpers::parse_config_optional(Some(¶ms)).unwrap(); + assert_eq!(cfg.gain, 0.5); + assert_eq!(cfg.channels, 2); + } + + #[test] + fn parse_config_optional_with_none_returns_default() { + let cfg: TestConfig = config_helpers::parse_config_optional(None).unwrap(); + assert_eq!(cfg, TestConfig::default()); + } + + #[test] + fn parse_config_optional_with_partial_json_fills_defaults() { + let params = serde_json::json!({"gain": 1.5}); + let cfg: TestConfig = config_helpers::parse_config_optional(Some(¶ms)).unwrap(); + assert_eq!(cfg.gain, 1.5); + assert_eq!(cfg.channels, 0); + } + + #[test] + fn parse_config_required_with_valid_json() { + let params = serde_json::json!({"gain": 2.0, "channels": 1}); + let cfg: TestConfig = config_helpers::parse_config_required(Some(¶ms)).unwrap(); + assert_eq!(cfg.gain, 2.0); + assert_eq!(cfg.channels, 1); + } + + #[test] + fn parse_config_required_with_none_returns_error() { + let result = config_helpers::parse_config_required::(None); + assert!(result.is_err()); + let err_str = result.unwrap_err().to_string(); + assert!(err_str.contains("Configuration"), "expected Configuration error, got: {err_str}"); + } + + #[test] + fn parse_config_required_with_invalid_type_returns_error() { + let params = serde_json::json!({"gain": "not_a_number"}); + let result = config_helpers::parse_config_required::(Some(¶ms)); + assert!(result.is_err()); + } + + #[test] + fn parse_config_with_context_missing_params() { + let result = config_helpers::parse_config_with_context::(None, "AudioGain"); + assert!(result.is_err()); + let err_str = result.unwrap_err().to_string(); + assert!(err_str.contains("AudioGain")); + } + + #[test] + fn parse_config_with_context_invalid_json() { + let params = serde_json::json!("just a string"); + let result = + config_helpers::parse_config_with_context::(Some(¶ms), "AudioGain"); + assert!(result.is_err()); + let err_str = result.unwrap_err().to_string(); + assert!(err_str.contains("AudioGain")); + } + + #[test] + fn parse_config_optional_with_invalid_type_falls_back_to_default() { + let params = serde_json::json!({"gain": "not_a_number"}); + let cfg: TestConfig = config_helpers::parse_config_optional(Some(¶ms)).unwrap(); + assert_eq!(cfg, TestConfig::default()); + } + + #[test] + fn parse_config_with_context_valid_json() { + let params = serde_json::json!({"gain": 3.0, "channels": 4}); + let cfg: TestConfig = + config_helpers::parse_config_with_context(Some(¶ms), "AudioGain").unwrap(); + assert_eq!(cfg.gain, 3.0); + assert_eq!(cfg.channels, 4); + } + + #[test] + fn batch_packets_greedy_drains_one_extra_packet() { + let (tx, mut rx) = tokio::sync::mpsc::channel(16); + let first = Packet::Text(std::sync::Arc::from("hello")); + tx.try_send(Packet::Text(std::sync::Arc::from("world"))).unwrap(); + let batch = packet_helpers::batch_packets_greedy(first, &mut rx, 4); + assert_eq!(batch.len(), 2); + } + + #[test] + fn batch_packets_greedy_empty_channel() { + let (_tx, mut rx) = tokio::sync::mpsc::channel::(16); + let first = Packet::Text(std::sync::Arc::from("only")); + let batch = packet_helpers::batch_packets_greedy(first, &mut rx, 8); + assert_eq!(batch.len(), 1); + } + + #[test] + fn batch_packets_greedy_respects_batch_size() { + let (tx, mut rx) = tokio::sync::mpsc::channel(16); + for i in 0..10 { + tx.try_send(Packet::Text(std::sync::Arc::from(format!("{i}")))).unwrap(); + } + let first = Packet::Text(std::sync::Arc::from("first")); + let batch = packet_helpers::batch_packets_greedy(first, &mut rx, 3); + assert_eq!(batch.len(), 3); + } + + #[test] + fn default_batch_capacity_is_reasonable() { + const { assert!(packet_helpers::DEFAULT_BATCH_CAPACITY >= 8) }; + const { assert!(packet_helpers::DEFAULT_BATCH_CAPACITY <= 128) }; + } +} diff --git a/crates/core/src/metrics.rs b/crates/core/src/metrics.rs index a4a865c6..ffcc2320 100644 --- a/crates/core/src/metrics.rs +++ b/crates/core/src/metrics.rs @@ -65,3 +65,58 @@ pub const HISTOGRAM_BOUNDARIES_SESSION_DURATION: &[f64] = /// minor jitter from severe overruns that cause A/V desync. pub const HISTOGRAM_BOUNDARIES_FRAME_OVERRUN: &[f64] = &[0.001, 0.005, 0.01, 0.02, 0.033, 0.05, 0.1, 0.5, 1.0]; + +#[cfg(test)] +mod tests { + use super::*; + + fn assert_sorted_and_positive(name: &str, boundaries: &[f64]) { + assert!(!boundaries.is_empty(), "{name} must not be empty"); + assert!( + boundaries[0] > 0.0, + "{name} first boundary must be positive, got {}", + boundaries[0] + ); + for window in boundaries.windows(2) { + assert!( + window[0] < window[1], + "{name} boundaries must be strictly ascending: {} >= {}", + window[0], + window[1] + ); + } + } + + #[test] + fn all_boundary_arrays_sorted_and_positive() { + let arrays: &[(&str, &[f64])] = &[ + ("CODEC_PACKET", HISTOGRAM_BOUNDARIES_CODEC_PACKET), + ("FILE_OPERATION", HISTOGRAM_BOUNDARIES_FILE_OPERATION), + ("NODE_EXECUTION", HISTOGRAM_BOUNDARIES_NODE_EXECUTION), + ("BACKPRESSURE", HISTOGRAM_BOUNDARIES_BACKPRESSURE), + ("PACER_LATENESS", HISTOGRAM_BOUNDARIES_PACER_LATENESS), + ("CLOCK_OFFSET_MS", HISTOGRAM_BOUNDARIES_CLOCK_OFFSET_MS), + ("FRAME_GAP_MS", HISTOGRAM_BOUNDARIES_FRAME_GAP_MS), + ("PIPELINE_DURATION", HISTOGRAM_BOUNDARIES_PIPELINE_DURATION), + ("HTTP_DURATION", HISTOGRAM_BOUNDARIES_HTTP_DURATION), + ("SESSION_DURATION", HISTOGRAM_BOUNDARIES_SESSION_DURATION), + ("FRAME_OVERRUN", HISTOGRAM_BOUNDARIES_FRAME_OVERRUN), + ]; + for (name, arr) in arrays { + assert_sorted_and_positive(name, arr); + } + } + + #[test] + fn codec_packet_covers_sub_millisecond_to_second() { + let b = HISTOGRAM_BOUNDARIES_CODEC_PACKET; + assert!(b[0] <= 0.0001, "should start at sub-millisecond range"); + assert!(*b.last().unwrap() >= 1.0, "should reach at least 1 second"); + } + + #[test] + fn session_duration_covers_hours() { + let b = HISTOGRAM_BOUNDARIES_SESSION_DURATION; + assert!(*b.last().unwrap() >= 86400.0, "should cover up to 24 hours"); + } +} diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index e94849a2..1650c5b6 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -508,3 +508,203 @@ pub type NodeFactory = Arc< /// Given parameters, returns a deterministic hash string used as part of the ResourceKey. /// Plugins should hash only the parameters that affect resource initialization (e.g., model path, GPU settings). pub type ResourceKeyHasher = Arc) -> String + Send + Sync>; + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + use tokio::sync::mpsc; + + #[test] + fn pipeline_mode_default_is_dynamic() { + assert_eq!(PipelineMode::default(), PipelineMode::Dynamic); + } + + #[test] + fn pipeline_mode_variants_are_distinct() { + assert_ne!(PipelineMode::Dynamic, PipelineMode::Oneshot); + } + + #[test] + fn output_sender_node_name() { + let routing = OutputRouting::Direct(HashMap::new()); + let sender = OutputSender::new("test_node".into(), routing); + assert_eq!(sender.node_name(), "test_node"); + } + + #[test] + fn output_sender_try_send_pin_not_found() { + let routing = OutputRouting::Direct(HashMap::new()); + let mut sender = OutputSender::new("node_a".into(), routing); + let packet = Packet::Text(Arc::from("hello")); + let err = sender.try_send("missing_pin", packet).unwrap_err(); + assert!(matches!(err, OutputSendError::PinNotFound { .. })); + assert!(err.to_string().contains("missing_pin")); + assert!(err.to_string().contains("node_a")); + } + + #[test] + fn output_sender_try_send_direct_success() { + let (tx, mut rx) = mpsc::channel(4); + let mut senders = HashMap::new(); + senders.insert("out".to_string(), tx); + let routing = OutputRouting::Direct(senders); + let mut sender = OutputSender::new("node_a".into(), routing); + + let packet = Packet::Text(Arc::from("hello")); + sender.try_send("out", packet).unwrap(); + + let received = rx.try_recv().unwrap(); + assert!(matches!(received, Packet::Text(ref s) if &**s == "hello")); + } + + #[test] + fn output_sender_try_send_direct_channel_full() { + let (tx, _rx) = mpsc::channel(1); + let mut senders = HashMap::new(); + senders.insert("out".to_string(), tx); + let routing = OutputRouting::Direct(senders); + let mut sender = OutputSender::new("node_a".into(), routing); + + sender.try_send("out", Packet::Text(Arc::from("1"))).unwrap(); + let err = sender.try_send("out", Packet::Text(Arc::from("2"))).unwrap_err(); + assert!(matches!(err, OutputSendError::ChannelFull { .. })); + } + + #[test] + fn output_sender_try_send_direct_channel_closed() { + let (tx, rx) = mpsc::channel(4); + let mut senders = HashMap::new(); + senders.insert("out".to_string(), tx); + let routing = OutputRouting::Direct(senders); + let mut sender = OutputSender::new("node_a".into(), routing); + drop(rx); + + let err = sender.try_send("out", Packet::Text(Arc::from("x"))).unwrap_err(); + assert!(matches!(err, OutputSendError::ChannelClosed { .. })); + } + + #[test] + fn output_sender_try_send_routed_success() { + let (engine_tx, mut engine_rx) = mpsc::channel(4); + let routing = OutputRouting::Routed(engine_tx); + let mut sender = OutputSender::new("source".into(), routing); + + sender.try_send("video_out", Packet::Text(Arc::from("frame"))).unwrap(); + + let (node_name, pin_name, packet) = engine_rx.try_recv().unwrap(); + assert_eq!(&*node_name, "source"); + assert_eq!(&*pin_name, "video_out"); + assert!(matches!(packet, Packet::Text(ref s) if &**s == "frame")); + } + + #[test] + fn output_sender_try_send_routed_closed() { + let (engine_tx, rx) = mpsc::channel(4); + let routing = OutputRouting::Routed(engine_tx); + let mut sender = OutputSender::new("source".into(), routing); + drop(rx); + + let err = sender.try_send("out", Packet::Text(Arc::from("x"))).unwrap_err(); + assert!(matches!(err, OutputSendError::ChannelClosed { .. })); + } + + #[test] + fn output_send_error_display_messages() { + let err = + OutputSendError::PinNotFound { node_name: "mynode".into(), pin_name: "mypin".into() }; + let msg = err.to_string(); + assert!(msg.contains("'mypin'"), "should quote pin name: {msg}"); + assert!(msg.contains("'mynode'"), "should quote node name: {msg}"); + + let err = OutputSendError::ChannelClosed { node_name: "n".into(), pin_name: "p".into() }; + assert!(err.to_string().contains("closed")); + + let err = OutputSendError::ChannelFull { node_name: "n".into(), pin_name: "p".into() }; + assert!(err.to_string().contains("full")); + } + + #[test] + fn output_send_error_clone_and_eq() { + let err = OutputSendError::PinNotFound { node_name: "n".into(), pin_name: "p".into() }; + let cloned = err.clone(); + assert_eq!(err, cloned); + } + + #[test] + fn init_context_field_access() { + let (state_tx, _rx) = mpsc::channel(4); + let ctx = InitContext { node_id: "my_node".into(), state_tx }; + assert_eq!(ctx.node_id, "my_node"); + } + + #[test] + fn output_sender_pin_name_caching() { + let (engine_tx, mut engine_rx) = mpsc::channel(16); + let routing = OutputRouting::Routed(engine_tx); + let mut sender = OutputSender::new("node".into(), routing); + + sender.try_send("pin_a", Packet::Text(Arc::from("1"))).unwrap(); + sender.try_send("pin_a", Packet::Text(Arc::from("2"))).unwrap(); + + let (_, pin1, _) = engine_rx.try_recv().unwrap(); + let (_, pin2, _) = engine_rx.try_recv().unwrap(); + assert!(Arc::ptr_eq(&pin1, &pin2)); + } + + #[tokio::test] + async fn output_sender_send_direct_success() { + let (tx, mut rx) = mpsc::channel(4); + let mut senders = HashMap::new(); + senders.insert("out".to_string(), tx); + let routing = OutputRouting::Direct(senders); + let mut sender = OutputSender::new("node".into(), routing); + + sender.send("out", Packet::Text(Arc::from("async_pkt"))).await.unwrap(); + let received = rx.try_recv().unwrap(); + assert!(matches!(received, Packet::Text(ref s) if &**s == "async_pkt")); + } + + #[tokio::test] + async fn output_sender_send_direct_pin_not_found() { + let routing = OutputRouting::Direct(HashMap::new()); + let mut sender = OutputSender::new("node".into(), routing); + let err = sender.send("nope", Packet::Text(Arc::from("x"))).await.unwrap_err(); + assert!(matches!(err, OutputSendError::PinNotFound { .. })); + } + + #[tokio::test] + async fn output_sender_send_direct_channel_closed() { + let (tx, rx) = mpsc::channel(4); + let mut senders = HashMap::new(); + senders.insert("out".to_string(), tx); + let routing = OutputRouting::Direct(senders); + let mut sender = OutputSender::new("node".into(), routing); + drop(rx); + let err = sender.send("out", Packet::Text(Arc::from("x"))).await.unwrap_err(); + assert!(matches!(err, OutputSendError::ChannelClosed { .. })); + } + + #[tokio::test] + async fn output_sender_send_routed_success() { + let (engine_tx, mut engine_rx) = mpsc::channel(4); + let routing = OutputRouting::Routed(engine_tx); + let mut sender = OutputSender::new("src".into(), routing); + + sender.send("out", Packet::Text(Arc::from("routed_pkt"))).await.unwrap(); + let (node_name, pin_name, packet) = engine_rx.try_recv().unwrap(); + assert_eq!(&*node_name, "src"); + assert_eq!(&*pin_name, "out"); + assert!(matches!(packet, Packet::Text(ref s) if &**s == "routed_pkt")); + } + + #[tokio::test] + async fn output_sender_send_routed_closed() { + let (engine_tx, rx) = mpsc::channel(4); + let routing = OutputRouting::Routed(engine_tx); + let mut sender = OutputSender::new("src".into(), routing); + drop(rx); + let err = sender.send("out", Packet::Text(Arc::from("x"))).await.unwrap_err(); + assert!(matches!(err, OutputSendError::ChannelClosed { .. })); + } +} diff --git a/crates/core/src/pins.rs b/crates/core/src/pins.rs index c0742940..4a602e33 100644 --- a/crates/core/src/pins.rs +++ b/crates/core/src/pins.rs @@ -152,3 +152,115 @@ pub enum PinManagementMessage { /// hints back to the source. AttachHintSender { pin_name: String, hint_tx: tokio::sync::mpsc::Sender }, } + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::{AudioFormat, PacketType, SampleFormat}; + + #[test] + fn input_pin_construction_and_clone() { + let pin = InputPin { + name: "audio_in".into(), + accepts_types: vec![PacketType::Any], + cardinality: PinCardinality::One, + }; + let cloned = pin.clone(); + assert_eq!(cloned.name, "audio_in"); + assert_eq!(cloned.cardinality, PinCardinality::One); + } + + #[test] + fn output_pin_construction_and_clone() { + let pin = OutputPin { + name: "audio_out".into(), + produces_type: PacketType::RawAudio(AudioFormat { + sample_rate: 48000, + channels: 2, + sample_format: SampleFormat::F32, + }), + cardinality: PinCardinality::Broadcast, + }; + let cloned = pin.clone(); + assert_eq!(cloned.name, "audio_out"); + assert_eq!(cloned.cardinality, PinCardinality::Broadcast); + assert_eq!(cloned.produces_type, pin.produces_type); + } + + #[test] + fn pin_cardinality_equality() { + assert_eq!(PinCardinality::One, PinCardinality::One); + assert_eq!(PinCardinality::Broadcast, PinCardinality::Broadcast); + assert_eq!( + PinCardinality::Dynamic { prefix: "in".into() }, + PinCardinality::Dynamic { prefix: "in".into() } + ); + assert_ne!(PinCardinality::One, PinCardinality::Broadcast); + assert_ne!( + PinCardinality::Dynamic { prefix: "in".into() }, + PinCardinality::Dynamic { prefix: "out".into() } + ); + } + + #[test] + fn dynamic_pin_match_exact_prefix() { + assert!(PinCardinality::is_dynamic_pin_match("in", "in")); + } + + #[test] + fn dynamic_pin_match_with_suffix() { + assert!(PinCardinality::is_dynamic_pin_match("in", "in_0")); + assert!(PinCardinality::is_dynamic_pin_match("in", "in_foo")); + } + + #[test] + fn dynamic_pin_no_match_partial_prefix() { + assert!(!PinCardinality::is_dynamic_pin_match("in", "inside")); + assert!(!PinCardinality::is_dynamic_pin_match("in", "internal")); + } + + #[test] + fn dynamic_pin_no_match_unrelated() { + assert!(!PinCardinality::is_dynamic_pin_match("in", "out")); + assert!(!PinCardinality::is_dynamic_pin_match("in", "output_0")); + } + + #[test] + fn pin_cardinality_serialization_roundtrip() { + let variants = vec![ + PinCardinality::One, + PinCardinality::Broadcast, + PinCardinality::Dynamic { prefix: "layer".into() }, + ]; + for v in variants { + let json = serde_json::to_string(&v).unwrap(); + let deserialized: PinCardinality = serde_json::from_str(&json).unwrap(); + assert_eq!(v, deserialized); + } + } + + #[test] + fn pin_update_no_change_variant() { + let update = PinUpdate::NoChange; + assert!(matches!(update, PinUpdate::NoChange)); + } + + #[test] + fn pin_update_updated_variant() { + let update = PinUpdate::Updated { + inputs: vec![InputPin { + name: "in".into(), + accepts_types: vec![PacketType::Text], + cardinality: PinCardinality::One, + }], + outputs: vec![], + }; + match update { + PinUpdate::Updated { inputs, outputs } => { + assert_eq!(inputs.len(), 1); + assert!(outputs.is_empty()); + }, + PinUpdate::NoChange => panic!("expected Updated"), + } + } +} diff --git a/crates/core/src/registry.rs b/crates/core/src/registry.rs index a42ce462..cd778af1 100644 --- a/crates/core/src/registry.rs +++ b/crates/core/src/registry.rs @@ -443,3 +443,394 @@ impl NodeRegistry { self.info.contains_key(name) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::node::NodeContext; + use crate::pins::{InputPin, OutputPin, PinCardinality}; + use crate::resource_manager::ResourcePolicy; + use crate::types::PacketType; + + struct StubNode; + + #[crate::async_trait] + impl ProcessorNode for StubNode { + fn input_pins(&self) -> Vec { + vec![InputPin { + name: "in".into(), + accepts_types: vec![PacketType::Any], + cardinality: PinCardinality::One, + }] + } + fn output_pins(&self) -> Vec { + vec![OutputPin { + name: "out".into(), + produces_type: PacketType::Text, + cardinality: PinCardinality::One, + }] + } + async fn run(self: Box, _ctx: NodeContext) -> Result<(), StreamKitError> { + Ok(()) + } + } + + fn stub_factory( + _params: Option<&serde_json::Value>, + ) -> Result, StreamKitError> { + Ok(Box::new(StubNode)) + } + + fn stub_pins() -> StaticPins { + StaticPins { + inputs: vec![InputPin { + name: "in".into(), + accepts_types: vec![PacketType::Any], + cardinality: PinCardinality::One, + }], + outputs: vec![OutputPin { + name: "out".into(), + produces_type: PacketType::Text, + cardinality: PinCardinality::One, + }], + } + } + + #[test] + fn new_registry_is_empty() { + let reg = NodeRegistry::new(); + assert!(reg.definitions().is_empty()); + } + + #[test] + fn register_static_and_list_definitions() { + let mut reg = NodeRegistry::new(); + reg.register_static( + "stub", + stub_factory, + serde_json::json!({}), + stub_pins(), + vec!["test".into()], + false, + ); + let defs = reg.definitions(); + assert_eq!(defs.len(), 1); + assert_eq!(defs[0].kind, "stub"); + assert!(!defs[0].bidirectional); + assert_eq!(defs[0].categories, vec!["test"]); + assert!(defs[0].description.is_none()); + } + + #[test] + fn register_static_with_description() { + let mut reg = NodeRegistry::new(); + reg.register_static_with_description( + "described", + stub_factory, + serde_json::json!({}), + stub_pins(), + vec![], + true, + "A test node", + ); + let def = reg.get_definition("described").unwrap(); + assert_eq!(def.description.as_deref(), Some("A test node")); + assert!(def.bidirectional); + } + + #[test] + fn register_dynamic_and_list_definitions() { + let mut reg = NodeRegistry::new(); + reg.register_dynamic( + "dyn_stub", + stub_factory, + serde_json::json!({}), + vec!["dynamic".into()], + false, + ); + let defs = reg.definitions(); + assert_eq!(defs.len(), 1); + assert_eq!(defs[0].kind, "dyn_stub"); + assert_eq!(defs[0].inputs.len(), 1); + assert_eq!(defs[0].outputs.len(), 1); + } + + #[test] + fn register_dynamic_with_description() { + let mut reg = NodeRegistry::new(); + reg.register_dynamic_with_description( + "dyn_desc", + stub_factory, + serde_json::json!({}), + vec![], + false, + "Dynamic described", + ); + let def = reg.get_definition("dyn_desc").unwrap(); + assert_eq!(def.description.as_deref(), Some("Dynamic described")); + } + + #[test] + fn create_node_success() { + let mut reg = NodeRegistry::new(); + reg.register_static( + "stub", + stub_factory, + serde_json::json!({}), + stub_pins(), + vec![], + false, + ); + let node = reg.create_node("stub", None).unwrap(); + assert_eq!(node.input_pins().len(), 1); + assert_eq!(node.output_pins().len(), 1); + } + + #[test] + fn create_node_unknown_kind() { + let reg = NodeRegistry::new(); + let result = reg.create_node("nonexistent", None); + assert!(result.is_err()); + let err = result.err().unwrap(); + assert!(err.to_string().contains("not found")); + } + + #[test] + fn create_node_factory_error() { + let mut reg = NodeRegistry::new(); + reg.register_static( + "fail", + |_| Err(StreamKitError::Configuration("bad params".into())), + serde_json::json!({}), + stub_pins(), + vec![], + false, + ); + let result = reg.create_node("fail", None); + assert!(result.is_err()); + let err = result.err().unwrap(); + assert!(err.to_string().contains("bad params")); + } + + #[test] + fn get_definition_existing() { + let mut reg = NodeRegistry::new(); + reg.register_static( + "stub", + stub_factory, + serde_json::json!({}), + stub_pins(), + vec!["cat".into()], + false, + ); + let def = reg.get_definition("stub").unwrap(); + assert_eq!(def.kind, "stub"); + assert_eq!(def.categories, vec!["cat"]); + } + + #[test] + fn get_definition_missing() { + let reg = NodeRegistry::new(); + assert!(reg.get_definition("nope").is_none()); + } + + #[test] + fn contains_and_unregister() { + let mut reg = NodeRegistry::new(); + reg.register_static( + "stub", + stub_factory, + serde_json::json!({}), + stub_pins(), + vec![], + false, + ); + assert!(reg.contains("stub")); + assert!(!reg.contains("other")); + + assert!(reg.unregister("stub")); + assert!(!reg.contains("stub")); + assert!(!reg.unregister("stub")); + } + + #[test] + fn duplicate_registration_overwrites() { + let mut reg = NodeRegistry::new(); + reg.register_static( + "stub", + stub_factory, + serde_json::json!({}), + stub_pins(), + vec!["first".into()], + false, + ); + reg.register_static( + "stub", + stub_factory, + serde_json::json!({}), + stub_pins(), + vec!["second".into()], + true, + ); + let defs = reg.definitions(); + assert_eq!(defs.len(), 1); + assert_eq!(defs[0].categories, vec!["second"]); + assert!(defs[0].bidirectional); + } + + #[test] + fn node_definition_serialization_roundtrip() { + let def = NodeDefinition { + kind: "test".into(), + description: Some("desc".into()), + param_schema: serde_json::json!({"type": "object"}), + inputs: vec![InputPin { + name: "in".into(), + accepts_types: vec![PacketType::Text], + cardinality: PinCardinality::One, + }], + outputs: vec![], + categories: vec!["audio".into(), "filters".into()], + bidirectional: false, + }; + let json = serde_json::to_string(&def).unwrap(); + let deserialized: NodeDefinition = serde_json::from_str(&json).unwrap(); + assert_eq!(deserialized.kind, "test"); + assert_eq!(deserialized.description.as_deref(), Some("desc")); + assert_eq!(deserialized.categories.len(), 2); + } + + #[test] + fn with_resource_manager() { + let rm = Arc::new(ResourceManager::new(ResourcePolicy::default())); + let reg = NodeRegistry::with_resource_manager(rm); + assert!(reg.definitions().is_empty()); + } + + #[test] + fn set_resource_manager_on_existing_registry() { + let mut reg = NodeRegistry::new(); + reg.register_static( + "plain", + stub_factory, + serde_json::json!({}), + stub_pins(), + vec![], + false, + ); + let rm = Arc::new(ResourceManager::new(ResourcePolicy::default())); + reg.set_resource_manager(rm); + reg.register_static_with_resource( + "res_node", + stub_factory, + stub_resource_factory(), + stub_key_hasher(), + serde_json::json!({}), + stub_pins(), + vec![], + false, + ); + assert!(reg.contains("plain")); + assert!(reg.contains("res_node")); + } + + struct StubResource; + impl crate::resource_manager::Resource for StubResource { + fn size_bytes(&self) -> usize { + 64 + } + fn resource_type(&self) -> &str { + "test" + } + } + + fn stub_resource_factory() -> AsyncResourceFactory { + Arc::new(|_params| { + Box::pin(async { + Ok(Arc::new(StubResource) as Arc) + }) + }) + } + + fn stub_key_hasher() -> crate::node::ResourceKeyHasher { + Arc::new(|_params| "test_hash".to_string()) + } + + #[test] + fn register_static_with_resource() { + let rm = Arc::new(ResourceManager::new(ResourcePolicy::default())); + let mut reg = NodeRegistry::with_resource_manager(rm); + reg.register_static_with_resource( + "res_node", + stub_factory, + stub_resource_factory(), + stub_key_hasher(), + serde_json::json!({}), + stub_pins(), + vec!["ml".into()], + false, + ); + assert!(reg.contains("res_node")); + let def = reg.get_definition("res_node").unwrap(); + assert_eq!(def.categories, vec!["ml"]); + } + + #[test] + fn register_dynamic_with_resource() { + let rm = Arc::new(ResourceManager::new(ResourcePolicy::default())); + let mut reg = NodeRegistry::with_resource_manager(rm); + reg.register_dynamic_with_resource( + "dyn_res", + stub_factory, + stub_resource_factory(), + stub_key_hasher(), + serde_json::json!({}), + vec!["ml".into()], + false, + ); + assert!(reg.contains("dyn_res")); + let defs = reg.definitions(); + assert_eq!(defs.len(), 1); + } + + #[tokio::test] + async fn create_node_async_success() { + let rm = Arc::new(ResourceManager::new(ResourcePolicy::default())); + let mut reg = NodeRegistry::with_resource_manager(rm); + reg.register_static_with_resource( + "res_node", + stub_factory, + stub_resource_factory(), + stub_key_hasher(), + serde_json::json!({}), + stub_pins(), + vec![], + false, + ); + let node = reg.create_node_async("res_node", None).await.unwrap(); + assert_eq!(node.input_pins().len(), 1); + } + + #[tokio::test] + async fn create_node_async_unknown_kind() { + let reg = NodeRegistry::new(); + let result = reg.create_node_async("missing", None).await; + assert!(result.is_err()); + assert!(result.err().unwrap().to_string().contains("not found")); + } + + #[tokio::test] + async fn create_node_async_without_resource_manager() { + let mut reg = NodeRegistry::new(); + reg.register_static( + "plain", + stub_factory, + serde_json::json!({}), + stub_pins(), + vec![], + false, + ); + let node = reg.create_node_async("plain", None).await.unwrap(); + assert_eq!(node.output_pins().len(), 1); + } +} diff --git a/crates/core/src/view_data.rs b/crates/core/src/view_data.rs index 389932d6..a44f8992 100644 --- a/crates/core/src/view_data.rs +++ b/crates/core/src/view_data.rs @@ -53,3 +53,64 @@ pub mod view_data_helpers { } } } + +#[cfg(test)] +mod tests { + use super::*; + use tokio::sync::mpsc; + + #[test] + fn node_view_data_update_construction() { + let ts = SystemTime::now(); + let update = NodeViewDataUpdate { + node_id: "compositor_0".into(), + data: serde_json::json!({"layers": 3}), + timestamp: ts, + }; + assert_eq!(update.node_id, "compositor_0"); + assert_eq!(update.data["layers"], 3); + assert_eq!(update.timestamp, ts); + } + + #[test] + fn node_view_data_update_clone() { + let update = NodeViewDataUpdate { + node_id: "node1".into(), + data: serde_json::json!({"key": "value"}), + timestamp: SystemTime::now(), + }; + let cloned = update.clone(); + assert_eq!(cloned.node_id, update.node_id); + assert_eq!(cloned.data, update.data); + } + + #[test] + fn emit_view_data_with_sender() { + let (tx, mut rx) = mpsc::channel(4); + let sender = Some(tx); + view_data_helpers::emit_view_data( + &sender, + "test_node", + || serde_json::json!({"width": 1920}), + ); + let update = rx.try_recv().unwrap(); + assert_eq!(update.node_id, "test_node"); + assert_eq!(update.data["width"], 1920); + } + + #[test] + fn emit_view_data_without_sender_is_noop() { + let sender: Option> = None; + view_data_helpers::emit_view_data(&sender, "test_node", || { + panic!("closure should not be called when sender is None"); + }); + } + + #[test] + fn emit_view_data_full_channel_does_not_panic() { + let (tx, _rx) = mpsc::channel(1); + let sender = Some(tx); + view_data_helpers::emit_view_data(&sender, "n", || serde_json::json!(1)); + view_data_helpers::emit_view_data(&sender, "n", || serde_json::json!(2)); + } +}