Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions engine/packages/api-peer/src/actors/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,14 @@ pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result
.signal(pegboard::workflows::actor::Destroy {})
.to_workflow::<pegboard::workflows::actor::Workflow>()
.tag("actor_id", path.actor_id)
.graceful_not_found()
.send()
.await;

if let Some(WorkflowError::WorkflowNotFound) = res
.as_ref()
.err()
.and_then(|x| x.chain().find_map(|x| x.downcast_ref::<WorkflowError>()))
{
.await?;
if res.is_none() {
tracing::warn!(
actor_id=?path.actor_id,
"actor workflow not found, likely already stopped"
);
} else {
res?;
}

Ok(DeleteResponse {})
Expand Down
13 changes: 5 additions & 8 deletions engine/packages/engine/src/util/wf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,22 +468,19 @@ pub async fn print_history(
}
}
EventData::Signals(data) => {
// Indent
print!("{}{c} ", " ".repeat(indent));

for ((signal_id, name), body) in
data.signal_ids.iter().zip(&data.names).zip(&data.bodies)
{
// Indent
print!("{}{c} - ", " ".repeat(indent));
print!("{}{c} - ", " ".repeat(indent));
println!("{}", event_style.apply_to(name));

print!("{}{c} ", " ".repeat(indent));
print!("{}{c} ", " ".repeat(indent));
println!("id {}", style(signal_id).green());

if !exclude_json {
// Indent
print!("{}{c} ", " ".repeat(indent));
print!("{}{c} ", " ".repeat(indent));

println!(
"body {}",
Expand Down Expand Up @@ -590,7 +587,7 @@ pub fn print_event_name(event: &Event) {
),
EventData::Signal(signal) => print!(
"{} {}",
style.apply_to("signal receive").bold(),
style.apply_to("signal").bold(),
style.apply_to(&signal.name)
),
EventData::SignalSend(signal_send) => print!(
Expand Down Expand Up @@ -626,7 +623,7 @@ pub fn print_event_name(event: &Event) {
EventData::Branch => print!("{}", style.apply_to("branch").bold()),
EventData::Signals(signal) => print!(
"{} {}",
style.apply_to("signal receive").bold(),
style.apply_to("signal recv").bold(),
style.apply_to(&signal.names.len())
),
}
Expand Down
33 changes: 29 additions & 4 deletions engine/packages/gasoline/src/builder/common/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct SignalBuilder<T: Signal + Serialize> {
to_workflow_name: Option<&'static str>,
to_workflow_id: Option<Id>,
tags: serde_json::Map<String, serde_json::Value>,
graceful_not_found: bool,
error: Option<BuilderError>,
}

Expand All @@ -37,6 +38,7 @@ impl<T: Signal + Serialize> SignalBuilder<T> {
to_workflow_name: None,
to_workflow_id: None,
tags: serde_json::Map::new(),
graceful_not_found: false,
error: from_workflow.then_some(BuilderError::CannotDispatchFromOpInWorkflow),
}
}
Expand Down Expand Up @@ -102,8 +104,21 @@ impl<T: Signal + Serialize> SignalBuilder<T> {
self
}

/// Does not throw an error when the signal target is not found and instead returns `Ok(None)`.
pub fn graceful_not_found(mut self) -> Self {
if self.error.is_some() {
return self;
}

self.graceful_not_found = true;

self
}

/// Returns the signal id that was just sent. Unless `graceful_not_found` is set and the workflow does not
/// exist, will always return `Some`.
#[tracing::instrument(skip_all, fields(signal_name=T::NAME, signal_id))]
pub async fn send(self) -> Result<Id> {
pub async fn send(self) -> Result<Option<Id>> {
if let Some(err) = self.error {
return Err(err.into());
}
Expand Down Expand Up @@ -132,8 +147,18 @@ impl<T: Signal + Serialize> SignalBuilder<T> {
let workflow_id = self
.db
.find_workflow(workflow_name, &serde_json::Value::Object(self.tags))
.await?
.ok_or(WorkflowError::WorkflowNotFound)?;
.await?;

let Some(workflow_id) = workflow_id else {
// Handle signal target not found gracefully
if self.graceful_not_found {
tracing::debug!("signal target not found");

return Ok(None);
} else {
return Err(WorkflowError::WorkflowNotFound.into());
}
};

self.db
.publish_signal(self.ray_id, workflow_id, signal_id, T::NAME, &input_val)
Expand Down Expand Up @@ -188,6 +213,6 @@ impl<T: Signal + Serialize> SignalBuilder<T> {
],
);

Ok(signal_id)
Ok(Some(signal_id))
}
}
71 changes: 64 additions & 7 deletions engine/packages/gasoline/src/builder/workflow/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ use rivet_util::Id;
use serde::Serialize;

use crate::{
builder::BuilderError, ctx::WorkflowCtx, error::WorkflowError, history::cursor::HistoryResult,
metrics, signal::Signal, workflow::Workflow,
builder::BuilderError,
ctx::WorkflowCtx,
error::WorkflowError,
history::{cursor::HistoryResult, event::EventType, removed::Signal as RemovedSignal},
metrics,
signal::Signal,
workflow::Workflow,
};

pub struct SignalBuilder<'a, T: Signal + Serialize> {
Expand All @@ -18,6 +23,7 @@ pub struct SignalBuilder<'a, T: Signal + Serialize> {
to_workflow_name: Option<&'static str>,
to_workflow_id: Option<Id>,
tags: serde_json::Map<String, serde_json::Value>,
graceful_not_found: bool,
error: Option<BuilderError>,
}

Expand All @@ -31,6 +37,7 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
to_workflow_name: None,
to_workflow_id: None,
tags: serde_json::Map::new(),
graceful_not_found: false,
error: None,
}
}
Expand Down Expand Up @@ -85,14 +92,39 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
self
}

