diff --git a/AGENTS.md b/AGENTS.md index 6a18d1505..44eb86e54 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -56,6 +56,40 @@ the full architecture. include a comment explaining the rationale. - **UI tooling**: Use `bun install` / `bunx` / `bun run` — never npm or pnpm. +## Fix Root Causes, Not Symptoms + +Prefer a clean change that takes longer over a brittle stack of patches that +ships sooner. If a feature requires defending against the same race in three +places, layering synchronous shadow refs over async state, or "preferring +draft over live" because two event sources disagree on timing — **stop and +reconsider the contract**, don't add a fourth patch. + +Concrete signals you've crossed into workaround territory: + +- You're adding a *timeout* to recover from a missing event. +- You need a synchronous shadow of state that already lives in an async store. +- You catch yourself writing "the X event doesn't actually mean X, it means + *attempted* X, so we also have to listen for Y to know if it really + happened." +- Tests are updated by adding sleeps or by relying on previously-broken + behavior (e.g. an invalid value being silently accepted). +- The root cause is in a different layer than the one you're editing, and + fixing it there would invalidate most of your patch. + +When you spot this, surface the design issue to the user with a concrete +proposal — even if it's more invasive — *before* writing the patch. State +the tradeoff honestly: "this will take longer but produces something +durable; vs. this short-term fix has these specific brittleness costs." + +Past incident worth remembering: the WebSocket `nodeadded` event used to fire +before plugin construction had even started. The UI accumulated 13 commits of +draft-state machinery (state-watchers, debounce timers, topology priority +hacks) trying to reconstruct "did the node actually get created?" from +out-of-band signals. The actual fix was a small server change — emit +`nodeadded` from the engine actor's success path instead of the WS handler — +which collapsed the UI back to the obvious cleanup. Code that exists to +paper over a broken contract should be deleted, not refined. + ## Verification Commands | Task | Command | diff --git a/apps/skit/src/session.rs b/apps/skit/src/session.rs index 20de224e3..b33e92dc6 100644 --- a/apps/skit/src/session.rs +++ b/apps/skit/src/session.rs @@ -7,7 +7,7 @@ use crate::config::Config; use crate::state::BroadcastEvent; use opentelemetry::global; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use streamkit_api::{Event as ApiEvent, EventPayload, MessageType, Pipeline}; @@ -177,6 +177,16 @@ pub struct Session { /// The handle to send control messages to the running DynamicEngine actor. engine_handle: Arc, pub pipeline: Arc>, + /// Node IDs that the WebSocket layer has accepted into the engine + /// actor but for which `NodeAdded`/`Failed` has not yet arrived. + /// `pipeline.nodes` only sees a node after the engine confirms + /// successful creation; the in-flight set covers the gap so a + /// second `addnode` for the same id (whether from the same or a + /// different client) is rejected at the handler instead of being + /// silently dropped by the actor's duplicate-id guard. Drained on + /// success (node-added forwarder), on failure (state forwarder + /// observes `Failed`), or when an in-flight node is removed. + pub creating_nodes: Arc>>, /// Timestamp when the session was created pub created_at: SystemTime, /// User/role who created this session (for permission filtering) @@ -187,6 +197,46 @@ pub struct Session { } impl Session { + /// Reserves a node id for an in-flight `addnode`. + /// + /// Atomically checks both the live pipeline snapshot and the + /// in-flight set, then inserts into the in-flight set. Returns + /// `Err` describing why the id is unavailable (already live, or + /// already being added). The reservation is drained by the + /// node-added forwarder on success, the state forwarder on a + /// non-`Creating` state transition, or by `release_node_id` on + /// remove/cancellation. + /// + /// Lock order: `pipeline` first, then `creating_nodes`. The two + /// guards are held jointly across both reads to prevent a node + /// from slipping into `pipeline.nodes` (via the node-added + /// forwarder) between the two checks and being silently dropped + /// by the actor's duplicate-id guard later. + /// + /// # Errors + /// + /// Returns `Err` with a human-readable reason when the id is + /// already live or already in flight. + #[allow(clippy::significant_drop_tightening)] // joint lock is the point + pub async fn reserve_node_id(&self, node_id: &str) -> Result<(), String> { + let pipeline = self.pipeline.lock().await; + if pipeline.nodes.contains_key(node_id) { + return Err(format!("Node '{node_id}' already exists in the pipeline")); + } + let mut creating = self.creating_nodes.lock().await; + if creating.contains(node_id) { + return Err(format!("Node '{node_id}' is already being added")); + } + creating.insert(node_id.to_string()); + Ok(()) + } + + /// Releases an in-flight reservation taken by `reserve_node_id`. + /// Used on explicit removal of a still-Creating node. Idempotent. + pub async fn release_node_id(&self, node_id: &str) { + self.creating_nodes.lock().await.remove(node_id); + } + /// Forwards a control message to this session's specific engine actor. pub async fn send_control_message(&self, msg: EngineControlMessage) { if let Err(e) = self.engine_handle.send_control(msg).await { @@ -271,11 +321,27 @@ impl Session { .await .map_err(|e| format!("Failed to subscribe to stats updates: {e}"))?; + // Pre-allocate the in-flight set so the state and node-added + // forwarders can both reach it. `pipeline.nodes` only contains + // confirmed entries; this set fills the gap for accepted-but- + // not-yet-confirmed addnode requests. + let creating_nodes: Arc>> = Arc::new(Mutex::new(HashSet::new())); + // Spawn task to forward state updates to WebSocket clients let session_id_for_state = session_id.clone(); let event_tx_for_state = event_tx.clone(); + let creating_nodes_for_state = creating_nodes.clone(); tokio::spawn(async move { while let Some(update) = state_rx.recv().await { + // Drain the in-flight entry as soon as a non-Creating + // state arrives — a Failed transition means the engine + // gave up on this id, and a later non-Creating state + // (Ready/Running/Degraded) confirms the node is past + // creation. The node-added forwarder also drains on + // success; the second remove is a no-op. + if !matches!(update.state, NodeState::Creating) { + creating_nodes_for_state.lock().await.remove(&update.node_id); + } let event = ApiEvent { message_type: MessageType::Event, correlation_id: None, @@ -375,6 +441,69 @@ impl Session { ); }); + // Subscribe to node-added notifications from the engine and use + // them as the trigger for both updating `pipeline.nodes` and + // emitting the public `NodeAdded` event. Doing this here (and + // not in the WebSocket addnode handler) means clients only see + // `nodeadded` after the engine has confirmed the plugin's + // constructor and `initialize_node` returned Ok — never + // speculatively before the FFI call has even run. Failures + // surface as `NodeStateChanged { state: Failed }` via the + // existing state forwarder above. + // + // Subscribing here (vs earlier in `create`) is safe: the engine + // can't have any nodes until something sends `AddNode`, and the + // first `AddNode` can't arrive until `create` returns the + // handle to its caller. + let pipeline = Arc::new(Mutex::new(Pipeline::default())); + let mut node_added_rx = engine_handle + .subscribe_node_added() + .await + .map_err(|e| format!("Failed to subscribe to node-added updates: {e}"))?; + let session_id_for_node_added = session_id.clone(); + let event_tx_for_node_added = event_tx.clone(); + let pipeline_for_node_added = pipeline.clone(); + let creating_nodes_for_node_added = creating_nodes.clone(); + tokio::spawn(async move { + while let Some(notification) = node_added_rx.recv().await { + // Update the cached pipeline snapshot first, then + // broadcast — late subscribers (re-fetching the pipeline + // immediately after a `nodeadded` event) see a + // consistent view that already includes the new entry. + { + let mut pip = pipeline_for_node_added.lock().await; + pip.nodes.insert( + notification.node_id.clone(), + streamkit_api::Node { + kind: notification.kind.clone(), + params: notification.params.clone(), + state: None, + }, + ); + } + // The node is now visible in pipeline.nodes — drop the + // in-flight reservation so a future addnode for this id + // (after a removenode) is gated only by the live + // pipeline check. + creating_nodes_for_node_added.lock().await.remove(¬ification.node_id); + let event = ApiEvent { + message_type: MessageType::Event, + correlation_id: None, + payload: EventPayload::NodeAdded { + session_id: session_id_for_node_added.clone(), + node_id: notification.node_id, + kind: notification.kind, + params: notification.params, + }, + }; + let _ = event_tx_for_node_added.send(BroadcastEvent::to_all(event)); + } + tracing::debug!( + session_id = %session_id_for_node_added, + "Node-added forwarding task ended" + ); + }); + // Subscribe to telemetry events from the engine let mut telemetry_rx = engine_handle .subscribe_telemetry() @@ -408,7 +537,8 @@ impl Session { id: session_id, name, engine_handle: Arc::new(engine_handle), - pipeline: Arc::new(Mutex::new(Pipeline::default())), + pipeline, + creating_nodes, created_at: SystemTime::now(), created_by, #[cfg(feature = "moq")] diff --git a/apps/skit/src/websocket_handlers.rs b/apps/skit/src/websocket_handlers.rs index c68a83777..e498aca6c 100644 --- a/apps/skit/src/websocket_handlers.rs +++ b/apps/skit/src/websocket_handlers.rs @@ -525,35 +525,26 @@ async fn handle_add_node( }); } - { - let mut pipeline = session.pipeline.lock().await; - if pipeline.nodes.contains_key(&node_id) { - return Some(ResponsePayload::Error { - message: format!("Node '{node_id}' already exists in the pipeline"), - }); - } - pipeline.nodes.insert( - node_id.clone(), - streamkit_api::Node { kind: kind.clone(), params: params.clone(), state: None }, - ); - } // Lock released here - - // Broadcast event to all clients - let event = ApiEvent { - message_type: MessageType::Event, - correlation_id: None, - payload: EventPayload::NodeAdded { - session_id: session.id.clone(), - node_id: node_id.clone(), - kind: kind.clone(), - params: params.clone(), - }, - }; - if let Err(e) = app_state.event_tx.send(BroadcastEvent::to_all(event)) { - error!("Failed to broadcast NodeAdded event: {}", e); + // Reject duplicates atomically against both the live pipeline and + // the in-flight set. pipeline.nodes only contains confirmed + // entries; creating_nodes covers the gap between WS dispatch and + // the engine's `NodeAdded`/`Failed` reply. Without the in-flight + // check, two clients (or one retrying client) could both pass the + // pipeline check, both reach the actor, and the second would be + // silently dropped by the actor's duplicate-id guard with no + // observable signal to the client. + if let Err(message) = session.reserve_node_id(&node_id).await { + return Some(ResponsePayload::Error { message }); } - // Now safe to do async operations without holding session_manager lock + // Forward to the engine. We deliberately do NOT insert into + // `pipeline.nodes` or broadcast `NodeAdded` here — both are + // emitted by the session-level node-added forwarder (see + // `session.rs`) once the engine confirms the plugin's constructor + // and `initialize_node` returned Ok. This makes the public + // `nodeadded` event mean what it says: a node that exists. The + // in-flight reservation taken above is drained by either the + // node-added forwarder (success) or the state forwarder (Failed). let control_msg = EngineControlMessage::AddNode { node_id, kind, params }; session.send_control_message(control_msg).await; Some(ResponsePayload::Success) @@ -597,6 +588,10 @@ async fn handle_remove_node( pipeline.nodes.shift_remove(&node_id); pipeline.connections.retain(|conn| conn.from_node != node_id && conn.to_node != node_id); } + // Release any in-flight reservation: removing a node mid-creation + // would otherwise leave the id wedged until the state forwarder + // observed a Failed transition for the cancelled creation. + session.release_node_id(&node_id).await; // Broadcast event to all clients let event = ApiEvent { diff --git a/apps/skit/tests/session_lifecycle_test.rs b/apps/skit/tests/session_lifecycle_test.rs index fbc5d9a34..54d316c4d 100644 --- a/apps/skit/tests/session_lifecycle_test.rs +++ b/apps/skit/tests/session_lifecycle_test.rs @@ -22,17 +22,14 @@ use tokio::net::TcpListener; use tokio::time::{timeout, Duration, Instant}; use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage}; +type WsRead = futures_util::stream::SplitStream< + tokio_tungstenite::WebSocketStream>, +>; + /// Helper to read messages from WebSocket, skipping events until we get a response with matching correlation_id -async fn read_response( - read: &mut futures_util::stream::SplitStream< - tokio_tungstenite::WebSocketStream< - tokio_tungstenite::MaybeTlsStream, - >, - >, - expected_correlation_id: &str, -) -> Response { +async fn read_response(read: &mut WsRead, expected_correlation_id: &str) -> Response { loop { - let message = timeout(Duration::from_secs(5), read.next()) + let message = timeout(Duration::from_secs(15), read.next()) .await .expect("Timeout waiting for response") .expect("No message received") @@ -60,6 +57,79 @@ async fn read_response( } } +/// Drain WebSocket messages until a `NodeAdded` event for `expected_node_id` +/// arrives. Required because the WS `addnode` handler now returns Success +/// as soon as the request is accepted; the public `nodeadded` event is +/// emitted later by the engine actor's success path (after the plugin's +/// constructor and `initialize_node` returned Ok). Tests that read +/// `pipeline.nodes` after `addnode` must wait for this event first, +/// otherwise they race against the engine's background creation task. +async fn wait_for_node_added(read: &mut WsRead, expected_node_id: &str) { + loop { + let message = timeout(Duration::from_secs(15), read.next()) + .await + .expect("Timeout waiting for NodeAdded event") + .expect("No message received") + .expect("Failed to read message"); + let text = message.to_text().expect("Expected text message"); + let value: serde_json::Value = serde_json::from_str(text).expect("Failed to parse message"); + let is_event = value.get("type").and_then(|v| v.as_str()) == Some("event"); + let payload_event = + value.get("payload").and_then(|p| p.get("event")).and_then(|e| e.as_str()); + let payload_node_id = + value.get("payload").and_then(|p| p.get("node_id")).and_then(|n| n.as_str()); + if is_event + && payload_event == Some("nodeadded") + && payload_node_id == Some(expected_node_id) + { + return; + } + } +} + +/// Drain WebSocket messages until a `NodeStateChanged` event with state +/// `Failed` arrives for `expected_node_id`. Asserts that no +/// `NodeAdded` for the same id slips through first — the engine's +/// failure path must not also emit a success. Returns `true` when the +/// Failed event is observed; panics on timeout. +async fn wait_for_node_state_failed(read: &mut WsRead, expected_node_id: &str) -> bool { + loop { + let message = timeout(Duration::from_secs(15), read.next()) + .await + .expect("Timeout waiting for NodeStateChanged(Failed) event") + .expect("No message received") + .expect("Failed to read message"); + let text = message.to_text().expect("Expected text message"); + let value: serde_json::Value = serde_json::from_str(text).expect("Failed to parse message"); + let is_event = value.get("type").and_then(|v| v.as_str()) == Some("event"); + if !is_event { + continue; + } + let payload = value.get("payload"); + let payload_event = payload.and_then(|p| p.get("event")).and_then(|e| e.as_str()); + let payload_node_id = payload.and_then(|p| p.get("node_id")).and_then(|n| n.as_str()); + if payload_node_id != Some(expected_node_id) { + continue; + } + assert!( + payload_event != Some("nodeadded"), + "saw nodeadded for '{}' before NodeStateChanged(Failed) — failed creation must not also emit success", + expected_node_id + ); + if payload_event == Some("nodestatechanged") { + // The state field can be either the string "Failed" or + // an object `{"Failed": {...}}` depending on whether the + // variant carries data. + let state = payload.and_then(|p| p.get("state")); + let is_failed = state.and_then(|s| s.as_str()) == Some("Failed") + || state.and_then(|s| s.as_object()).is_some_and(|m| m.contains_key("Failed")); + if is_failed { + return true; + } + } + } +} + async fn start_test_server() -> Option<(SocketAddr, tokio::task::JoinHandle<()>)> { // Find an available port by binding to port 0 let listener = match TcpListener::bind("127.0.0.1:0").await { @@ -305,8 +375,8 @@ async fn test_add_and_remove_nodes() { payload: RequestPayload::AddNode { session_id: session_id.clone(), node_id: "gain1".to_string(), - kind: "gain".to_string(), - params: Some(json!({"gain": 2.0})), + kind: "audio::gain".to_string(), + params: Some(json!({"gain_db": 2.0})), }, }; @@ -322,6 +392,12 @@ async fn test_add_and_remove_nodes() { _ => panic!("Unexpected response"), } + // Wait for the engine to confirm creation before querying the + // pipeline — `addnode` now returns Success when the request is + // *accepted*, while the cached pipeline snapshot is updated by the + // session's node-added forwarder when the engine's `Ok` arrives. + wait_for_node_added(&mut read, "gain1").await; + println!("✅ Added gain node"); // Get pipeline to verify node was added @@ -341,7 +417,7 @@ async fn test_add_and_remove_nodes() { ResponsePayload::Pipeline { pipeline } => { assert_eq!(pipeline.nodes.len(), 1); assert!(pipeline.nodes.contains_key("gain1")); - assert_eq!(pipeline.nodes.get("gain1").unwrap().kind, "gain"); + assert_eq!(pipeline.nodes.get("gain1").unwrap().kind, "audio::gain"); }, _ => panic!("Expected Pipeline response"), } @@ -394,6 +470,220 @@ async fn test_add_and_remove_nodes() { println!("✅ Pipeline is empty after node removal"); } +/// Locks in the public contract that `nodeadded` only fires on +/// successful creation: when `addnode` is sent for an unregistered +/// kind, the WebSocket request still returns Success (the *request* +/// was accepted), but no `NodeAdded` event ever lands and the +/// pipeline snapshot stays empty. Failure surfaces only via +/// `NodeStateChanged { state: Failed }`. +#[tokio::test] +async fn test_addnode_failure_leaves_pipeline_empty() { + let _ = tracing_subscriber::fmt::try_init(); + + let Some((addr, _server_handle)) = start_test_server().await else { + eprintln!("Skipping session lifecycle tests: local TCP bind not permitted"); + return; + }; + + let ws_url = format!("ws://{}/api/v1/control", addr); + let (ws_stream, _) = connect_async(&ws_url).await.unwrap(); + let (mut write, mut read) = ws_stream.split(); + + // Create session + let create_request = Request { + message_type: MessageType::Request, + correlation_id: Some("create".to_string()), + payload: RequestPayload::CreateSession { name: None }, + }; + write + .send(WsMessage::Text(serde_json::to_string(&create_request).unwrap().into())) + .await + .unwrap(); + let response = read_response(&mut read, "create").await; + let session_id = match response.payload { + ResponsePayload::SessionCreated { session_id, .. } => session_id, + _ => panic!("Expected SessionCreated"), + }; + + // Send addnode for a kind the registry has no entry for. The WS + // handler accepts the request (forwards to the engine actor); the + // actor's spawn_blocking creation task fails and the actor + // broadcasts NodeState::Failed — but never NodeAdded, and the + // session-level forwarder therefore never inserts into + // pipeline.nodes. + let add_node_request = Request { + message_type: MessageType::Request, + correlation_id: Some("add-bad".to_string()), + payload: RequestPayload::AddNode { + session_id: session_id.clone(), + node_id: "bogus".to_string(), + kind: "this::kind::does::not::exist".to_string(), + params: None, + }, + }; + write + .send(WsMessage::Text(serde_json::to_string(&add_node_request).unwrap().into())) + .await + .unwrap(); + let response = read_response(&mut read, "add-bad").await; + match response.payload { + ResponsePayload::Success => {}, + ResponsePayload::Error { message } => { + panic!("Expected Success on dispatch, got Error: {}", message) + }, + _ => panic!("Unexpected response"), + } + + // Wait for the engine to broadcast NodeStateChanged(Failed) for + // this id — that's the actor's "creation failed" signal. Asserting + // on it directly (rather than sleeping) ties the test to the + // public contract. We also assert that no NodeAdded event for + // `bogus` arrives in the meantime. + let saw_failed = wait_for_node_state_failed(&mut read, "bogus").await; + assert!( + saw_failed, + "expected NodeStateChanged(Failed) for 'bogus', no NodeAdded should have arrived first" + ); + + // Confirm pipeline.nodes is empty — the failed creation must NOT + // leave a phantom entry behind for clients to inspect. + let get_pipeline_request = Request { + message_type: MessageType::Request, + correlation_id: Some("get-pipeline".to_string()), + payload: RequestPayload::GetPipeline { session_id: session_id.clone() }, + }; + write + .send(WsMessage::Text(serde_json::to_string(&get_pipeline_request).unwrap().into())) + .await + .unwrap(); + let response = read_response(&mut read, "get-pipeline").await; + match response.payload { + ResponsePayload::Pipeline { pipeline } => { + assert_eq!( + pipeline.nodes.len(), + 0, + "pipeline.nodes must stay empty after a failed addnode; got {:?}", + pipeline.nodes.keys().collect::>() + ); + }, + _ => panic!("Expected Pipeline response"), + } + + println!("✅ Failed creation did not leak into pipeline.nodes"); +} + +/// A second `addnode` for the same id, sent while the first is still +/// in flight (no `nodeadded` yet), must be rejected at the WebSocket +/// handler with an Error response — not silently dropped by the +/// actor's duplicate-id guard. The first request must still succeed. +/// +/// Reproducing the race deterministically would require a slow plugin +/// constructor; instead we assert the structural fix: the in-flight +/// reservation set causes the second handler call to short-circuit. +/// We trigger this by sending two addnodes back-to-back without +/// waiting for `nodeadded` between them. +#[tokio::test] +async fn test_addnode_rejects_duplicate_in_flight() { + let _ = tracing_subscriber::fmt::try_init(); + + let Some((addr, _server_handle)) = start_test_server().await else { + eprintln!("Skipping session lifecycle tests: local TCP bind not permitted"); + return; + }; + + let ws_url = format!("ws://{}/api/v1/control", addr); + let (ws_stream, _) = connect_async(&ws_url).await.unwrap(); + let (mut write, mut read) = ws_stream.split(); + + let create_request = Request { + message_type: MessageType::Request, + correlation_id: Some("create".to_string()), + payload: RequestPayload::CreateSession { name: None }, + }; + write + .send(WsMessage::Text(serde_json::to_string(&create_request).unwrap().into())) + .await + .unwrap(); + let response = read_response(&mut read, "create").await; + let session_id = match response.payload { + ResponsePayload::SessionCreated { session_id, .. } => session_id, + _ => panic!("Expected SessionCreated"), + }; + + // Send two addnodes for the same id back-to-back, without reading + // any responses in between. The handler processes messages + // sequentially per connection: the first synchronously takes the + // in-flight reservation under the pipeline lock, and the second + // is rejected against that reservation deterministically — even + // though the engine's async creation hasn't started yet. + let first_add = Request { + message_type: MessageType::Request, + correlation_id: Some("add-1".to_string()), + payload: RequestPayload::AddNode { + session_id: session_id.clone(), + node_id: "gain1".to_string(), + kind: "audio::gain".to_string(), + params: Some(json!({"gain_db": 1.0})), + }, + }; + let second_add = Request { + message_type: MessageType::Request, + correlation_id: Some("add-2".to_string()), + payload: RequestPayload::AddNode { + session_id: session_id.clone(), + node_id: "gain1".to_string(), + kind: "audio::gain".to_string(), + params: Some(json!({"gain_db": 2.0})), + }, + }; + write.send(WsMessage::Text(serde_json::to_string(&first_add).unwrap().into())).await.unwrap(); + write.send(WsMessage::Text(serde_json::to_string(&second_add).unwrap().into())).await.unwrap(); + + let resp1 = read_response(&mut read, "add-1").await; + assert!(matches!(resp1.payload, ResponsePayload::Success), "first addnode must succeed"); + + let resp2 = read_response(&mut read, "add-2").await; + match resp2.payload { + ResponsePayload::Error { message } => { + assert!( + message.contains("already") || message.contains("being added"), + "expected duplicate-id error for second addnode, got: {}", + message + ); + }, + other => { + panic!("expected Error for duplicate addnode while first in flight, got: {:?}", other) + }, + } + + // After the in-flight reservation has been drained by the + // node-added forwarder, a third addnode for the same id is + // rejected via the live-pipeline check instead — still observably + // rejected, never silently dropped. + let third_add = Request { + message_type: MessageType::Request, + correlation_id: Some("add-3".to_string()), + payload: RequestPayload::AddNode { + session_id: session_id.clone(), + node_id: "gain1".to_string(), + kind: "audio::gain".to_string(), + params: Some(json!({"gain_db": 3.0})), + }, + }; + write.send(WsMessage::Text(serde_json::to_string(&third_add).unwrap().into())).await.unwrap(); + let resp3 = read_response(&mut read, "add-3").await; + match resp3.payload { + ResponsePayload::Error { message } => { + assert!( + message.contains("already"), + "expected duplicate-id error for already-live id, got: {}", + message + ); + }, + other => panic!("expected Error for already-live duplicate, got: {:?}", other), + } +} + #[tokio::test] async fn test_session_not_found() { let Some((addr, _server_handle)) = start_test_server().await else { @@ -695,8 +985,8 @@ async fn test_concurrent_operations_no_lock_contention() { payload: RequestPayload::AddNode { session_id: session_id.clone(), node_id: "gain".to_string(), - kind: "gain".to_string(), - params: Some(json!({"gain": 1.0})), + kind: "audio::gain".to_string(), + params: Some(json!({"gain_db": 1.0})), }, }; @@ -706,6 +996,7 @@ async fn test_concurrent_operations_no_lock_contention() { .unwrap(); let _ = read_response(&mut read, "setup-add-node").await; + wait_for_node_added(&mut read, "gain").await; println!("✅ Setup complete: session {} with gain node", session_id); diff --git a/crates/engine/src/dynamic_actor.rs b/crates/engine/src/dynamic_actor.rs index 94d6f88a5..cdf142928 100644 --- a/crates/engine/src/dynamic_actor.rs +++ b/crates/engine/src/dynamic_actor.rs @@ -11,7 +11,7 @@ use crate::{ constants::DEFAULT_SUBSCRIBER_CHANNEL_CAPACITY, dynamic_config::CONTROL_CAPACITY, - dynamic_messages::{PinConfigMsg, QueryMessage, RuntimeSchemaUpdate}, + dynamic_messages::{NodeAddedNotification, PinConfigMsg, QueryMessage, RuntimeSchemaUpdate}, dynamic_pin_distributor::PinDistributorActor, graph_builder, }; @@ -65,6 +65,11 @@ pub struct NodeCreatedEvent { node_id: String, kind: String, creation_id: u64, + /// Original params from the AddNode request, retained so the success + /// path can include them in the `NodeAddedNotification` it emits to + /// session-level forwarders. `create_node` only borrows them, so we + /// keep the owned value alongside the result. + params: Option, result: Result, StreamKitError>, } @@ -157,6 +162,19 @@ pub struct DynamicEngine { /// a bounded channel risks silently dropping a notification that leaves /// the UI permanently stale. pub(super) runtime_schema_subscribers: Vec>, + /// Subscribers that want to receive a notification when a node is + /// fully created and initialized (i.e. transitioned from `Creating` + /// to `Initializing`). This is what session-level forwarders turn + /// into the public `NodeAdded` event. Failures are visible via the + /// existing state subscribers (`NodeState::Failed`) and never appear + /// here, so a `NodeAddedNotification` always means success. + /// + /// Unbounded because node creations are one-per-node and very + /// low-frequency; a bounded channel risks silently dropping a + /// notification that leaves the UI permanently without a + /// `nodeadded` event for that node. Same model as + /// `runtime_schema_subscribers` above. + pub(super) node_added_subscribers: Vec>, // Metrics pub(super) nodes_active_gauge: opentelemetry::metrics::Gauge, pub(super) node_state_transitions_counter: opentelemetry::metrics::Counter, @@ -291,6 +309,11 @@ impl DynamicEngine { self.runtime_schema_subscribers.push(tx); let _ = response_tx.send(rx).await; }, + QueryMessage::SubscribeNodeAdded { response_tx } => { + let (tx, rx) = mpsc::unbounded_channel(); + self.node_added_subscribers.push(tx); + let _ = response_tx.send(rx).await; + }, } } @@ -1540,7 +1563,7 @@ impl DynamicEngine { /// On failure: transitions the node to `Failed`, drains pending connections /// referencing the failed node. async fn handle_node_created(&mut self, event: NodeCreatedEvent, channels: &NodeChannels) { - let NodeCreatedEvent { node_id, kind, creation_id, result } = event; + let NodeCreatedEvent { node_id, kind, creation_id, params, result } = event; // Check whether this creation result is still the active one. // A mismatch means either: @@ -1603,6 +1626,22 @@ impl DynamicEngine { // Replay any TuneNode messages that arrived while Creating. self.flush_pending_tunes(&node_id).await; + + // Notify subscribers that the node has been fully created. + // The session-level forwarder turns this into the public + // `NodeAdded` event — clients see `nodeadded` only after + // the plugin's constructor *and* `initialize_node` returned + // Ok, never speculatively while the FFI call is still in + // flight. Failures don't reach here; they're observable + // via NodeStateUpdate { state: Failed }. + let notification = NodeAddedNotification { node_id: node_id.clone(), kind, params }; + // Unbounded send; dropping a NodeAdded notification + // would leave the client without a `nodeadded` event + // for this node, which is worse than memory pressure + // for a dead receiver. Closed receivers prune via + // `is_ok()` returning false. + self.node_added_subscribers + .retain(|subscriber| subscriber.send(notification.clone()).is_ok()); }, Err(e) => { tracing::error!( @@ -1752,8 +1791,16 @@ impl DynamicEngine { self.engine_operations_counter.add(1, &[KeyValue::new("operation", "add_node")]); tracing::info!(name = %node_id, kind = %kind, "Adding node to graph (async)"); - // Reject duplicate node IDs — the node already exists in - // node_states (either Creating or fully initialized). + // Defence-in-depth duplicate guard. The WebSocket + // handler holds the session's `pipeline` + `creating_nodes` + // locks atomically and rejects duplicate ids before + // they ever reach this actor (see `handle_add_node` in + // `apps/skit/src/websocket_handlers.rs`). This branch + // therefore only fires under abnormal control-plane + // paths (e.g. internal callers that bypass the WS + // layer). In that case we drop the message and log; + // emitting a synthetic Failed here would clobber the + // legitimate node's state. if self.node_states.contains_key(&node_id) { tracing::error!( node_id = %node_id, @@ -1794,6 +1841,12 @@ impl DynamicEngine { let tx = self.node_created_tx.clone(); let spawn_node_id = node_id; let spawn_kind = kind.clone(); + // Clone params so the spawned closure can pass them by + // reference into create_node while the outer `params` + // remains owned and travels (via NodeCreatedEvent) to + // handle_node_created — which needs them to populate + // the NodeAddedNotification on success. + let spawn_params = params.clone(); tokio::spawn(async move { let result = tokio::task::spawn_blocking(move || { let guard = match registry.read() { @@ -1804,7 +1857,7 @@ impl DynamicEngine { ))); }, }; - guard.create_node(&spawn_kind, params.as_ref()) + guard.create_node(&spawn_kind, spawn_params.as_ref()) }) .await; @@ -1820,6 +1873,7 @@ impl DynamicEngine { node_id: spawn_node_id, kind, creation_id, + params, result, }) .await; diff --git a/crates/engine/src/dynamic_handle.rs b/crates/engine/src/dynamic_handle.rs index 1944bd7fc..9031690c1 100644 --- a/crates/engine/src/dynamic_handle.rs +++ b/crates/engine/src/dynamic_handle.rs @@ -4,7 +4,7 @@ //! Public client handle for controlling a running dynamic engine. -use crate::dynamic_messages::{QueryMessage, RuntimeSchemaUpdate}; +use crate::dynamic_messages::{NodeAddedNotification, QueryMessage, RuntimeSchemaUpdate}; use std::collections::HashMap; use std::sync::Arc; use streamkit_core::control::EngineControlMessage; @@ -192,6 +192,28 @@ impl DynamicEngineHandle { response_rx.recv().await.ok_or_else(|| "Failed to receive response from engine".to_string()) } + /// Subscribes to node-creation success notifications. + /// + /// Yields exactly one `NodeAddedNotification` per node whose + /// constructor *and* initialization succeeded. Failures do not + /// appear on this channel — observe them via `subscribe_state` and + /// match `NodeState::Failed`. + /// + /// # Errors + /// + /// Returns an error if the engine actor has shut down or fails to respond. + pub async fn subscribe_node_added( + &self, + ) -> Result, String> { + let (response_tx, mut response_rx) = mpsc::channel(1); + self.query_tx + .send(QueryMessage::SubscribeNodeAdded { response_tx }) + .await + .map_err(|_| "Engine actor has shut down".to_string())?; + + response_rx.recv().await.ok_or_else(|| "Failed to receive response from engine".to_string()) + } + /// Sends a shutdown signal to the engine and waits for it to complete. /// This ensures all nodes are properly stopped before returning. /// Can only be called once - subsequent calls will return an error. diff --git a/crates/engine/src/dynamic_messages.rs b/crates/engine/src/dynamic_messages.rs index 5e92f8895..28c0f73dc 100644 --- a/crates/engine/src/dynamic_messages.rs +++ b/crates/engine/src/dynamic_messages.rs @@ -53,6 +53,20 @@ pub struct RuntimeSchemaUpdate { pub schema: serde_json::Value, } +/// Notification emitted when a node has been *successfully* created and +/// initialized — i.e. the plugin's constructor returned `Ok` and +/// `initialize_node` completed without error. This is what the WebSocket +/// layer turns into the public `NodeAdded` event, so clients see +/// `nodeadded` only after the engine has confirmed the node is real. +/// +/// Failures are reported separately via `NodeStateUpdate { state: Failed }`. +#[derive(Clone, Debug)] +pub struct NodeAddedNotification { + pub node_id: String, + pub kind: String, + pub params: Option, +} + /// Query messages for retrieving information from the engine without modifying state. pub enum QueryMessage { GetNodeStates { @@ -82,6 +96,9 @@ pub enum QueryMessage { SubscribeRuntimeSchemas { response_tx: mpsc::Sender>, }, + SubscribeNodeAdded { + response_tx: mpsc::Sender>, + }, } // Re-export ConnectionMode from core for use by pin distributor diff --git a/crates/engine/src/lib.rs b/crates/engine/src/lib.rs index 4204cc787..41142908f 100644 --- a/crates/engine/src/lib.rs +++ b/crates/engine/src/lib.rs @@ -209,6 +209,7 @@ impl Engine { view_data_subscribers: Vec::new(), runtime_schemas: HashMap::new(), runtime_schema_subscribers: Vec::new(), + node_added_subscribers: Vec::new(), nodes_active_gauge: meter .u64_gauge("engine.nodes.active") .with_description("Number of active nodes in the pipeline") diff --git a/crates/engine/src/tests/connection_types.rs b/crates/engine/src/tests/connection_types.rs index 23a9ec024..cd64a85a8 100644 --- a/crates/engine/src/tests/connection_types.rs +++ b/crates/engine/src/tests/connection_types.rs @@ -61,6 +61,7 @@ fn create_test_engine() -> DynamicEngine { node_state_gauge: meter.u64_gauge("test.state").build(), runtime_schemas: HashMap::new(), runtime_schema_subscribers: Vec::new(), + node_added_subscribers: Vec::new(), engine_control_tx, node_created_tx, node_created_rx, diff --git a/crates/engine/src/tests/pipeline_activation.rs b/crates/engine/src/tests/pipeline_activation.rs index aef8665fe..e7fb9314d 100644 --- a/crates/engine/src/tests/pipeline_activation.rs +++ b/crates/engine/src/tests/pipeline_activation.rs @@ -62,6 +62,7 @@ fn create_test_engine() -> DynamicEngine { node_state_gauge: meter.u64_gauge("test.state").build(), runtime_schemas: HashMap::new(), runtime_schema_subscribers: Vec::new(), + node_added_subscribers: Vec::new(), engine_control_tx, node_created_tx: nc_tx, node_created_rx: nc_rx, diff --git a/crates/nodes/src/video/compositor/mod.rs b/crates/nodes/src/video/compositor/mod.rs index 984f7ca34..9be7dd6d9 100644 --- a/crates/nodes/src/video/compositor/mod.rs +++ b/crates/nodes/src/video/compositor/mod.rs @@ -867,15 +867,8 @@ impl ProcessorNode for CompositorNode { msg, &mut slots, &mut clear_conversion_cache, + &mut layer_configs_dirty, ); - // Clear causal-consistency metadata so the resulting - // view data is not stamped with a stale sender/rev - // from a previous UpdateParams. Without this, the - // client that last edited config would suppress the - // pin-triggered layout update via its echo gate. - self.config_sender.clear(); - self.config_rev = 0; - layer_configs_dirty = true; continue; } @@ -988,15 +981,13 @@ impl ProcessorNode for CompositorNode { scene = resolve_scene(&slots, &self.config, &image_overlays, &text_overlays); layer_configs_dirty = false; - // Emit layout via view data if it changed. + // Emit layout via view data if it changed. Stamp every + // tick (defaults to "" / 0) so clients can gate stale + // pre-commit echoes via `rev < localRev`. if last_layout.as_ref() != Some(&scene.layout) { if let Ok(mut json) = serde_json::to_value(&scene.layout) { - // Stamp view data with the sender/rev from the last - // UpdateParams so clients can detect stale self-echoes. - if !self.config_sender.is_empty() { - json["_sender"] = serde_json::Value::from(self.config_sender.as_str()); - json["_rev"] = serde_json::Value::from(self.config_rev); - } + json["_sender"] = serde_json::Value::from(self.config_sender.as_str()); + json["_rev"] = serde_json::Value::from(self.config_rev); view_data_helpers::emit_view_data(&view_data_tx, &node_name, || json); } last_layout = Some(scene.layout.clone()); @@ -1446,6 +1437,7 @@ impl CompositorNode { msg: PinManagementMessage, slots: &mut Vec, clear_conversion_cache: &mut bool, + layer_configs_dirty: &mut bool, ) { match msg { PinManagementMessage::RequestAddInputPin { suggested_name, response_tx } => { @@ -1468,6 +1460,9 @@ impl CompositorNode { last_source_dims: None, hint_tx, }); + // Surface the new layer in view-data without waiting + // for the source's first frame. + *layer_configs_dirty = true; Self::send_initial_hint_for_slot( &pin_name, @@ -1483,6 +1478,7 @@ impl CompositorNode { if let Some(idx) = slots.iter().position(|s| s.name == pin_name) { *clear_conversion_cache = true; slots.remove(idx); + *layer_configs_dirty = true; } node.input_pins.retain(|p| p.name != pin_name); }, diff --git a/plugins/native/aac-encoder/Cargo.lock b/plugins/native/aac-encoder/Cargo.lock index 183253c10..04636b9a2 100644 --- a/plugins/native/aac-encoder/Cargo.lock +++ b/plugins/native/aac-encoder/Cargo.lock @@ -4,7 +4,7 @@ version = 4 [[package]] name = "aac-encoder-plugin-native" -version = "0.1.0" +version = "0.2.0" dependencies = [ "bytes", "serde", diff --git a/plugins/native/kokoro/Cargo.lock b/plugins/native/kokoro/Cargo.lock index 5970c74bc..7c18ef3bd 100644 --- a/plugins/native/kokoro/Cargo.lock +++ b/plugins/native/kokoro/Cargo.lock @@ -67,7 +67,7 @@ checksum = "7ee5b5339afb4c41626dde77b7a611bd4f2c202b897852b4bcf5d03eddc61010" [[package]] name = "kokoro-plugin-native" -version = "0.2.0" +version = "0.3.0" dependencies = [ "cc", "once_cell", diff --git a/plugins/native/matcha/Cargo.lock b/plugins/native/matcha/Cargo.lock index 7136ccc1e..69af15681 100644 --- a/plugins/native/matcha/Cargo.lock +++ b/plugins/native/matcha/Cargo.lock @@ -67,7 +67,7 @@ checksum = "7ee5b5339afb4c41626dde77b7a611bd4f2c202b897852b4bcf5d03eddc61010" [[package]] name = "matcha-plugin-native" -version = "0.2.0" +version = "0.3.0" dependencies = [ "cc", "once_cell", diff --git a/plugins/native/nllb/Cargo.lock b/plugins/native/nllb/Cargo.lock index 476b7c834..652f77e83 100644 --- a/plugins/native/nllb/Cargo.lock +++ b/plugins/native/nllb/Cargo.lock @@ -578,7 +578,7 @@ dependencies = [ [[package]] name = "nllb-plugin-native" -version = "0.2.0" +version = "0.3.0" dependencies = [ "ct2rs", "ctor", diff --git a/plugins/native/piper/Cargo.lock b/plugins/native/piper/Cargo.lock index 9967d8ff0..6c7b8cae4 100644 --- a/plugins/native/piper/Cargo.lock +++ b/plugins/native/piper/Cargo.lock @@ -85,7 +85,7 @@ checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" [[package]] name = "piper-plugin-native" -version = "0.2.0" +version = "0.3.0" dependencies = [ "cc", "once_cell", diff --git a/plugins/native/pocket-tts/Cargo.lock b/plugins/native/pocket-tts/Cargo.lock index df8d566ba..c8e672cbe 100644 --- a/plugins/native/pocket-tts/Cargo.lock +++ b/plugins/native/pocket-tts/Cargo.lock @@ -1812,7 +1812,7 @@ dependencies = [ [[package]] name = "pocket-tts-plugin-native" -version = "0.2.0" +version = "0.3.0" dependencies = [ "base64 0.22.1", "candle-core", diff --git a/plugins/native/sensevoice/Cargo.lock b/plugins/native/sensevoice/Cargo.lock index bef8e356d..afb1dfbe4 100644 --- a/plugins/native/sensevoice/Cargo.lock +++ b/plugins/native/sensevoice/Cargo.lock @@ -642,7 +642,7 @@ dependencies = [ [[package]] name = "sensevoice-plugin-native" -version = "0.2.0" +version = "0.3.0" dependencies = [ "cc", "ndarray", diff --git a/plugins/native/servo/src/servo_node.rs b/plugins/native/servo/src/servo_node.rs index dff7168bd..0c488f362 100644 --- a/plugins/native/servo/src/servo_node.rs +++ b/plugins/native/servo/src/servo_node.rs @@ -99,7 +99,7 @@ impl NativeSourceNode for ServoSourcePlugin { "load_timeout_secs": { "type": "integer", "default": 30, - "description": "Maximum seconds to wait for page load", + "description": "(Currently unused: page load is non-blocking; tick() returns transparent frames until the first paint.) Reserved for a future Degraded-state timeout signal.", "minimum": 1 } }, diff --git a/plugins/native/servo/src/servo_thread.rs b/plugins/native/servo/src/servo_thread.rs index 32867dc98..f893d7a22 100644 --- a/plugins/native/servo/src/servo_thread.rs +++ b/plugins/native/servo/src/servo_thread.rs @@ -128,26 +128,29 @@ pub fn send_work(item: ServoWorkItem) -> Result<(), String> { /// The critical contract is calling `webview.paint()` inside /// `notify_new_frame_ready` -- without it the software rendering context's /// framebuffer never receives pixels. +/// +/// `loaded` is read by `handle_render`'s post-load gate to fire one-shot +/// post-load actions (custom CSS injection) on the first tick after the +/// page reaches `LoadStatus::Complete`. #[derive(Default)] struct FrameDelegate { loaded: Cell, - load_failed: Cell, - frames: Cell, } impl WebViewDelegate for FrameDelegate { fn notify_load_status_changed(&self, _webview: WebView, status: LoadStatus) { if status == LoadStatus::Complete { self.loaded.set(true); - self.load_failed.set(false); } // Servo 0.1.0 does not expose a Failed variant. Load failures - // are detected via timeout (page never reaches Complete). + // would manifest as the page never reaching Complete; there is + // no synchronous wait at register time, so failures simply + // leave the page in its loading state and `handle_render` + // returns whatever has been painted. } fn notify_new_frame_ready(&self, webview: WebView) { webview.paint(); - self.frames.set(self.frames.get() + 1); } } @@ -181,6 +184,12 @@ struct InstanceState { /// returned directly. Set after [`POISON_THRESHOLD`] consecutive /// render panics; cleared on URL change (`UpdateConfig`). poisoned: bool, + /// Tracks whether one-shot post-load work (custom CSS injection) has + /// been performed for the current URL. Set to `true` after we've + /// observed `LoadStatus::Complete` and run the post-load actions. + /// Reset on `UpdateConfig` when the URL changes so the new page + /// gets the same treatment. + post_load_done: bool, } /// Entry point for the shared Servo thread. @@ -372,9 +381,23 @@ fn send_fallback_frame(instances: &HashMap, node_id: &Nod let _ = state.result_tx.send(ServoThreadResult::Frame { rgba_data: fallback }); } -/// Handle a `Register` work item: create a WebView on the shared Servo -/// instance (creating the Servo if this is the first registration), -/// navigate to URL, and wait for the initial load. +/// Handle a `Register` work item: create the WebView and the per-instance +/// rendering context, then return `InitOk` *immediately*. +/// +/// Page loading is deferred — the first few `handle_render` ticks will +/// return transparent / partially-painted frames while Servo's event +/// loop progresses the load asynchronously. This keeps node-init +/// latency bounded by GPU surface allocation (sub-second) instead of +/// blocking on the page's full first paint (5+ seconds for typical +/// websites). The `load_timeout_secs` config field is currently a +/// no-op; it remains in the schema for forward compatibility (a future +/// change may use it to cap wait-for-load progression for diagnostics +/// or to move the node into Degraded if the page never loads). +/// +/// One-shot post-load work (custom CSS injection) is gated on +/// `post_load_done` and runs in `handle_render` once +/// `delegate.loaded` flips, so the contract that "custom_css is +/// applied after load" is preserved. fn handle_register( instances: &mut HashMap, servo: &mut Option, @@ -394,53 +417,15 @@ fn handle_register( s }); - let load_timeout = Duration::from_secs(u64::from(config.load_timeout_secs)); - match create_webview(servo_ref, &config) { Ok((webview, rendering_context, delegate)) => { - // Wait for the initial page load so the first Render has content. - let load_start = Instant::now(); - wait_for_load(servo_ref, &delegate, &config.url, &node_id, load_timeout); - let load_duration = load_start.elapsed(); - - if delegate.load_failed.get() { - tracing::warn!( - node_id = %node_id, - url = %config.url, - load_ms = load_duration.as_millis(), - "Page load reported failure — proceeding with partial content", - ); - } else if !delegate.loaded.get() { - tracing::warn!( - node_id = %node_id, - url = %config.url, - load_ms = load_duration.as_millis(), - "Page load timed out — proceeding with partial content", - ); - } else { - tracing::info!( - node_id = %node_id, - url = %config.url, - load_ms = load_duration.as_millis(), - "Page loaded successfully", - ); - } - - // Force at least one post-load frame via a rAF nudge. - nudge_frame(servo_ref, &webview, &delegate); - - // Inject custom CSS if provided. - if let Some(ref css) = config.custom_css { - inject_custom_css(&webview, servo_ref, css); - } - tracing::info!( node_id = %node_id, url = %config.url, output = %format_args!("{}x{}", config.width, config.height), viewport = %format_args!("{}x{}", config.effective_viewport_width(), config.effective_viewport_height()), scaling = config.needs_scaling(), - "Created Servo WebView on shared instance", + "Created Servo WebView (page load deferred)", ); let rc_width = config.effective_viewport_width(); @@ -462,6 +447,7 @@ fn handle_register( render_duration_sum: Duration::ZERO, consecutive_panic_count: 0, poisoned: false, + post_load_done: false, }, ); }, @@ -487,9 +473,26 @@ fn handle_render( let render_start = Instant::now(); - // Pump the event loop to let Servo process pending work. + // Pump the event loop to let Servo process pending work — this is + // also what advances the deferred page load registered in + // `handle_register`. servo.spin_event_loop(); + // Run one-shot post-load actions (custom CSS) the first tick that + // observes a successful load. Gated to fire exactly once per URL + // (`UpdateConfig` resets `post_load_done` on URL change). + if !state.post_load_done && state.delegate.loaded.get() { + if let Some(ref css) = state.config.custom_css { + inject_custom_css(&state.webview, servo, css); + } + state.post_load_done = true; + tracing::info!( + node_id = %node_id, + url = %state.config.url, + "Page reached LoadStatus::Complete — post-load actions applied", + ); + } + // Always read the full rendering context (rc_width × rc_height) — // this is the native size Servo is currently rendering at. These // stay constant under `Resize` hints (output-only) but are updated @@ -575,37 +578,20 @@ fn handle_update_config( if let Ok(parsed) = url::Url::parse(&new_config.url) { state.webview.load(parsed); state.delegate.loaded.set(false); - state.delegate.load_failed.set(false); - - let load_timeout = Duration::from_secs(u64::from(new_config.load_timeout_secs)); - let load_start = Instant::now(); - wait_for_load(servo, &state.delegate, &new_config.url, node_id, load_timeout); - - if state.delegate.load_failed.get() { - tracing::warn!( - node_id = %node_id, - url = %new_config.url, - load_ms = load_start.elapsed().as_millis(), - "URL navigation load failed", - ); - } else if !state.delegate.loaded.get() { - tracing::warn!( - node_id = %node_id, - url = %new_config.url, - load_ms = load_start.elapsed().as_millis(), - "URL navigation load timed out", - ); - } + // Reset the one-shot post-load gate so the new page gets + // its custom-CSS injection (if any) once it finishes + // loading. Render ticks will run `handle_render`'s + // post-load block when `delegate.loaded` flips. + state.post_load_done = false; } } - if css_changed || url_changed { - let css = if css_changed { - new_config.custom_css.as_deref() - } else { - state.config.custom_css.as_deref() - }; - if let Some(css) = css { + // CSS-only changes (no URL change) apply immediately if the current + // page is already loaded. Otherwise they fold into the + // render-driven post-load gate above, which will pick up the new + // value because we're about to merge `new_config` into `state.config`. + if css_changed && !url_changed && state.delegate.loaded.get() { + if let Some(ref css) = new_config.custom_css { inject_custom_css(&state.webview, servo, css); } } @@ -697,65 +683,6 @@ fn create_webview( Ok((webview, rendering_context, delegate)) } -/// Wait for the page to reach `LoadStatus::Complete`, with a configurable -/// timeout. Returns when the delegate's `loaded` flag is set, or on timeout. -fn wait_for_load( - servo: &Servo, - delegate: &FrameDelegate, - url: &str, - node_id: &NodeId, - timeout: Duration, -) { - let deadline = Instant::now() + timeout; - while !delegate.loaded.get() { - if Instant::now() > deadline { - tracing::warn!( - node_id = %node_id, - url = %url, - timeout_secs = timeout.as_secs(), - "Timed out waiting for page load, proceeding anyway", - ); - break; - } - servo.spin_event_loop(); - std::thread::sleep(Duration::from_millis(1)); - } -} - -/// Force a post-load frame by triggering a `requestAnimationFrame` nudge -/// and waiting for a new frame to be painted. -fn nudge_frame(servo: &Servo, webview: &WebView, delegate: &FrameDelegate) { - let js_done = Rc::new(Cell::new(false)); - { - let js_done_inner = js_done.clone(); - webview.evaluate_javascript( - "new Promise(r => requestAnimationFrame(() => { \ - document.documentElement.getBoundingClientRect(); r(); \ - }))", - move |_r| js_done_inner.set(true), - ); - } - let js_deadline = Instant::now() + Duration::from_secs(5); - while !js_done.get() { - if Instant::now() > js_deadline { - break; - } - servo.spin_event_loop(); - std::thread::sleep(Duration::from_millis(1)); - } - - // Wait for at least one post-load frame_ready. - let frames_at_load = delegate.frames.get(); - let frame_deadline = Instant::now() + Duration::from_secs(5); - while delegate.frames.get() <= frames_at_load { - if Instant::now() > frame_deadline { - break; - } - servo.spin_event_loop(); - std::thread::sleep(Duration::from_millis(1)); - } -} - /// Inject custom CSS into a loaded page via JavaScript. fn inject_custom_css(webview: &WebView, servo: &Servo, css: &str) { let escaped = diff --git a/plugins/native/slint/Cargo.lock b/plugins/native/slint/Cargo.lock index 112b52e7a..2dda4d3c7 100644 --- a/plugins/native/slint/Cargo.lock +++ b/plugins/native/slint/Cargo.lock @@ -3830,7 +3830,7 @@ dependencies = [ [[package]] name = "slint-plugin-native" -version = "0.3.0" +version = "0.4.0" dependencies = [ "pollster", "serde", diff --git a/plugins/native/supertonic/Cargo.lock b/plugins/native/supertonic/Cargo.lock index b5124849a..f9846e309 100644 --- a/plugins/native/supertonic/Cargo.lock +++ b/plugins/native/supertonic/Cargo.lock @@ -963,7 +963,7 @@ checksum = "fe895eb47f22e2ddd4dabc02bce419d2e643c8e3b585c78158b349195bc24d82" [[package]] name = "supertonic-plugin-native" -version = "0.2.0" +version = "0.3.0" dependencies = [ "serde", "serde_json", diff --git a/plugins/native/vad/Cargo.lock b/plugins/native/vad/Cargo.lock index a401efdb9..2690f2364 100644 --- a/plugins/native/vad/Cargo.lock +++ b/plugins/native/vad/Cargo.lock @@ -362,7 +362,7 @@ checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" [[package]] name = "vad-plugin-native" -version = "0.3.0" +version = "0.4.0" dependencies = [ "serde", "serde_json", diff --git a/plugins/native/whisper/Cargo.lock b/plugins/native/whisper/Cargo.lock index 78f62eab6..37173db92 100644 --- a/plugins/native/whisper/Cargo.lock +++ b/plugins/native/whisper/Cargo.lock @@ -1158,7 +1158,7 @@ dependencies = [ [[package]] name = "whisper-plugin-native" -version = "0.2.0" +version = "0.3.0" dependencies = [ "ndarray", "once_cell", diff --git a/sdks/plugin-sdk/native/src/lib.rs b/sdks/plugin-sdk/native/src/lib.rs index 2c48d1341..b0caac491 100644 --- a/sdks/plugin-sdk/native/src/lib.rs +++ b/sdks/plugin-sdk/native/src/lib.rs @@ -930,9 +930,18 @@ macro_rules! native_plugin_entry { let kind = <$plugin_type as $crate::NativeProcessorNode>::metadata().kind; let logger = $crate::logger::Logger::new(log_callback, log_user_data, &kind); + // Clone the logger so we can still report on `Err` after + // ownership has been moved into `new()`. Without this, + // plugin-side validation errors (e.g. "url must not be + // empty") were silently swallowed and the host only saw + // a generic "Plugin failed to create instance" message. + let err_logger = logger.clone(); match <$plugin_type as $crate::NativeProcessorNode>::new(params_json, logger) { Ok(instance) => Box::into_raw(Box::new(instance)) as $crate::types::CPluginHandle, - Err(_) => std::ptr::null_mut(), + Err(e) => { + err_logger.error(&format!("Plugin instance creation failed: {e}")); + std::ptr::null_mut() + } } }) } @@ -1148,11 +1157,18 @@ macro_rules! native_source_plugin_entry { let kind = <$plugin_type as $crate::NativeSourceNode>::metadata().kind; let logger = $crate::logger::Logger::new(log_callback, log_user_data, &kind); + // Clone the logger so we can still report on `Err` after + // ownership has been moved into `new()`. See the + // processor variant above for context. + let err_logger = logger.clone(); match <$plugin_type as $crate::NativeSourceNode>::new(params_json, logger) { Ok(instance) => { Box::into_raw(Box::new(instance)) as $crate::types::CPluginHandle }, - Err(_) => std::ptr::null_mut(), + Err(e) => { + err_logger.error(&format!("Plugin instance creation failed: {e}")); + std::ptr::null_mut() + }, } }) } diff --git a/ui/src/components/node/NodeFrame.tsx b/ui/src/components/node/NodeFrame.tsx index d2833534a..2072b276f 100644 --- a/ui/src/components/node/NodeFrame.tsx +++ b/ui/src/components/node/NodeFrame.tsx @@ -18,9 +18,15 @@ import type { import { PinRow } from './PinRow'; import { PlaceholderPinRow } from './PlaceholderPinRow'; -const NodeWrapper = styled.div<{ selected?: boolean; minWidth: number }>` +const NodeWrapper = styled.div<{ selected?: boolean; minWidth: number; isDraft?: boolean }>` background: var(--sk-panel-bg); - border: 2px solid ${(props) => (props.selected ? 'var(--sk-primary)' : 'var(--sk-border-strong)')}; + border: 2px ${(props) => (props.isDraft ? 'dashed' : 'solid')} + ${(props) => + props.selected + ? 'var(--sk-primary)' + : props.isDraft + ? 'var(--sk-warning, var(--sk-text-muted))' + : 'var(--sk-border-strong)'}; border-radius: 8px; padding: 8px; min-width: ${(props) => props.minWidth}px; @@ -32,22 +38,107 @@ const NodeWrapper = styled.div<{ selected?: boolean; minWidth: number }>` outline: ${(props) => (props.selected ? '2px solid var(--sk-primary)' : 'none')}; outline-offset: 2px; color: var(--sk-text); + opacity: ${(props) => (props.isDraft ? 0.85 : 1)}; `; -const BidirectionalWrapper = styled.div<{ selected?: boolean; minWidth: number }>` +const BidirectionalWrapper = styled.div<{ + selected?: boolean; + minWidth: number; + isDraft?: boolean; +}>` display: flex; flex-direction: column; gap: 4px; padding: 8px; min-width: ${(props) => props.minWidth}px; background: var(--sk-panel-bg); - border: 2px solid ${(props) => (props.selected ? 'var(--sk-primary)' : 'var(--sk-border-strong)')}; + border: 2px ${(props) => (props.isDraft ? 'dashed' : 'solid')} + ${(props) => + props.selected + ? 'var(--sk-primary)' + : props.isDraft + ? 'var(--sk-warning, var(--sk-text-muted))' + : 'var(--sk-border-strong)'}; border-radius: 8px; box-shadow: ${(props) => props.selected ? 'var(--sk-focus-ring)' : `0 2px 8px var(--sk-shadow)`}; outline: ${(props) => (props.selected ? '2px solid var(--sk-primary)' : 'none')}; outline-offset: 2px; color: var(--sk-text); + opacity: ${(props) => (props.isDraft ? 0.85 : 1)}; +`; + +const DraftBanner = styled.div` + display: flex; + align-items: center; + gap: 6px; + padding: 4px 6px; + background: var(--sk-warning-bg, rgba(255, 170, 0, 0.12)); + color: var(--sk-warning, var(--sk-text)); + border: 1px dashed var(--sk-warning, var(--sk-text-muted)); + border-radius: 4px; + font-size: 11px; + line-height: 1.3; +`; + +// Primary call-to-action that promotes a draft into a real node. The +// only way to commit a draft — typing never does this implicitly. +const DraftPromoteButton = styled.button` + margin-left: auto; + flex-shrink: 0; + padding: 2px 8px; + font-size: 11px; + font-weight: 600; + border-radius: 4px; + border: 1px solid var(--sk-primary, var(--sk-text)); + background: var(--sk-primary, var(--sk-text)); + color: var(--sk-panel-bg); + cursor: pointer; + transition: filter 0.1s ease; + &:hover:not(:disabled) { + filter: brightness(1.1); + } + &:disabled { + cursor: not-allowed; + opacity: 0.5; + } +`; + +const DraftBadge = styled.span` + font-weight: 700; + text-transform: uppercase; + letter-spacing: 0.5px; + font-size: 10px; + padding: 1px 5px; + border-radius: 3px; + background: var(--sk-warning, var(--sk-text-muted)); + color: var(--sk-panel-bg); + flex-shrink: 0; +`; + +const DraftMessage = styled.span` + font-family: var(--sk-font-mono, ui-monospace, monospace); + word-break: break-word; +`; + +// Small CSS-only spinner shown while a fully-configured draft is +// promoted (`addnode` dispatched) and we're waiting for the engine's +// `nodeadded` echo or a `NodeStateChanged(Failed)` reply. Replaces +// the previous silent "configuring…" text — the user now has a clear +// "something is happening on the server" signal. +const DraftSpinner = styled.span` + width: 10px; + height: 10px; + flex-shrink: 0; + border-radius: 50%; + border: 1.5px solid var(--sk-warning, var(--sk-text-muted)); + border-top-color: transparent; + animation: sk-draft-spin 0.8s linear infinite; + @keyframes sk-draft-spin { + to { + transform: rotate(360deg); + } + } `; const BidirectionalNodesRow = styled.div` @@ -118,6 +209,23 @@ const Kind = styled.div` margin-top: 0; `; +/** Draft state for a node that has been dropped on the canvas but not + * yet committed via `addnode`. Rendered with a dashed border, a + * banner listing any outstanding required fields, and an explicit + * "Add to pipeline" button — the *only* way to promote a draft to a + * real node. Typing into fields never auto-promotes. */ +export type DraftNodeState = { + missingRequired: string[]; + /** True after the user has clicked "Add to pipeline" and we are + * waiting for the engine's `nodeadded` (success) or + * `NodeStateChanged(Failed)` (failure) reply. Banner shows a + * spinner; promote button is disabled. */ + isCreating: boolean; + /** Click handler for the "Add to pipeline" button. Disabled by the + * component when missingRequired is non-empty or isCreating is true. */ + onPromote: () => void; +}; + type NodeFrameProps = { id: string; label: string; @@ -132,6 +240,9 @@ type NodeFrameProps = { children?: React.ReactNode; isBidirectional?: boolean; sessionId?: string; // For fetching live stats + /** When set, render the node as an unsubmitted draft (dashed border + + * "Draft" banner listing the missing required params). */ + draft?: DraftNodeState; }; // Helper: Check if node definition has dynamic pins @@ -185,6 +296,7 @@ const BidirectionalNodeLayout: React.FC<{ state?: NodeState; stats?: NodeStats; sessionId?: string; + draft?: DraftNodeState; children?: React.ReactNode; }> = ({ id, @@ -197,12 +309,18 @@ const BidirectionalNodeLayout: React.FC<{ state, stats, sessionId, + draft, children, }) => ( - + {/* Centered header with node name and type */}
- {state && ( + {state && !draft && ( @@ -210,6 +328,7 @@ const BidirectionalNodeLayout: React.FC<{ ({kind})
+ {draft && } {/* Two halves side by side */} @@ -239,9 +358,10 @@ const NodeHeader: React.FC<{ state?: NodeState; stats?: NodeStats; sessionId?: string; -}> = ({ id, label, kind, state, stats, sessionId }) => ( + draft?: DraftNodeState; +}> = ({ id, label, kind, state, stats, sessionId, draft }) => (
- {state && ( + {state && !draft && ( @@ -251,6 +371,53 @@ const NodeHeader: React.FC<{
); +// Sub-component: Draft banner shown above node controls. Three visual +// states, all driven by the explicit-promotion model: +// - missingRequired > 0 -> 'needs ', promote button disabled. +// - missingRequired = 0 && !isCreating -> 'ready', promote enabled. +// - isCreating -> spinner + 'creating on server', button disabled. +// The button is the *only* way to promote a draft to a real node. +const DraftBannerSection: React.FC<{ draft: DraftNodeState }> = ({ draft }) => { + const { missingRequired, isCreating, onPromote } = draft; + const message = isCreating + ? 'creating on server\u2026' + : missingRequired.length > 0 + ? `needs ${missingRequired.join(', ')}` + : 'ready'; + const promoteDisabled = isCreating || missingRequired.length > 0; + // Stop the click from reaching React Flow's drag handler — the + // banner sits inside `.drag-handle` so without this the click + // would be interpreted as the start of a drag and the button + // never fires. + const handleClick = (event: React.MouseEvent) => { + event.stopPropagation(); + if (promoteDisabled) return; + onPromote(); + }; + return ( + + Draft + {isCreating && + ); +}; + // Sub-component: Normal node layout with dynamic pin support const NormalNodeLayout: React.FC<{ id: string; @@ -264,6 +431,7 @@ const NormalNodeLayout: React.FC<{ state?: NodeState; stats?: NodeStats; sessionId?: string; + draft?: DraftNodeState; children?: React.ReactNode; }> = ({ id, @@ -277,6 +445,7 @@ const NormalNodeLayout: React.FC<{ state, stats, sessionId, + draft, children, }) => { // Show ghost pins for nodes that have any dynamic cardinality pins in their definition @@ -300,7 +469,12 @@ const NormalNodeLayout: React.FC<{ const totalOutputPins = runtimeOutputs.length + (showOutputGhost ? 1 : 0); return ( - + {/* Show real pins AND ghost pin for inputs */} {runtimeInputs.length > 0 && ( @@ -322,8 +496,11 @@ const NormalNodeLayout: React.FC<{ state={state} stats={stats} sessionId={sessionId} + draft={draft} /> + {draft && } + {children} {/* Show real pins AND ghost pin for outputs */} @@ -363,6 +540,7 @@ export const NodeFrame: React.FC = ({ children, isBidirectional = false, sessionId, + draft, }) => { if (isBidirectional) { return ( @@ -377,6 +555,7 @@ export const NodeFrame: React.FC = ({ state={state} stats={stats} sessionId={sessionId} + draft={draft} > {children} @@ -396,6 +575,7 @@ export const NodeFrame: React.FC = ({ state={state} stats={stats} sessionId={sessionId} + draft={draft} > {children} diff --git a/ui/src/hooks/compositorDragResize.test.ts b/ui/src/hooks/compositorDragResize.test.ts index 4a0954c3d..a5132f905 100644 --- a/ui/src/hooks/compositorDragResize.test.ts +++ b/ui/src/hooks/compositorDragResize.test.ts @@ -18,6 +18,7 @@ import { describe, it, expect, vi, beforeEach } from 'vitest'; import type { DragResizeDeps } from './compositorDragResize'; import { useCompositorDragResize } from './compositorDragResize'; import type { LayerState } from './compositorLayerParsers'; +import { promoteEditedServerOnly } from './useCompositorLayers'; /** Build a minimal layer for testing. */ function makeLayer(id: string): LayerState { @@ -247,3 +248,71 @@ describe('useCompositorDragResize zero-delta guard', () => { expect(deps.throttledConfigChange).toHaveBeenCalled(); }); }); + +// ── First-drag-of-server-stub regression ──────────────────────────────────── +// +// Auto-PiP layers materialised by `mapServerLayers` carry `serverOnly: +// true`. `serializeLayers` skips serverOnly layers so the server can keep +// aspect-fitting them. When the user drags such a layer, the dragged +// entry must be promoted (serverOnly cleared) BEFORE serialization so +// the user's edit reaches the server. Pre-fix, `pointerup` reconstructed +// the array from a closure-captured `updated` that still had +// `serverOnly: true`, the server never received the edit, and the next +// view-data tick snapped the layer back to its auto-fitted position. +describe('useCompositorDragResize first-drag of serverOnly layer', () => { + it('fires throttledConfigChange with serverOnly cleared on the dragged layer', () => { + const stubLayer: LayerState = { ...makeLayer('stub-1'), serverOnly: true }; + const otherStub: LayerState = { ...makeLayer('stub-2'), x: 500, serverOnly: true }; + + // Simulate the production setLayers + store.sub + ref-update chain: + // setLayers commits via promoteEditedServerOnly, then layersRef.current + // catches up synchronously via the store subscription. + const layersRef: { current: LayerState[] } = { current: [stubLayer, otherStub] }; + const setLayers = vi.fn((action: React.SetStateAction) => { + const next = + typeof action === 'function' + ? (action as (prev: LayerState[]) => LayerState[])(layersRef.current) + : action; + layersRef.current = promoteEditedServerOnly(layersRef.current, next); + }); + + const deps = makeDeps({ + layersRef, + setLayers, + findAnyLayer: (id: string) => { + const found = layersRef.current.find((l) => l.id === id); + return found ? { state: found, kind: 'video' as const } : null; + }, + }); + const { result } = renderHook(() => useCompositorDragResize(deps)); + + act(() => { + result.current.handleLayerPointerDown('stub-1', { + button: 0, + clientX: 200, + clientY: 150, + stopPropagation: vi.fn(), + preventDefault: vi.fn(), + } as unknown as React.PointerEvent); + }); + + act(() => { + document.dispatchEvent(new PointerEvent('pointerup', { clientX: 260, clientY: 200 })); + }); + + expect(deps.throttledConfigChange).toHaveBeenCalledTimes(1); + const sentLayers = (deps.throttledConfigChange as ReturnType).mock + .calls[0][0] as LayerState[]; + + // The dragged stub must be promoted to explicit config — otherwise + // serializeLayers would skip it and the server would never receive + // the user's edit. + const draggedSent = sentLayers.find((l) => l.id === 'stub-1'); + expect(draggedSent).toBeDefined(); + expect(draggedSent?.serverOnly).toBeUndefined(); + // Untouched stubs in the same commit retain serverOnly so the + // server keeps aspect-fitting sources the user didn't drag. + const untouchedSent = sentLayers.find((l) => l.id === 'stub-2'); + expect(untouchedSent?.serverOnly).toBe(true); + }); +}); diff --git a/ui/src/hooks/compositorDragResize.ts b/ui/src/hooks/compositorDragResize.ts index d488992aa..de6caf35f 100644 --- a/ui/src/hooks/compositorDragResize.ts +++ b/ui/src/hooks/compositorDragResize.ts @@ -249,8 +249,10 @@ export function useCompositorDragResize(deps: DragResizeDeps) { if (state.layerKind === 'video') { if (!isZeroDelta) { setLayers((prev) => prev.map((l) => (l.id === updated.id ? updated : l))); - const newLayers = layersRef.current.map((l) => (l.id === updated.id ? updated : l)); - throttledConfigChange?.(newLayers); + // Read the post-commit store, not the closure-captured + // `updated`: setLayers strips `serverOnly` from the dragged + // layer, and serializeLayers skips entries that still carry it. + throttledConfigChange?.(layersRef.current); } } else if (state.layerKind === 'text') { if (!isZeroDelta) { diff --git a/ui/src/hooks/compositorServerSync.test.ts b/ui/src/hooks/compositorServerSync.test.ts index 2af58fbcc..b27270aaa 100644 --- a/ui/src/hooks/compositorServerSync.test.ts +++ b/ui/src/hooks/compositorServerSync.test.ts @@ -12,12 +12,22 @@ * - The activeInteractionRef guard suppresses view data during interactions */ -import { describe, it, expect } from 'vitest'; +import { describe, it, expect, vi } from 'vitest'; import type { ResolvedLayer } from '@/types/generated/compositor-types'; import type { LayerState } from './compositorLayerParsers'; -import { mapServerLayers } from './compositorServerSync'; +import { isStaleViewData, mapServerLayers } from './compositorServerSync'; +import { bumpConfigRev, resetAllConfigRevs } from './useConfigRev'; + +// Stub the WS service so `getClientNonce()` returns a deterministic +// value without requiring a live WS connection. Hoisted by Vitest so +// it applies before module-level imports resolve in the SUT. +vi.mock('@/services/websocket', () => ({ + getWebSocketService: () => ({ + getClientNonce: () => 'client-A-nonce', + }), +})); function makeLayer(id: string, x: number, width: number): LayerState { return { @@ -83,6 +93,78 @@ describe('mapServerLayers — pure geometry merge', () => { expect(result).toBe(prev); // referential equality }); + it('first-drag gate: empty-sender (uninitialised server stamp) is treated as "ours" for rev gating', () => { + // Server stamps view-data unconditionally with `_sender`/`_rev` — + // before any client has committed, the stamp defaults to `""`/`0`. + // After our first stamped commit (localRev = 1) the server is + // briefly still rendering with the pre-commit config and emits view + // data with rev 0. Without gating it, `mapServerLayers` would + // overwrite the user's just-committed geometry — visible as a + // first-drag snap-back on auto-stub layers. + resetAllConfigRevs(); + bumpConfigRev('compositor-1'); // localRev becomes 1 + + const preCommit: Record = { + _sender: '', + _rev: 0, + layers: [], + }; + expect(isStaleViewData(preCommit, 'compositor-1')).toBe(true); + + // Once the server applies our commit it stamps with our nonce. + // rev === local: not stale. + const fresh: Record = { + _sender: 'client-A-nonce', + _rev: 1, + layers: [], + }; + expect(isStaleViewData(fresh, 'compositor-1')).toBe(false); + + // Echo of an older commit from us: stale. + const echo: Record = { + _sender: 'client-A-nonce', + _rev: 0, + layers: [], + }; + expect(isStaleViewData(echo, 'compositor-1')).toBe(true); + + // Another client's stamp passes through (we accept their edits). + const otherClient: Record = { + _sender: 'client-B-nonce', + _rev: 5, + layers: [], + }; + expect(isStaleViewData(otherClient, 'compositor-1')).toBe(false); + + resetAllConfigRevs(); + }); + + it('first-drag gate: empty-sender at rev 0 passes through when localRev is 0', () => { + // Before any local commit, the server's default `""`/`0` stamp is + // authoritative — the user is just observing the server-resolved + // layout. The gate must not fire here. + resetAllConfigRevs(); + + const preCommit: Record = { + _sender: '', + _rev: 0, + layers: [], + }; + expect(isStaleViewData(preCommit, 'compositor-1')).toBe(false); + }); + + it('view-data without `_rev` (unrelated emitters) is not gated', () => { + // `isStaleViewData` only gates emitters that participate in the + // rev contract — anything missing `_rev` falls through unchanged. + resetAllConfigRevs(); + bumpConfigRev('compositor-1'); + + const noRev: Record = { layers: [] }; + expect(isStaleViewData(noRev, 'compositor-1')).toBe(false); + + resetAllConfigRevs(); + }); + it('materializes server-only layers with default config', () => { const prev = [makeLayer('in_0', 0, 1280)]; const serverLayers: ResolvedLayer[] = [ diff --git a/ui/src/hooks/compositorServerSync.ts b/ui/src/hooks/compositorServerSync.ts index 0e14ee7d6..7d8ec2d27 100644 --- a/ui/src/hooks/compositorServerSync.ts +++ b/ui/src/hooks/compositorServerSync.ts @@ -159,12 +159,17 @@ export function mergeTextMeasurements( // ── Server layout helpers ──────────────────────────────────────────────────── -/** Check whether incoming view data is a stale echo of our own config change. - * Returns true when the data should be skipped (rev < local counter). */ -function isStaleViewData(vd: Record, nodeId: string): boolean { +/** True when the view-data tick was rendered from pre-commit config + * (rev older than our latest stamped commit). Empty sender is the + * server's pre-stamp default (see compositor `mod.rs` view-data emit: + * any node that participates in the rev contract emits `_sender: ""`, + * `_rev: 0` until the first stamped UpdateParams lands) and is + * treated like "ours" for gating. */ +export function isStaleViewData(vd: Record, nodeId: string): boolean { const sender = typeof vd._sender === 'string' ? vd._sender : undefined; const rev = typeof vd._rev === 'number' ? vd._rev : undefined; - if (sender && sender === getClientNonce() && rev !== undefined) { + if (rev === undefined) return false; + if (sender === '' || sender === getClientNonce()) { return rev < getLocalConfigRev(nodeId); } return false; diff --git a/ui/src/hooks/useCompositorLayers.monitor-flow.test.ts b/ui/src/hooks/useCompositorLayers.monitor-flow.test.ts index 39beb6163..13fa59cf4 100644 --- a/ui/src/hooks/useCompositorLayers.monitor-flow.test.ts +++ b/ui/src/hooks/useCompositorLayers.monitor-flow.test.ts @@ -784,7 +784,7 @@ describe('Stale view-data gating (causal consistency)', () => { expect(layers1[0].width).toBe(960); }); - it('view data without _sender/_rev metadata is always applied', () => { + it('view data with empty-default stamp is applied before any local commit', () => { seedStore(); const opts = monitorOptions(); @@ -793,18 +793,54 @@ describe('Stale view-data gating (causal consistency)', () => { { initialProps: opts } ); - // Bump local rev - bumpConfigRev(NODE_ID); - - // Server sends view data without any causal metadata - act(() => pushServerViewData(makeServerLayout())); + // localRev is 0 — the user hasn't committed anything yet. The + // server stamps view-data with the empty default (`_sender: ""`, + // `_rev: 0`); pre-commit it's the authoritative source of geometry + // for auto-PiP layouts and must be accepted. + const preCommitLayout = { + ...makeServerLayout(), + _sender: '', + _rev: 0, + }; + act(() => pushServerViewData(preCommitLayout)); - // Should be applied — no metadata means no gating const layers1 = getLayersFromStore(result.current.store); expect(layers1[0].x).toBe(160); expect(layers1[0].width).toBe(960); }); + it('view data with empty-default stamp is gated AFTER a local commit (first-drag race)', () => { + seedStore(); + + const opts = monitorOptions(); + const { result } = renderHook( + (props: UseCompositorLayersOptions) => useCompositorLayers(props), + { initialProps: opts } + ); + + // Simulate the user's first drag: bump local rev to 1. The server + // briefly continues rendering with pre-commit config and stamps + // view-data with the empty default (`_sender: ""`, `_rev: 0`). + // Without the gate this would snap the just-dragged layer back to + // its aspect-fitted server position. + bumpConfigRev(NODE_ID); // localRev = 1 + + // The store currently holds the full-canvas fallback (x=0, w=1280) + // from `seedStore`. A pre-commit-stamped view-data tick arrives. + const preCommitLayout = { + ...makeServerLayout(), + _sender: '', + _rev: 0, + }; + act(() => pushServerViewData(preCommitLayout)); + + // Geometry must NOT have been overwritten — the rev is below our + // local rev, so it's a stale pre-commit echo. + const layers1 = getLayersFromStore(result.current.store); + expect(layers1[0].x).toBe(0); + expect(layers1[0].width).toBe(1280); + }); + it('activeInteractionRef suppresses view data during interactions', () => { seedStore(); diff --git a/ui/src/hooks/useCompositorLayers.test.ts b/ui/src/hooks/useCompositorLayers.test.ts new file mode 100644 index 000000000..ff4eea3a3 --- /dev/null +++ b/ui/src/hooks/useCompositorLayers.test.ts @@ -0,0 +1,117 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * Unit tests for `promoteEditedServerOnly` — the per-layer scoping + * that decides which `serverOnly` auto-stubs flip into "explicit + * config" mode when the user mutates layers. + * + * The contract being protected: + * 1. A layer the user actually edits gets promoted (serverOnly cleared). + * 2. Other auto-stubs in the same commit stay serverOnly so the + * server keeps aspect-fitting sources the user never touched. + * 3. Already-explicit layers (no serverOnly flag) pass through + * unchanged regardless of identity churn. + */ + +import { describe, it, expect } from 'vitest'; + +import type { LayerState } from './compositorLayerParsers'; +import { promoteEditedServerOnly } from './useCompositorLayers'; + +function makeLayer(id: string, x: number, opts: Partial = {}): LayerState { + return { + id, + x, + y: 0, + width: 320, + height: 240, + opacity: 1.0, + zIndex: 0, + rotationDegrees: 0, + mirrorHorizontal: false, + mirrorVertical: false, + visible: true, + cropX: 0.5, + cropY: 0.5, + cropZoom: 1.0, + cropShape: 'rect', + aspectFit: true, + ...opts, + }; +} + +describe('promoteEditedServerOnly', () => { + it('promotes only the layer whose identity changed', () => { + const a = makeLayer('in_0', 0, { serverOnly: true }); + const b = makeLayer('in_1', 320, { serverOnly: true }); + const c = makeLayer('in_2', 640, { serverOnly: true }); + const current = [a, b, c]; + + // Caller mutates only `b` via map identity preservation. + const next = current.map((l) => (l.id === 'in_1' ? { ...l, x: 400 } : l)); + + const result = promoteEditedServerOnly(current, next); + + // Edited layer: serverOnly cleared, geometry applied. + expect(result[1].id).toBe('in_1'); + expect(result[1].x).toBe(400); + expect(result[1].serverOnly).toBeUndefined(); + + // Untouched layers retain serverOnly so the server keeps + // resolving their geometry. + expect(result[0].serverOnly).toBe(true); + expect(result[2].serverOnly).toBe(true); + + // Untouched layer references are preserved. + expect(result[0]).toBe(a); + expect(result[2]).toBe(c); + }); + + it('passes through already-explicit layers untouched', () => { + const a = makeLayer('in_0', 0); // no serverOnly + const b = makeLayer('in_1', 320, { serverOnly: true }); + const current = [a, b]; + + const next = current.map((l) => (l.id === 'in_0' ? { ...l, x: 100 } : l)); + const result = promoteEditedServerOnly(current, next); + + // Layer that was never serverOnly stays as-is, with edits applied. + expect(result[0].x).toBe(100); + expect(result[0].serverOnly).toBeUndefined(); + // Server-stub passes through untouched (its identity didn't change). + expect(result[1]).toBe(b); + expect(result[1].serverOnly).toBe(true); + }); + + it('promotes a freshly-added serverOnly layer (identity not in current)', () => { + const a = makeLayer('in_0', 0); + const current = [a]; + + const fresh = makeLayer('in_1', 320, { serverOnly: true }); + const next = [a, fresh]; + + const result = promoteEditedServerOnly(current, next); + + // The fresh entry is treated as "user-introduced": serverOnly cleared. + // (In practice this path isn't hit because materialization writes + // directly via setLayersInStore, but the rule is consistent: any + // serverOnly entry whose identity isn't in `current` is promoted.) + expect(result[1].serverOnly).toBeUndefined(); + }); + + it('returns the same number of entries as next', () => { + const a = makeLayer('in_0', 0, { serverOnly: true }); + const b = makeLayer('in_1', 320, { serverOnly: true }); + const current = [a, b]; + // Caller removes one and edits the other. + const next = [{ ...a, x: 50 }]; + + const result = promoteEditedServerOnly(current, next); + + expect(result).toHaveLength(1); + expect(result[0].x).toBe(50); + expect(result[0].serverOnly).toBeUndefined(); + }); +}); diff --git a/ui/src/hooks/useCompositorLayers.ts b/ui/src/hooks/useCompositorLayers.ts index 1728b2cbd..c12130bc6 100644 --- a/ui/src/hooks/useCompositorLayers.ts +++ b/ui/src/hooks/useCompositorLayers.ts @@ -143,6 +143,25 @@ export interface UseCompositorLayersResult { keyboardDeps: CompositorKeyboardDeps; } +/** + * Clear `serverOnly` on layers the caller actually mutated (object + * identity differs from current). Untouched stubs keep the flag so + * the server keeps aspect-fitting sources the user never edited. + */ +export const promoteEditedServerOnly = ( + current: LayerState[], + next: LayerState[] +): LayerState[] => { + const currentById = new Map(current.map((l) => [l.id, l])); + return next.map((l) => { + if (!l.serverOnly) return l; + if (currentById.get(l.id) === l) return l; + const cleared = { ...l }; + delete cleared.serverOnly; + return cleared; + }); +}; + export const useCompositorLayers = ( options: UseCompositorLayersOptions ): UseCompositorLayersResult => { @@ -180,7 +199,7 @@ export const useCompositorLayers = ( (action: React.SetStateAction) => { const current = getLayersFromStore(store); const next = typeof action === 'function' ? action(current) : action; - setLayersInStore(store, next); + setLayersInStore(store, promoteEditedServerOnly(current, next)); }, [store] ); diff --git a/ui/src/nodes/AudioGainNode.tsx b/ui/src/nodes/AudioGainNode.tsx index fdc4cd849..a90000033 100644 --- a/ui/src/nodes/AudioGainNode.tsx +++ b/ui/src/nodes/AudioGainNode.tsx @@ -83,6 +83,7 @@ interface AudioGainNodeData { stats?: NodeStats; onParamChange?: (nodeId: string, paramName: string, value: unknown) => void; sessionId?: string; + draft?: { missingRequired: string[]; isCreating: boolean; onPromote: () => void }; } interface AudioGainNodeProps { @@ -157,6 +158,7 @@ const AudioGainNode: React.FC = React.memo(function AudioGai outputs={data.outputs} state={data.state} sessionId={data.sessionId} + draft={data.draft} > diff --git a/ui/src/nodes/CompositorNode.tsx b/ui/src/nodes/CompositorNode.tsx index 1cb301e62..70a41dcfe 100644 --- a/ui/src/nodes/CompositorNode.tsx +++ b/ui/src/nodes/CompositorNode.tsx @@ -48,6 +48,7 @@ interface CompositorNodeData { onParamChange?: (nodeId: string, paramName: string, value: unknown) => void; onConfigChange?: (nodeId: string, config: Record) => void; sessionId?: string; + draft?: { missingRequired: string[]; isCreating: boolean; onPromote: () => void }; } interface CompositorNodeProps { @@ -358,6 +359,7 @@ const CompositorNode: React.FC = React.memo(function Compos nodeDefinition={data.nodeDefinition} state={data.state} sessionId={data.sessionId} + draft={data.draft} > void; sessionId?: string; + /** Set when the node has been dropped on the canvas but not yet + * committed via `addnode`. See `MonitorView`/`pipelineGraph.buildNodeObject`. */ + draft?: { missingRequired: string[]; isCreating: boolean; onPromote: () => void }; } interface ConfigurableNodeProps { @@ -287,7 +290,14 @@ const ConfigurableNode: React.FC = React.memo(function Co const toggleConfigs = useMemo(() => extractToggleConfigs(schema), [schema]); const textConfigs = useMemo(() => extractTextConfigs(schema), [schema]); const controlCount = toggleConfigs.length + sliderConfigs.length + textConfigs.length; - const hasControls = controlCount > 0; + // Drafts hide the canvas-side tune controls entirely. Those controls + // dispatch `tunenode` directly via `useTuneNode` (see SchemaControls) + // and so cannot route through the draft path — for a draft node the + // engine has no entry yet and would warn "Could not tune non-existent + // node". Drafts are configured exclusively from the right-pane + // Inspector, whose `onParamChange` is wired to draft-aware routing. + const isDraft = !!data.draft; + const hasControls = controlCount > 0 && !isDraft; // Detect bidirectional nodes using the bidirectional property from node definition const isBidirectional = data.definition?.bidirectional ?? false; @@ -318,6 +328,7 @@ const ConfigurableNode: React.FC = React.memo(function Co state={data.state} sessionId={data.sessionId} isBidirectional={isBidirectional} + draft={data.draft} > {hasControls && ( <> diff --git a/ui/src/panes/InspectorPane.tsx b/ui/src/panes/InspectorPane.tsx index 0bef54113..5445a7020 100644 --- a/ui/src/panes/InspectorPane.tsx +++ b/ui/src/panes/InspectorPane.tsx @@ -39,6 +39,26 @@ const PaneTitle = styled.h3` font-size: 14px; font-weight: 600; color: var(--sk-text); + display: flex; + align-items: center; + gap: 6px; +`; + +const DraftPill = styled.span` + font-size: 10px; + font-weight: 700; + text-transform: uppercase; + letter-spacing: 0.5px; + padding: 1px 6px; + border-radius: 999px; + background: var(--sk-warning, var(--sk-text-muted)); + color: var(--sk-panel-bg); +`; + +const DraftHint = styled.p` + margin: 4px 0 0 0; + font-size: 12px; + color: var(--sk-warning, var(--sk-text-muted)); `; const PaneSubtitle = styled.p` @@ -149,7 +169,15 @@ const ColorDot = styled.span<{ color: string }>` `; interface InspectorPaneProps { - node: Node<{ label: string; kind: string; params: Record; sessionId?: string }>; + node: Node<{ + label: string; + kind: string; + params: Record; + sessionId?: string; + /** Inspector reads only `missingRequired` for the hint text; + * promotion happens via the canvas-side button (see NodeFrame). */ + draft?: { missingRequired: string[]; isCreating: boolean; onPromote: () => void }; + }>; nodeDefinition: NodeDefinition; onParamChange: (nodeId: string, paramName: string, value: unknown) => void; onLabelChange: (nodeId: string, newLabel: string) => void; @@ -332,8 +360,25 @@ const InspectorPane: React.FC = ({ schema.default ?? ''; const inputId = `param-${node.id}-${key}`; - // In monitor view, disable non-tunable params (they can't be changed at runtime) - const isDisabled = readOnly || (isMonitorView && !schema.tunable); + // In monitor view, non-tunable params are normally disabled (they + // can't be changed at runtime). Drafts are the exception: the + // node does not exist in the engine yet, so the user must be able + // to fill required-but-not-tunable fields (e.g. slint's + // `slint_file`) to promote the draft. All fields stay editable + // until the draft is committed; tunable gating resumes once the + // engine echoes back a real node. + // + // While the draft is *in flight* (Add to pipeline clicked, waiting + // for `nodeadded` or `Failed`), disable everything: any keystroke + // would be racing the engine's response and either overwritten on + // success (the addnode payload is already on the wire) or wasted + // on failure (the entry is being torn down). Disabling matches + // the spinner + disabled button on the canvas banner — the user + // sees a coherent "wait" state and the UI doesn't have to + // reconcile post-promote keystrokes. + const isDraft = !!node.data.draft; + const isInflight = node.data.draft?.isCreating ?? false; + const isDisabled = readOnly || isInflight || (isMonitorView && !isDraft && !schema.tunable); switch (schema.type) { case 'string': @@ -398,11 +443,19 @@ const InspectorPane: React.FC = ({ return ( - Inspector + + Inspector + {node.data.draft && Draft} + {node.data.label} {nodeDefinition.description && ( {nodeDefinition.description} )} + {node.data.draft && node.data.draft.missingRequired.length > 0 && ( + + Fill {node.data.draft.missingRequired.join(', ')} to add this node to the pipeline. + + )} diff --git a/ui/src/services/websocket.ts b/ui/src/services/websocket.ts index fd2c81a02..6fee5e2ed 100644 --- a/ui/src/services/websocket.ts +++ b/ui/src/services/websocket.ts @@ -4,7 +4,7 @@ import { v4 as uuidv4 } from 'uuid'; -import { getLocalConfigRev } from '@/hooks/useConfigRev'; +import { getLocalConfigRev, resetAllConfigRevs } from '@/hooks/useConfigRev'; import { batchWriteNodeStates, batchWriteNodeStats, @@ -95,9 +95,11 @@ export class WebSocketService { this.ws.onopen = () => { logger.info('Connected (onopen fired)'); this.reconnectAttempts = 0; - // Reset sender nonce on each new connection so stale echoes from - // previous sessions are never mistaken for the current session's. + // Server resets its config_rev on restart, so our per-node + // counters must reset alongside the nonce — otherwise a stale + // localRev would gate fresh server view-data as a self-echo. this.clientNonce = uuidv4(); + resetAllConfigRevs(); this.notifyConnectionStatus(true); this.flushMessageQueue(); this.resubscribeToSessions(); diff --git a/ui/src/utils/draftNodes.test.ts b/ui/src/utils/draftNodes.test.ts new file mode 100644 index 000000000..c6f11af67 --- /dev/null +++ b/ui/src/utils/draftNodes.test.ts @@ -0,0 +1,152 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +import { describe, expect, it } from 'vitest'; + +import type { NodeDefinition } from '@/types/types'; + +import { computeMissingRequired, defaultParamsForKind, mergeDraftParam } from './draftNodes'; + +const def = (kind: string, schema: Record): NodeDefinition => + ({ + kind, + description: '', + inputs: [], + outputs: [], + param_schema: schema, + }) as unknown as NodeDefinition; + +describe('computeMissingRequired', () => { + const defs: NodeDefinition[] = [ + def('plugin::native::servo', { + properties: { + url: { type: 'string', description: 'Stream URL' }, + width: { type: 'integer', default: 1280 }, + height: { type: 'integer', default: 720 }, + }, + required: ['url'], + }), + def('audio::gain', { + properties: { gain: { type: 'number', default: 1.0 } }, + }), + def('plugin::native::piper', { + properties: { model_dir: { type: 'string' }, voice: { type: 'string' } }, + required: ['model_dir'], + }), + ]; + + it('returns the unset required keys', () => { + expect(computeMissingRequired('plugin::native::servo', {}, defs)).toEqual(['url']); + }); + + it('treats undefined, null, and whitespace strings as missing', () => { + expect(computeMissingRequired('plugin::native::servo', { url: undefined }, defs)).toEqual([ + 'url', + ]); + expect(computeMissingRequired('plugin::native::servo', { url: null }, defs)).toEqual(['url']); + expect(computeMissingRequired('plugin::native::servo', { url: ' ' }, defs)).toEqual(['url']); + }); + + it('returns an empty list once all required keys have a value', () => { + expect( + computeMissingRequired( + 'plugin::native::servo', + { url: 'https://example.com/stream.m3u8' }, + defs + ) + ).toEqual([]); + }); + + it('returns an empty list when there are no required keys', () => { + expect(computeMissingRequired('audio::gain', {}, defs)).toEqual([]); + }); + + it('returns an empty list for unknown kinds', () => { + expect(computeMissingRequired('does::not::exist', {}, defs)).toEqual([]); + }); + + it('treats numbers (including 0) and booleans (including false) as set', () => { + const numberDef: NodeDefinition[] = [ + def('test::node', { + properties: { count: { type: 'integer' }, enabled: { type: 'boolean' } }, + required: ['count', 'enabled'], + }), + ]; + expect(computeMissingRequired('test::node', { count: 0, enabled: false }, numberDef)).toEqual( + [] + ); + }); +}); + +describe('defaultParamsForKind', () => { + const defs: NodeDefinition[] = [ + def('plugin::native::servo', { + properties: { + url: { type: 'string' }, + width: { type: 'integer', default: 1280 }, + height: { type: 'integer', default: 720 }, + timeout: { type: 'integer' }, + }, + required: ['url'], + }), + ]; + + it('only fills properties with explicit defaults', () => { + expect(defaultParamsForKind('plugin::native::servo', defs)).toEqual({ + width: 1280, + height: 720, + }); + }); + + it('returns an empty object for kinds with no schema', () => { + expect(defaultParamsForKind('unknown::kind', defs)).toEqual({}); + }); + + it('round-trips with computeMissingRequired (defaults satisfy non-required keys only)', () => { + const params = defaultParamsForKind('plugin::native::servo', defs); + expect(computeMissingRequired('plugin::native::servo', params, defs)).toEqual(['url']); + }); +}); + +describe('mergeDraftParam', () => { + it('replaces a flat top-level key', () => { + expect(mergeDraftParam({ width: 1280, height: 720 }, 'url', 'https://x')).toEqual({ + width: 1280, + height: 720, + url: 'https://x', + }); + }); + + it('overwrites an existing flat key with the new value (regression: stale-value bug)', () => { + // Simulates the second keystroke into the same field. The + // returned object must reflect the new value so the inspector + // (driven by nodeParamsAtom mirroring this object) does not + // freeze on the previous character. + const after1 = mergeDraftParam({}, 'url', 'h'); + const after2 = mergeDraftParam(after1, 'url', 'ht'); + expect(after2['url']).toBe('ht'); + expect(after1).not.toBe(after2); // new identity each call + }); + + it('writes a dotted path as a nested object instead of a flat key', () => { + // Regression for finding #2: previously the draft branch stored + // dot-paths verbatim ({ "properties.show": ... }) which would have + // been sent to the engine as-is. + const out = mergeDraftParam({}, 'properties.show', true); + expect(out).toEqual({ properties: { show: true } }); + expect(out['properties.show']).toBeUndefined(); + }); + + it('deep-merges sibling nested keys instead of clobbering them', () => { + const start = { properties: { show: true, color: 'red' } }; + const out = mergeDraftParam(start, 'properties.color', 'blue'); + expect(out).toEqual({ properties: { show: true, color: 'blue' } }); + }); + + it('preserves unrelated top-level keys when editing a nested path', () => { + const start = { width: 1280 }; + const out = mergeDraftParam(start, 'properties.show', true); + expect(out).toEqual({ width: 1280, properties: { show: true } }); + }); +}); diff --git a/ui/src/utils/draftNodes.ts b/ui/src/utils/draftNodes.ts new file mode 100644 index 000000000..6a106be80 --- /dev/null +++ b/ui/src/utils/draftNodes.ts @@ -0,0 +1,85 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * Helpers for "draft" nodes in the Monitor view. + * + * A draft is a node the user has dragged onto the canvas but cannot yet + * be sent to the engine because one or more required schema params (in + * `param_schema.required`) have no value (no default + nothing entered + * by the user yet). Drafts live entirely in the UI; they are promoted + * to a real `addnode` WebSocket call once all required params are + * filled. See `MonitorView.tsx` for the full lifecycle. + */ + +import type { NodeDefinition } from '@/types/types'; +import { buildParamUpdate, deepMerge } from '@/utils/controlProps'; + +/** A param value is considered "missing" if it is undefined, null, or + * an empty / whitespace-only string. Numbers and booleans always count + * as set (including 0 / false), matching how plugin validators read + * required params. */ +const isMissingValue = (v: unknown): boolean => { + if (v === undefined || v === null) return true; + if (typeof v === 'string' && v.trim() === '') return true; + return false; +}; + +/** Return the subset of `param_schema.required` keys whose value in + * `params` is missing. Returns `[]` for kinds with no required params, + * no schema, or no node definition. */ +export const computeMissingRequired = ( + kind: string, + params: Record, + nodeDefinitions: NodeDefinition[] +): string[] => { + const def = nodeDefinitions.find((d) => d.kind === kind); + const schema = def?.param_schema as Record | undefined; + const required = schema?.['required']; + if (!Array.isArray(required)) return []; + return required.filter((k): k is string => typeof k === 'string' && isMissingValue(params[k])); +}; + +/** Build the default param object the UI sends on drop: every property + * that has an explicit `default` in the schema is filled with that + * default; required-but-defaultless properties are left absent so the + * caller can detect them via `computeMissingRequired`. */ +export const defaultParamsForKind = ( + kind: string, + nodeDefinitions: NodeDefinition[] +): Record => { + const def = nodeDefinitions.find((d) => d.kind === kind); + const params: Record = {}; + const schema = def?.param_schema as Record | undefined; + const props = schema?.['properties'] as Record> | undefined; + if (!props) return params; + for (const [key, propSchema] of Object.entries(props)) { + if (propSchema && typeof propSchema === 'object' && 'default' in propSchema) { + const defVal = propSchema['default']; + if (defVal !== undefined) { + params[key] = defVal; + } + } + } + return params; +}; + +/** Apply a single inspector edit to a draft's `params`. + * + * Flat keys (no dot) replace the corresponding top-level entry. Dotted + * paths (e.g. `"properties.show"`) are converted into a nested partial + * via `buildParamUpdate` and deep-merged into the existing params so + * sibling fields are preserved — this matches the live-node code path + * in `controlProps.dispatchParamUpdate` and ensures drafts produce the + * same shape on promotion that a normal `tunenode` would have. */ +export const mergeDraftParam = ( + params: Record, + key: string, + value: unknown +): Record => { + if (key.includes('.')) { + return deepMerge(params, buildParamUpdate(key, value)); + } + return { ...params, [key]: value }; +}; diff --git a/ui/src/utils/pipelineGraph.ts b/ui/src/utils/pipelineGraph.ts index c1b556901..6cf3fa209 100644 --- a/ui/src/utils/pipelineGraph.ts +++ b/ui/src/utils/pipelineGraph.ts @@ -175,6 +175,22 @@ export interface BuildNodeParams { stableOnParamChange: (nodeId: string, paramName: string, value: unknown) => void; stableOnConfigChange?: (nodeId: string, config: Record) => void; selectedSessionId: string | null; + /** When set, the React Flow node is rendered as an unsubmitted draft + * (dashed border + "needs " banner + "Add to pipeline" + * button). Used by Monitor view for nodes the user has dropped + * but not yet committed via the explicit promotion button. + * + * - `missingRequired`: param keys still empty. When non-empty, the + * promote button is disabled and the banner shows the list. + * - `isCreating`: true once the user has clicked Add and we are + * waiting for the engine's `nodeadded`/`Failed` reply. Banner + * shows a spinner; button is disabled. + * - `onPromote`: invoked when the user clicks the promote button. */ + draft?: { + missingRequired: string[]; + isCreating: boolean; + onPromote: () => void; + }; } /** Determine the ReactFlow node type from the pipeline node kind */ @@ -206,6 +222,10 @@ export const buildNodeObject = (params: BuildNodeParams): RFNode => { // Full-config change callback for compositor nodes onConfigChange: params.stableOnConfigChange, sessionId: params.selectedSessionId || undefined, + // When the node is an unsubmitted draft, downstream renderers + // (NodeFrame) render the dashed-border draft banner with the + // promote button. Live nodes leave this undefined. + draft: params.draft, }, }; }; diff --git a/ui/src/views/MonitorView.tsx b/ui/src/views/MonitorView.tsx index ce59967a4..5ec0122da 100644 --- a/ui/src/views/MonitorView.tsx +++ b/ui/src/views/MonitorView.tsx @@ -60,19 +60,29 @@ import { nodeStateAtom, nodeParamsAtom, nodeKey, + clearNodeParams, + writeNodeParam, + writeNodeParams, } from '@/stores/sessionAtoms'; import { useSessionStore } from '@/stores/sessionStore'; import type { NodeDefinition, Connection, + JsonValue, + NodeState, Pipeline, MessageType, InputPin, OutputPin, } from '@/types/types'; -import { dispatchParamUpdate } from '@/utils/controlProps'; +import { buildParamUpdate, dispatchParamUpdate } from '@/utils/controlProps'; import { topoLevelsFromPipeline, orderedNamesFromLevels } from '@/utils/dag'; import { deepEqual } from '@/utils/deepEqual'; +import { + computeMissingRequired, + defaultParamsForKind as draftDefaultParamsForKind, + mergeDraftParam, +} from '@/utils/draftNodes'; import { deepMergeSchemas, validateValue } from '@/utils/jsonSchema'; import type { JsonSchema, JsonSchemaProperty } from '@/utils/jsonSchema'; import { viewsLogger } from '@/utils/logger'; @@ -86,6 +96,28 @@ import { nodeTypes, defaultEdgeOptions } from '@/utils/reactFlowDefaults'; // Memoized view title to prevent re-renders during drag const MonitorViewTitle = React.memo(() => Monitor); +// UI-only node dropped on the canvas, promoted to a real node on +// explicit "Add to pipeline" click — never auto-promoted on edit. +export type DraftNode = { + kind: string; + params: Record; + position: { x: number; y: number }; + missingRequired: string[]; + /** True between "Add to pipeline" click and the engine's + * NodeAdded/Failed reply. Cleared on Failed so the user can fix + * the input and click again. */ + inFlight?: boolean; +}; + +// Returns the failure reason for `NodeState::Failed`, else undefined. +const nodeStateFailedReason = (s: NodeState | null | undefined): string | undefined => { + if (s && typeof s === 'object' && 'Failed' in s) { + const f = (s as { Failed: { reason?: string } }).Failed; + return f?.reason; + } + return undefined; +}; + /** * Main content component for the Monitor view. * This component has 114 statements which exceeds the max-statements limit. @@ -102,13 +134,8 @@ const MonitorViewContent: React.FC = () => { const [nodes, setNodes, onNodesChangeInternal] = useNodesState([]); const [edges, setEdges, onEdgesChange] = useEdgesState([]); - // ── Low-priority dimension changes ──────────────────────────────────── - // ReactFlow fires onNodesChange with 'dimensions' type for each node - // after mount measurement. These are internal bookkeeping (the nodes - // are already visible) so we wrap them in startTransition to let React - // schedule them at lower priority rather than blocking the main thread. - // Interactive changes (select, drag, remove) bypass this and apply - // immediately. + // Defer 'dimensions' changes (post-mount measurement) via startTransition + // so they don't block interactive changes. const onNodesChangeBatched = useCallback( (changes: NodeChange[]) => { const immediate: NodeChange[] = []; @@ -158,6 +185,9 @@ const MonitorViewContent: React.FC = () => { [selectedSessionId, updateNodePosition] ); const [selectedNodes, setSelectedNodes] = useState([]); + // Drafts to mark selected on their first topology rebuild; cleared + // once applied so React Flow owns selection thereafter. + const newDraftSelectionRef = useRef>(new Set()); const [rightPaneView, setRightPaneView] = useState<'yaml' | 'inspector' | 'telemetry'>('yaml'); const [showDeleteModal, setShowDeleteModal] = useState(false); const [sessionToDelete, setSessionToDelete] = useState(null); @@ -171,9 +201,16 @@ const MonitorViewContent: React.FC = () => { ); const [type, setType] = useDnD(); const toast = useToast(); - // Cache for positions of nodes that are being added (to preserve drop location) - const pendingNodePositions = React.useRef>(new Map()); + // ── Draft nodes ─────────────────────────────────────────────────────── + // See module-level DraftNode type for the full lifecycle. + const [draftNodes, setDraftNodes] = useState>(new Map()); + // Read-only snapshot for callbacks that mustn't depend on `draftNodes`. + // Modifying paths use functional `setDraftNodes((prev) => ...)`. + const draftNodesRef = useRef(draftNodes); + useEffect(() => { + draftNodesRef.current = draftNodes; + }, [draftNodes]); // Auto-select session from navigation state (e.g., from Stream view) useEffect(() => { const state = location.state as { sessionId?: string } | null; @@ -232,19 +269,14 @@ const MonitorViewContent: React.FC = () => { }, }); - // Keep YAML view as default when nodes are selected - // Inspector only opens on double-click + // YAML by default; inspector opens on double-click — except drafts, + // which need the inspector to fill required fields. useEffect(() => { - if (selectedNodes.length === 0) { - // No selection - keep YAML view - setRightPaneView('yaml'); - } else if (selectedNodes.length > 1) { - // Multiple selection - show YAML view - setRightPaneView('yaml'); - } else if (selectedNodes.length === 1) { - // Single selection - switch to YAML view (with highlighting) - setRightPaneView('yaml'); + if (selectedNodes.length === 1 && draftNodesRef.current.has(selectedNodes[0])) { + setRightPaneView('inspector'); + return; } + setRightPaneView('yaml'); }, [selectedNodes]); // Double-click handler to open inspector @@ -275,7 +307,7 @@ const MonitorViewContent: React.FC = () => { const prev = selectedNodeRef.current; const prevData = (prev?.data as Record | undefined) ?? undefined; const nextData = selectedNode.data as Record; - // Check if meaningful properties have changed (not just position) + // Recompute only when meaningful data changes — not on every position update. if ( !prev || prev.id !== selectedNode.id || @@ -284,7 +316,8 @@ const MonitorViewContent: React.FC = () => { prevData?.['label'] !== nextData['label'] || prevData?.['sessionId'] !== nextData['sessionId'] || !deepEqual(prevData?.['state'], nextData['state']) || - !deepEqual(prevData?.['params'], nextData['params']) + !deepEqual(prevData?.['params'], nextData['params']) || + !deepEqual(prevData?.['draft'], nextData['draft']) ) { selectedNodeRef.current = selectedNode; } @@ -364,18 +397,11 @@ const MonitorViewContent: React.FC = () => { // eslint-disable-next-line react-hooks/exhaustive-deps -- setNeedsAutoLayout/setNeedsFit are stable useState setters declared later }, [selectedSessionId, isLoadingSessions, sessions, getNodePositions]); - // Pipeline data for the selected session is fetched once by useSession - // (staleTime: Infinity) and kept current by live WebSocket events - // (nodeadded, noderemoved, connectionadded, connectionremoved). - // No periodic polling is needed — it would only introduce stale REST - // data that overwrites live state and reverts local edits. - - // Subscribe to selected session. - // nodeStates is intentionally NOT consumed from this hook — see the - // useNodeStatesSubscription block below for the reasoning. + // Subscribe to selected session. Pipeline is fetched once and kept + // current by live WS events — no polling. nodeStates/nodeStats are + // consumed from session store directly via NodeStateIndicator. const { pipeline, - // nodeStats not used here - NodeStateIndicator fetches directly from session store isConnected: sessionIsConnected, tuneNode, tuneNodeConfig, @@ -385,9 +411,7 @@ const MonitorViewContent: React.FC = () => { disconnectPins, } = useSession(selectedSessionId); - // Lightweight hook for dot-notation path updates: deep-merges locally - // into the atom and sends only the partial to the server (unlike - // useSession.tuneNodeConfig which shallow-merges and sends as-is). + // Dot-path-aware deep merge for nested params (vs useSession's shallow merge). const { tuneNodeConfig: tuneNodeConfigDeep } = useTuneNode(selectedSessionId); // Use session-specific connection status if a session is selected, otherwise use global @@ -406,25 +430,96 @@ const MonitorViewContent: React.FC = () => { const pipelineRef = useRef(pipeline); pipelineRef.current = pipeline; + // Drop drafts whose ids now appear in pipeline.nodes — the engine's + // node-added forwarder only inserts after successful creation. + // Depend on pipeline.nodes only and read the current draft snapshot + // via the ref, so this doesn't re-run on every keystroke into a draft. + useEffect(() => { + const drafts = draftNodesRef.current; + if (drafts.size === 0) return; + let changed = false; + const next = new Map(drafts); + for (const id of drafts.keys()) { + if (!pipeline?.nodes[id]) continue; + next.delete(id); + changed = true; + } + if (changed) setDraftNodes(next); + }, [pipeline?.nodes]); + + // Drop one or more drafts: clear their per-node atom (so a re-dropped + // draft with the same generated id doesn't inherit stale typed values) + // and remove them from the drafts map in a single setDraftNodes commit. + const deleteDrafts = useCallback( + (ids: string[]) => { + if (ids.length === 0) return; + for (const id of ids) clearNodeParams(id, selectedSessionId ?? undefined); + setDraftNodes((prev) => { + const next = new Map(prev); + for (const id of ids) next.delete(id); + return next; + }); + }, + [selectedSessionId] + ); + + // Per-promotion failure subscriptions, set up by promoteDraft and + // reaped here when the draft is dropped or its in-flight flag clears. + const failureUnsubsRef = useRef void>>(new Map()); + useEffect(() => { + for (const [id, unsub] of failureUnsubsRef.current) { + const d = draftNodes.get(id); + if (!d || !d.inFlight) { + unsub(); + failureUnsubsRef.current.delete(id); + } + } + }, [draftNodes]); + useEffect( + () => () => { + for (const unsub of failureUnsubsRef.current.values()) unsub(); + failureUnsubsRef.current.clear(); + }, + [] + ); + + // Discard drafts on session switch — they're tied to the previous + // canvas's coords and namespace. useLayoutEffect to clear before + // paint so the topology effect doesn't briefly render them on the + // new session's canvas. + const prevDraftSessionIdRef = useRef(null); + React.useLayoutEffect(() => { + const prevSession = prevDraftSessionIdRef.current; + prevDraftSessionIdRef.current = selectedSessionId; + if (prevSession === null || prevSession === selectedSessionId) return; + for (const id of draftNodesRef.current.keys()) { + clearNodeParams(id, prevSession); + } + setDraftNodes((prev) => (prev.size === 0 ? prev : new Map())); + }, [selectedSessionId]); + // Topology signature: only changes when nodes/kinds or connections change const topoKey = React.useMemo(() => { - if (!pipeline) return ''; - const names = Object.keys(pipeline.nodes).sort(); - const kinds = names.map((n) => `${n}:${pipeline.nodes[n].kind}`); - const conns = pipeline.connections - .map((c: Connection) => `${c.from_node}:${c.from_pin}>${c.to_node}:${c.to_pin}`) + if (!pipeline && draftNodes.size === 0) return ''; + const names = pipeline ? Object.keys(pipeline.nodes).sort() : []; + const kinds = names.map((n) => `${n}:${pipeline!.nodes[n].kind}`); + const conns = pipeline + ? pipeline.connections + .map((c: Connection) => `${c.from_node}:${c.from_pin}>${c.to_node}:${c.to_pin}`) + .sort() + : []; + // Track only schema KEYS (not content): runtime_param_schema is + // documented as immutable for the node's lifetime. + const runtimeKeys = Object.keys(pipeline?.runtime_schemas ?? {}).sort(); + // Draft fingerprint excludes param values — drafts render from + // their Jotai atom on each keystroke, no rebuild needed. + const draftFingerprint = Array.from(draftNodes.entries()) + .map(([id, d]) => `${id}:${d.kind}:${d.missingRequired.join(',')}:${d.inFlight ? '1' : '0'}`) .sort(); - // Include runtime schema keys so topology rebuilds when schemas arrive - // after the initial build (e.g. Slint property discovery). - // NOTE: Only keys are tracked, not content. If a schema's content changed - // for an existing key (hot-reload), the effect would NOT re-run. This is - // intentional — runtime_param_schema() is documented as immutable for the - // node's lifetime (see crates/core ProcessorNode trait docs). - const runtimeKeys = Object.keys(pipeline.runtime_schemas ?? {}).sort(); - const key = JSON.stringify([kinds, conns, runtimeKeys]); + const key = JSON.stringify([kinds, conns, runtimeKeys, draftFingerprint]); viewsLogger.debug('topoKey recalculated:', key.substring(0, 100)); return key; - }, [pipeline]); + }, [pipeline, draftNodes]); // Auto-layout + fit-view hook const { setNeedsAutoLayout, setNeedsFit, handleAutoLayout } = useAutoLayout({ @@ -445,15 +540,9 @@ const MonitorViewContent: React.FC = () => { topoKey, }); - // When a session is destroyed, the optimistic removal empties the list - // before React processes the batched setSelectedSessionId(null) from - // handleConfirmQuickDelete. Eagerly clear the selection here so the - // badge and "Delete" control disappear immediately. - // - // The ref prevents this from fighting with the nav-state auto-select: - // we only clear selection for sessions that were *previously seen* in - // the list and then vanished (i.e., destroyed), not for a session ID - // that was just set via navigation state and hasn't appeared yet. + // Eagerly clear selection when a previously-seen session vanishes + // (destroyed). The ref guards against clearing a freshly-set + // session id (from nav state) that hasn't appeared in the list yet. const sessionSeenInListRef = useRef(false); if (selectedSession) { sessionSeenInListRef.current = true; @@ -470,36 +559,26 @@ const MonitorViewContent: React.FC = () => { } }, [selectedSessionId, selectedSession, isLoadingSessions]); - // Helper to validate parameter value against schema. - // Uses the runtime-merged schema when available so that dynamically - // discovered parameters are validated correctly. - // - // Runtime-discovered properties (e.g. from Slint) are stored as flat - // keys in the merged schema (e.g. "show") with a `path` field containing - // the dot-notation wire path (e.g. "properties.show"). When paramKey is - // a dot-path, we search for a property whose `path` matches before - // falling back to a flat key lookup. + // Validate against the runtime-merged schema if available. Drafts + // fall back to the static schema since the engine hasn't run them yet. const validateParamValue = useCallback( (nodeId: string, paramKey: string, value: unknown): string | null => { const node = pipeline?.nodes[nodeId]; - if (!node) return null; + const draft = draftNodesRef.current.get(nodeId); + const kind = node?.kind ?? draft?.kind; + if (!kind) return null; - const nodeDef = nodeDefinitions.find((d) => d.kind === node.kind); + const nodeDef = nodeDefinitions.find((d) => d.kind === kind); if (!nodeDef) return null; - // Merge runtime schema (if any) so dynamically discovered properties - // are included in validation. const runtimeSchema = pipeline?.runtime_schemas?.[nodeId] as JsonSchema | undefined; const baseSchema = nodeDef.param_schema as JsonSchema | undefined; const merged = runtimeSchema ? deepMergeSchemas(baseSchema, runtimeSchema) : baseSchema; if (!merged?.properties) return null; - // 1. Direct flat-key lookup (works for simple keys like "gain_db"). + // Direct lookup, then dot-path lookup via each property's `path` + // field (used by runtime-discovered Slint properties). let propSchema = merged.properties[paramKey] as JsonSchemaProperty | undefined; - - // 2. If paramKey is a dot-path (e.g. "properties.show"), search for a - // schema property whose `path` field matches. Runtime-discovered - // properties use this pattern. if (!propSchema && paramKey.includes('.')) { for (const entry of Object.values(merged.properties)) { if (entry && (entry as JsonSchemaProperty).path === paramKey) { @@ -516,17 +595,117 @@ const MonitorViewContent: React.FC = () => { [pipeline, nodeDefinitions] ); - // Ref indirection: keeps stableOnParamChange identity stable across - // pipeline reference changes. validateParamValue changes whenever the - // pipeline object changes (e.g. param echo-back from server), but we - // don't want that to cascade into new node data objects and break - // React.memo on every node component. + // Ref keeps stableOnParamChange identity stable across pipeline + // changes — preserves React.memo on each node component. const validateParamValueRef = useRef(validateParamValue); validateParamValueRef.current = validateParamValue; + // Update local draft state only. Promotion is exclusively via the + // "Add to pipeline" button, never as a side-effect of typing. + const handleDraftParamChange = useCallback( + (nodeId: string, key: string, value: unknown) => { + // Required-but-empty is handled by computeMissingRequired (the + // "needs ..." banner), not surfaced as a validation error here. + const validationError = validateParamValueRef.current(nodeId, key, value); + if (validationError) { + toast.error(`Invalid value for ${key}: ${validationError}`); + return; + } + // Mirror the edit into the per-node atom so InspectorPane sees + // every keystroke without waiting on a render. + if (key.includes('.')) { + writeNodeParams(nodeId, buildParamUpdate(key, value), selectedSessionId ?? undefined); + } else { + writeNodeParam(nodeId, key, value, selectedSessionId ?? undefined); + } + + // Functional updater: rapid edits to different keys each see the + // previous one's commit. + setDraftNodes((prev) => { + const c = prev.get(nodeId); + if (!c || c.inFlight) return prev; + const newParams = mergeDraftParam(c.params, key, value); + const missing = computeMissingRequired(c.kind, newParams, nodeDefinitions); + const next = new Map(prev); + next.set(nodeId, { ...c, params: newParams, missingRequired: missing }); + return next; + }); + }, + [nodeDefinitions, selectedSessionId, toast] + ); + + // The only place `addNode` fires for drafts (wired to the "Add to + // pipeline" button on the draft banner). + const promoteDraft = useCallback( + (nodeId: string) => { + const draft = draftNodesRef.current.get(nodeId); + if (!draft) return; + if (draft.inFlight) return; + const missing = computeMissingRequired(draft.kind, draft.params, nodeDefinitions); + if (missing.length > 0) return; + + // Subscribe before addNode so a synchronously-emitted Failed + // (e.g. duplicate-id rejection) isn't missed. + if (selectedSessionId) { + const stateAtom = nodeStateAtom(nodeKey(selectedSessionId, nodeId)); + const handle = () => { + const reason = nodeStateFailedReason(defaultSessionStore.get(stateAtom)); + if (reason === undefined) return; + const unsubExisting = failureUnsubsRef.current.get(nodeId); + if (unsubExisting) { + unsubExisting(); + failureUnsubsRef.current.delete(nodeId); + } + removeNode(nodeId); + setDraftNodes((prev) => { + const c = prev.get(nodeId); + if (!c || !c.inFlight) return prev; + const next = new Map(prev); + next.set(nodeId, { + ...c, + missingRequired: computeMissingRequired(c.kind, c.params, nodeDefinitions), + inFlight: false, + }); + return next; + }); + toast.error(`${nodeId} failed: ${reason}`); + }; + const unsub = defaultSessionStore.sub(stateAtom, handle); + const prior = failureUnsubsRef.current.get(nodeId); + if (prior) prior(); + failureUnsubsRef.current.set(nodeId, unsub); + // sub() doesn't fire for the current value — check once in + // case the atom is already populated (retry of a failed id). + handle(); + } + + addNode(nodeId, draft.kind, draft.params); + setDraftNodes((prev) => { + const c = prev.get(nodeId); + if (!c) return prev; + const next = new Map(prev); + next.set(nodeId, { + ...c, + missingRequired: [], + inFlight: true, + }); + return next; + }); + }, + [addNode, nodeDefinitions, selectedSessionId, removeNode, toast] + ); + + // Stable ref so onPromote arrows don't churn when promoteDraft re-creates. + const stablePromoteDraftRef = useRef(promoteDraft); + stablePromoteDraftRef.current = promoteDraft; + // Memoized param change handler for right pane const handleRightPaneParamChange = useCallback( (nodeId: string, key: string, value: unknown) => { + if (draftNodesRef.current.has(nodeId)) { + handleDraftParamChange(nodeId, key, value); + return; + } // Validate before sending to server const error = validateParamValueRef.current(nodeId, key, value); if (error) { @@ -538,7 +717,7 @@ const MonitorViewContent: React.FC = () => { // stableOnParamChange — see comment there for details). dispatchParamUpdate(nodeId, key, value, tuneNode, tuneNodeConfigDeep); }, - [toast, tuneNode, tuneNodeConfigDeep] + [toast, tuneNode, tuneNodeConfigDeep, handleDraftParamChange] ); // Memoized label change handler (currently no-op) @@ -551,6 +730,24 @@ const MonitorViewContent: React.FC = () => { const onConnect = React.useCallback( (connection: RFConnection) => { + // Block connections that touch a draft — the node does not exist + // in the engine yet, so a `connect` would fail with "Source/Target + // node not found". Steer the user to the inspector instead. + const drafts = draftNodesRef.current; + const sourceDraft = connection.source ? drafts.get(connection.source) : undefined; + const targetDraft = connection.target ? drafts.get(connection.target) : undefined; + if (sourceDraft || targetDraft) { + const draft = sourceDraft ?? targetDraft!; + const draftId = sourceDraft ? connection.source : connection.target; + // missingRequired empties before nodeadded echoes; surface the + // transitional in-flight state instead of an empty list. + const message = + draft.missingRequired.length > 0 + ? `Configure ${draft.missingRequired.join(', ')} on ${draftId} before connecting` + : `${draftId} is being added to the pipeline — try again in a moment`; + toast.error(message); + return; + } return createOnConnect( nodesRefForCallbacks.current, setEdges, @@ -563,7 +760,7 @@ const MonitorViewContent: React.FC = () => { setNodes )(connection); }, - [createOnConnect, setEdges, connectPins, setNodes] + [createOnConnect, setEdges, connectPins, setNodes, toast] ); const onEdgesDelete = (deleted: Edge[]) => { @@ -575,43 +772,32 @@ const MonitorViewContent: React.FC = () => { }; const onNodesDelete = (deleted: RFNode[]) => { - deleted.forEach((n) => { - removeNode(n.id); - }); + const draftIds: string[] = []; + for (const n of deleted) { + if (draftNodesRef.current.has(n.id)) { + draftIds.push(n.id); + } else { + removeNode(n.id); + } + } + deleteDrafts(draftIds); }; // Deletion is handled by React Flow's built-in delete key via onNodesDelete/onEdgesDelete. - // Helpers to add nodes with sensible defaults + // Considers both live pipeline and in-flight drafts to avoid collisions. const generateName = (kind: string) => { - const existing = pipeline ? Object.keys(pipeline.nodes) : []; + const existing = new Set(pipeline ? Object.keys(pipeline.nodes) : []); + for (const id of draftNodesRef.current.keys()) existing.add(id); let i = 1; let candidate = `${kind}_${i}`; - while (existing.includes(candidate)) { + while (existing.has(candidate)) { i += 1; candidate = `${kind}_${i}`; } return candidate; }; - const defaultParamsForKind = (kind: string): Record => { - const def = nodeDefinitions.find((d) => d.kind === kind); - const params: Record = {}; - const schema = def?.param_schema as Record | undefined; - const props = schema?.properties as Record> | undefined; - if (props) { - Object.entries(props).forEach(([key, propSchema]) => { - if (propSchema && typeof propSchema === 'object' && 'default' in propSchema) { - const defVal = propSchema.default; - if (defVal !== undefined) { - params[key] = defVal; - } - } - }); - } - return params; - }; - const onDragStart = useCallback( (event: React.DragEvent, nodeType: string) => { setType(nodeType); @@ -624,32 +810,16 @@ const MonitorViewContent: React.FC = () => { // Track previous topoKey to avoid unnecessary rebuilds const prevTopoKeyForTopologyRef = useRef(''); - // Helper: Resolve node position from various sources (previous, pending, saved, or default) + // Position lookup: prev render → persistent store → origin. Drop + // coordinates for new nodes are written to the store in `onDrop`. const resolveNodePosition = useCallback( ( nodeName: string, prevPositions: Map, savedPositions: Record - ): { position: { x: number; y: number }; fromPending: boolean } => { - let pos = prevPositions.get(nodeName); - let fromPending = false; - - // Check pending positions from node drops - if (!pos && pendingNodePositions.current.has(nodeName)) { - pos = pendingNodePositions.current.get(nodeName)!; - pendingNodePositions.current.delete(nodeName); - fromPending = true; - } - - // Check saved positions from position store - if (!pos && savedPositions[nodeName]) { - pos = savedPositions[nodeName]; - } - - return { - position: pos ?? { x: 0, y: 0 }, - fromPending, - }; + ): { position: { x: number; y: number } } => { + const pos = prevPositions.get(nodeName) ?? savedPositions[nodeName]; + return { position: pos ?? { x: 0, y: 0 } }; }, [] ); @@ -835,7 +1005,7 @@ const MonitorViewContent: React.FC = () => { } prevTopoKeyForTopologyRef.current = topoKey; - if (!pipeline) { + if (!pipeline && draftNodes.size === 0) { viewsLogger.debug('Topology effect: No pipeline, clearing nodes'); setNodes([]); setEdges([]); @@ -846,30 +1016,27 @@ const MonitorViewContent: React.FC = () => { viewsLogger.debug('Topology effect triggered, topoKey:', topoKey.substring(0, 50) + '...'); // Preserve existing node positions; do not auto-layout during edits. - const { levels, sortedLevels } = topoLevelsFromPipeline(pipeline); - const orderedNames = orderedNamesFromLevels(levels, sortedLevels); + const orderedNames: string[] = pipeline + ? (() => { + const { levels, sortedLevels } = topoLevelsFromPipeline(pipeline); + return orderedNamesFromLevels(levels, sortedLevels); + })() + : []; const prevPositions = new Map(nodes.map((n) => [n.id, n.position])); + // setNodes(newNodes) replaces the array, so `selected` is lost + // unless we re-apply it. Fresh drafts use newDraftSelectionRef. + const prevSelected = new Set(nodes.filter((n) => n.selected).map((n) => n.id)); // Get saved positions from position store const savedPositions = selectedSessionId ? getNodePositions(selectedSessionId) : {}; const newNodes: RFNode[] = []; for (const nodeName of orderedNames) { - const apiNode = pipeline.nodes[nodeName]; + const apiNode = pipeline!.nodes[nodeName]; if (!apiNode) continue; - // Resolve node position from various sources - const { position: pos, fromPending: positionFromPending } = resolveNodePosition( - nodeName, - prevPositions, - savedPositions - ); - - // Save position to position store if it came from pending (newly dropped) - if (positionFromPending && selectedSessionId) { - updateNodePosition(selectedSessionId, nodeName, pos); - } + const { position: pos } = resolveNodePosition(nodeName, prevPositions, savedPositions); // Use real-time state from Jotai atom if available, otherwise use pipeline state. // Read directly from the default store (non-reactive) since the topology effect @@ -887,7 +1054,7 @@ const MonitorViewContent: React.FC = () => { const { finalInputs, finalOutputs } = resolveDynamicPins( nodeDef, nodeName, - pipeline, + pipeline!, baseInputs, baseOutputs ); @@ -895,7 +1062,7 @@ const MonitorViewContent: React.FC = () => { // Merge runtime param schema (if any) with the static per-kind schema. // Runtime schemas are per-instance overrides discovered after node init // (e.g. Slint component properties enumerated from the compiled .slint). - const runtimeSchema = pipeline.runtime_schemas?.[nodeName] as JsonSchema | undefined; + const runtimeSchema = pipeline!.runtime_schemas?.[nodeName] as JsonSchema | undefined; const effectiveNodeDef = runtimeSchema && nodeDef ? { @@ -924,8 +1091,55 @@ const MonitorViewContent: React.FC = () => { newNodes.push(node); } - // Build edges using helper function - const newEdges = buildEdgesFromConnections(pipeline.connections, newNodes); + // Append draft nodes (UI-only, not yet sent to engine). + for (const [draftId, draft] of draftNodes) { + const draftDef = defByKind.get(draft.kind); + const draftBaseInputs = draftDef?.inputs ?? []; + const draftBaseOutputs = draftDef?.outputs ?? []; + // No engine state yet → use template pins, skip dynamic-pin reconstruction. + const draftFinalInputs = draftBaseInputs; + const draftFinalOutputs = draftBaseOutputs; + // Prefer prev/saved position over draft.position so topology + // rebuilds don't snap back to the original drop point after a drag. + const draftPos = prevPositions.get(draftId) ?? savedPositions[draftId] ?? draft.position; + const node = buildNodeObject({ + nodeName: draftId, + apiNode: { + kind: draft.kind, + params: draft.params as JsonValue, + state: null, + }, + position: draftPos, + nodeState: undefined, + finalInputs: draftFinalInputs, + finalOutputs: draftFinalOutputs, + nodeDef: draftDef, + stableOnParamChange, + stableOnConfigChange, + selectedSessionId, + draft: { + missingRequired: draft.missingRequired, + isCreating: !!draft.inFlight, + onPromote: () => stablePromoteDraftRef.current(draftId), + }, + }); + // Just-dropped draft: mark selected so the inspector opens. + if (newDraftSelectionRef.current.has(draftId)) { + node.selected = true; + newDraftSelectionRef.current.delete(draftId); + } + newNodes.push(node); + } + + // Build edges using helper function (only from real pipeline + // connections — drafts cannot be connected). + const newEdges = buildEdgesFromConnections(pipeline?.connections ?? [], newNodes); + + // Re-apply the previous selected flag so React Flow's selection + // state survives the topology rebuild (see prevSelected comment). + for (const n of newNodes) { + if (prevSelected.has(n.id)) n.selected = true; + } viewsLogger.debug('Setting', newNodes.length, 'nodes and', newEdges.length, 'edges'); // Batch node and edge updates to prevent double render @@ -935,17 +1149,21 @@ const MonitorViewContent: React.FC = () => { topoEffectRanRef.current = true; }); - // Generate YAML using helper function - const yamlString = generatePipelineYaml(pipeline, orderedNames); + // Generate YAML using helper function (drafts are excluded from YAML + // until they're committed — a draft has no real existence in the + // engine yet). + const yamlString = pipeline ? generatePipelineYaml(pipeline, orderedNames) : ''; setYamlString(yamlString); // eslint-disable-next-line react-hooks/exhaustive-deps }, [topoKey, defByKind, selectedSessionId, tuneNode]); - // Stable callback for param changes - always sends directly to server - // Uses ref indirection for validateParamValue to keep identity stable - // across pipeline reference changes (see validateParamValueRef above). + // Stable param-change callback: routes to draft state or to server. const stableOnParamChange = useCallback( (nodeId: string, paramName: string, value: unknown) => { + if (draftNodesRef.current.has(nodeId)) { + handleDraftParamChange(nodeId, paramName, value); + return; + } // Validate before sending to server const error = validateParamValueRef.current(nodeId, paramName, value); if (error) { @@ -953,13 +1171,10 @@ const MonitorViewContent: React.FC = () => { return; } - // Dot-notation paths (e.g. "properties.show") need buildParamUpdate to - // produce the correct nested UpdateParams payload. tuneNodeConfigDeep - // deep-merges locally into the atom (preserving sibling nested - // properties) and sends only the partial to the server. + // dispatchParamUpdate handles nested dot-paths via tuneNodeConfigDeep. dispatchParamUpdate(nodeId, paramName, value, tuneNode, tuneNodeConfigDeep); }, - [toast, tuneNode, tuneNodeConfigDeep] + [toast, tuneNode, tuneNodeConfigDeep, handleDraftParamChange] ); // Stable callback for full-config updates (compositor nodes). @@ -970,13 +1185,7 @@ const MonitorViewContent: React.FC = () => { [tuneNodeConfig] ); - // NOTE: fitView is triggered only by: - // 1. Auto-layout effect (when needsAutoLayout is true) - // 2. needsFit effect (when needsFit is true) - // Avoid auto-fitting on every node change to prevent disruption during editing. - - // Keep YAML up to date with live (Zustand) param overrides - // Only runs when params change, not when nodes move + // Keep YAML in sync with live param overrides; runs only on param changes. useEffect(() => { if (!pipeline) { setYamlString(''); @@ -1041,6 +1250,10 @@ const MonitorViewContent: React.FC = () => { }; const handleDeleteNode = (nodeId: string) => { + if (draftNodesRef.current.has(nodeId)) { + deleteDrafts([nodeId]); + return; + } removeNode(nodeId); }; @@ -1063,13 +1276,31 @@ const MonitorViewContent: React.FC = () => { const kind = type; const nodeId = generateName(kind); - const params = defaultParamsForKind(kind); - - // Cache the position for when the node appears in the pipeline - pendingNodePositions.current.set(nodeId, position); - - // Send to server immediately - addNode(nodeId, kind, params); + const params = draftDefaultParamsForKind(kind, nodeDefinitions); + + // Hold as a local draft if any required param has no default — + // avoids round-tripping a guaranteed-to-fail `addnode`. + const missing = computeMissingRequired(kind, params, nodeDefinitions); + if (missing.length > 0) { + setDraftNodes((prev) => { + const next = new Map(prev); + next.set(nodeId, { kind, params, position, missingRequired: missing }); + return next; + }); + newDraftSelectionRef.current.add(nodeId); + setRightPaneView('inspector'); + if (rightCollapsed) { + setRightCollapsed(false); + } + toast.info(`Configure ${missing.join(', ')} before this node is added to the pipeline`); + } else { + // Persist the drop coordinate so the post-`nodeadded` topology + // rebuild renders the new node where the user dropped it. + if (selectedSessionId) { + updateNodePosition(selectedSessionId, nodeId, position); + } + addNode(nodeId, kind, params); + } setType(null); }; @@ -1095,7 +1326,6 @@ const MonitorViewContent: React.FC = () => { ); const handleQuickDeleteSession = useCallback((sessionId: string) => { - // Store which session to delete and show confirmation modal setSessionToDelete(sessionId); }, []); @@ -1105,10 +1335,8 @@ const MonitorViewContent: React.FC = () => { setIsDeletingSession(true); try { - // Only tear down the preview when deleting the session that is - // actually being previewed. sessionToDelete can be any session - // from the sidebar; stopping the preview unconditionally would - // kill an unrelated active stream. + // Only tear down preview if it's for the session being deleted — + // sessionToDelete may be any session from the sidebar. if (sessionToDelete === selectedSessionId) { await handleStopPreview(); }