From 272a09d627614e86a71483183dd82c64e180b277 Mon Sep 17 00:00:00 2001 From: MasterPtato <23087326+MasterPtato@users.noreply.github.com> Date: Fri, 2 Aug 2024 03:29:59 +0000 Subject: [PATCH] feat(workflows): add loops (#1001) ## Changes --- docs/libraries/workflow/GOTCHAS.md | 4 + docs/libraries/workflow/LOOPS.md | 9 + lib/chirp-workflow/core/src/activity.rs | 2 +- lib/chirp-workflow/core/src/ctx/listen.rs | 1 + lib/chirp-workflow/core/src/ctx/workflow.rs | 116 ++++++++- lib/chirp-workflow/core/src/db/mod.rs | 23 ++ lib/chirp-workflow/core/src/db/postgres.rs | 226 +++++++++++++++--- lib/chirp-workflow/core/src/error.rs | 28 ++- lib/chirp-workflow/core/src/event.rs | 210 +++++++++++++++- lib/chirp-workflow/core/src/executable.rs | 2 +- lib/chirp-workflow/core/src/prelude.rs | 4 +- lib/chirp-workflow/core/src/util.rs | 187 +-------------- lib/chirp-workflow/macros/src/lib.rs | 4 +- svc/pkg/cluster/src/workflows/cluster.rs | 90 +++---- .../cluster/src/workflows/datacenter/mod.rs | 81 ++++--- svc/pkg/cluster/src/workflows/server/mod.rs | 1 + .../standalone/gc/tests/integration.rs | 1 - svc/pkg/linode/{tests => tests_old}/image.rs | 0 .../{tests => tests_old}/instance_type_get.rs | 0 .../{tests => tests_old}/server_destroy.rs | 0 .../{tests => tests_old}/server_provision.rs | 0 .../20240712221043_fix_signals_idx.up.sql | 2 +- .../migrations/20240722174355_loops.down.sql | 0 .../migrations/20240722174355_loops.up.sql | 58 +++++ 24 files changed, 725 insertions(+), 324 deletions(-) create mode 100644 docs/libraries/workflow/LOOPS.md rename svc/pkg/linode/{tests => tests_old}/image.rs (100%) rename svc/pkg/linode/{tests => tests_old}/instance_type_get.rs (100%) rename svc/pkg/linode/{tests => tests_old}/server_destroy.rs (100%) rename svc/pkg/linode/{tests => tests_old}/server_provision.rs (100%) create mode 100644 svc/pkg/workflow/db/workflow/migrations/20240722174355_loops.down.sql create mode 100644 svc/pkg/workflow/db/workflow/migrations/20240722174355_loops.up.sql diff --git a/docs/libraries/workflow/GOTCHAS.md b/docs/libraries/workflow/GOTCHAS.md index 1d81bca877..e9101185e0 100644 --- a/docs/libraries/workflow/GOTCHAS.md +++ b/docs/libraries/workflow/GOTCHAS.md @@ -108,3 +108,7 @@ the internal location. > **\*** Even if they did know about each other via atomics, there is no guarantee of consistency from > `buffer_unordered`. Preemptively incrementing the location ensures consistency regardless of the order or > completion time of the futures. + +## Loops + +TODO diff --git a/docs/libraries/workflow/LOOPS.md b/docs/libraries/workflow/LOOPS.md new file mode 100644 index 0000000000..a84c282b52 --- /dev/null +++ b/docs/libraries/workflow/LOOPS.md @@ -0,0 +1,9 @@ +# Loops + +TODO + +## Differences between "Continue As New" + +https://docs.temporal.io/develop/go/continue-as-new + +TODO diff --git a/lib/chirp-workflow/core/src/activity.rs b/lib/chirp-workflow/core/src/activity.rs index 10c769b47f..0a047c68ef 100644 --- a/lib/chirp-workflow/core/src/activity.rs +++ b/lib/chirp-workflow/core/src/activity.rs @@ -19,7 +19,7 @@ pub trait Activity { type Output: Serialize + DeserializeOwned + Debug + Send; const NAME: &'static str; - const MAX_RETRIES: u32; + const MAX_RETRIES: usize; const TIMEOUT: std::time::Duration; async fn run(ctx: &ActivityCtx, input: &Self::Input) -> GlobalResult; diff --git a/lib/chirp-workflow/core/src/ctx/listen.rs b/lib/chirp-workflow/core/src/ctx/listen.rs index 92232160cd..d8a2b0fbea 100644 --- a/lib/chirp-workflow/core/src/ctx/listen.rs +++ b/lib/chirp-workflow/core/src/ctx/listen.rs @@ -24,6 +24,7 @@ impl<'a> ListenCtx<'a> { self.ctx.workflow_id(), signal_names, self.ctx.full_location().as_ref(), + self.ctx.loop_location(), ) .await?; diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index 6ed2d98b2d..35fedca730 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, sync::Arc}; use global_error::{GlobalError, GlobalResult}; -use serde::Serialize; +use serde::{de::DeserializeOwned, Serialize}; use tokio::time::Duration; use uuid::Uuid; @@ -65,6 +65,9 @@ pub struct WorkflowCtx { root_location: Location, location_idx: usize, + /// If this context is currently in a loop, this is the location of the where the loop started. + loop_location: Option>, + msg_ctx: MessageCtx, } @@ -95,6 +98,7 @@ impl WorkflowCtx { root_location: Box::new([]), location_idx: 0, + loop_location: None, msg_ctx, }) @@ -125,6 +129,7 @@ impl WorkflowCtx { .chain(std::iter::once(self.location_idx)) .collect(), location_idx: 0, + loop_location: self.loop_location.clone(), msg_ctx: self.msg_ctx.clone(), }; @@ -161,6 +166,10 @@ impl WorkflowCtx { .collect() } + pub(crate) fn loop_location(&self) -> Option<&[usize]> { + self.loop_location.as_deref() + } + // Purposefully infallible pub(crate) async fn run(mut self) { if let Err(err) = Self::run_inner(&mut self).await { @@ -216,7 +225,8 @@ impl WorkflowCtx { // finish. This workflow will be retried when the sub workflow completes let wake_sub_workflow = err.sub_workflow(); - if deadline_ts.is_some() || !wake_signals.is_empty() || wake_sub_workflow.is_some() { + if deadline_ts.is_some() || !wake_signals.is_empty() || wake_sub_workflow.is_some() + { tracing::info!(name=%self.name, id=%self.workflow_id, ?err, "workflow sleeping"); } else { tracing::error!(name=%self.name, id=%self.workflow_id, ?err, "workflow error"); @@ -299,6 +309,7 @@ impl WorkflowCtx { create_ts, input_val, Ok(output_val), + self.loop_location(), ) .await?; @@ -318,6 +329,7 @@ impl WorkflowCtx { create_ts, input_val, Err(&err.to_string()), + self.loop_location(), ) .await?; @@ -336,6 +348,7 @@ impl WorkflowCtx { create_ts, input_val, Err(&err.to_string()), + self.loop_location(), ) .await?; @@ -437,6 +450,7 @@ impl WorkflowCtx { &sub_workflow_name, tags, input_val, + self.loop_location(), ) .await .map_err(GlobalError::raw)?; @@ -579,6 +593,7 @@ impl WorkflowCtx { .chain(std::iter::once(self.location_idx)) .collect(), location_idx: 0, + loop_location: self.loop_location.clone(), msg_ctx: self.msg_ctx.clone(), }; @@ -746,6 +761,7 @@ impl WorkflowCtx { signal_id, T::NAME, input_val, + self.loop_location(), ) .await .map_err(GlobalError::raw)?; @@ -810,6 +826,7 @@ impl WorkflowCtx { signal_id, T::NAME, input_val, + self.loop_location(), ) .await .map_err(GlobalError::raw)?; @@ -1005,7 +1022,8 @@ impl WorkflowCtx { location.as_ref(), &tags, M::NAME, - body_val + body_val, + self.loop_location(), ), self.msg_ctx.message(tags.clone(), body), ); @@ -1063,7 +1081,8 @@ impl WorkflowCtx { location.as_ref(), &tags, M::NAME, - body_val + body_val, + self.loop_location(), ), self.msg_ctx.message_wait(tags.clone(), body), ); @@ -1078,6 +1097,90 @@ impl WorkflowCtx { Ok(()) } + /// Runs workflow steps in a loop. **Ensure that there are no side effects caused by the code in this + /// callback**. If you need side causes or side effects, use a native rust loop. + pub async fn repeat(&mut self, mut cb: F) -> GlobalResult + where + F: for<'a> FnMut(&'a mut WorkflowCtx) -> AsyncResult<'a, Loop>, + T: Serialize + DeserializeOwned, + { + let loop_location = self.full_location(); + let mut loop_branch = self.branch(); + + let event = { self.relevant_history().nth(self.location_idx) }; + + // Loop existed before + let output = if let Some(event) = event { + // Validate history is consistent + let Event::Loop(loop_event) = event else { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event}, found loop" + ))) + .map_err(GlobalError::raw); + }; + + let output = loop_event.parse_output().map_err(GlobalError::raw)?; + + // Shift by iteration count + loop_branch.location_idx = loop_event.iteration; + + output + } else { + None + }; + + // Loop complete + let output = if let Some(output) = output { + tracing::debug!(name=%self.name, id=%self.workflow_id, "replaying loop"); + + output + } + // Run loop + else { + tracing::info!(name=%self.name, id=%self.workflow_id, "running loop"); + + loop { + let iteration_idx = loop_branch.location_idx; + + let mut iteration_branch = loop_branch.branch(); + iteration_branch.loop_location = Some(loop_location.clone()); + + match cb(&mut iteration_branch).await? { + Loop::Continue => { + self.db + .update_loop( + self.workflow_id, + loop_location.as_ref(), + iteration_idx, + None, + self.loop_location(), + ) + .await?; + } + Loop::Break(res) => { + let output_val = serde_json::to_value(&res) + .map_err(WorkflowError::SerializeLoopOutput) + .map_err(GlobalError::raw)?; + + self.db + .update_loop( + self.workflow_id, + loop_location.as_ref(), + iteration_idx, + Some(output_val), + self.loop_location(), + ) + .await?; + + break res; + } + } + } + }; + + Ok(output) + } + // TODO: sleep_for, sleep_until } @@ -1105,3 +1208,8 @@ impl WorkflowCtx { self.ts.saturating_sub(self.create_ts) } } + +pub enum Loop { + Continue, + Break(T), +} diff --git a/lib/chirp-workflow/core/src/db/mod.rs b/lib/chirp-workflow/core/src/db/mod.rs index 59815998b9..2caa1a20eb 100644 --- a/lib/chirp-workflow/core/src/db/mod.rs +++ b/lib/chirp-workflow/core/src/db/mod.rs @@ -62,6 +62,7 @@ pub trait Database: Send { create_ts: i64, input: serde_json::Value, output: Result, + loop_location: Option<&[usize]>, ) -> WorkflowResult<()>; async fn pull_next_signal( @@ -69,6 +70,7 @@ pub trait Database: Send { workflow_id: Uuid, filter: &[&str], location: &[usize], + loop_location: Option<&[usize]>, ) -> WorkflowResult>; async fn publish_signal( &self, @@ -95,6 +97,7 @@ pub trait Database: Send { signal_id: Uuid, signal_name: &str, body: serde_json::Value, + loop_location: Option<&[usize]>, ) -> WorkflowResult<()>; async fn publish_tagged_signal_from_workflow( &self, @@ -105,6 +108,7 @@ pub trait Database: Send { signal_id: Uuid, signal_name: &str, body: serde_json::Value, + loop_location: Option<&[usize]>, ) -> WorkflowResult<()>; async fn dispatch_sub_workflow( @@ -116,6 +120,7 @@ pub trait Database: Send { sub_workflow_name: &str, tags: Option<&serde_json::Value>, input: serde_json::Value, + loop_location: Option<&[usize]>, ) -> WorkflowResult<()>; /// Fetches a workflow that has the given json as a subset of its input after the given ts. @@ -133,6 +138,16 @@ pub trait Database: Send { tags: &serde_json::Value, message_name: &str, body: serde_json::Value, + loop_location: Option<&[usize]>, + ) -> WorkflowResult<()>; + + async fn update_loop( + &self, + workflow_id: Uuid, + location: &[usize], + iteration: usize, + output: Option, + loop_location: Option<&[usize]>, ) -> WorkflowResult<()>; } @@ -222,3 +237,11 @@ pub struct SignalRow { pub signal_name: String, pub body: serde_json::Value, } + +#[derive(sqlx::FromRow)] +pub struct LoopEventRow { + pub workflow_id: Uuid, + pub location: Vec, + pub output: Option, + pub iteration: i64, +} diff --git a/lib/chirp-workflow/core/src/db/postgres.rs b/lib/chirp-workflow/core/src/db/postgres.rs index 4d640e1965..b03eefef35 100644 --- a/lib/chirp-workflow/core/src/db/postgres.rs +++ b/lib/chirp-workflow/core/src/db/postgres.rs @@ -1,17 +1,18 @@ use std::{sync::Arc, time::Duration}; use indoc::indoc; -use sqlx::{pool::PoolConnection, PgPool, Postgres}; +use sqlx::{pool::PoolConnection, Acquire, PgPool, Postgres}; use uuid::Uuid; use super::{ - ActivityEventRow, Database, MessageSendEventRow, PulledWorkflow, PulledWorkflowRow, - SignalEventRow, SignalRow, SignalSendEventRow, SubWorkflowEventRow, WorkflowRow, + ActivityEventRow, Database, LoopEventRow, MessageSendEventRow, PulledWorkflow, + PulledWorkflowRow, SignalEventRow, SignalRow, SignalSendEventRow, SubWorkflowEventRow, + WorkflowRow, }; use crate::{ activity::ActivityId, error::{WorkflowError, WorkflowResult}, - util, + event::combine_events, }; const MAX_QUERY_RETRIES: usize = 16; @@ -229,15 +230,16 @@ impl Database for DatabasePostgres { signal_send_events, msg_send_events, sub_workflow_events, + loop_events, ) = tokio::try_join!( async { sqlx::query_as::<_, ActivityEventRow>(indoc!( " SELECT - ev.workflow_id, - ev.location, - ev.activity_name, - ev.input_hash, + ev.workflow_id, + ev.location, + ev.activity_name, + ev.input_hash, ev.output, ev.create_ts, COUNT(err.workflow_id) AS error_count @@ -246,7 +248,7 @@ impl Database for DatabasePostgres { ON ev.workflow_id = err.workflow_id AND ev.location = err.location - WHERE ev.workflow_id = ANY($1) + WHERE ev.workflow_id = ANY($1) AND forgotten = FALSE GROUP BY ev.workflow_id, ev.location, ev.activity_name, ev.input_hash, ev.output ORDER BY ev.workflow_id, ev.location ASC ", @@ -262,7 +264,7 @@ impl Database for DatabasePostgres { SELECT workflow_id, location, signal_name, body FROM db_workflow.workflow_signal_events - WHERE workflow_id = ANY($1) + WHERE workflow_id = ANY($1) AND forgotten = FALSE ORDER BY workflow_id, location ASC ", )) @@ -277,7 +279,7 @@ impl Database for DatabasePostgres { SELECT workflow_id, location, signal_id, signal_name FROM db_workflow.workflow_signal_send_events - WHERE workflow_id = ANY($1) + WHERE workflow_id = ANY($1) AND forgotten = FALSE ORDER BY workflow_id, location ASC ", )) @@ -292,7 +294,7 @@ impl Database for DatabasePostgres { SELECT workflow_id, location, message_name FROM db_workflow.workflow_message_send_events - WHERE workflow_id = ANY($1) + WHERE workflow_id = ANY($1) AND forgotten = FALSE ORDER BY workflow_id, location ASC ", )) @@ -312,7 +314,7 @@ impl Database for DatabasePostgres { FROM db_workflow.workflow_sub_workflow_events AS sw JOIN db_workflow.workflows AS w ON sw.sub_workflow_id = w.workflow_id - WHERE sw.workflow_id = ANY($1) + WHERE sw.workflow_id = ANY($1) AND forgotten = FALSE ORDER BY sw.workflow_id, sw.location ASC ", )) @@ -320,16 +322,32 @@ impl Database for DatabasePostgres { .fetch_all(&mut *self.conn().await?) .await .map_err(WorkflowError::Sqlx) - } + }, + async { + sqlx::query_as::<_, LoopEventRow>(indoc!( + " + SELECT + workflow_id, location, iteration, output + FROM db_workflow.workflow_loop_events + WHERE workflow_id = ANY($1) AND forgotten = FALSE + ORDER BY workflow_id, location ASC + ", + )) + .bind(&workflow_ids) + .fetch_all(&mut *self.conn().await?) + .await + .map_err(WorkflowError::Sqlx) + }, )?; - let workflows = util::combine_events( + let workflows = combine_events( workflow_rows, activity_events, signal_events, signal_send_events, msg_send_events, sub_workflow_events, + loop_events, )?; Ok(workflows) @@ -432,19 +450,27 @@ impl Database for DatabasePostgres { create_ts: i64, input: serde_json::Value, res: Result, + loop_location: Option<&[usize]>, ) -> WorkflowResult<()> { match res { Ok(output) => { self.query(|| async { sqlx::query(indoc!( " - INSERT INTO db_workflow.workflow_activity_events ( - workflow_id, location, activity_name, input_hash, input, output, create_ts - ) - VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (workflow_id, location) DO UPDATE - SET output = excluded.output - ", + INSERT INTO db_workflow.workflow_activity_events ( + workflow_id, + location, + activity_name, + input_hash, + input, + output, + create_ts, + loop_location + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (workflow_id, location) DO UPDATE + SET output = excluded.output + ", )) .bind(workflow_id) .bind(location.iter().map(|x| *x as i64).collect::>()) @@ -454,6 +480,7 @@ impl Database for DatabasePostgres { .bind(&output) .bind(rivet_util::timestamp::now()) .bind(create_ts) + .bind(loop_location.map(|l| l.iter().map(|x| *x as i64).collect::>())) .execute(&mut *self.conn().await?) .await .map_err(WorkflowError::Sqlx) @@ -467,9 +494,15 @@ impl Database for DatabasePostgres { WITH event AS ( INSERT INTO db_workflow.workflow_activity_events ( - workflow_id, location, activity_name, input_hash, input, create_ts + workflow_id, + location, + activity_name, + input_hash, + input, + create_ts, + loop_location ) - VALUES ($1, $2, $3, $4, $5, $7) + VALUES ($1, $2, $3, $4, $5, $7, $8) ON CONFLICT (workflow_id, location) DO NOTHING RETURNING 1 ), @@ -477,7 +510,7 @@ impl Database for DatabasePostgres { INSERT INTO db_workflow.workflow_activity_errors ( workflow_id, location, activity_name, error, ts ) - VALUES ($1, $2, $3, $6, $8) + VALUES ($1, $2, $3, $6, $9) RETURNING 1 ) SELECT 1 @@ -490,6 +523,7 @@ impl Database for DatabasePostgres { .bind(&input) .bind(err) .bind(create_ts) + .bind(loop_location.map(|l| l.iter().map(|x| *x as i64).collect::>())) .bind(rivet_util::timestamp::now()) .execute(&mut *self.conn().await?) .await @@ -507,6 +541,7 @@ impl Database for DatabasePostgres { workflow_id: Uuid, filter: &[&str], location: &[usize], + loop_location: Option<&[usize]>, ) -> WorkflowResult> { let signal = self .query(|| async { @@ -553,9 +588,16 @@ impl Database for DatabasePostgres { -- After acking the signal, add it to the events table insert_event AS ( INSERT INTO db_workflow.workflow_signal_events ( - workflow_id, location, signal_id, signal_name, body, ack_ts + workflow_id, location, signal_id, signal_name, body, ack_ts, loop_location ) - SELECT $1 AS workflow_id, $3 AS location, signal_id, signal_name, body, $4 AS ack_ts + SELECT + $1 AS workflow_id, + $3 AS location, + signal_id, + signal_name, + body, + $4 AS ack_ts, + $5 AS loop_location FROM next_signal RETURNING 1 ) @@ -566,6 +608,7 @@ impl Database for DatabasePostgres { .bind(filter) .bind(location.iter().map(|x| *x as i64).collect::>()) .bind(rivet_util::timestamp::now()) + .bind(loop_location.map(|l| l.iter().map(|x| *x as i64).collect::>())) .fetch_optional(&mut *self.conn().await?) .await .map_err(WorkflowError::Sqlx) @@ -644,6 +687,7 @@ impl Database for DatabasePostgres { signal_id: Uuid, signal_name: &str, body: serde_json::Value, + loop_location: Option<&[usize]>, ) -> WorkflowResult<()> { self.query(|| async { sqlx::query(indoc!( @@ -656,9 +700,9 @@ impl Database for DatabasePostgres { ), send_event AS ( INSERT INTO db_workflow.workflow_signal_send_events( - workflow_id, location, signal_id, signal_name, body + workflow_id, location, signal_id, signal_name, body, loop_location ) - VALUES($7, $8, $1, $3, $4) + VALUES($7, $8, $1, $3, $4, $9) RETURNING 1 ) SELECT 1 @@ -672,6 +716,7 @@ impl Database for DatabasePostgres { .bind(rivet_util::timestamp::now()) .bind(from_workflow_id) .bind(location.iter().map(|x| *x as i64).collect::>()) + .bind(loop_location.map(|l| l.iter().map(|x| *x as i64).collect::>())) .execute(&mut *self.conn().await?) .await .map_err(WorkflowError::Sqlx) @@ -690,6 +735,7 @@ impl Database for DatabasePostgres { signal_id: Uuid, signal_name: &str, body: serde_json::Value, + loop_location: Option<&[usize]>, ) -> WorkflowResult<()> { self.query(|| async { sqlx::query(indoc!( @@ -702,9 +748,9 @@ impl Database for DatabasePostgres { ), send_event AS ( INSERT INTO db_workflow.workflow_signal_send_events( - workflow_id, location, signal_id, signal_name, body + workflow_id, location, signal_id, signal_name, body, loop_location ) - VALUES($7, $8, $1, $3, $4) + VALUES($7, $8, $1, $3, $4, $9) RETURNING 1 ) SELECT 1 @@ -718,6 +764,7 @@ impl Database for DatabasePostgres { .bind(rivet_util::timestamp::now()) .bind(from_workflow_id) .bind(location.iter().map(|x| *x as i64).collect::>()) + .bind(loop_location.map(|l| l.iter().map(|x| *x as i64).collect::>())) .execute(&mut *self.conn().await?) .await .map_err(WorkflowError::Sqlx) @@ -736,6 +783,7 @@ impl Database for DatabasePostgres { sub_workflow_name: &str, tags: Option<&serde_json::Value>, input: serde_json::Value, + loop_location: Option<&[usize]>, ) -> WorkflowResult<()> { self.query(|| async { sqlx::query(indoc!( @@ -750,9 +798,9 @@ impl Database for DatabasePostgres { ), sub_workflow AS ( INSERT INTO db_workflow.workflow_sub_workflow_events( - workflow_id, location, sub_workflow_id, create_ts + workflow_id, location, sub_workflow_id, create_ts, loop_location ) - VALUES($1, $7, $8, $3) + VALUES($1, $7, $8, $3, $9) RETURNING 1 ) SELECT 1 @@ -766,6 +814,7 @@ impl Database for DatabasePostgres { .bind(&input) .bind(location.iter().map(|x| *x as i64).collect::>()) .bind(sub_workflow_id) + .bind(loop_location.map(|l| l.iter().map(|x| *x as i64).collect::>())) .execute(&mut *self.conn().await?) .await .map_err(WorkflowError::Sqlx) @@ -807,6 +856,7 @@ impl Database for DatabasePostgres { tags: &serde_json::Value, message_name: &str, body: serde_json::Value, + loop_location: Option<&[usize]>, ) -> WorkflowResult<()> { self.query(|| async { sqlx::query(indoc!( @@ -814,7 +864,7 @@ impl Database for DatabasePostgres { INSERT INTO db_workflow.workflow_message_send_events( workflow_id, location, tags, message_name, body ) - VALUES($1, $2, $3, $4, $5) + VALUES($1, $2, $3, $4, $5, $6) RETURNING 1 ", )) @@ -823,6 +873,7 @@ impl Database for DatabasePostgres { .bind(tags) .bind(message_name) .bind(&body) + .bind(loop_location.map(|l| l.iter().map(|x| *x as i64).collect::>())) .execute(&mut *self.conn().await?) .await .map_err(WorkflowError::Sqlx) @@ -831,4 +882,111 @@ impl Database for DatabasePostgres { Ok(()) } + + async fn update_loop( + &self, + workflow_id: Uuid, + location: &[usize], + iteration: usize, + output: Option, + loop_location: Option<&[usize]>, + ) -> WorkflowResult<()> { + self.query(|| async { + let mut conn = self.conn().await?; + let mut tx = conn.begin().await.map_err(WorkflowError::Sqlx)?; + + sqlx::query(indoc!( + " + INSERT INTO db_workflow.workflow_loop_events ( + workflow_id, + location, + iteration, + output, + loop_location + ) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (workflow_id, location) DO UPDATE + SET + iteration = $3, + output = $4 + RETURNING 1 + ", + )) + .bind(workflow_id) + .bind(location.iter().map(|x| *x as i64).collect::>()) + .bind(iteration as i64) + .bind(&output) + .bind(loop_location.map(|l| l.iter().map(|x| *x as i64).collect::>())) + .execute(&mut *tx) + .await + .map_err(WorkflowError::Sqlx)?; + + sqlx::query(indoc!( + " + WITH + forget_activity_events AS ( + UPDATE db_workflow.workflow_activity_events + SET forgotten = TRUE + WHERE + workflow_id = $1 AND + loop_location = $2 + RETURNING 1 + ), + forget_signal_events AS ( + UPDATE db_workflow.workflow_signal_events + SET forgotten = TRUE + WHERE + workflow_id = $1 AND + loop_location = $2 + RETURNING 1 + ), + forget_sub_workflow_events AS ( + UPDATE db_workflow.workflow_sub_workflow_events + SET forgotten = TRUE + WHERE + workflow_id = $1 AND + loop_location = $2 + RETURNING 1 + ), + forget_signal_send_events AS ( + UPDATE db_workflow.workflow_signal_send_events + SET forgotten = TRUE + WHERE + workflow_id = $1 AND + loop_location = $2 + RETURNING 1 + ), + forget_message_send_events AS ( + UPDATE db_workflow.workflow_message_send_events + SET forgotten = TRUE + WHERE + workflow_id = $1 AND + loop_location = $2 + RETURNING 1 + ), + forget_loop_events AS ( + UPDATE db_workflow.workflow_loop_events + SET forgotten = TRUE + WHERE + workflow_id = $1 AND + loop_location = $2 + RETURNING 1 + ) + SELECT 1 + ", + )) + .bind(workflow_id) + .bind(location.iter().map(|x| *x as i64).collect::>()) + .execute(&mut *tx) + .await + .map_err(WorkflowError::Sqlx)?; + + tx.commit().await.map_err(WorkflowError::Sqlx)?; + + Ok(()) + }) + .await?; + + Ok(()) + } } diff --git a/lib/chirp-workflow/core/src/error.rs b/lib/chirp-workflow/core/src/error.rs index 37e820ab66..50d7b181ed 100644 --- a/lib/chirp-workflow/core/src/error.rs +++ b/lib/chirp-workflow/core/src/error.rs @@ -1,7 +1,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::time::Instant; use global_error::GlobalError; +use tokio::time::Instant; use uuid::Uuid; use crate::ctx::workflow::RETRY_TIMEOUT_MS; @@ -19,7 +19,7 @@ pub enum WorkflowError { // Includes error count #[error("activity failure: {0:?}")] - ActivityFailure(GlobalError, u32), + ActivityFailure(GlobalError, usize), #[error("activity failure, max retries reached: {0:?}")] ActivityMaxFailuresReached(GlobalError), @@ -42,10 +42,10 @@ pub enum WorkflowError { #[error("deserialize workflow input: {0}")] DeserializeWorkflowInput(serde_json::Error), - #[error("serialize activity output: {0}")] + #[error("serialize workflow output: {0}")] SerializeWorkflowOutput(serde_json::Error), - #[error("deserialize workflow input: {0}")] + #[error("deserialize workflow output: {0}")] DeserializeWorkflowOutput(serde_json::Error), #[error("serialize activity input: {0}")] @@ -78,6 +78,12 @@ pub enum WorkflowError { #[error("serialize message tags: {0:?}")] SerializeMessageTags(cjson::Error), + #[error("serialize loop output: {0}")] + SerializeLoopOutput(serde_json::Error), + + #[error("deserialize loop input: {0}")] + DeserializeLoopOutput(serde_json::Error), + #[error("create subscription: {0}")] CreateSubscription(rivet_pools::prelude::nats::Error), @@ -129,24 +135,24 @@ impl WorkflowError { if let WorkflowError::ActivityFailure(_, error_count) = self { // NOTE: Max retry is handled in `WorkflowCtx::activity` let mut backoff = - rivet_util::Backoff::new_at(8, None, RETRY_TIMEOUT_MS, 500, *error_count as usize); + rivet_util::Backoff::new_at(8, None, RETRY_TIMEOUT_MS, 500, *error_count); let next = backoff.step().expect("should not have max retry"); // Calculate timestamp based on the backoff let duration_until = next.duration_since(Instant::now()); let deadline_ts = (SystemTime::now() + duration_until) - .duration_since(UNIX_EPOCH) - .unwrap_or_else(|err| unreachable!("time is broken: {}", err)) - .as_millis() - .try_into() - .expect("doesn't fit in i64"); + .duration_since(UNIX_EPOCH) + .unwrap_or_else(|err| unreachable!("time is broken: {}", err)) + .as_millis() + .try_into() + .expect("doesn't fit in i64"); Some(deadline_ts) } else { None } } - + pub fn is_recoverable(&self) -> bool { match self { WorkflowError::ActivityFailure(_, _) => true, diff --git a/lib/chirp-workflow/core/src/event.rs b/lib/chirp-workflow/core/src/event.rs index 10908ba755..6d56341f11 100644 --- a/lib/chirp-workflow/core/src/event.rs +++ b/lib/chirp-workflow/core/src/event.rs @@ -1,13 +1,16 @@ +use std::collections::HashMap; + use serde::de::DeserializeOwned; use uuid::Uuid; use crate::{ activity::ActivityId, db::{ - ActivityEventRow, MessageSendEventRow, SignalEventRow, SignalSendEventRow, - SubWorkflowEventRow, + ActivityEventRow, LoopEventRow, MessageSendEventRow, PulledWorkflow, PulledWorkflowRow, + SignalEventRow, SignalSendEventRow, SubWorkflowEventRow, }, error::{WorkflowError, WorkflowResult}, + util::Location, }; /// An event that happened in the workflow run. @@ -20,6 +23,7 @@ pub enum Event { SignalSend(SignalSendEvent), MessageSend(MessageSendEvent), SubWorkflow(SubWorkflowEvent), + Loop(LoopEvent), // Used as a placeholder for branching locations Branch, } @@ -32,6 +36,7 @@ impl std::fmt::Display for Event { Event::SignalSend(signal_send) => write!(f, "signal send {:?}", signal_send.name), Event::MessageSend(message_send) => write!(f, "message send {:?}", message_send.name), Event::SubWorkflow(sub_workflow) => write!(f, "sub workflow {:?}", sub_workflow.name), + Event::Loop(_) => write!(f, "loop"), Event::Branch => write!(f, "branch"), } } @@ -42,9 +47,9 @@ pub struct ActivityEvent { pub activity_id: ActivityId, pub create_ts: i64, - /// If activity succeeds, this will be some. + /// If the activity succeeds, this will be some. pub(crate) output: Option, - pub error_count: u32, + pub error_count: usize, } impl ActivityEvent { @@ -138,3 +143,200 @@ impl TryFrom for SubWorkflowEvent { }) } } + +#[derive(Debug)] +pub struct LoopEvent { + /// If the loop completes, this will be some. + pub(crate) output: Option, + pub iteration: usize, +} + +impl LoopEvent { + pub fn parse_output(&self) -> WorkflowResult> { + self.output + .clone() + .map(serde_json::from_value) + .transpose() + .map_err(WorkflowError::DeserializeLoopOutput) + } +} + +impl TryFrom for LoopEvent { + type Error = WorkflowError; + + fn try_from(value: LoopEventRow) -> WorkflowResult { + Ok(LoopEvent { + output: value.output, + iteration: value + .iteration + .try_into() + .map_err(|_| WorkflowError::IntegerConversion)?, + }) + } +} + +/// Takes all workflow events (each with their own location) and combines them via enum into a hashmap of the +/// following structure: +/// +/// Given the location [1, 2, 3], 3 is the index and [1, 2] is the root location +/// +/// HashMap { +/// [1, 2]: [ +/// example signal event, +/// example activity event, +/// example sub workflow event, +/// example activity event (this is [1, 2, 3]) +/// ], +/// } +pub fn combine_events( + workflow_rows: Vec, + activity_events: Vec, + signal_events: Vec, + signal_send_events: Vec, + msg_send_events: Vec, + sub_workflow_events: Vec, + loop_events: Vec, +) -> WorkflowResult> { + // Map workflow rows by workflow id + let mut workflows_by_id = workflow_rows + .into_iter() + .map(|row| { + let events_by_location: HashMap> = HashMap::new(); + + (row.workflow_id, (row, events_by_location)) + }) + .collect::>(); + + for event in activity_events { + let (_, ref mut events_by_location) = workflows_by_id + .get_mut(&event.workflow_id) + .expect("unreachable, workflow for event not found"); + let (root_location, idx) = split_location(&event.location); + + events_by_location + .entry(root_location) + .or_default() + .push((idx, Event::Activity(event.try_into()?))); + } + + for event in signal_events { + let (_, ref mut events_by_location) = workflows_by_id + .get_mut(&event.workflow_id) + .expect("unreachable, workflow for event not found"); + let (root_location, idx) = split_location(&event.location); + + events_by_location + .entry(root_location) + .or_default() + .push((idx, Event::Signal(event.try_into()?))); + } + + for event in signal_send_events { + let (_, ref mut events_by_location) = workflows_by_id + .get_mut(&event.workflow_id) + .expect("unreachable, workflow for event not found"); + let (root_location, idx) = split_location(&event.location); + + events_by_location + .entry(root_location) + .or_default() + .push((idx, Event::SignalSend(event.try_into()?))); + } + + for event in msg_send_events { + let (_, ref mut events_by_location) = workflows_by_id + .get_mut(&event.workflow_id) + .expect("unreachable, workflow for event not found"); + let (root_location, idx) = split_location(&event.location); + + events_by_location + .entry(root_location) + .or_default() + .push((idx, Event::MessageSend(event.try_into()?))); + } + + for event in sub_workflow_events { + let (_, ref mut events_by_location) = workflows_by_id + .get_mut(&event.workflow_id) + .expect("unreachable, workflow for event not found"); + let (root_location, idx) = split_location(&event.location); + + events_by_location + .entry(root_location) + .or_default() + .push((idx, Event::SubWorkflow(event.try_into()?))); + } + + for event in loop_events { + let (_, ref mut events_by_location) = workflows_by_id + .get_mut(&event.workflow_id) + .expect("unreachable, workflow for event not found"); + let (root_location, idx) = split_location(&event.location); + + events_by_location + .entry(root_location) + .or_default() + .push((idx, Event::Loop(event.try_into()?))); + } + + let workflows = workflows_by_id + .into_values() + .map(|(row, mut events_by_location)| { + // TODO(RVT-3754): This involves inserting, sorting, then recollecting into lists and recollecting + // into a hashmap + // Sort all of the events because we are inserting from two different lists. Both lists are + // already sorted themselves so this should be fairly cheap + for (_, list) in events_by_location.iter_mut() { + list.sort_by_key(|(idx, _)| *idx); + } + + // Remove idx from lists + let event_history = events_by_location + .into_iter() + .map(|(k, events)| { + let mut expected_idx = 0; + + // Check for missing indexes and insert a `Branch` placeholder event for each missing spot + let events = events + .into_iter() + .flat_map(|(idx, v)| { + assert!(expected_idx <= idx, "invalid history"); + + let offset = (idx - expected_idx) as usize; + expected_idx = idx + 1; + + std::iter::repeat_with(|| Event::Branch) + .take(offset) + .chain(std::iter::once(v)) + }) + .collect(); + + (k, events) + }) + .collect(); + + PulledWorkflow { + workflow_id: row.workflow_id, + workflow_name: row.workflow_name, + create_ts: row.create_ts, + ray_id: row.ray_id, + input: row.input, + wake_deadline_ts: row.wake_deadline_ts, + events: event_history, + } + }) + .collect(); + + Ok(workflows) +} + +fn split_location(location: &[i64]) -> (Location, i64) { + ( + location + .iter() + .take(location.len().saturating_sub(1)) + .map(|x| *x as usize) + .collect::(), + *location.last().unwrap(), + ) +} diff --git a/lib/chirp-workflow/core/src/executable.rs b/lib/chirp-workflow/core/src/executable.rs index 0e80b0b8bb..a7231bfa2f 100644 --- a/lib/chirp-workflow/core/src/executable.rs +++ b/lib/chirp-workflow/core/src/executable.rs @@ -74,7 +74,7 @@ struct TupleHelper { exec: T, } -// Must wrap all closured being used as executables in this function due to +// Must wrap all closured being used as executables in `WorkflowCtx::join` in this function due to // https://github.com/rust-lang/rust/issues/70263 pub fn closure(f: F) -> F where diff --git a/lib/chirp-workflow/core/src/prelude.rs b/lib/chirp-workflow/core/src/prelude.rs index 433d05503f..f3210d078b 100644 --- a/lib/chirp-workflow/core/src/prelude.rs +++ b/lib/chirp-workflow/core/src/prelude.rs @@ -15,11 +15,11 @@ pub mod util { pub use crate::{ activity::Activity, + ctx::workflow::Loop, ctx::*, db, error::{WorkflowError, WorkflowResult}, - executable::closure, - executable::Executable, + executable::{closure, Executable}, listen::{CustomListener, Listen}, message::Message, operation::Operation, diff --git a/lib/chirp-workflow/core/src/util.rs b/lib/chirp-workflow/core/src/util.rs index f9114bb6fd..9a106be887 100644 --- a/lib/chirp-workflow/core/src/util.rs +++ b/lib/chirp-workflow/core/src/util.rs @@ -1,21 +1,11 @@ -use std::{ - collections::HashMap, - time::{SystemTime, UNIX_EPOCH}, -}; +use std::time::{SystemTime, UNIX_EPOCH}; use global_error::{macros::*, GlobalError, GlobalResult}; use rand::Rng; use tokio::time::{self, Duration}; use uuid::Uuid; -use crate::{ - db::{ - ActivityEventRow, MessageSendEventRow, PulledWorkflow, PulledWorkflowRow, SignalEventRow, - SignalSendEventRow, SubWorkflowEventRow, - }, - error::{WorkflowError, WorkflowResult}, - event::Event, -}; +use crate::error::WorkflowError; pub type Location = Box<[usize]>; @@ -58,179 +48,6 @@ pub async fn sleep_until_ts(ts: i64) { } } -/// Takes all workflow events (each with their own location) and combines them via enum into a hashmap of the -/// following structure: -/// -/// Given the location [1, 2, 3], 3 is the index and [1, 2] is the root location -/// -/// HashMap { -/// [1, 2]: [ -/// example signal event, -/// example activity event, -/// example sub workflow event, -/// example activity event (this is [1, 2, 3]) -/// ], -/// } -pub fn combine_events( - workflow_rows: Vec, - activity_events: Vec, - signal_events: Vec, - signal_send_events: Vec, - msg_send_events: Vec, - sub_workflow_events: Vec, -) -> WorkflowResult> { - // Map workflow rows by workflow id - let mut workflows_by_id = workflow_rows - .into_iter() - .map(|row| { - let events_by_location: HashMap> = HashMap::new(); - - (row.workflow_id, (row, events_by_location)) - }) - .collect::>(); - - for event in activity_events { - let (_, ref mut events_by_location) = workflows_by_id - .get_mut(&event.workflow_id) - .expect("unreachable, workflow for event not found"); - let (root_location, idx) = split_location(&event.location); - - events_by_location - .entry(root_location) - .or_default() - .push((idx, Event::Activity(event.try_into()?))); - } - - for event in signal_events { - let (_, ref mut events_by_location) = workflows_by_id - .get_mut(&event.workflow_id) - .expect("unreachable, workflow for event not found"); - let (root_location, idx) = split_location(&event.location); - - events_by_location - .entry(root_location) - .or_default() - .push((idx, Event::Signal(event.try_into()?))); - } - - for event in signal_send_events { - let (_, ref mut events_by_location) = workflows_by_id - .get_mut(&event.workflow_id) - .expect("unreachable, workflow for event not found"); - let (root_location, idx) = split_location(&event.location); - - events_by_location - .entry(root_location) - .or_default() - .push((idx, Event::SignalSend(event.try_into()?))); - } - - for event in msg_send_events { - let (_, ref mut events_by_location) = workflows_by_id - .get_mut(&event.workflow_id) - .expect("unreachable, workflow for event not found"); - let (root_location, idx) = split_location(&event.location); - - events_by_location - .entry(root_location) - .or_default() - .push((idx, Event::MessageSend(event.try_into()?))); - } - - for event in sub_workflow_events { - let (_, ref mut events_by_location) = workflows_by_id - .get_mut(&event.workflow_id) - .expect("unreachable, workflow for event not found"); - let (root_location, idx) = split_location(&event.location); - - events_by_location - .entry(root_location) - .or_default() - .push((idx, Event::SubWorkflow(event.try_into()?))); - } - - let workflows = workflows_by_id - .into_values() - .map(|(row, mut events_by_location)| { - // TODO(RVT-3754): This involves inserting, sorting, then recollecting into lists and recollecting - // into a hashmap - // Sort all of the events because we are inserting from two different lists. Both lists are - // already sorted themselves so this should be fairly cheap - for (_, list) in events_by_location.iter_mut() { - list.sort_by_key(|(idx, _)| *idx); - } - - // Remove idx from lists - let event_history = events_by_location - .into_iter() - .map(|(k, events)| { - let mut expected_idx = 0; - - // Check for missing indexes and insert a `Branch` placeholder event for each missing spot - let events = events - .into_iter() - .flat_map(|(idx, v)| { - let offset = (idx - expected_idx) as usize; - expected_idx = idx + 1; - - std::iter::repeat_with(|| Event::Branch) - .take(offset) - .chain(std::iter::once(v)) - }) - .collect(); - - (k, events) - }) - .collect(); - - PulledWorkflow { - workflow_id: row.workflow_id, - workflow_name: row.workflow_name, - create_ts: row.create_ts, - ray_id: row.ray_id, - input: row.input, - wake_deadline_ts: row.wake_deadline_ts, - events: event_history, - } - }) - .collect(); - - Ok(workflows) -} - -fn split_location(location: &[i64]) -> (Location, i64) { - ( - location - .iter() - .take(location.len().saturating_sub(1)) - .map(|x| *x as usize) - .collect::(), - *location.last().unwrap(), - ) -} - -// // Insert placeholder record into parent location list (ex. for the location [4, 0], insert placeholder into -// // the [] list at the 4th index) -// fn insert_placeholder( -// events_by_location: &mut HashMap>, -// location: &[i64], -// idx: i64, -// ) { -// if idx == 0 && location.len() > 1 { -// let parent_location = location -// .iter() -// .take(location.len().saturating_sub(2)) -// .map(|x| *x as usize) -// .collect::(); -// let parent_idx = *location.get(location.len().saturating_sub(2)).unwrap(); - -// events_by_location -// .entry(parent_location) -// .or_default() -// .push((parent_idx, Event::Branch)); -// } -// } - pub fn inject_fault() -> GlobalResult<()> { if rand::thread_rng().gen_range(0..100) < FAULT_RATE { bail!("This is a random panic!"); diff --git a/lib/chirp-workflow/macros/src/lib.rs b/lib/chirp-workflow/macros/src/lib.rs index f668eb1086..74184b4222 100644 --- a/lib/chirp-workflow/macros/src/lib.rs +++ b/lib/chirp-workflow/macros/src/lib.rs @@ -8,7 +8,7 @@ use syn::{ }; struct Config { - max_retries: u32, + max_retries: usize, timeout: u64, } @@ -132,7 +132,7 @@ pub fn activity(attr: TokenStream, item: TokenStream) -> TokenStream { type Output = #output_type; const NAME: &'static str = #fn_name; - const MAX_RETRIES: u32 = #max_retries; + const MAX_RETRIES: usize = #max_retries; const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(#timeout); async fn run(#ctx_ident: #ctx_ty, #input_ident: &Self::Input) -> GlobalResult { diff --git a/svc/pkg/cluster/src/workflows/cluster.rs b/svc/pkg/cluster/src/workflows/cluster.rs index a076da7259..a2bd833ff8 100644 --- a/svc/pkg/cluster/src/workflows/cluster.rs +++ b/svc/pkg/cluster/src/workflows/cluster.rs @@ -1,4 +1,5 @@ use chirp_workflow::prelude::*; +use futures_util::FutureExt; use serde_json::json; use crate::types::{BuildDeliveryMethod, Pool, Provider}; @@ -27,49 +28,56 @@ pub async fn cluster(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> { ) .await?; - let cluster_id = input.cluster_id; - loop { - match ctx.listen::
().await? { - Main::GameLink(sig) => { - ctx.activity(GameLinkInput { - cluster_id, - game_id: sig.game_id, - }) - .await?; - - ctx.msg( - json!({ - "cluster_id": cluster_id, - }), - GameLinkComplete {}, - ) - .await?; - } - Main::DatacenterCreate(sig) => { - ctx.dispatch_tagged_workflow( - &json!({ - "datacenter_id": sig.datacenter_id, - }), - crate::workflows::datacenter::Input { - cluster_id: input.cluster_id, - datacenter_id: sig.datacenter_id, - name_id: sig.name_id, - display_name: sig.display_name, - - provider: sig.provider, - provider_datacenter_id: sig.provider_datacenter_id, - provider_api_token: sig.provider_api_token, - - pools: sig.pools, - - build_delivery_method: sig.build_delivery_method, - prebakes_enabled: sig.prebakes_enabled, - }, - ) - .await?; + ctx.repeat(|ctx| { + let cluster_id = input.cluster_id; + + async move { + match ctx.listen::
().await? { + Main::GameLink(sig) => { + ctx.activity(GameLinkInput { + cluster_id, + game_id: sig.game_id, + }) + .await?; + + ctx.msg( + json!({ + "cluster_id": cluster_id, + }), + GameLinkComplete {}, + ) + .await?; + } + Main::DatacenterCreate(sig) => { + ctx.dispatch_tagged_workflow( + &json!({ + "datacenter_id": sig.datacenter_id, + }), + crate::workflows::datacenter::Input { + cluster_id, + datacenter_id: sig.datacenter_id, + name_id: sig.name_id, + display_name: sig.display_name, + + provider: sig.provider, + provider_datacenter_id: sig.provider_datacenter_id, + provider_api_token: sig.provider_api_token, + + pools: sig.pools, + + build_delivery_method: sig.build_delivery_method, + prebakes_enabled: sig.prebakes_enabled, + }, + ) + .await?; + } } + + Ok(Loop::Continue) } - } + .boxed() + }) + .await } #[derive(Debug, Serialize, Deserialize, Hash)] diff --git a/svc/pkg/cluster/src/workflows/datacenter/mod.rs b/svc/pkg/cluster/src/workflows/datacenter/mod.rs index 72ef5e1599..8e18185b52 100644 --- a/svc/pkg/cluster/src/workflows/datacenter/mod.rs +++ b/svc/pkg/cluster/src/workflows/datacenter/mod.rs @@ -61,47 +61,54 @@ pub(crate) async fn cluster_datacenter(ctx: &mut WorkflowCtx, input: &Input) -> // Scale ctx.signal(ctx.workflow_id(), Scale {}).await?; - let datacenter_id = input.datacenter_id; - loop { - match ctx.listen::
().await? { - Main::Update(sig) => { - ctx.activity(UpdateDbInput { - datacenter_id, - pools: sig.pools, - prebakes_enabled: sig.prebakes_enabled, - }) - .await?; + ctx.repeat(|ctx| { + let datacenter_id = input.datacenter_id; + let provider = input.provider.clone(); - // Scale - ctx.signal(ctx.workflow_id(), Scale {}).await?; - } - Main::Scale(_) => { - ctx.workflow(scale::Input { datacenter_id }).await?; - } - Main::ServerCreate(sig) => { - ctx.dispatch_tagged_workflow( - &json!({ - "server_id": sig.server_id, - }), - crate::workflows::server::Input { + async move { + match ctx.listen::
().await? { + Main::Update(sig) => { + ctx.activity(UpdateDbInput { datacenter_id, - server_id: sig.server_id, - pool_type: sig.pool_type, - provider: input.provider.clone(), - tags: sig.tags, - }, - ) - .await?; - } - Main::TlsRenew(_) => { - ctx.dispatch_workflow(tls_issue::Input { - datacenter_id, - renew: true, - }) - .await?; + pools: sig.pools, + prebakes_enabled: sig.prebakes_enabled, + }) + .await?; + + // Scale + ctx.signal(ctx.workflow_id(), Scale {}).await?; + } + Main::Scale(_) => { + ctx.workflow(scale::Input { datacenter_id }).await?; + } + Main::ServerCreate(sig) => { + ctx.dispatch_tagged_workflow( + &json!({ + "server_id": sig.server_id, + }), + crate::workflows::server::Input { + datacenter_id, + server_id: sig.server_id, + pool_type: sig.pool_type, + provider: provider.clone(), + tags: sig.tags, + }, + ) + .await?; + } + Main::TlsRenew(_) => { + ctx.dispatch_workflow(tls_issue::Input { + datacenter_id, + renew: true, + }) + .await?; + } } + Ok(Loop::Continue) } - } + .boxed() + }) + .await } #[derive(Debug, Clone, Serialize, Deserialize, Hash)] diff --git a/svc/pkg/cluster/src/workflows/server/mod.rs b/svc/pkg/cluster/src/workflows/server/mod.rs index 4292721035..9f541150ab 100644 --- a/svc/pkg/cluster/src/workflows/server/mod.rs +++ b/svc/pkg/cluster/src/workflows/server/mod.rs @@ -197,6 +197,7 @@ pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> Glob bail!("failed all attempts to provision server"); }; + // NOTE: This loop has side effects (for state) so we do not use `ctx.repeat` let mut state = State::default(); loop { match state.run(ctx).await? { diff --git a/svc/pkg/cluster/standalone/gc/tests/integration.rs b/svc/pkg/cluster/standalone/gc/tests/integration.rs index 4902a87463..dbc7cb4010 100644 --- a/svc/pkg/cluster/standalone/gc/tests/integration.rs +++ b/svc/pkg/cluster/standalone/gc/tests/integration.rs @@ -204,7 +204,6 @@ async fn setup(ctx: &TestCtx) -> (Uuid, Uuid) { cluster::workflows::datacenter::ServerCreate { server_id, pool_type: pool_type.clone(), - provider: provider.clone(), tags: vec!["test".to_string()], }, ) diff --git a/svc/pkg/linode/tests/image.rs b/svc/pkg/linode/tests_old/image.rs similarity index 100% rename from svc/pkg/linode/tests/image.rs rename to svc/pkg/linode/tests_old/image.rs diff --git a/svc/pkg/linode/tests/instance_type_get.rs b/svc/pkg/linode/tests_old/instance_type_get.rs similarity index 100% rename from svc/pkg/linode/tests/instance_type_get.rs rename to svc/pkg/linode/tests_old/instance_type_get.rs diff --git a/svc/pkg/linode/tests/server_destroy.rs b/svc/pkg/linode/tests_old/server_destroy.rs similarity index 100% rename from svc/pkg/linode/tests/server_destroy.rs rename to svc/pkg/linode/tests_old/server_destroy.rs diff --git a/svc/pkg/linode/tests/server_provision.rs b/svc/pkg/linode/tests_old/server_provision.rs similarity index 100% rename from svc/pkg/linode/tests/server_provision.rs rename to svc/pkg/linode/tests_old/server_provision.rs diff --git a/svc/pkg/workflow/db/workflow/migrations/20240712221043_fix_signals_idx.up.sql b/svc/pkg/workflow/db/workflow/migrations/20240712221043_fix_signals_idx.up.sql index 6233744200..466e782b72 100644 --- a/svc/pkg/workflow/db/workflow/migrations/20240712221043_fix_signals_idx.up.sql +++ b/svc/pkg/workflow/db/workflow/migrations/20240712221043_fix_signals_idx.up.sql @@ -1,5 +1,5 @@ DROP INDEX IF EXISTS idx_signals_workflow_id; -CREATE INDEX idx_signals_workflow_id +CREATE INDEX idx_signals_workflow_id2 ON signals (workflow_id) WHERE ack_ts IS NULL; diff --git a/svc/pkg/workflow/db/workflow/migrations/20240722174355_loops.down.sql b/svc/pkg/workflow/db/workflow/migrations/20240722174355_loops.down.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/svc/pkg/workflow/db/workflow/migrations/20240722174355_loops.up.sql b/svc/pkg/workflow/db/workflow/migrations/20240722174355_loops.up.sql new file mode 100644 index 0000000000..c62aa5ddce --- /dev/null +++ b/svc/pkg/workflow/db/workflow/migrations/20240722174355_loops.up.sql @@ -0,0 +1,58 @@ +-- Stores loops for replay +CREATE TABLE workflow_loop_events ( + workflow_id UUID NOT NULL REFERENCES workflows, + location INT[] NOT NULL, + iteration INT NOT NULL, + output JSONB, + + loop_location INT[], + forgotten BOOLEAN NOT NULL DEFAULT FALSE, + + PRIMARY KEY (workflow_id, location) +); + +-- Query by loop location +CREATE INDEX idx_workflow_loop_events_loop_location +ON workflow_loop_events (workflow_id, loop_location); + + + +ALTER TABLE workflow_activity_events + ADD COLUMN loop_location INT[], + ADD COLUMN forgotten BOOLEAN NOT NULL DEFAULT FALSE; + +-- Query by loop location +CREATE INDEX idx_workflow_activity_events_loop_location +ON workflow_activity_events (workflow_id, loop_location); + +ALTER TABLE workflow_signal_events + ADD COLUMN loop_location INT[], + ADD COLUMN forgotten BOOLEAN NOT NULL DEFAULT FALSE; + +-- Query by loop location +CREATE INDEX idx_workflow_signal_events_loop_location +ON workflow_signal_events (workflow_id, loop_location); + +ALTER TABLE workflow_sub_workflow_events + ADD COLUMN loop_location INT[], + ADD COLUMN forgotten BOOLEAN NOT NULL DEFAULT FALSE; + +-- Query by loop location +CREATE INDEX idx_workflow_sub_workflow_events_loop_location +ON workflow_sub_workflow_events (workflow_id, loop_location); + +ALTER TABLE workflow_signal_send_events + ADD COLUMN loop_location INT[], + ADD COLUMN forgotten BOOLEAN NOT NULL DEFAULT FALSE; + +-- Query by loop location +CREATE INDEX idx_workflow_signal_send_events_loop_location +ON workflow_signal_send_events (workflow_id, loop_location); + +ALTER TABLE workflow_message_send_events + ADD COLUMN loop_location INT[], + ADD COLUMN forgotten BOOLEAN NOT NULL DEFAULT FALSE; + +-- Query by loop location +CREATE INDEX idx_workflow_message_send_events_loop_location +ON workflow_message_send_events (workflow_id, loop_location);