/// Does not throw an error when the signal target is not found and instead returns `Ok(None)`.
pub fn graceful_not_found(mut self) -> Self {
if self.error.is_some() {
return self;
}

self.graceful_not_found = true;

self
}

/// Returns the signal id that was just sent. Unless `graceful_not_found` is set and the workflow does not
/// exist, will always return `Some`.
#[tracing::instrument(skip_all, fields(signal_name=T::NAME, signal_id))]
pub async fn send(self) -> Result<Id> {
pub async fn send(self) -> Result<Option<Id>> {
self.ctx.check_stop()?;

if let Some(err) = self.error {
return Err(err.into());
}

// Check if this signal is being replayed and previously had no target (will have a removed event)
if self.graceful_not_found && self.ctx.cursor().is_removed() {
self.ctx.cursor().compare_removed::<RemovedSignal<T>>()?;

tracing::debug!("replaying gracefully not found signal dispatch");

// Move to next event
self.ctx.cursor_mut().inc();

return Ok(None);
}

// Error for version mismatch. This is done in the builder instead of in `VersionedWorkflowCtx` to
// defer the error.
self.ctx.compare_version("signal", self.version)?;
Expand All @@ -105,7 +137,7 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {

// Signal sent before
let signal_id = if let HistoryResult::Event(signal) = history_res {
tracing::debug!("replaying signal dispatch",);
tracing::debug!("replaying signal dispatch");

signal.signal_id
}
Expand Down Expand Up @@ -133,8 +165,33 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
.ctx
.db()
.find_workflow(workflow_name, &serde_json::Value::Object(self.tags))
.await?
.ok_or(WorkflowError::WorkflowNotFound)?;
.await?;

let Some(workflow_id) = workflow_id else {
// Handle signal target not found gracefully
if self.graceful_not_found {
tracing::debug!("signal target not found");

// Insert removed event
self.ctx
.db()
.commit_workflow_removed_event(
self.ctx.workflow_id(),
&location,
EventType::SignalSend,
Some(T::NAME),
self.ctx.loop_location(),
)
.await?;

// Move to next event
self.ctx.cursor_mut().update(&location);

return Ok(None);
} else {
return Err(WorkflowError::WorkflowNotFound.into());
}
};

self.ctx
.db()
Expand Down Expand Up @@ -222,6 +279,6 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
// Move to next event
self.ctx.cursor_mut().update(&location);

Ok(signal_id)
Ok(Some(signal_id))
}
}
64 changes: 44 additions & 20 deletions engine/packages/gasoline/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ impl WorkflowCtx {
let loop_location = self.cursor.current_location_for(&history_res);

// Loop existed before
let (mut iteration, mut state, output, mut loop_event_commit_fut) =
let (mut iteration, mut state, output, mut loop_event_init_fut) =
if let HistoryResult::Event(loop_event) = history_res {
let state = loop_event.parse_state()?;
let output = loop_event.parse_output()?;
Expand All @@ -838,7 +838,7 @@ impl WorkflowCtx {
let nested_loop_location = self.loop_location().cloned();

// This future is deferred until later for parallelization
let loop_event_commit_fut = async move {
let loop_event_init_fut = async move {
db2.upsert_workflow_loop_event(
workflow_id,
&name,
Expand All @@ -852,7 +852,7 @@ impl WorkflowCtx {
.await
};

(0, state, None, Some(loop_event_commit_fut))
(0, state, None, Some(loop_event_init_fut))
};

// Create a branch for the loop event
Expand All @@ -869,6 +869,9 @@ impl WorkflowCtx {
else {
tracing::debug!("running loop");

// Used to defer loop upsertion for parallelization
let mut loop_event_upsert_fut = None;

loop {
self.check_stop()?;

Expand Down Expand Up @@ -898,14 +901,23 @@ impl WorkflowCtx {
// NOTE: Great care has been taken to optimize this function. This join allows multiple
// txns to run simultaneously instead of in series but is hard to read.
//
// 1. First, but not necessarily chronologically first because its parallelized, we
// 1. First (but not necessarily chronologically first because its parallelized), we
// commit the loop event. This only happens on the first iteration of the loop
// 2. Second, we commit the branch event for the current iteration
// 3. Last, we run the user's loop code
let (loop_event_commit_res, branch_commit_res, loop_res) = tokio::join!(
// 3. Third, we run the user's loop code
// 4. Last, if we have to upsert the loop event, we save the future and process it in the
// next iteration of the loop as part of this join
let (loop_event_commit_res, loop_event_upsert_res, branch_commit_res, loop_res) = tokio::join!(
async {
if let Some(loop_event_commit_fut) = loop_event_commit_fut.take() {
loop_event_commit_fut.await
if let Some(loop_event_init_fut) = loop_event_init_fut.take() {
loop_event_init_fut.await
} else {
Ok(())
}
},
async {
if let Some(loop_event_upsert_fut) = loop_event_upsert_fut.take() {
loop_event_upsert_fut.await
} else {
Ok(())
}
Expand All @@ -928,6 +940,7 @@ impl WorkflowCtx {
);

loop_event_commit_res?;
loop_event_upsert_res?;
branch_commit_res?;

// Run loop
Expand All @@ -936,23 +949,33 @@ impl WorkflowCtx {
let dt2 = start_instant2.elapsed().as_secs_f64();
iteration += 1;

let state_val = serde_json::value::to_raw_value(&state)
.map_err(WorkflowError::SerializeLoopOutput)?;

// Commit workflow state to db
if iteration % LOOP_ITERS_PER_COMMIT == 0 {
self.db
.upsert_workflow_loop_event(
self.workflow_id,
&self.name,
let state_val = serde_json::value::to_raw_value(&state)
.map_err(WorkflowError::SerializeLoopOutput)?;

// Clone data to move into future
let loop_location = loop_location.clone();
let db2 = self.db.clone();
let workflow_id = self.workflow_id;
let name = self.name.clone();
let version = self.version;
let nested_loop_location = self.loop_location().cloned();

// Defer upsertion to next iteration so it runs in parallel
loop_event_upsert_fut = Some(async move {
db2.upsert_workflow_loop_event(
workflow_id,
&name,
&loop_location,
self.version,
version,
iteration,
&state_val,
None,
self.loop_location(),
nested_loop_location.as_ref(),
)
.await?;
.await
});
}

anyhow::Ok((dt2, None))
Expand All @@ -966,7 +989,8 @@ impl WorkflowCtx {
let output_val = serde_json::value::to_raw_value(&res)
.map_err(WorkflowError::SerializeLoopOutput)?;

// Commit loop output and final state to db
// Commit loop output and final state to db. Note that we don't defer this because
// there will be no more loop iterations afterwards.
self.db
.upsert_workflow_loop_event(
self.workflow_id,
Expand Down Expand Up @@ -1338,7 +1362,7 @@ impl WorkflowCtx {

// Existing event
if self.cursor.compare_removed::<T>()? {
tracing::debug!("skipping removed step",);
tracing::debug!("skipping removed step");
}
// New "removed" event
else {
Expand Down
Loading
Loading