feat(engine): make AddNode non-blocking with async node creation#286
feat(engine): make AddNode non-blocking with async node creation#286streamer45 merged 16 commits intomainfrom
Conversation
Node creation (registry.create_node()) now runs inside tokio::task::spawn_blocking so that slow native plugin constructors (e.g. ONNX model loading via FFI) no longer block the engine actor loop. Key changes: - Add NodeState::Creating variant to streamkit-core. The actor inserts this state immediately when AddNode arrives, closing the observability gap between 'message received' and 'node exists'. - AddNode handler spawns a background task instead of calling create_node() synchronously. Results are sent back via an internal mpsc channel (NodeCreatedEvent) polled in the actor select! loop. - Deferred connection queue: Connect requests where one or both endpoints are still Creating are stored in a Vec<PendingConnection> and replayed when NodeCreated completes successfully. - RemoveNode while Creating: cancelled node IDs are tracked in a HashSet. When NodeCreated arrives for a cancelled node, the result is discarded (no zombie nodes). - Pipeline activation naturally gates on Creating state since it doesn't match Ready|Running|Degraded|Recovering. - UI: NodeStateIndicator.tsx and sessionStatus.ts handle the new Creating state (uses initializing color, treated like Initializing for session status). - 10 comprehensive test cases covering: basic async creation, deferred connections, concurrent creation, creation failure, RemoveNode while Creating, pipeline activation timing, duplicate AddNode, remove then re-add, shutdown while Creating, and mixed realized/creating connections. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
- Fix remove-then-readd race: clear cancelled_creations entry when a new AddNode arrives with the same ID, preventing the new creation result from being mistakenly discarded. - Fix Disconnect not draining pending_connections: remove matching deferred connections so they aren't replayed after the user explicitly disconnected them. - Fix broadcast_state_update skipping previous-state gauge zeroing: mirror the one-hot gauge pattern from handle_state_update so engine-synthesized transitions (Creating → Failed) don't leave stale gauge series at 1. - Strengthen test_remove_then_readd_same_id to verify the node is fully initialized (not Creating/Failed) after the re-add. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| /// Execute any pending connections whose both endpoints are now realized | ||
| /// (i.e., present in `live_nodes`). | ||
| async fn flush_pending_connections(&mut self) { | ||
| // Drain the vec, keeping connections that still have unrealized endpoints. | ||
| let pending = std::mem::take(&mut self.pending_connections); | ||
| let mut still_pending = Vec::new(); | ||
|
|
||
| for pc in pending { | ||
| let from_realized = self.live_nodes.contains_key(&pc.from_node); | ||
| let to_realized = self.live_nodes.contains_key(&pc.to_node); | ||
|
|
||
| if from_realized && to_realized { | ||
| tracing::info!( | ||
| "Replaying deferred connection {}.{} -> {}.{}", | ||
| pc.from_node, | ||
| pc.from_pin, | ||
| pc.to_node, | ||
| pc.to_pin | ||
| ); | ||
| self.connect_nodes(pc.from_node, pc.from_pin, pc.to_node, pc.to_pin, pc.mode).await; | ||
| self.check_and_activate_pipeline(); | ||
| } else { | ||
| still_pending.push(pc); | ||
| } | ||
| } | ||
|
|
||
| self.pending_connections = still_pending; | ||
| } |
There was a problem hiding this comment.
🚩 Pending connections left orphaned if both endpoints fail creation
If both endpoints of a pending connection are in Creating state and one fails, the code correctly drains pending connections referencing the failed node (crates/engine/src/dynamic_actor.rs:1549-1550). However, if neither endpoint fails but the second endpoint never transitions out of Creating (e.g., its background task panics silently and the NodeCreatedEvent send fails because the actor loop has moved on), the pending connection would remain in self.pending_connections indefinitely. The let _ = tx.send(...) at line 1720-1721 silently drops send failures. This is an unlikely edge case since spawn_blocking panics are caught at line 1715, but a monitoring mechanism or periodic cleanup of stale pending connections could provide defense in depth.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Acknowledged — this is a valid defense-in-depth observation. The let _ = tx.send(...) at the sender side could silently drop if the actor's receiver is closed (e.g., during shutdown). In practice this is safe because:
spawn_blockingpanics are caught by theJoinErrorhandling at line 1743-1746- If
tx.send()fails, it means the actor loop has exited (receiver dropped), so the pending connection would never be replayed anyway
A periodic stale-connection cleanup or TTL could be added as a follow-up if monitoring reveals orphaned entries in practice. For now, the risk is minimal given the actor's single-threaded model and the existing shutdown cleanup (self.pending_connections.clear()).
…counter The previous cancelled_creations HashSet approach had a race condition in Remove → re-Add sequences: clearing the cancellation on re-Add allowed the old background task's result to also be processed, causing double initialization and resource leaks. Replace with a monotonic creation_id counter (next_creation_id) and an active_creations map (node_id → creation_id). Each spawned creation task carries its creation_id; handle_node_created only accepts results whose creation_id matches the current active entry. Stale results from cancelled or superseded creations are silently discarded. Also fix broadcast_state_update to read previous state from node_states BEFORE inserting the new one (it now owns the insertion), so the one-hot gauge zeroing works correctly for engine-synthesized transitions like Creating → Failed. Add a zero_state_gauge helper for the Creating → Initializing transition in handle_node_created. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| self.active_creations.remove(&node_id); | ||
| self.node_states.remove(&node_id); | ||
| self.node_kinds.remove(&node_id); | ||
| // Drain pending connections referencing this node. | ||
| self.pending_connections | ||
| .retain(|pc| pc.from_node != node_id && pc.to_node != node_id); |
There was a problem hiding this comment.
🚩 Stale subscriber state when RemoveNode cancels a Creating node
When RemoveNode is called for a node in Creating state (lines 1770-1775), the node is removed from node_states but no state update is broadcast to subscribers. Subscribers that received the initial Creating notification via broadcast_state_update at line 1718 will retain a stale entry until their next snapshot query via get_node_states(). This is consistent with shutdown_node which also doesn't broadcast an explicit removal event — it relies on the node transitioning through Failed/Stopped before being cleaned up. For the Creating→removed path there is no intermediate state. This is a design choice rather than a bug, but worth noting for real-time subscription consumers.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Acknowledged — this is consistent with shutdown_node's behavior which also doesn't broadcast an explicit removal event. The get_node_states() query path returns the current node_states map directly, so subscribers that poll will see the node disappear on the next query. Real-time subscription consumers would need to infer removal from the absence of the node in a subsequent snapshot. Adding an explicit "Removed" state or removal notification could be a follow-up if real-time consumers need it, but that would be a broader change affecting all node removal paths, not just Creating→removed.
There was a problem hiding this comment.
Acknowledged — this is consistent with shutdown_node's existing pattern (no explicit removal broadcast). The WebSocket layer handles the NodeRemoved event separately via handle_remove_node which broadcasts it to clients before sending the engine control message. So UI consumers do get notified of the removal through the API layer, even though the engine's state subscribers don't get an explicit removal event. This is a pre-existing design choice.
When RemoveNode cancels a Creating node, the node_state_gauge for (node_id, 'creating') was never zeroed, causing a permanent metrics leak. Add zero_state_gauge call before removing state, mirroring shutdown_node's pattern. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| let result = tokio::task::spawn_blocking(move || { | ||
| let guard = match registry.read() { | ||
| Ok(g) => g, | ||
| Err(err) => { | ||
| return Err(StreamKitError::Runtime(format!( | ||
| "Registry lock poisoned: {err}" | ||
| ))); | ||
| }, | ||
| }; | ||
| guard.create_node(&spawn_kind, params.as_ref()) | ||
| }) |
There was a problem hiding this comment.
🚩 create_node (sync) is used instead of create_node_async — resource management not supported
The background creation task at crates/engine/src/dynamic_actor.rs:1736 calls guard.create_node(&spawn_kind, params.as_ref()) (the synchronous version). The registry also has create_node_async (crates/core/src/registry.rs:332) which supports the ResourceManager for shared resource caching (ML models). However, the old code also used create_node (sync), not create_node_async, so this is not a regression. If resource-managed node creation is desired in the dynamic engine, this would need to be changed to call create_node_async inside a tokio::spawn (not spawn_blocking) since create_node_async is async. This is a pre-existing limitation, not introduced by this PR.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Correct — this is a pre-existing limitation, not introduced by this PR. The old code also used create_node (sync). Using create_node_async here would require a different approach since it's async and expects a ResourceManager, but spawn_blocking is specifically needed because the native plugin FFI constructors are synchronous and blocking. Layering create_node_async support on top could be a follow-up if resource-managed node creation is needed in the dynamic engine.
There was a problem hiding this comment.
Acknowledged — both observations are pre-existing patterns, not introduced by this PR:
-
create_nodevscreate_node_async: Correct, the old code also usedcreate_node(sync). Thespawn_blockingwrapper addresses the blocking concern for FFI. Switching tocreate_node_asyncforResourceManagersupport would be a separate enhancement. -
Orphaned pending connections: The
spawn_blockingpanic is caught at theJoinErrorhandler and converted to aNodeCreatedEventwith an error result, so the node transitions toFailedand pending connections are drained. Thelet _ = tx.send(...)silently dropping would only happen if the actor loop has already shut down (receiver dropped), in which case pending connections are moot. Agree that periodic cleanup could be added as defense-in-depth in a future PR.
With async node creation, nodes transition through Creating before reaching Initializing/Ready/Running. In the test environment, nodes may also reach Failed if the node kind isn't available in the test registry. The test's purpose is verifying clean session destruction, not specific node states, so accept all valid lifecycle states. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
…rove TuneNode log - Route Creating → Initializing through broadcast_state_update so the gauge transition zeroes Creating and sets Initializing atomically (no window where no gauge reads 1 for the node). - Add upfront check in Connect handler: both endpoints must exist in node_states before deferring. Connecting to a non-existent node while the other is Creating would otherwise leak a pending connection that is never flushed. - Distinguish 'still Creating' from 'non-existent' in TuneNode warning. - Remove unused emit_creating helper (dead code). Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| Err(e) => { | ||
| tracing::error!( | ||
| node_id = %node_id, | ||
| kind = %kind, | ||
| error = %e, | ||
| "Background node creation failed" | ||
| ); | ||
|
|
||
| // Broadcast Failed (reads prev state before inserting). | ||
| self.broadcast_state_update(&node_id, NodeState::Failed { reason: e.to_string() }); | ||
|
|
||
| // Drain pending connections referencing this node. | ||
| self.pending_connections | ||
| .retain(|pc| pc.from_node != node_id && pc.to_node != node_id); | ||
| }, | ||
| } |
There was a problem hiding this comment.
🚩 Connecting to a Failed node is handled gracefully but silently
If a node transitions to Failed after creation failure, it remains in node_states but is absent from live_nodes and node_pin_metadata. A subsequent Connect request targeting this node would pass the existence check at line 1833 (it's in node_states) and the Creating check at line 1850 (it's Failed, not Creating), then call connect_nodes. Inside connect_nodes, validate_connection_types at crates/engine/src/dynamic_actor.rs:877 would fail with "Source/Destination node not found" because node_pin_metadata has no entry for the failed node. This is handled gracefully with an error log, but the caller (WebSocket handler) has already optimistically inserted the connection into the pipeline model. This is a pre-existing pattern where the API-layer pipeline model can diverge from the engine's actual state on errors.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
This is correct and by design — a Failed node in node_states is semantically equivalent to the pre-existing behavior where create_node errors were logged but the node wasn't cleaned up from state tracking. The Failed state is observable via state subscriptions (broadcast via broadcast_state_update), so callers do get an explicit signal. The stale entries serve as a record that creation was attempted and failed, which is useful for diagnostics. Cleanup happens via RemoveNode or Shutdown, consistent with how other failure modes work in the engine.
There was a problem hiding this comment.
Both observations are accurate and pre-existing by design:
-
Stale entries after creation failure: Correct —
Failednodes remain innode_states/node_kindsuntil explicitly removed viaRemoveNode. This is intentional so clients can see the failure reason via state subscriptions and the pipeline view.check_and_activate_pipelinecorrectly blocks on Failed nodes (the pipeline can't activate with a broken node). -
Connect to Failed node: Also a pre-existing pattern — the API layer optimistically inserts connections before the engine validates pin compatibility.
validate_connection_typescatches the missing metadata and logs an error. This API/engine state divergence exists for all error cases (not just Failed nodes) and is outside the scope of this PR.
Built-in nodes (silence, gain) should always succeed creation. Only accept Creating/Initializing/Ready/Running — if these nodes fail, that's a real regression that should surface as a test failure. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
…ode at API layer P1: TuneNode messages arriving while a node is still in Creating state are now queued in pending_tunes and replayed once the node finishes initialization. This prevents a regression where UpdateParams was persisted and broadcast to clients but the eventual node instance never received the config change. P2: handle_add_node now checks for duplicate node_id in the pipeline model before inserting. Previously, the API layer would silently overwrite the pipeline entry and broadcast NodeAdded while the engine actor rejected the duplicate — leaving clients showing stale kind/params while the old live node continued running. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
…ode at API layer P1: TuneNode messages arriving while a node is still in Creating state are now queued in pending_tunes and replayed once the node finishes initialization. This prevents a regression where UpdateParams was persisted and broadcast to clients but the eventual node instance never received the config change. P2: handle_add_node now checks for duplicate node_id in the pipeline model before inserting. Previously, the API layer would silently overwrite the pipeline entry and broadcast NodeAdded while the engine actor rejected the duplicate — leaving clients showing stale kind/params while the old live node continued running. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
The test server uses Config::default() which only registers core nodes. 'silence' and 'gain' are not core nodes, so async creation correctly transitions them Creating → Failed. This test validates clean session destruction regardless of individual node outcomes, so Failed is a valid observed state. Added comment explaining why. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
The test was using unregistered node types ('silence', 'gain') which
only passed before because synchronous creation failure was silent
(no state broadcast). With async creation these correctly transition
to Failed. Fix by using 'audio::gain' which is a real registered
built-in node type with in/out pins.
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Verifies that UpdateParams messages sent while a node is still being constructed (Creating state) are queued and replayed after the node finishes initialization. Uses a TuneTrackingSlowNode that counts received UpdateParams messages. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
- Add pre-validation pass in handle_apply_batch to reject duplicate AddNode IDs before mutating the pipeline model. Simulates the batch's Add/Remove sequence so Remove→Add within a batch is still allowed. - Clean up node_kinds in both handle_node_created failure paths (creation failure and initialization failure), matching the cleanup done in RemoveNode-while-Creating. - Remove unused _delay field from SlowTestNode (zero-sized struct). Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| // Validate permissions for all operations | ||
| for op in &operations { | ||
| if let streamkit_api::BatchOperation::AddNode { kind, params, .. } = op { |
There was a problem hiding this comment.
🚩 Missing oneshot and plugin allowlist checks in batch operations
The handle_add_node function (line 432-453) rejects oneshot-only nodes (streamkit::http_input, streamkit::http_output) and enforces the plugin allowlist (perms.is_plugin_allowed). Neither handle_apply_batch (lines 1259-1320) nor handle_validate_batch (lines 1135-1191) perform these checks. This means a user could add oneshot-only marker nodes or bypass the plugin allowlist via the batch API. This is a pre-existing issue (the validation loops are unchanged in this PR), but it's worth noting since the developer was actively working on batch validation (adding the duplicate node_id pre-check).
(Refers to lines 1259-1320)
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Acknowledged — the missing oneshot and plugin allowlist checks in the batch path are a pre-existing issue, not introduced by this PR. The batch validation loop at lines 1259-1320 was unchanged; I only added the duplicate node_id pre-check. Fixing the missing oneshot/allowlist checks in the batch path would be a good follow-up but is out of scope for this PR.
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| Err(e) => { | ||
| tracing::error!( | ||
| node_id = %node_id, | ||
| kind = %kind, | ||
| error = %e, | ||
| "Background node creation failed" | ||
| ); | ||
|
|
||
| // Broadcast Failed (reads prev state before inserting). | ||
| self.broadcast_state_update(&node_id, NodeState::Failed { reason: e.to_string() }); | ||
|
|
||
| // Clean up node_kinds (mirrors RemoveNode-while-Creating). | ||
| self.node_kinds.remove(&node_id); | ||
|
|
||
| // Drain pending connections and tunes referencing this node. | ||
| self.pending_connections | ||
| .retain(|pc| pc.from_node != node_id && pc.to_node != node_id); | ||
| self.pending_tunes.retain(|pt| pt.node_id != node_id); | ||
| }, | ||
| } |
There was a problem hiding this comment.
🚩 Failed node creation leaves persistent state that blocks pipeline activation
When handle_node_created processes a creation failure (either constructor error or initialization error), it broadcasts NodeState::Failed via broadcast_state_update and removes node_kinds, but the Failed entry remains in node_states indefinitely. Since check_and_activate_pipeline at dynamic_actor.rs:297-305 excludes Failed from the activatable set, a failed creation will block pipeline activation for ALL other nodes until someone explicitly sends RemoveNode for the failed node. This is likely intentional (the user should see the failure and decide what to do), but it's worth confirming this UX is desired — auto-cleanup on creation failure would be an alternative.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
This is intentional — a Failed node should remain visible so the user can see the failure reason and decide whether to remove it or retry. Auto-cleanup would silently hide creation failures. The existing NodeState::Failed behavior for other failure modes (e.g., node runtime errors) works the same way — the failed node stays in node_states until explicitly removed.
The pipeline activation blocking is also correct behavior: if a user added a node and it failed to create, they should know the pipeline isn't fully operational rather than having it silently activate without the intended node.
Summary
Make
AddNodenon-blocking in the dynamic engine actor so that when native ML plugins load large ONNX models via FFI in their constructors (blocking 10-20+ seconds), the engine can continue processing other control messages.Key changes:
AddNodeinsertsNodeState::Creatingimmediately, spawnsregistry.create_node()insidespawn_blocking, and processes the next messageConnectdefers if endpoints aren't realized yet, replays when readyTuneNodemessages forCreatingnodes are queued and replayed after init (prevents regression whereUpdateParamsbroadcast to clients but node never receives config)RemoveNodewhileCreating: usescreation_idgeneration counter to discard stale background resultsAddNoderejected at both engine level (node_states) and API level — bothhandle_add_nodeandhandle_apply_batch(pre-validation pass simulates Add/Remove sequence so Remove→Add within a batch is still allowed)node_kindscleaned up on all failure paths (creation failure, initialization failure), matching RemoveNode-while-Creating cleanupCreatingstate inNodeStateIndicatorandsessionStatustest_session_destroy_shuts_down_pipelineto use registeredaudio::gainnodesReview & Testing Checklist for Human
creation_idcounter correctly prevents staleNodeCreatedresults from being applied after Remove→re-Add cycles. Checkhandle_node_createddiscards mismatched IDs.UpdateParamsmessages are replayed in order after initialization. Test with a real slow plugin (e.g., Parakeet/Whisper) — send param updates while model is loading, verify node starts with latest config.handle_apply_batchcorrectly rejects duplicate AddNode IDs while still allowing Remove→Add sequences within the same batch.Creating → Initializingtransition.broadcast_state_updatereads prev state before inserting new state.Startsignal) until all nodes leaveCreatingstate.Recommended test plan
TuneNodewith updated params, verify the UI showsCreatingstateNotes
cargo auditadvisory (RUSTSEC-2026-0097 forrand 0.10.0) — unrelated to this PRLink to Devin session: https://staging.itsdev.in/sessions/8ce6e91d5f81470fbb4abb131c046e7b
Requested by: @streamer45