From dfd167c55bb36d53776c9c52a95946c213268fc1 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Mon, 17 Nov 2025 18:29:06 -0800 Subject: [PATCH] fix(gas): fix batch listen, fix history for graceful signal send in workflows --- engine/packages/api-peer/src/actors/delete.rs | 12 +-- engine/packages/engine/src/util/wf/mod.rs | 13 ++- .../gasoline/src/builder/common/signal.rs | 33 +++++++- .../gasoline/src/builder/workflow/signal.rs | 71 +++++++++++++++-- engine/packages/gasoline/src/ctx/workflow.rs | 64 ++++++++++----- engine/packages/gasoline/src/db/debug.rs | 2 +- engine/packages/gasoline/src/db/kv/debug.rs | 4 + .../gasoline/src/db/kv/keys/history.rs | 2 +- engine/packages/gasoline/src/db/kv/mod.rs | 24 +++--- .../packages/gasoline/src/history/cursor.rs | 9 +++ .../guard/src/routing/pegboard_gateway.rs | 11 +-- .../packages/pegboard-runner/src/ping_task.rs | 2 +- .../packages/pegboard-serverless/src/lib.rs | 38 +++++---- engine/packages/pegboard/src/lib.rs | 1 + .../pegboard/src/workflows/actor/mod.rs | 2 +- .../packages/pegboard/src/workflows/runner.rs | 35 +++----- .../rocksdb/transaction_conflict_tracker.rs | 2 +- pnpm-lock.yaml | 79 ++++++------------- 18 files changed, 236 insertions(+), 168 deletions(-) diff --git a/engine/packages/api-peer/src/actors/delete.rs b/engine/packages/api-peer/src/actors/delete.rs index afb5c3486c..865e87ea14 100644 --- a/engine/packages/api-peer/src/actors/delete.rs +++ b/engine/packages/api-peer/src/actors/delete.rs @@ -67,20 +67,14 @@ pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result .signal(pegboard::workflows::actor::Destroy {}) .to_workflow::() .tag("actor_id", path.actor_id) + .graceful_not_found() .send() - .await; - - if let Some(WorkflowError::WorkflowNotFound) = res - .as_ref() - .err() - .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) - { + .await?; + if res.is_none() { tracing::warn!( actor_id=?path.actor_id, "actor workflow not found, likely already stopped" ); - } else { - res?; } Ok(DeleteResponse {}) diff --git a/engine/packages/engine/src/util/wf/mod.rs b/engine/packages/engine/src/util/wf/mod.rs index 2c9e04c4c4..306d9e6961 100644 --- a/engine/packages/engine/src/util/wf/mod.rs +++ b/engine/packages/engine/src/util/wf/mod.rs @@ -468,22 +468,19 @@ pub async fn print_history( } } EventData::Signals(data) => { - // Indent - print!("{}{c} ", " ".repeat(indent)); - for ((signal_id, name), body) in data.signal_ids.iter().zip(&data.names).zip(&data.bodies) { // Indent - print!("{}{c} - ", " ".repeat(indent)); + print!("{}{c} - ", " ".repeat(indent)); println!("{}", event_style.apply_to(name)); - print!("{}{c} ", " ".repeat(indent)); + print!("{}{c} ", " ".repeat(indent)); println!("id {}", style(signal_id).green()); if !exclude_json { // Indent - print!("{}{c} ", " ".repeat(indent)); + print!("{}{c} ", " ".repeat(indent)); println!( "body {}", @@ -590,7 +587,7 @@ pub fn print_event_name(event: &Event) { ), EventData::Signal(signal) => print!( "{} {}", - style.apply_to("signal receive").bold(), + style.apply_to("signal").bold(), style.apply_to(&signal.name) ), EventData::SignalSend(signal_send) => print!( @@ -626,7 +623,7 @@ pub fn print_event_name(event: &Event) { EventData::Branch => print!("{}", style.apply_to("branch").bold()), EventData::Signals(signal) => print!( "{} {}", - style.apply_to("signal receive").bold(), + style.apply_to("signal recv").bold(), style.apply_to(&signal.names.len()) ), } diff --git a/engine/packages/gasoline/src/builder/common/signal.rs b/engine/packages/gasoline/src/builder/common/signal.rs index 86f03fa878..7f1b4d2692 100644 --- a/engine/packages/gasoline/src/builder/common/signal.rs +++ b/engine/packages/gasoline/src/builder/common/signal.rs @@ -18,6 +18,7 @@ pub struct SignalBuilder { to_workflow_name: Option<&'static str>, to_workflow_id: Option, tags: serde_json::Map, + graceful_not_found: bool, error: Option, } @@ -37,6 +38,7 @@ impl SignalBuilder { to_workflow_name: None, to_workflow_id: None, tags: serde_json::Map::new(), + graceful_not_found: false, error: from_workflow.then_some(BuilderError::CannotDispatchFromOpInWorkflow), } } @@ -102,8 +104,21 @@ impl SignalBuilder { self } + /// Does not throw an error when the signal target is not found and instead returns `Ok(None)`. + pub fn graceful_not_found(mut self) -> Self { + if self.error.is_some() { + return self; + } + + self.graceful_not_found = true; + + self + } + + /// Returns the signal id that was just sent. Unless `graceful_not_found` is set and the workflow does not + /// exist, will always return `Some`. #[tracing::instrument(skip_all, fields(signal_name=T::NAME, signal_id))] - pub async fn send(self) -> Result { + pub async fn send(self) -> Result> { if let Some(err) = self.error { return Err(err.into()); } @@ -132,8 +147,18 @@ impl SignalBuilder { let workflow_id = self .db .find_workflow(workflow_name, &serde_json::Value::Object(self.tags)) - .await? - .ok_or(WorkflowError::WorkflowNotFound)?; + .await?; + + let Some(workflow_id) = workflow_id else { + // Handle signal target not found gracefully + if self.graceful_not_found { + tracing::debug!("signal target not found"); + + return Ok(None); + } else { + return Err(WorkflowError::WorkflowNotFound.into()); + } + }; self.db .publish_signal(self.ray_id, workflow_id, signal_id, T::NAME, &input_val) @@ -188,6 +213,6 @@ impl SignalBuilder { ], ); - Ok(signal_id) + Ok(Some(signal_id)) } } diff --git a/engine/packages/gasoline/src/builder/workflow/signal.rs b/engine/packages/gasoline/src/builder/workflow/signal.rs index 6c304ad841..bd476a4f4b 100644 --- a/engine/packages/gasoline/src/builder/workflow/signal.rs +++ b/engine/packages/gasoline/src/builder/workflow/signal.rs @@ -6,8 +6,13 @@ use rivet_util::Id; use serde::Serialize; use crate::{ - builder::BuilderError, ctx::WorkflowCtx, error::WorkflowError, history::cursor::HistoryResult, - metrics, signal::Signal, workflow::Workflow, + builder::BuilderError, + ctx::WorkflowCtx, + error::WorkflowError, + history::{cursor::HistoryResult, event::EventType, removed::Signal as RemovedSignal}, + metrics, + signal::Signal, + workflow::Workflow, }; pub struct SignalBuilder<'a, T: Signal + Serialize> { @@ -18,6 +23,7 @@ pub struct SignalBuilder<'a, T: Signal + Serialize> { to_workflow_name: Option<&'static str>, to_workflow_id: Option, tags: serde_json::Map, + graceful_not_found: bool, error: Option, } @@ -31,6 +37,7 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> { to_workflow_name: None, to_workflow_id: None, tags: serde_json::Map::new(), + graceful_not_found: false, error: None, } } @@ -85,14 +92,39 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> { self } + /// Does not throw an error when the signal target is not found and instead returns `Ok(None)`. + pub fn graceful_not_found(mut self) -> Self { + if self.error.is_some() { + return self; + } + + self.graceful_not_found = true; + + self + } + + /// Returns the signal id that was just sent. Unless `graceful_not_found` is set and the workflow does not + /// exist, will always return `Some`. #[tracing::instrument(skip_all, fields(signal_name=T::NAME, signal_id))] - pub async fn send(self) -> Result { + pub async fn send(self) -> Result> { self.ctx.check_stop()?; if let Some(err) = self.error { return Err(err.into()); } + // Check if this signal is being replayed and previously had no target (will have a removed event) + if self.graceful_not_found && self.ctx.cursor().is_removed() { + self.ctx.cursor().compare_removed::>()?; + + tracing::debug!("replaying gracefully not found signal dispatch"); + + // Move to next event + self.ctx.cursor_mut().inc(); + + return Ok(None); + } + // Error for version mismatch. This is done in the builder instead of in `VersionedWorkflowCtx` to // defer the error. self.ctx.compare_version("signal", self.version)?; @@ -105,7 +137,7 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> { // Signal sent before let signal_id = if let HistoryResult::Event(signal) = history_res { - tracing::debug!("replaying signal dispatch",); + tracing::debug!("replaying signal dispatch"); signal.signal_id } @@ -133,8 +165,33 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> { .ctx .db() .find_workflow(workflow_name, &serde_json::Value::Object(self.tags)) - .await? - .ok_or(WorkflowError::WorkflowNotFound)?; + .await?; + + let Some(workflow_id) = workflow_id else { + // Handle signal target not found gracefully + if self.graceful_not_found { + tracing::debug!("signal target not found"); + + // Insert removed event + self.ctx + .db() + .commit_workflow_removed_event( + self.ctx.workflow_id(), + &location, + EventType::SignalSend, + Some(T::NAME), + self.ctx.loop_location(), + ) + .await?; + + // Move to next event + self.ctx.cursor_mut().update(&location); + + return Ok(None); + } else { + return Err(WorkflowError::WorkflowNotFound.into()); + } + }; self.ctx .db() @@ -222,6 +279,6 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> { // Move to next event self.ctx.cursor_mut().update(&location); - Ok(signal_id) + Ok(Some(signal_id)) } } diff --git a/engine/packages/gasoline/src/ctx/workflow.rs b/engine/packages/gasoline/src/ctx/workflow.rs index d2c567659e..93470ebf8b 100644 --- a/engine/packages/gasoline/src/ctx/workflow.rs +++ b/engine/packages/gasoline/src/ctx/workflow.rs @@ -819,7 +819,7 @@ impl WorkflowCtx { let loop_location = self.cursor.current_location_for(&history_res); // Loop existed before - let (mut iteration, mut state, output, mut loop_event_commit_fut) = + let (mut iteration, mut state, output, mut loop_event_init_fut) = if let HistoryResult::Event(loop_event) = history_res { let state = loop_event.parse_state()?; let output = loop_event.parse_output()?; @@ -838,7 +838,7 @@ impl WorkflowCtx { let nested_loop_location = self.loop_location().cloned(); // This future is deferred until later for parallelization - let loop_event_commit_fut = async move { + let loop_event_init_fut = async move { db2.upsert_workflow_loop_event( workflow_id, &name, @@ -852,7 +852,7 @@ impl WorkflowCtx { .await }; - (0, state, None, Some(loop_event_commit_fut)) + (0, state, None, Some(loop_event_init_fut)) }; // Create a branch for the loop event @@ -869,6 +869,9 @@ impl WorkflowCtx { else { tracing::debug!("running loop"); + // Used to defer loop upsertion for parallelization + let mut loop_event_upsert_fut = None; + loop { self.check_stop()?; @@ -898,14 +901,23 @@ impl WorkflowCtx { // NOTE: Great care has been taken to optimize this function. This join allows multiple // txns to run simultaneously instead of in series but is hard to read. // - // 1. First, but not necessarily chronologically first because its parallelized, we + // 1. First (but not necessarily chronologically first because its parallelized), we // commit the loop event. This only happens on the first iteration of the loop // 2. Second, we commit the branch event for the current iteration - // 3. Last, we run the user's loop code - let (loop_event_commit_res, branch_commit_res, loop_res) = tokio::join!( + // 3. Third, we run the user's loop code + // 4. Last, if we have to upsert the loop event, we save the future and process it in the + // next iteration of the loop as part of this join + let (loop_event_commit_res, loop_event_upsert_res, branch_commit_res, loop_res) = tokio::join!( async { - if let Some(loop_event_commit_fut) = loop_event_commit_fut.take() { - loop_event_commit_fut.await + if let Some(loop_event_init_fut) = loop_event_init_fut.take() { + loop_event_init_fut.await + } else { + Ok(()) + } + }, + async { + if let Some(loop_event_upsert_fut) = loop_event_upsert_fut.take() { + loop_event_upsert_fut.await } else { Ok(()) } @@ -928,6 +940,7 @@ impl WorkflowCtx { ); loop_event_commit_res?; + loop_event_upsert_res?; branch_commit_res?; // Run loop @@ -936,23 +949,33 @@ impl WorkflowCtx { let dt2 = start_instant2.elapsed().as_secs_f64(); iteration += 1; - let state_val = serde_json::value::to_raw_value(&state) - .map_err(WorkflowError::SerializeLoopOutput)?; - // Commit workflow state to db if iteration % LOOP_ITERS_PER_COMMIT == 0 { - self.db - .upsert_workflow_loop_event( - self.workflow_id, - &self.name, + let state_val = serde_json::value::to_raw_value(&state) + .map_err(WorkflowError::SerializeLoopOutput)?; + + // Clone data to move into future + let loop_location = loop_location.clone(); + let db2 = self.db.clone(); + let workflow_id = self.workflow_id; + let name = self.name.clone(); + let version = self.version; + let nested_loop_location = self.loop_location().cloned(); + + // Defer upsertion to next iteration so it runs in parallel + loop_event_upsert_fut = Some(async move { + db2.upsert_workflow_loop_event( + workflow_id, + &name, &loop_location, - self.version, + version, iteration, &state_val, None, - self.loop_location(), + nested_loop_location.as_ref(), ) - .await?; + .await + }); } anyhow::Ok((dt2, None)) @@ -966,7 +989,8 @@ impl WorkflowCtx { let output_val = serde_json::value::to_raw_value(&res) .map_err(WorkflowError::SerializeLoopOutput)?; - // Commit loop output and final state to db + // Commit loop output and final state to db. Note that we don't defer this because + // there will be no more loop iterations afterwards. self.db .upsert_workflow_loop_event( self.workflow_id, @@ -1338,7 +1362,7 @@ impl WorkflowCtx { // Existing event if self.cursor.compare_removed::()? { - tracing::debug!("skipping removed step",); + tracing::debug!("skipping removed step"); } // New "removed" event else { diff --git a/engine/packages/gasoline/src/db/debug.rs b/engine/packages/gasoline/src/db/debug.rs index 0dba696a81..5b37dabe46 100644 --- a/engine/packages/gasoline/src/db/debug.rs +++ b/engine/packages/gasoline/src/db/debug.rs @@ -122,7 +122,7 @@ impl std::fmt::Display for EventData { unique_names.sort(); unique_names.dedup(); - write!(f, "signals {:?}", unique_names.join(", ")) + write!(f, "signal receive {:?}", unique_names.join(", ")) } } } diff --git a/engine/packages/gasoline/src/db/kv/debug.rs b/engine/packages/gasoline/src/db/kv/debug.rs index 21c6fbe7d3..28ae6793f5 100644 --- a/engine/packages/gasoline/src/db/kv/debug.rs +++ b/engine/packages/gasoline/src/db/kv/debug.rs @@ -932,6 +932,10 @@ impl DatabaseDebug for DatabaseKv { current_event.indexed_input_chunks.get_mut(key.index) { input_chunks.push(entry); + } else { + current_event + .indexed_input_chunks + .insert(key.index, vec![entry]); } } diff --git a/engine/packages/gasoline/src/db/kv/keys/history.rs b/engine/packages/gasoline/src/db/kv/keys/history.rs index 7650ad4d63..784797383e 100644 --- a/engine/packages/gasoline/src/db/kv/keys/history.rs +++ b/engine/packages/gasoline/src/db/kv/keys/history.rs @@ -1642,7 +1642,7 @@ pub mod insert { tx, workflow_id, location, - EventType::Signal, + EventType::Signals, version, create_ts, ) diff --git a/engine/packages/gasoline/src/db/kv/mod.rs b/engine/packages/gasoline/src/db/kv/mod.rs index 6d3777c3be..27dad968a9 100644 --- a/engine/packages/gasoline/src/db/kv/mod.rs +++ b/engine/packages/gasoline/src/db/kv/mod.rs @@ -1505,6 +1505,10 @@ impl Database for DatabaseKv { .get_mut(key.index) { input_chunks.push(entry); + } else { + current_event + .indexed_input_chunks + .insert(key.index, vec![entry]); } } @@ -1938,7 +1942,7 @@ impl Database for DatabaseKv { async move { // Fetch signals from all streams at the same time - let signals = futures_util::stream::iter(owned_filter.clone()) + let mut signals = futures_util::stream::iter(owned_filter.clone()) .map(|signal_name| { let pending_signal_subspace = self.subspace.subspace( &keys::workflow::PendingSignalKey::subspace( @@ -1972,6 +1976,9 @@ impl Database for DatabaseKv { .await?; if !signals.is_empty() { + // Sort by ts + signals.sort_by_key(|key| key.ts); + let now = rivet_util::timestamp::now(); // Insert history event @@ -1984,13 +1991,14 @@ impl Database for DatabaseKv { now, )?; - let mut signals = - futures_util::stream::iter(signals.into_iter().enumerate()) + let signals = + futures_util::stream::iter(signals.into_iter().take(limit).enumerate()) .map(|(index, key)| { let tx = tx.clone(); async move { let ack_ts_key = keys::signal::AckTsKey::new(key.signal_id); - let packed_key = tx.pack(&key); + + let packed_key = self.subspace.pack(&key); // Ack signal tx.add_conflict_range( @@ -2000,7 +2008,7 @@ impl Database for DatabaseKv { )?; tx.set( &self.subspace.pack(&ack_ts_key), - &ack_ts_key.serialize(rivet_util::timestamp::now())?, + &ack_ts_key.serialize(now)?, ); update_metric( @@ -2056,11 +2064,7 @@ impl Database for DatabaseKv { .try_collect::>() .await?; - // Sort by ts - signals.sort_by_key(|key| key.create_ts); - - // Apply limit - Ok(signals.into_iter().take(limit).collect()) + Ok(signals) } // No signals found else { diff --git a/engine/packages/gasoline/src/history/cursor.rs b/engine/packages/gasoline/src/history/cursor.rs index a098c351be..79fc6579eb 100644 --- a/engine/packages/gasoline/src/history/cursor.rs +++ b/engine/packages/gasoline/src/history/cursor.rs @@ -475,6 +475,15 @@ impl Cursor { } } + // Helper function for signal functionality + pub fn is_removed(&self) -> bool { + let Some(event) = self.current_event() else { + return false; + }; + + matches!(&event.data, EventData::Removed(_)) + } + /// Returns `true` if the current event is being replayed. pub fn compare_removed(&self) -> WorkflowResult { let Some(event) = self.current_event() else { diff --git a/engine/packages/guard/src/routing/pegboard_gateway.rs b/engine/packages/guard/src/routing/pegboard_gateway.rs index 22054a9ac2..ee542d59fd 100644 --- a/engine/packages/guard/src/routing/pegboard_gateway.rs +++ b/engine/packages/guard/src/routing/pegboard_gateway.rs @@ -195,20 +195,15 @@ async fn route_request_inner( let res = ctx.signal(pegboard::workflows::actor::Wake {}) .to_workflow_id(actor.workflow_id) + .graceful_not_found() .send() - .await; + .await?; - if let Some(WorkflowError::WorkflowNotFound) = res - .as_ref() - .err() - .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) - { + if res.is_none() { tracing::warn!( ?actor_id, "actor workflow not found for rewake" ); - } else { - res?; } } else { tracing::warn!("actor retried waking 16 times, has not yet started"); diff --git a/engine/packages/pegboard-runner/src/ping_task.rs b/engine/packages/pegboard-runner/src/ping_task.rs index 68d48d07aa..fa23376656 100644 --- a/engine/packages/pegboard-runner/src/ping_task.rs +++ b/engine/packages/pegboard-runner/src/ping_task.rs @@ -26,7 +26,7 @@ pub async fn task( async fn update_runner_ping(ctx: &StandaloneCtx, conn: &Conn) -> Result<()> { let Some(wf) = ctx - .workflow::(conn.workflow_id) + .workflow::(conn.workflow_id) .get() .await? else { diff --git a/engine/packages/pegboard-serverless/src/lib.rs b/engine/packages/pegboard-serverless/src/lib.rs index 19d9c9386c..336b9586c2 100644 --- a/engine/packages/pegboard-serverless/src/lib.rs +++ b/engine/packages/pegboard-serverless/src/lib.rs @@ -480,25 +480,33 @@ async fn outbound_handler( async fn drain_runner(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> { let res = ctx - .signal(pegboard::workflows::runner::Stop { + .signal(pegboard::workflows::runner2::Stop { reset_actor_rescheduling: true, }) - .to_workflow::() + .to_workflow::() .tag("runner_id", runner_id) + .graceful_not_found() .send() - .await; - - if let Some(WorkflowError::WorkflowNotFound) = res - .as_ref() - .err() - .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) - { - tracing::warn!( - ?runner_id, - "runner workflow not found, likely already stopped" - ); - } else { - res?; + .await?; + + if res.is_none() { + // Retry with old runner wf + let res = ctx + .signal(pegboard::workflows::runner::Stop { + reset_actor_rescheduling: true, + }) + .to_workflow::() + .tag("runner_id", runner_id) + .graceful_not_found() + .send() + .await?; + + if res.is_none() { + tracing::warn!( + ?runner_id, + "runner workflow not found, likely already stopped" + ); + } } Ok(()) diff --git a/engine/packages/pegboard/src/lib.rs b/engine/packages/pegboard/src/lib.rs index 1574ff4503..d6f02fdbc9 100644 --- a/engine/packages/pegboard/src/lib.rs +++ b/engine/packages/pegboard/src/lib.rs @@ -15,6 +15,7 @@ pub fn registry() -> WorkflowResult { let mut registry = Registry::new(); registry.register_workflow::()?; registry.register_workflow::()?; + registry.register_workflow::()?; Ok(registry) } diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index 4f555e3a5d..c90f614051 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -3,7 +3,7 @@ use gas::prelude::*; use rivet_runner_protocol as protocol; use rivet_types::actors::CrashPolicy; -use crate::{errors, workflows::runner::AllocatePendingActorsInput}; +use crate::{errors, workflows::runner2::AllocatePendingActorsInput}; mod destroy; mod keys; diff --git a/engine/packages/pegboard/src/workflows/runner.rs b/engine/packages/pegboard/src/workflows/runner.rs index a0a4eb0276..41739b4b26 100644 --- a/engine/packages/pegboard/src/workflows/runner.rs +++ b/engine/packages/pegboard/src/workflows/runner.rs @@ -164,19 +164,14 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> }) .to_workflow::() .tag("actor_id", &actor_id) + .graceful_not_found() .send() - .await; - - if let Some(WorkflowError::WorkflowNotFound) = - res.as_ref().err().and_then(|x| { - x.chain().find_map(|x| x.downcast_ref::()) - }) { + .await?; + if res.is_none() { tracing::warn!( ?actor_id, "actor workflow not found, likely already stopped" ); - } else { - res?; } } @@ -257,20 +252,14 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> }) .to_workflow::() .tag("actor_id", actor_id) + .graceful_not_found() .send() - .await; - - if let Some(WorkflowError::WorkflowNotFound) = res - .as_ref() - .err() - .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) - { + .await?; + if res.is_none() { tracing::warn!( ?actor_id, "actor workflow not found, likely already stopped" ); - } else { - res?; } } else { let index = ctx @@ -356,20 +345,14 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> }) .to_workflow::() .tag("actor_id", actor_id) + .graceful_not_found() .send() - .await; - - if let Some(WorkflowError::WorkflowNotFound) = res - .as_ref() - .err() - .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) - { + .await?; + if res.is_none() { tracing::warn!( ?actor_id, "actor workflow not found, likely already stopped" ); - } else { - res?; } } diff --git a/engine/packages/universaldb/src/driver/rocksdb/transaction_conflict_tracker.rs b/engine/packages/universaldb/src/driver/rocksdb/transaction_conflict_tracker.rs index 988c166a39..8ef61500d5 100644 --- a/engine/packages/universaldb/src/driver/rocksdb/transaction_conflict_tracker.rs +++ b/engine/packages/universaldb/src/driver/rocksdb/transaction_conflict_tracker.rs @@ -64,7 +64,7 @@ impl TransactionConflictTracker { for (cr2_start, cr2_end, cr2_type) in &txn2.conflict_ranges { // Check conflict ranges overlap if cr1_start < cr2_end && cr2_start < cr1_end && cr1_type != cr2_type { - tracing::info!( + tracing::debug!( ?cr1_start, ?cr1_end, ?cr1_type, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b4115199e7..d8a3d9beb1 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1995,7 +1995,7 @@ importers: version: 3.13.12(react-dom@19.1.1(react@19.1.1))(react@19.1.1) '@uiw/codemirror-extensions-basic-setup': specifier: ^4.25.1 - version: 4.25.1(@codemirror/autocomplete@6.19.0)(@codemirror/commands@6.8.1)(@codemirror/language@6.11.3)(@codemirror/lint@6.9.0)(@codemirror/search@6.5.11)(@codemirror/state@6.5.2)(@codemirror/view@6.38.2) + version: 4.25.1(@codemirror/autocomplete@6.19.0)(@codemirror/commands@6.9.0)(@codemirror/language@6.11.3)(@codemirror/lint@6.9.0)(@codemirror/search@6.5.11)(@codemirror/state@6.5.2)(@codemirror/view@6.38.2) '@uiw/codemirror-theme-github': specifier: ^4.25.1 version: 4.25.1(@codemirror/language@6.11.3)(@codemirror/state@6.5.2)(@codemirror/view@6.38.2) @@ -2451,13 +2451,13 @@ importers: version: 6.0.1 '@mdx-js/loader': specifier: ^3.1.1 - version: 3.1.1(webpack@5.101.3) + version: 3.1.1(webpack@5.101.3(esbuild@0.25.9)) '@mdx-js/react': specifier: ^3.1.1 version: 3.1.1(@types/react@19.2.2)(react@19.1.1) '@next/mdx': specifier: ^15.5.2 - version: 15.5.2(@mdx-js/loader@3.1.1(webpack@5.101.3))(@mdx-js/react@3.1.1(@types/react@19.2.2)(react@19.1.1)) + version: 15.5.2(@mdx-js/loader@3.1.1(webpack@5.101.3(esbuild@0.25.9)))(@mdx-js/react@3.1.1(@types/react@19.2.2)(react@19.1.1)) '@next/third-parties': specifier: latest version: 16.0.3(next@15.5.2(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(babel-plugin-react-compiler@1.0.0)(react-dom@19.1.1(react@19.1.1))(react@19.1.1)(sass@1.93.2))(react@19.1.1) @@ -2647,7 +2647,7 @@ importers: version: 13.0.2(eslint@8.26.0)(typescript@5.9.2) file-loader: specifier: ^6.2.0 - version: 6.2.0(webpack@5.101.3) + version: 6.2.0(webpack@5.101.3(esbuild@0.25.9)) prettier: specifier: ^2.8.8 version: 2.8.8 @@ -16226,12 +16226,12 @@ snapshots: '@marijn/find-cluster-break@1.0.2': {} - '@mdx-js/loader@3.1.1(webpack@5.101.3)': + '@mdx-js/loader@3.1.1(webpack@5.101.3(esbuild@0.25.9))': dependencies: '@mdx-js/mdx': 3.1.1 source-map: 0.7.6 optionalDependencies: - webpack: 5.101.3 + webpack: 5.101.3(esbuild@0.25.9) transitivePeerDependencies: - supports-color @@ -16409,11 +16409,11 @@ snapshots: dependencies: glob: 7.1.7 - '@next/mdx@15.5.2(@mdx-js/loader@3.1.1(webpack@5.101.3))(@mdx-js/react@3.1.1(@types/react@19.2.2)(react@19.1.1))': + '@next/mdx@15.5.2(@mdx-js/loader@3.1.1(webpack@5.101.3(esbuild@0.25.9)))(@mdx-js/react@3.1.1(@types/react@19.2.2)(react@19.1.1))': dependencies: source-map: 0.7.6 optionalDependencies: - '@mdx-js/loader': 3.1.1(webpack@5.101.3) + '@mdx-js/loader': 3.1.1(webpack@5.101.3(esbuild@0.25.9)) '@mdx-js/react': 3.1.1(@types/react@19.2.2)(react@19.1.1) '@next/swc-darwin-arm64@15.4.5': @@ -18640,6 +18640,16 @@ snapshots: '@codemirror/state': 6.5.2 '@codemirror/view': 6.38.2 + '@uiw/codemirror-extensions-basic-setup@4.25.1(@codemirror/autocomplete@6.19.0)(@codemirror/commands@6.9.0)(@codemirror/language@6.11.3)(@codemirror/lint@6.9.0)(@codemirror/search@6.5.11)(@codemirror/state@6.5.2)(@codemirror/view@6.38.2)': + dependencies: + '@codemirror/autocomplete': 6.19.0 + '@codemirror/commands': 6.9.0 + '@codemirror/language': 6.11.3 + '@codemirror/lint': 6.9.0 + '@codemirror/search': 6.5.11 + '@codemirror/state': 6.5.2 + '@codemirror/view': 6.38.2 + '@uiw/codemirror-theme-github@4.25.1(@codemirror/language@6.11.3)(@codemirror/state@6.5.2)(@codemirror/view@6.38.2)': dependencies: '@uiw/codemirror-themes': 4.25.1(@codemirror/language@6.11.3)(@codemirror/state@6.5.2)(@codemirror/view@6.38.2) @@ -20702,7 +20712,7 @@ snapshots: eslint: 8.26.0 eslint-import-resolver-node: 0.3.9 eslint-import-resolver-typescript: 2.7.1(eslint-plugin-import@2.32.0(eslint@8.26.0))(eslint@8.26.0) - eslint-plugin-import: 2.32.0(@typescript-eslint/parser@5.62.0(eslint@8.26.0)(typescript@5.9.2))(eslint-import-resolver-typescript@2.7.1)(eslint@8.26.0) + eslint-plugin-import: 2.32.0(@typescript-eslint/parser@5.62.0(eslint@8.26.0)(typescript@5.9.2))(eslint-import-resolver-typescript@2.7.1(eslint-plugin-import@2.32.0(eslint@8.26.0))(eslint@8.26.0))(eslint@8.26.0) eslint-plugin-jsx-a11y: 6.10.2(eslint@8.26.0) eslint-plugin-react: 7.37.5(eslint@8.26.0) eslint-plugin-react-hooks: 4.6.2(eslint@8.26.0) @@ -20724,7 +20734,7 @@ snapshots: dependencies: debug: 4.4.1 eslint: 8.26.0 - eslint-plugin-import: 2.32.0(@typescript-eslint/parser@5.62.0(eslint@8.26.0)(typescript@5.9.2))(eslint-import-resolver-typescript@2.7.1)(eslint@8.26.0) + eslint-plugin-import: 2.32.0(@typescript-eslint/parser@5.62.0(eslint@8.26.0)(typescript@5.9.2))(eslint-import-resolver-typescript@2.7.1(eslint-plugin-import@2.32.0(eslint@8.26.0))(eslint@8.26.0))(eslint@8.26.0) glob: 7.2.3 is-glob: 4.0.3 resolve: 1.22.10 @@ -20743,7 +20753,7 @@ snapshots: transitivePeerDependencies: - supports-color - eslint-plugin-import@2.32.0(@typescript-eslint/parser@5.62.0(eslint@8.26.0)(typescript@5.9.2))(eslint-import-resolver-typescript@2.7.1)(eslint@8.26.0): + eslint-plugin-import@2.32.0(@typescript-eslint/parser@5.62.0(eslint@8.26.0)(typescript@5.9.2))(eslint-import-resolver-typescript@2.7.1(eslint-plugin-import@2.32.0(eslint@8.26.0))(eslint@8.26.0))(eslint@8.26.0): dependencies: '@rtsao/scc': 1.1.0 array-includes: 3.1.9 @@ -21242,11 +21252,11 @@ snapshots: dependencies: flat-cache: 3.2.0 - file-loader@6.2.0(webpack@5.101.3): + file-loader@6.2.0(webpack@5.101.3(esbuild@0.25.9)): dependencies: loader-utils: 2.0.4 schema-utils: 3.3.0 - webpack: 5.101.3 + webpack: 5.101.3(esbuild@0.25.9) file-saver@2.0.5: {} @@ -25777,16 +25787,6 @@ snapshots: webpack: 5.101.3(esbuild@0.25.9) optionalDependencies: esbuild: 0.25.9 - optional: true - - terser-webpack-plugin@5.3.14(webpack@5.101.3): - dependencies: - '@jridgewell/trace-mapping': 0.3.31 - jest-worker: 27.5.1 - schema-utils: 4.3.3 - serialize-javascript: 6.0.2 - terser: 5.44.0 - webpack: 5.101.3 terser@5.44.0: dependencies: @@ -26921,38 +26921,6 @@ snapshots: webpack-virtual-modules@0.6.2: {} - webpack@5.101.3: - dependencies: - '@types/eslint-scope': 3.7.7 - '@types/estree': 1.0.8 - '@types/json-schema': 7.0.15 - '@webassemblyjs/ast': 1.14.1 - '@webassemblyjs/wasm-edit': 1.14.1 - '@webassemblyjs/wasm-parser': 1.14.1 - acorn: 8.15.0 - acorn-import-phases: 1.0.4(acorn@8.15.0) - browserslist: 4.26.3 - chrome-trace-event: 1.0.4 - enhanced-resolve: 5.18.3 - es-module-lexer: 1.7.0 - eslint-scope: 5.1.1 - events: 3.3.0 - glob-to-regexp: 0.4.1 - graceful-fs: 4.2.11 - json-parse-even-better-errors: 2.3.1 - loader-runner: 4.3.1 - mime-types: 2.1.35 - neo-async: 2.6.2 - schema-utils: 4.3.3 - tapable: 2.3.0 - terser-webpack-plugin: 5.3.14(webpack@5.101.3) - watchpack: 2.4.4 - webpack-sources: 3.3.3 - transitivePeerDependencies: - - '@swc/core' - - esbuild - - uglify-js - webpack@5.101.3(esbuild@0.25.9): dependencies: '@types/eslint-scope': 3.7.7 @@ -26984,7 +26952,6 @@ snapshots: - '@swc/core' - esbuild - uglify-js - optional: true whatwg-fetch@3.6.20: {}