Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 77 additions & 37 deletions engine/packages/gasoline/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ use crate::{
const DB_ACTION_RETRY: Duration = Duration::from_millis(150);
/// Most db action retries
const MAX_DB_ACTION_RETRIES: usize = 5;
/// How often to commit loop event data to db and mark previous loop history to forgotten
const LOOP_ITERS_PER_COMMIT: usize = 20;

// NOTE: Cloneable because of inner arcs
#[derive(Clone)]
Expand Down Expand Up @@ -868,34 +870,43 @@ impl WorkflowCtx {
let loop_location = self.cursor.current_location_for(&history_res);

// Loop existed before
let (mut iteration, mut state, output) =
let (mut iteration, mut state, output, mut loop_event_commit_fut) =
if let HistoryResult::Event(loop_event) = history_res {
let state = loop_event.parse_state()?;
let output = loop_event.parse_output()?;

(loop_event.iteration, state, output)
(loop_event.iteration, state, output, None)
} else {
let state_val = serde_json::value::to_raw_value(&state)
.map_err(WorkflowError::SerializeLoopOutput)?;

// Insert event before loop is run so the history is consistent
self.db
.upsert_workflow_loop_event(
self.workflow_id,
&self.name,
// Clone data to move into future
let loop_location = loop_location.clone();
let db2 = self.db.clone();
let workflow_id = self.workflow_id;
let name = self.name.clone();
let version = self.version;
let nested_loop_location = self.loop_location().cloned();

// This future is deferred until later for parallelization
let loop_event_commit_fut = async move {
db2.upsert_workflow_loop_event(
workflow_id,
&name,
&loop_location,
self.version,
version,
0,
&state_val,
None,
self.loop_location(),
nested_loop_location.as_ref(),
)
.await?;
.await
};

(0, state, None)
(0, state, None, Some(loop_event_commit_fut))
};

// Create a branch but no branch event (loop event takes its place)
// Create a branch for the loop event
let mut loop_branch =
self.branch_inner(self.input.clone(), self.version, loop_location.clone());

Expand Down Expand Up @@ -923,6 +934,7 @@ impl WorkflowCtx {
.root()
.join(Coordinate::simple(iteration + 1)),
);
let iteration_branch_root = iteration_branch.cursor.root().clone();

// Set branch loop location to the current loop
iteration_branch.loop_location = Some(loop_location.clone());
Expand All @@ -931,41 +943,68 @@ impl WorkflowCtx {

// Async block for instrumentation purposes
let (dt2, res) = async {
// Insert event if iteration is not a replay
if !loop_branch.cursor.compare_loop_branch(iteration)? {
self.db
.commit_workflow_branch_event(
self.workflow_id,
iteration_branch.cursor.root(),
self.version,
Some(&loop_location),
)
.await?;
}

let start_instant2 = Instant::now();
let db2 = self.db.clone();

// NOTE: Great care has been taken to optimize this function. This join allows multiple
// txns to run simultaneously instead of in series but is hard to read.
//
// 1. First, but not necessarily chronologically first because its parallelized, we
// commit the loop event. This only happens on the first iteration of the loop
// 2. Second, we commit the branch event for the current iteration
// 3. Last, we run the user's loop code
let (loop_event_commit_res, branch_commit_res, loop_res) = tokio::join!(
async {
if let Some(loop_event_commit_fut) = loop_event_commit_fut.take() {
loop_event_commit_fut.await
} else {
Ok(())
}
},
async {
// Insert event if iteration is not a replay
if !loop_branch.cursor.compare_loop_branch(iteration)? {
db2.commit_workflow_branch_event(
self.workflow_id,
&iteration_branch_root,
self.version,
Some(&loop_location),
)
.await
} else {
Ok(())
}
},
cb(&mut iteration_branch, &mut state),
);

loop_event_commit_res?;
branch_commit_res?;

// Run loop
match cb(&mut iteration_branch, &mut state).await? {
match loop_res? {
Loop::Continue => {
let dt2 = start_instant2.elapsed().as_secs_f64();
iteration += 1;

let state_val = serde_json::value::to_raw_value(&state)
.map_err(WorkflowError::SerializeLoopOutput)?;

self.db
.upsert_workflow_loop_event(
self.workflow_id,
&self.name,
&loop_location,
self.version,
iteration,
&state_val,
None,
self.loop_location(),
)
.await?;
// Commit workflow state to db
if iteration % LOOP_ITERS_PER_COMMIT == 0 {
self.db
.upsert_workflow_loop_event(
self.workflow_id,
&self.name,
&loop_location,
self.version,
iteration,
&state_val,
None,
self.loop_location(),
)
.await?;
}

anyhow::Ok((dt2, None))
}
Expand All @@ -978,6 +1017,7 @@ impl WorkflowCtx {
let output_val = serde_json::value::to_raw_value(&res)
.map_err(WorkflowError::SerializeLoopOutput)?;

// Commit loop output and final state to db
self.db
.upsert_workflow_loop_event(
self.workflow_id,
Expand Down
Loading