feat(mcp): add update_pipeline tool and extend batch mutation tests#395
Conversation
Add a new `update_pipeline` MCP tool that takes desired pipeline YAML and a session ID, diffs against the current pipeline state, and applies the minimal set of batch operations (addnode, removenode, connect, disconnect) to reconcile the running session. This provides a declarative "desired state" interface for agents that reason about pipeline YAML rather than individual graph mutations. Also add integration tests for previously uncovered batch mutation operations: connect, disconnect, removenode, and multi-operation batches. Add tests for the new update_pipeline tool covering add-node, remove-node, no-op, invalid YAML, and permission denied cases. Update MCP documentation and skill files to reflect the new tool (16 tools total). Closes #360 Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
| let current = { session.pipeline.lock().await.clone() }; | ||
|
|
||
| // Compute diff → batch operations. | ||
| let operations = diff_pipeline(¤t, &desired); | ||
|
|
||
| if operations.is_empty() { | ||
| info!(session_id = %args.session_id, "MCP update_pipeline (no changes)"); | ||
| let result = serde_json::json!({ | ||
| "operations_applied": 0, | ||
| "operations": [], | ||
| }); | ||
| return json_tool_result(&result); | ||
| } | ||
|
|
||
| // Apply via the shared batch path (validates + applies). | ||
| crate::server::apply_batch_operations( | ||
| &session, | ||
| operations.clone(), | ||
| &perms, | ||
| &self.app_state.config.security, | ||
| ) | ||
| .await | ||
| .map_err(|e| McpError::invalid_params(e, None))?; |
There was a problem hiding this comment.
📝 Info: TOCTOU gap between pipeline snapshot and batch apply is documented and acceptable
The pipeline is snapshotted at line 978 (session.pipeline.lock().await.clone()), then the lock is released, diff is computed, and batch operations are applied. Concurrent mutations between snapshot and apply could cause errors like 'node already exists'. This is explicitly documented in the tool description and in agent_docs/mcp.md:180-182 with the recommendation to retry. The alternative (holding the lock during diff computation) would block the entire session for the duration, which is worse for a multi-tenant server. The current design trades strict atomicity for better concurrency, which is appropriate for this use case.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
| // 1. Disconnect removed connections (that aren't on nodes being removed). | ||
| for conn in ¤t.connections { | ||
| let key = conn_key(conn); | ||
| if !desired_conns.contains(&key) | ||
| && desired_node_ids.contains(conn.from_node.as_str()) | ||
| && desired_node_ids.contains(conn.to_node.as_str()) | ||
| { | ||
| ops.push(BatchOperation::Disconnect { | ||
| from_node: conn.from_node.clone(), | ||
| from_pin: conn.from_pin.clone(), | ||
| to_node: conn.to_node.clone(), | ||
| to_pin: conn.to_pin.clone(), | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| // 2. Remove nodes that no longer exist. | ||
| for node_id in ¤t_node_ids { | ||
| if !desired_node_ids.contains(node_id) { | ||
| ops.push(BatchOperation::RemoveNode { node_id: (*node_id).to_string() }); | ||
| } | ||
| } | ||
|
|
||
| // 3. Add new nodes. | ||
| for (node_id, node) in &desired.nodes { | ||
| if !current_node_ids.contains(node_id.as_str()) { | ||
| ops.push(BatchOperation::AddNode { | ||
| node_id: node_id.clone(), | ||
| kind: node.kind.clone(), | ||
| params: node.params.clone(), | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| // 4. Connect new connections. | ||
| for conn in &desired.connections { | ||
| if !current_conns.contains(&conn_key(conn)) { | ||
| ops.push(BatchOperation::Connect { | ||
| from_node: conn.from_node.clone(), | ||
| from_pin: conn.from_pin.clone(), | ||
| to_node: conn.to_node.clone(), | ||
| to_pin: conn.to_pin.clone(), | ||
| mode: conn.mode, | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
📝 Info: Operation ordering (disconnects → removes → adds → connects) is correct for apply_batch_operations
The diff produces operations in the order: disconnects, removes, adds, connects. This is safe because apply_batch_operations at apps/skit/src/server/mod.rs:4115 processes them sequentially. Disconnects first remove stale connections between surviving nodes; removes clean up departing nodes (and their remaining connections via line 4136-4138); adds create new nodes; connects wire up new/surviving nodes. A new connection referencing a newly-added node works because adds precede connects. The optimization of skipping explicit disconnects for connections on removed nodes (line 1043-1045 condition) is also correct since RemoveNode handler auto-cleans connections.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
diff_pipeline now detects nodes whose kind changed between current and desired state and treats them as replacements (remove + re-add). This prevents a silent no-op when an agent changes a node's kind while keeping the same node ID. Connections touching replaced nodes are explicitly disconnected before removal and reconnected after re-addition per the desired state. Add mcp_update_pipeline_kind_change integration test to verify the behavior. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| // 1. Disconnect removed connections (that aren't on nodes being fully removed). | ||
| // Also disconnect connections touching replaced nodes (they will be re-added). | ||
| for conn in ¤t.connections { | ||
| let key = conn_key(conn); | ||
| let from_replaced = replaced_node_ids.contains(conn.from_node.as_str()); | ||
| let to_replaced = replaced_node_ids.contains(conn.to_node.as_str()); | ||
| let from_survives = desired_node_ids.contains(conn.from_node.as_str()); | ||
| let to_survives = desired_node_ids.contains(conn.to_node.as_str()); | ||
|
|
||
| if (!desired_conns.contains(&key) || from_replaced || to_replaced) | ||
| && from_survives | ||
| && to_survives | ||
| { | ||
| ops.push(BatchOperation::Disconnect { | ||
| from_node: conn.from_node.clone(), | ||
| from_pin: conn.from_pin.clone(), | ||
| to_node: conn.to_node.clone(), | ||
| to_pin: conn.to_pin.clone(), | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
📝 Info: Disconnect operations for replaced nodes are redundant with RemoveNode cleanup
The diff logic at apps/skit/src/mcp/mod.rs:1109-1127 explicitly emits Disconnect operations for connections touching replaced nodes, even though the subsequent RemoveNode operation (line 1131-1136) would tear down those connections anyway. The comment acknowledges this design: "Connections on replaced nodes are explicitly disconnected because the node is re-added as a new instance." This is technically safe since disconnects are ordered before removes, but produces slightly larger batch payloads than necessary. In the test mcp_update_pipeline_kind_change_preserves_connections, this manifests as 4 operations (disconnect + removenode + addnode + connect) where 3 might suffice (removenode + addnode + connect). Not a bug since the engine handles it correctly, but worth noting for future optimization.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
Step 4 of diff_pipeline now also emits Connect operations for desired connections that touch replaced nodes, even if those connections existed in the current pipeline. Previously, connections on nodes whose kind changed were disconnected (step 1) but never reconnected because step 4 only emitted Connect for connections absent from current_conns. Add mcp_update_pipeline_kind_change_preserves_connections test to verify connections survive node kind changes. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| // 1. Disconnect removed connections (that aren't on nodes being fully removed). | ||
| // Also disconnect connections touching replaced nodes (they will be re-added). | ||
| for conn in ¤t.connections { | ||
| let key = conn_key(conn); | ||
| let from_replaced = replaced_node_ids.contains(conn.from_node.as_str()); | ||
| let to_replaced = replaced_node_ids.contains(conn.to_node.as_str()); | ||
| let from_survives = desired_node_ids.contains(conn.from_node.as_str()); | ||
| let to_survives = desired_node_ids.contains(conn.to_node.as_str()); | ||
|
|
||
| if (!desired_conns.contains(&key) || from_replaced || to_replaced) | ||
| && from_survives | ||
| && to_survives | ||
| { | ||
| ops.push(BatchOperation::Disconnect { | ||
| from_node: conn.from_node.clone(), | ||
| from_pin: conn.from_pin.clone(), | ||
| to_node: conn.to_node.clone(), | ||
| to_pin: conn.to_pin.clone(), | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
📝 Info: Replaced-node disconnect logic correctly handles the from_survives/to_survives check
The disconnect phase (lines 1109-1127) checks from_survives && to_survives using desired_node_ids, which includes replaced nodes (since they exist in both current and desired, just with different kind). This ensures connections on replaced nodes are explicitly disconnected before the node is removed and re-added. Connections involving fully-removed nodes are correctly skipped because RemoveNode in apply_batch_operations (apps/skit/src/server/mod.rs:4134-4138) tears down their connections. I verified this for several scenarios: both-endpoints-replaced, one-removed-one-replaced, and one-removed-one-surviving — all produce correct operation sequences.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
- Detect param-only changes and auto-apply via tune_node when caller has tune_nodes permission; return params_changed in response - Include ConnectionMode in ConnKey so Reliable↔BestEffort flips are detected as disconnect+reconnect (derive Hash on ConnectionMode) - Tighten kind-change-preserves-connections assertion to == 4 - Document TOCTOU retry expectation and partial-failure semantics in tool description - Add clarifying comments for multi-edge/dangling-edge reasoning - Use IndexMap iteration for deterministic operation ordering - Assert error message contains 'YAML' in invalid test - Add tests: mode-only change, params-only change, both-endpoints-replaced - Update docs with params-drift and partial-failure caveats Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| Copy, | ||
| PartialEq, | ||
| Eq, | ||
| Hash, |
There was a problem hiding this comment.
📝 Info: ConnKey includes mode to detect Reliable↔BestEffort flips as disconnect+reconnect
The ConnKey type at apps/skit/src/mcp/mod.rs:1098 includes ConnectionMode as the 5th tuple element, which is what triggers the Hash derive addition on ConnectionMode in crates/core/src/control.rs. This design ensures mode changes are detected as connection removals + additions (disconnect old mode, connect new mode). The BatchOperation::Disconnect variant doesn't carry a mode field (apps/skit/src/server/mod.rs:4175-4194), so the disconnect removes the connection regardless of mode, and the subsequent Connect re-establishes it with the desired mode. This works correctly because the YAML compiler never emits parallel edges between the same pin pair (as noted in the inline comment at line 1128-1129).
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Confirmed — Hash is required for the ConnKey tuple used in HashSet. Minimal and non-breaking addition to a simple 2-variant enum.
| let current_conns: HashSet<ConnKey> = current.connections.iter().map(conn_key).collect(); | ||
| let desired_conns: HashSet<ConnKey> = desired.connections.iter().map(conn_key).collect(); | ||
|
|
||
| // 1. Disconnect removed/changed connections. | ||
| // We skip connections where either endpoint is being fully removed | ||
| // (the engine tears those down with RemoveNode). Connections on | ||
| // replaced nodes are explicitly disconnected because the node is | ||
| // re-added as a new instance. HashSet<ConnKey> collapses true | ||
| // parallel edges (same endpoints + same mode) into one entry; | ||
| // this is acceptable because the engine also deduplicates them. |
There was a problem hiding this comment.
🚩 Duplicate connection handling relies on engine-level deduplication
The diff algorithm collects connections into HashSet<ConnKey> which collapses true parallel edges (same endpoints + same mode) into one entry. However, the iteration at line 1109 (for conn in ¤t.connections) still visits each connection in the Vec individually, potentially emitting duplicate Disconnect operations for the same pin pair. Since BatchOperation::Disconnect has no mode field, these duplicates target the same logical connection. The comment at lines 1106-1108 acknowledges this: "this is acceptable because the engine also deduplicates them." If the engine does NOT handle duplicate disconnects gracefully, this could be an issue, but based on the test suite passing this appears to be safe.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Correct — the HashSet<ConnKey> deduplication and the Vec iteration are consistent here because Disconnect has no mode field, so duplicate disconnects for the same pin pair are idempotent at the engine level. The engine's pipeline.connections.retain(...) handles this gracefully.
When desired YAML omits params but the running node has params set,
treat it as resetting to empty object ({}) instead of silently
dropping the change.
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| type ConnKey = (String, String, String, String, ConnectionMode); | ||
| fn conn_key(c: &streamkit_api::Connection) -> ConnKey { | ||
| (c.from_node.clone(), c.from_pin.clone(), c.to_node.clone(), c.to_pin.clone(), c.mode) | ||
| } |
There was a problem hiding this comment.
📝 Info: Disconnect is mode-agnostic but diff uses mode-aware ConnKey — potential issue with parallel edges
The diff_pipeline function uses ConnKey = (String, String, String, String, ConnectionMode) to identify connections (line 1078), treating connections with different modes as distinct. However, BatchOperation::Disconnect at apps/skit/src/server/mod.rs:4181-4186 removes ALL connections matching the four endpoint fields regardless of mode. If two connections existed between the same pin pair with different modes and only one should be removed, the Disconnect would remove both, and the surviving one would not be reconnected (since the diff considers it unchanged). In practice, the engine and YAML format likely don't support parallel edges between the same pins, making this theoretical — but worth noting if the model ever allows it.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Correct — the engine/YAML format doesn't support parallel edges between the same pin pair, so this is theoretical. The mode-aware ConnKey is specifically for detecting Reliable↔BestEffort flips on the same logical connection, which works correctly since Disconnect removes the one matching connection and the new Connect re-establishes it with the desired mode.
…pipeline tune_session_node uses deep_merge_json which preserves keys not present in the new params. For update_pipeline's full-replacement semantics, pre-clear node.params to None before calling tune_session_node so its take() branch skips the merge. Also fix clippy: unwrap_or with function call → unwrap_or_else. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| // 2. Remove nodes that no longer exist or whose kind changed. | ||
| // Iterating current.nodes (IndexMap) gives deterministic ordering. | ||
| for node_id in current.nodes.keys() { | ||
| if !desired_node_ids.contains(node_id.as_str()) | ||
| || replaced_node_ids.contains(node_id.as_str()) | ||
| { | ||
| ops.push(BatchOperation::RemoveNode { node_id: node_id.clone() }); | ||
| } | ||
| } | ||
|
|
||
| // 3. Add new nodes and re-add replaced nodes with new kind. | ||
| for (node_id, node) in &desired.nodes { | ||
| if !current_node_ids.contains(node_id.as_str()) | ||
| || replaced_node_ids.contains(node_id.as_str()) | ||
| { | ||
| ops.push(BatchOperation::AddNode { | ||
| node_id: node_id.clone(), | ||
| kind: node.kind.clone(), | ||
| params: node.params.clone(), | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
📝 Info: diff_pipeline correctly handles replaced-node ordering through check_batch_node_id_uniqueness
At first glance, generating both RemoveNode and AddNode for the same node ID (when its kind changes) looks like it should fail the check_batch_node_id_uniqueness validation in apps/skit/src/server/mod.rs:4013-4034. However, that function simulates operations in order — it removes the ID from live_ids on RemoveNode and re-inserts on AddNode. Since diff_pipeline emits removes before adds (lines 1139-1160), the simulation succeeds. This was verified by tracing through the test case mcp_update_pipeline_kind_change which expects exactly 2 operations (remove + add) and passes.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
| // Parse and compile the desired YAML. | ||
| let user_pipeline = streamkit_api::yaml::parse_yaml(&args.yaml) | ||
| .map_err(|e| McpError::invalid_params(format!("Invalid pipeline YAML: {e}"), None))?; | ||
|
|
||
| let desired = streamkit_api::yaml::compile(user_pipeline).map_err(|e| { | ||
| McpError::invalid_params(format!("Pipeline compilation error: {e}"), None) | ||
| })?; |
There was a problem hiding this comment.
🚩 update_pipeline skips create_session-level validation checks on desired YAML
The create_dynamic_session function (apps/skit/src/server/mod.rs:3874) validates that the pipeline is non-empty, checks for synthetic/oneshot-only node kinds, and performs per-node permission checks. The update_pipeline tool only calls parse_yaml and compile (lines 970-975), then delegates structural validation to apply_batch_operations (which validates AddNode permissions/security). This means:
- An
update_pipelinethat removes ALL nodes produces an empty pipeline — no guard against this. - Synthetic node kinds would fail at the
apply_batch_operationslevel (viavalidate_add_node_op), so they're still caught.
The empty-pipeline case may be intentional (callers might want to clear a session), but it's a semantic difference from create_session's behavior.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
This is intentional — update_pipeline mutates an existing session, and apply_batch already allows removing all nodes. Preventing empty pipelines here would be inconsistent with what apply_batch permits directly. An agent clearing all nodes from a session is a valid (if unusual) use case.
…ssions Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| // 5. Detect param-only changes on surviving, non-replaced nodes. | ||
| let mut params_changed = Vec::new(); | ||
| for (node_id, desired_node) in &desired.nodes { | ||
| if replaced_node_ids.contains(node_id.as_str()) | ||
| || !current_node_ids.contains(node_id.as_str()) | ||
| { | ||
| continue; | ||
| } | ||
| let current_node = ¤t.nodes[node_id.as_str()]; | ||
| if desired_node.params != current_node.params { | ||
| let new_params = desired_node | ||
| .params | ||
| .clone() | ||
| .unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::default())); | ||
| params_changed.push((node_id.clone(), new_params)); | ||
| } | ||
| } |
There was a problem hiding this comment.
📝 Info: Param diff detects changes when YAML omits params but runtime has set them
If a node in the running pipeline has had its params set via tune_node at runtime (e.g., params: Some({"gain": 0.5})), but the desired YAML doesn't specify params for that node (so compiled params: None), diff_pipeline will detect a params change and attempt to reset the params to {} (line 1190's unwrap_or_else). This is correct for 'declarative desired state' semantics but could be surprising to users who expect update_pipeline to only change structural aspects. The documentation should perhaps clarify that omitting params in the YAML means 'reset to empty', not 'leave unchanged'.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
…comments
- Add tune_session_node_replace() with full replacement semantics,
eliminating the TOCTOU-prone pre-clear hack in update_pipeline
- Accumulate partial param failures with warn! log + params_errors
in response instead of early-returning on first error
- Use McpError::internal_error for apply_batch/tune failures
(not invalid_params — those aren't parameter validation errors)
- Fix incorrect comment claiming engine deduplicates parallel edges
(YAML compiler doesn't emit them; engine doesn't deduplicate)
- Add test verifying param replacement removes old keys
({a:1,b:2} -> {a:9} must drop b)
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| for conn in ¤t.connections { | ||
| let key = conn_key(conn); | ||
| let from_replaced = replaced_node_ids.contains(conn.from_node.as_str()); | ||
| let to_replaced = replaced_node_ids.contains(conn.to_node.as_str()); | ||
| let from_survives = desired_node_ids.contains(conn.from_node.as_str()); | ||
| let to_survives = desired_node_ids.contains(conn.to_node.as_str()); | ||
|
|
||
| if (!desired_conns.contains(&key) || from_replaced || to_replaced) | ||
| && from_survives | ||
| && to_survives | ||
| { | ||
| ops.push(BatchOperation::Disconnect { | ||
| from_node: conn.from_node.clone(), | ||
| from_pin: conn.from_pin.clone(), | ||
| to_node: conn.to_node.clone(), | ||
| to_pin: conn.to_pin.clone(), | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
📝 Info: Disconnect logic correctly skips connections on fully-removed nodes
The from_survives && to_survives guard at line 1138-1139 skips emitting explicit Disconnect operations when either endpoint node is being fully removed (not in desired_node_ids). This relies on the engine's RemoveNode handler tearing down connections involving the removed node. Verified this is correct by checking apply_batch_operations at apps/skit/src/server/mod.rs:4134-4138 which does pipeline.connections.retain(|conn| conn.from_node != node_id && conn.to_node != node_id) during RemoveNode processing. The comment in the diff function accurately describes this contract.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
| /// | ||
| /// Also detects param-only changes on surviving nodes and returns them | ||
| /// separately so the caller can apply them via `tune_node`. | ||
| fn diff_pipeline(current: &Pipeline, desired: &Pipeline) -> DiffResult { |
There was a problem hiding this comment.
🚩 Pipeline metadata (name, description, mode) silently ignored by diff
diff_pipeline only compares nodes and connections, ignoring top-level fields like name, description, and mode. This means if the desired YAML changes the pipeline mode (e.g. from dynamic to oneshot) or metadata, those changes are silently dropped. This is consistent with the available BatchOperation variants (which only support structural graph mutations), but could surprise callers who expect full YAML reconciliation. The tool description focuses on 'addnode, removenode, connect, disconnect' which sets the right expectation, but this limitation isn't explicitly called out as a caveat.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
| node.params = Some(if replace { | ||
| durable_params | ||
| } else { | ||
| match node.params.take() { | ||
| Some(existing) => { | ||
| crate::websocket_handlers::deep_merge_json(existing, durable_params) | ||
| }, | ||
| None => durable_params, | ||
| } | ||
| }); |
There was a problem hiding this comment.
📝 Info: Replace semantics correctly avoid deep-merge for declarative updates
The tune_session_node_inner function's replace branch at apps/skit/src/server/mod.rs:4279-4280 directly assigns durable_params instead of deep-merging. This is critical for update_pipeline's declarative semantics: if the desired YAML removes a param key, the old key must actually disappear from the durable model rather than persist via merge. The test mcp_update_pipeline_params_replace_removes_old_keys at line 2137 explicitly verifies this by checking that key b is removed when updating from {a:1, b:2} to {a:9}.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
- Normalize None and Some({}) as equivalent in param diff so
repeated update_pipeline calls with the same YAML are true no-ops
- Gate tune_session_node_replace behind cfg(feature = "mcp") to
avoid dead_code warning in non-MCP builds
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| node.params = Some(if replace { | ||
| durable_params | ||
| } else { | ||
| match node.params.take() { | ||
| Some(existing) => { | ||
| crate::websocket_handlers::deep_merge_json(existing, durable_params) | ||
| }, | ||
| None => durable_params, | ||
| } | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
🚩 NodeParamsChanged event broadcasts full params — UI merge semantics may not reflect key removal
When tune_session_node_replace is called (replace=true), the durable pipeline model gets a full replacement of params. However, the NodeParamsChanged event at apps/skit/src/server/mod.rs:4293-4304 broadcasts the incoming params value. If the UI applies deep-merge semantics on receiving this event (as it does for regular tune_node), keys removed from the YAML won't be removed from the UI state until the next full state fetch. This is a pre-existing limitation of the event system (not introduced by this PR), and is mitigated by the fact that the broadcast contains the full desired params. Still, for UIs that track param state incrementally, this could cause stale keys to linger.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Acknowledged — this is a pre-existing limitation of the NodeParamsChanged event system. The event broadcasts the full desired params so UIs that do a full state refresh will be correct. UIs that apply incremental deep-merge on this event may see stale keys until a full fetch. Not something to address in this PR as it applies to all tune_node usage, not just update_pipeline.
| let normalise = |p: &Option<serde_json::Value>| match p { | ||
| Some(serde_json::Value::Object(m)) if m.is_empty() => None, | ||
| other => other.clone(), | ||
| }; | ||
| if normalise(&desired_node.params) != normalise(¤t_node.params) { |
There was a problem hiding this comment.
📝 Info: Param normalisation treats None and Some({}) as equivalent for idempotency
The normalise closure at apps/skit/src/mcp/mod.rs:1199-1202 converts Some(Value::Object(empty_map)) to None before comparison. This is important because after a replace operation sets params to Some({}), a subsequent update_pipeline call with the same YAML (where params is None) would otherwise detect a spurious change. This normalisation ensures true idempotency for repeated calls. However, it also means that Some({}) in YAML and omitting params entirely are semantically identical — which is the correct behavior for this declarative tool.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
apply_batch_operations returns validation/permission errors (e.g. "node already exists", "permission denied"), not engine-level failures. Keep invalid_params consistent with the apply_batch tool. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Summary
Closes #360 — all six proposed mutation tools are now covered:
add_nodeapply_batch(AddNode op)remove_nodeapply_batch(RemoveNode op)connect_nodesapply_batch(Connect op)disconnect_nodesapply_batch(Disconnect op)update_node_paramstune_node(UpdateParams message)update_pipelineupdate_pipelinetoolTakes
(session_id, yaml), diffs desired YAML against the running pipeline, and applies the minimal set of batch operations:tune_session_node_replace(full replacement, not deep merge) when caller hastune_nodespermissionparams_deferredfor manual follow-upwarn!with context and returned inparams_errorsKey changes
apps/skit/src/mcp/mod.rs:update_pipelinetool +diff_pipelinehelper withDiffResultNone/Some({})for idempotent repeated callsapply_batch_operationserrors useinvalid_params;tune_session_nodeerrors useinternal_errorapps/skit/src/server/mod.rs:tune_session_node_replace()— full replacement semantics (no deep merge), gated behindcfg(feature = "mcp")crates/core/src/control.rs: DeriveHashonConnectionMode(forConnKey)apps/skit/tests/mcp_integration_test.rs: 14 new integration tests (53 total MCP tests)agent_docs/mcp.md&.agents/skills/mcp-usage/SKILL.md: Updated tool docs + caveatsReview & Testing Checklist for Human
apps/skit/src/mcp/mod.rs:1076-1213replaceflag intune_session_node_innercorrectly skips deep merge. Key area:apps/skit/src/server/mod.rs:4279-4289update_pipelinerequiresmodify_sessionsfor structural ops and checkstune_nodesbefore auto-applying paramsNoneandSome({})are normalized as equivalent to prevent spurious UpdateParams on repeated callsSuggested test plan:
just skitupdate_pipelineto:{a:1, b:2}to{a:9}— verifybis removedupdate_pipelineagain with same YAML — verify no-opmodify_sessionsbut NOTtune_nodes— verifyparams_deferredappearsNotes
ConnectionModepreviously lackedHash— minor public API addition, no breaking change.NodeParamsChangedevent broadcasts full params; UIs using incremental deep-merge may see stale keys until full refresh (pre-existing limitation, not introduced by this PR).Link to Devin session: https://staging.itsdev.in/sessions/16776ed914eb42d28dbd471912ccc998
Requested by: @streamer45
Devin Review
582566d