Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
716669b
fix(ui,sdk): defer addnode for nodes with unset required params
streamkit-devin Apr 26, 2026
cad7604
style(sdk): cargo fmt fixup
streamkit-devin Apr 26, 2026
ee0e801
fix(ui): include drafts in id collision check, allow editing required…
streamkit-devin Apr 26, 2026
b7dd031
fix(ui): mirror draft param edits to atom, recover from failed promotion
streamkit-devin Apr 26, 2026
25b9e40
fix(ui): guard re-promotion of draft nodes; address Devin Review find…
streamkit-devin Apr 26, 2026
918a75d
fix(ui): preserve dragged position of draft nodes across topology reb…
streamkit-devin Apr 26, 2026
d7c39d9
fix(ui): avoid stale-draft flash on session switch; trim leaked pendi…
streamkit-devin Apr 26, 2026
277643a
fix(ui): close stale-snapshot race in handleDraftParamChange
streamkit-devin Apr 26, 2026
05a1afb
fix(ui): close cross-tick promotion race; skip mount churn on session…
streamkit-devin Apr 26, 2026
787a068
fix(ui): clean up shadow refs for drafts deleted via React Flow Delet…
streamkit-devin Apr 26, 2026
095d14d
fix(ui): debounce draft promotion and preserve selection through rebu…
streamkit-devin Apr 26, 2026
74354b1
fix(ui): tighten draft node lifecycle (validation, atom cleanup, flus…
streamkit-devin Apr 26, 2026
84a2872
fix(ui): clear stale Jotai atoms on session switch for in-flight drafts
streamkit-devin Apr 26, 2026
9cbf828
fix(engine,ui): emit nodeadded only after engine confirms creation
streamer45 Apr 27, 2026
b31b1a0
fix(ui): promote auto-stub compositor layers when user edits them
streamer45 Apr 27, 2026
fbaa1e6
improvements
streamer45 Apr 28, 2026
ff69641
cleanup
streamer45 Apr 28, 2026
b224e24
Merge remote-tracking branch 'origin/main' into devin/1777193573-moni…
streamer45 Apr 28, 2026
cd81779
linting
streamer45 Apr 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,40 @@ the full architecture.
include a comment explaining the rationale.
- **UI tooling**: Use `bun install` / `bunx` / `bun run` — never npm or pnpm.

## Fix Root Causes, Not Symptoms

Prefer a clean change that takes longer over a brittle stack of patches that
ships sooner. If a feature requires defending against the same race in three
places, layering synchronous shadow refs over async state, or "preferring
draft over live" because two event sources disagree on timing — **stop and
reconsider the contract**, don't add a fourth patch.

Concrete signals you've crossed into workaround territory:

- You're adding a *timeout* to recover from a missing event.
- You need a synchronous shadow of state that already lives in an async store.
- You catch yourself writing "the X event doesn't actually mean X, it means
*attempted* X, so we also have to listen for Y to know if it really
happened."
- Tests are updated by adding sleeps or by relying on previously-broken
behavior (e.g. an invalid value being silently accepted).
- The root cause is in a different layer than the one you're editing, and
fixing it there would invalidate most of your patch.

When you spot this, surface the design issue to the user with a concrete
proposal — even if it's more invasive — *before* writing the patch. State
the tradeoff honestly: "this will take longer but produces something
durable; vs. this short-term fix has these specific brittleness costs."

Past incident worth remembering: the WebSocket `nodeadded` event used to fire
before plugin construction had even started. The UI accumulated 13 commits of
draft-state machinery (state-watchers, debounce timers, topology priority
hacks) trying to reconstruct "did the node actually get created?" from
out-of-band signals. The actual fix was a small server change — emit
`nodeadded` from the engine actor's success path instead of the WS handler —
which collapsed the UI back to the obvious cleanup. Code that exists to
paper over a broken contract should be deleted, not refined.

## Verification Commands

| Task | Command |
Expand Down
134 changes: 132 additions & 2 deletions apps/skit/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use crate::config::Config;
use crate::state::BroadcastEvent;
use opentelemetry::global;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use streamkit_api::{Event as ApiEvent, EventPayload, MessageType, Pipeline};
Expand Down Expand Up @@ -177,6 +177,16 @@ pub struct Session {
/// The handle to send control messages to the running DynamicEngine actor.
engine_handle: Arc<DynamicEngineHandle>,
pub pipeline: Arc<Mutex<Pipeline>>,
/// Node IDs that the WebSocket layer has accepted into the engine
/// actor but for which `NodeAdded`/`Failed` has not yet arrived.
/// `pipeline.nodes` only sees a node after the engine confirms
/// successful creation; the in-flight set covers the gap so a
/// second `addnode` for the same id (whether from the same or a
/// different client) is rejected at the handler instead of being
/// silently dropped by the actor's duplicate-id guard. Drained on
/// success (node-added forwarder), on failure (state forwarder
/// observes `Failed`), or when an in-flight node is removed.
pub creating_nodes: Arc<Mutex<HashSet<String>>>,
/// Timestamp when the session was created
pub created_at: SystemTime,
/// User/role who created this session (for permission filtering)
Expand All @@ -187,6 +197,46 @@ pub struct Session {
}

impl Session {
/// Reserves a node id for an in-flight `addnode`.
///
/// Atomically checks both the live pipeline snapshot and the
/// in-flight set, then inserts into the in-flight set. Returns
/// `Err` describing why the id is unavailable (already live, or
/// already being added). The reservation is drained by the
/// node-added forwarder on success, the state forwarder on a
/// non-`Creating` state transition, or by `release_node_id` on
/// remove/cancellation.
///
/// Lock order: `pipeline` first, then `creating_nodes`. The two
/// guards are held jointly across both reads to prevent a node
/// from slipping into `pipeline.nodes` (via the node-added
/// forwarder) between the two checks and being silently dropped
/// by the actor's duplicate-id guard later.
///
/// # Errors
///
/// Returns `Err` with a human-readable reason when the id is
/// already live or already in flight.
#[allow(clippy::significant_drop_tightening)] // joint lock is the point
pub async fn reserve_node_id(&self, node_id: &str) -> Result<(), String> {
let pipeline = self.pipeline.lock().await;
if pipeline.nodes.contains_key(node_id) {
return Err(format!("Node '{node_id}' already exists in the pipeline"));
}
let mut creating = self.creating_nodes.lock().await;
if creating.contains(node_id) {
return Err(format!("Node '{node_id}' is already being added"));
}
creating.insert(node_id.to_string());
Ok(())
}

/// Releases an in-flight reservation taken by `reserve_node_id`.
/// Used on explicit removal of a still-Creating node. Idempotent.
pub async fn release_node_id(&self, node_id: &str) {
self.creating_nodes.lock().await.remove(node_id);
}

/// Forwards a control message to this session's specific engine actor.
pub async fn send_control_message(&self, msg: EngineControlMessage) {
if let Err(e) = self.engine_handle.send_control(msg).await {
Expand Down Expand Up @@ -271,11 +321,27 @@ impl Session {
.await
.map_err(|e| format!("Failed to subscribe to stats updates: {e}"))?;

// Pre-allocate the in-flight set so the state and node-added
// forwarders can both reach it. `pipeline.nodes` only contains
// confirmed entries; this set fills the gap for accepted-but-
// not-yet-confirmed addnode requests.
let creating_nodes: Arc<Mutex<HashSet<String>>> = Arc::new(Mutex::new(HashSet::new()));

// Spawn task to forward state updates to WebSocket clients
let session_id_for_state = session_id.clone();
let event_tx_for_state = event_tx.clone();
let creating_nodes_for_state = creating_nodes.clone();
tokio::spawn(async move {
while let Some(update) = state_rx.recv().await {
// Drain the in-flight entry as soon as a non-Creating
// state arrives — a Failed transition means the engine
// gave up on this id, and a later non-Creating state
// (Ready/Running/Degraded) confirms the node is past
// creation. The node-added forwarder also drains on
// success; the second remove is a no-op.
if !matches!(update.state, NodeState::Creating) {
creating_nodes_for_state.lock().await.remove(&update.node_id);
}
let event = ApiEvent {
message_type: MessageType::Event,
correlation_id: None,
Expand Down Expand Up @@ -375,6 +441,69 @@ impl Session {
);
});

// Subscribe to node-added notifications from the engine and use
// them as the trigger for both updating `pipeline.nodes` and
// emitting the public `NodeAdded` event. Doing this here (and
// not in the WebSocket addnode handler) means clients only see
// `nodeadded` after the engine has confirmed the plugin's
// constructor and `initialize_node` returned Ok — never
// speculatively before the FFI call has even run. Failures
// surface as `NodeStateChanged { state: Failed }` via the
// existing state forwarder above.
//
// Subscribing here (vs earlier in `create`) is safe: the engine
// can't have any nodes until something sends `AddNode`, and the
// first `AddNode` can't arrive until `create` returns the
// handle to its caller.
let pipeline = Arc::new(Mutex::new(Pipeline::default()));
let mut node_added_rx = engine_handle
.subscribe_node_added()
.await
.map_err(|e| format!("Failed to subscribe to node-added updates: {e}"))?;
let session_id_for_node_added = session_id.clone();
let event_tx_for_node_added = event_tx.clone();
let pipeline_for_node_added = pipeline.clone();
let creating_nodes_for_node_added = creating_nodes.clone();
tokio::spawn(async move {
while let Some(notification) = node_added_rx.recv().await {
// Update the cached pipeline snapshot first, then
// broadcast — late subscribers (re-fetching the pipeline
// immediately after a `nodeadded` event) see a
// consistent view that already includes the new entry.
{
let mut pip = pipeline_for_node_added.lock().await;
pip.nodes.insert(
notification.node_id.clone(),
streamkit_api::Node {
kind: notification.kind.clone(),
params: notification.params.clone(),
state: None,
},
);
}
// The node is now visible in pipeline.nodes — drop the
// in-flight reservation so a future addnode for this id
// (after a removenode) is gated only by the live
// pipeline check.
creating_nodes_for_node_added.lock().await.remove(&notification.node_id);
let event = ApiEvent {
message_type: MessageType::Event,
correlation_id: None,
payload: EventPayload::NodeAdded {
session_id: session_id_for_node_added.clone(),
node_id: notification.node_id,
kind: notification.kind,
params: notification.params,
},
};
let _ = event_tx_for_node_added.send(BroadcastEvent::to_all(event));
}
tracing::debug!(
session_id = %session_id_for_node_added,
"Node-added forwarding task ended"
);
});

// Subscribe to telemetry events from the engine
let mut telemetry_rx = engine_handle
.subscribe_telemetry()
Expand Down Expand Up @@ -408,7 +537,8 @@ impl Session {
id: session_id,
name,
engine_handle: Arc::new(engine_handle),
pipeline: Arc::new(Mutex::new(Pipeline::default())),
pipeline,
creating_nodes,
created_at: SystemTime::now(),
created_by,
#[cfg(feature = "moq")]
Expand Down
49 changes: 22 additions & 27 deletions apps/skit/src/websocket_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,35 +525,26 @@ async fn handle_add_node(
});
}

{
let mut pipeline = session.pipeline.lock().await;
if pipeline.nodes.contains_key(&node_id) {
return Some(ResponsePayload::Error {
message: format!("Node '{node_id}' already exists in the pipeline"),
});
}
pipeline.nodes.insert(
node_id.clone(),
streamkit_api::Node { kind: kind.clone(), params: params.clone(), state: None },
);
} // Lock released here

// Broadcast event to all clients
let event = ApiEvent {
message_type: MessageType::Event,
correlation_id: None,
payload: EventPayload::NodeAdded {
session_id: session.id.clone(),
node_id: node_id.clone(),
kind: kind.clone(),
params: params.clone(),
},
};
if let Err(e) = app_state.event_tx.send(BroadcastEvent::to_all(event)) {
error!("Failed to broadcast NodeAdded event: {}", e);
// Reject duplicates atomically against both the live pipeline and
// the in-flight set. pipeline.nodes only contains confirmed
// entries; creating_nodes covers the gap between WS dispatch and
// the engine's `NodeAdded`/`Failed` reply. Without the in-flight
// check, two clients (or one retrying client) could both pass the
// pipeline check, both reach the actor, and the second would be
// silently dropped by the actor's duplicate-id guard with no
// observable signal to the client.
if let Err(message) = session.reserve_node_id(&node_id).await {
return Some(ResponsePayload::Error { message });
}

// Now safe to do async operations without holding session_manager lock
// Forward to the engine. We deliberately do NOT insert into
// `pipeline.nodes` or broadcast `NodeAdded` here — both are
// emitted by the session-level node-added forwarder (see
// `session.rs`) once the engine confirms the plugin's constructor
// and `initialize_node` returned Ok. This makes the public
// `nodeadded` event mean what it says: a node that exists. The
// in-flight reservation taken above is drained by either the
// node-added forwarder (success) or the state forwarder (Failed).
let control_msg = EngineControlMessage::AddNode { node_id, kind, params };
session.send_control_message(control_msg).await;
Some(ResponsePayload::Success)
Expand Down Expand Up @@ -597,6 +588,10 @@ async fn handle_remove_node(
pipeline.nodes.shift_remove(&node_id);
pipeline.connections.retain(|conn| conn.from_node != node_id && conn.to_node != node_id);
}
// Release any in-flight reservation: removing a node mid-creation
// would otherwise leave the id wedged until the state forwarder
// observed a Failed transition for the cancelled creation.
session.release_node_id(&node_id).await;

// Broadcast event to all clients
let event = ApiEvent {
Expand Down
Loading
Loading