test(engine): add unit tests for graph_builder, oneshot, and pin_distributor#427
Conversation
…ributor Add comprehensive unit tests for three engine modules: - graph_builder.rs: 7 tests covering linear pipeline wiring, type-compatible/incompatible connections, passthrough type resolution, missing pin name errors, and standalone node execution. - oneshot.rs: 4 tests covering config defaults, linear pipeline end-to-end execution with data verification, cancellation token support, and failing node error propagation. - pin_distributor.rs: 5 new tests covering broadcast distribution to multiple outputs, dynamic pin add/remove lifecycle, best-effort backpressure behavior, and shutdown message handling. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
| for rx in &mut receivers { | ||
| let Some(pkt) = rx.recv().await else { | ||
| panic!("receiver channel closed unexpectedly"); |
There was a problem hiding this comment.
📝 Info: Several async tests can hang indefinitely if the behavior regresses before their assertions
The new tests mostly use timeouts only when awaiting task shutdown, but key receive points such as rx.recv().await remain unbounded. For example, if the distributor processes the data message before the queued AddConnection config messages in a future implementation change, this test would wait forever at the first receiver instead of failing with a useful timeout. This is not a bug in the current code because PinDistributorActor::run uses a biased select that prioritizes config messages (crates/engine/src/dynamic_pin_distributor.rs:165-181), but wrapping receives in timeouts would make these regression tests more diagnosable.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
| let mut received = Vec::new(); | ||
| while let Ok(pkt) = tokio::time::timeout(std::time::Duration::from_millis(200), rx.recv()).await | ||
| { | ||
| if let Some(pkt) = pkt { | ||
| if let Packet::Text(s) = pkt { | ||
| received.push(s.to_string()); | ||
| } | ||
| } else { | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| assert!(!received.is_empty(), "should have received at least one packet"); |
There was a problem hiding this comment.
🚩 Best-effort test only proves delivery, not drop-on-full behavior
The test name says best-effort drops when full, but the assertion only requires that at least one packet was received. Given PinDistributorActor::distribute_packet stores a packet in pending_best_effort when try_send hits Full and does not currently flush that pending slot later (crates/engine/src/dynamic_pin_distributor.rs:285-296 and crates/engine/src/dynamic_pin_distributor.rs:370-382), this test is not checking the intended drop/overwrite semantics. I did not flag this as a bug because the PR only adds tests and the exact intended “newest packet” behavior is broader pre-existing implementation context, but the test could be strengthened by asserting bounded received count or exact dropped/retained packets.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
| async fn wire( | ||
| nodes: HashMap<String, Box<dyn ProcessorNode>>, | ||
| connections: &[Connection], | ||
| node_kinds: &HashMap<String, String>, | ||
| ) -> Result<HashMap<String, graph_builder::LiveNode>, StreamKitError> { | ||
| graph_builder::wire_and_spawn_graph( | ||
| nodes, | ||
| connections, | ||
| node_kinds, | ||
| 1, | ||
| DEFAULT_ONESHOT_MEDIA_CAPACITY, | ||
| None, | ||
| None, | ||
| None, | ||
| None, | ||
| None, | ||
| ) | ||
| .await |
There was a problem hiding this comment.
📝 Info: New graph-builder tests exercise oneshot-only fan-out assumptions indirectly
The added graph_builder tests call wire_and_spawn_graph directly and validate linear, passthrough, unknown-pin, and standalone cases. This is consistent with the implementation’s early oneshot fan-out rejection in crates/engine/src/graph_builder.rs:77-88, and the existing oneshot_linear test already covers that explicit constraint. I considered whether adding these un-gated tests would affect dynamic-only builds, but graph_builder, oneshot, and the needed default constants are unconditional modules in crates/engine/src/lib.rs:16-18, while pin_distributor remains behind the existing dynamic feature gate in crates/engine/src/tests/mod.rs:15-18.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
- Remove redundant type_compatible_connection_succeeds test (finding 1) - Tighten best_effort_drops_when_full assertion to verify drops (finding 3) - Rename passthrough test to clarify wiring-only scope (finding 4) - Switch drain() and task awaits to join_all (finding 6) Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Summary
Adds unit tests to
crates/engine/modules that previously had no test coverage:graph_builder.rs(6 tests): linear pipeline spawn/connect, type-incompatible rejection, passthrough wiring type resolution, missing input/output pin errors, standalone node executiononeshot.rs(4 tests): linear pipeline end-to-end, cancellation token, failing node error propagation, config defaultspin_distributor.rs(5 new tests): broadcast fanout, dynamic add/remove, shutdown message, closed-output handling, best-effort drop semanticsAll 51 engine tests pass. No production code changes.
Review & Testing Checklist for Human
cargo test -p streamkit-engine --features dynamicpasses (51 tests)best_effort_drops_when_fullassertion (received.len() < 5) correctly exercises drop semantics with a 1-slot channelpassthrough_wiring_resolves_type_from_upstreamname clearly conveys it tests connection-time type resolution, not data flowNotes
join_allfor parallel task drainingpin_distributor_removes_closed_outputsassertionLink to Devin session: https://staging.itsdev.in/sessions/693b00178cda43e3b4de00f895d76f59
Requested by: @streamer45
Devin Review
d2e7077(HEAD is537fad2)