From eda671f040ee8648620e7da435f51f6531a78da7 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Mon, 17 Nov 2025 13:07:20 -0800 Subject: [PATCH] fix(gas): optimize loops --- engine/packages/gasoline/src/ctx/workflow.rs | 114 +++++++++++++------ 1 file changed, 77 insertions(+), 37 deletions(-) diff --git a/engine/packages/gasoline/src/ctx/workflow.rs b/engine/packages/gasoline/src/ctx/workflow.rs index 236ee2e7d9..5c2c6be6c8 100644 --- a/engine/packages/gasoline/src/ctx/workflow.rs +++ b/engine/packages/gasoline/src/ctx/workflow.rs @@ -42,6 +42,8 @@ use crate::{ const DB_ACTION_RETRY: Duration = Duration::from_millis(150); /// Most db action retries const MAX_DB_ACTION_RETRIES: usize = 5; +/// How often to commit loop event data to db and mark previous loop history to forgotten +const LOOP_ITERS_PER_COMMIT: usize = 20; // NOTE: Cloneable because of inner arcs #[derive(Clone)] @@ -868,34 +870,43 @@ impl WorkflowCtx { let loop_location = self.cursor.current_location_for(&history_res); // Loop existed before - let (mut iteration, mut state, output) = + let (mut iteration, mut state, output, mut loop_event_commit_fut) = if let HistoryResult::Event(loop_event) = history_res { let state = loop_event.parse_state()?; let output = loop_event.parse_output()?; - (loop_event.iteration, state, output) + (loop_event.iteration, state, output, None) } else { let state_val = serde_json::value::to_raw_value(&state) .map_err(WorkflowError::SerializeLoopOutput)?; - // Insert event before loop is run so the history is consistent - self.db - .upsert_workflow_loop_event( - self.workflow_id, - &self.name, + // Clone data to move into future + let loop_location = loop_location.clone(); + let db2 = self.db.clone(); + let workflow_id = self.workflow_id; + let name = self.name.clone(); + let version = self.version; + let nested_loop_location = self.loop_location().cloned(); + + // This future is deferred until later for parallelization + let loop_event_commit_fut = async move { + db2.upsert_workflow_loop_event( + workflow_id, + &name, &loop_location, - self.version, + version, 0, &state_val, None, - self.loop_location(), + nested_loop_location.as_ref(), ) - .await?; + .await + }; - (0, state, None) + (0, state, None, Some(loop_event_commit_fut)) }; - // Create a branch but no branch event (loop event takes its place) + // Create a branch for the loop event let mut loop_branch = self.branch_inner(self.input.clone(), self.version, loop_location.clone()); @@ -923,6 +934,7 @@ impl WorkflowCtx { .root() .join(Coordinate::simple(iteration + 1)), ); + let iteration_branch_root = iteration_branch.cursor.root().clone(); // Set branch loop location to the current loop iteration_branch.loop_location = Some(loop_location.clone()); @@ -931,22 +943,46 @@ impl WorkflowCtx { // Async block for instrumentation purposes let (dt2, res) = async { - // Insert event if iteration is not a replay - if !loop_branch.cursor.compare_loop_branch(iteration)? { - self.db - .commit_workflow_branch_event( - self.workflow_id, - iteration_branch.cursor.root(), - self.version, - Some(&loop_location), - ) - .await?; - } - let start_instant2 = Instant::now(); + let db2 = self.db.clone(); + + // NOTE: Great care has been taken to optimize this function. This join allows multiple + // txns to run simultaneously instead of in series but is hard to read. + // + // 1. First, but not necessarily chronologically first because its parallelized, we + // commit the loop event. This only happens on the first iteration of the loop + // 2. Second, we commit the branch event for the current iteration + // 3. Last, we run the user's loop code + let (loop_event_commit_res, branch_commit_res, loop_res) = tokio::join!( + async { + if let Some(loop_event_commit_fut) = loop_event_commit_fut.take() { + loop_event_commit_fut.await + } else { + Ok(()) + } + }, + async { + // Insert event if iteration is not a replay + if !loop_branch.cursor.compare_loop_branch(iteration)? { + db2.commit_workflow_branch_event( + self.workflow_id, + &iteration_branch_root, + self.version, + Some(&loop_location), + ) + .await + } else { + Ok(()) + } + }, + cb(&mut iteration_branch, &mut state), + ); + + loop_event_commit_res?; + branch_commit_res?; // Run loop - match cb(&mut iteration_branch, &mut state).await? { + match loop_res? { Loop::Continue => { let dt2 = start_instant2.elapsed().as_secs_f64(); iteration += 1; @@ -954,18 +990,21 @@ impl WorkflowCtx { let state_val = serde_json::value::to_raw_value(&state) .map_err(WorkflowError::SerializeLoopOutput)?; - self.db - .upsert_workflow_loop_event( - self.workflow_id, - &self.name, - &loop_location, - self.version, - iteration, - &state_val, - None, - self.loop_location(), - ) - .await?; + // Commit workflow state to db + if iteration % LOOP_ITERS_PER_COMMIT == 0 { + self.db + .upsert_workflow_loop_event( + self.workflow_id, + &self.name, + &loop_location, + self.version, + iteration, + &state_val, + None, + self.loop_location(), + ) + .await?; + } anyhow::Ok((dt2, None)) } @@ -978,6 +1017,7 @@ impl WorkflowCtx { let output_val = serde_json::value::to_raw_value(&res) .map_err(WorkflowError::SerializeLoopOutput)?; + // Commit loop output and final state to db self.db .upsert_workflow_loop_event( self.workflow_id,