From f7d021ccecdd6d25966126e0bed91c58ec5179ee Mon Sep 17 00:00:00 2001 From: MasterPtato <23087326+MasterPtato@users.noreply.github.com> Date: Fri, 9 Aug 2024 20:35:46 +0000 Subject: [PATCH] fix(clusters): fix linode cleanup logic (#1034) ## Changes --- lib/chirp-workflow/core/src/ctx/workflow.rs | 14 ++++++++++++++ lib/chirp-workflow/core/src/db/postgres.rs | 2 +- svc/pkg/linode/src/workflows/server/mod.rs | 10 ++++++---- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index ae638f2036..e50c22ec0d 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -751,6 +751,20 @@ impl WorkflowCtx { tokio::task::spawn(async move { closure(f).execute(&mut ctx).await }) } + /// Tests if the given error is unrecoverable. If it is, allows the user to run recovery code safely. + /// Should always be used when trying to handle activity errors manually. + pub fn catch_unrecoverable(&mut self, res: GlobalResult) -> GlobalResult> { + match res { + Err(err) if !err.is_workflow_recoverable() => { + self.location_idx += 1; + + Ok(Err(err)) + } + Err(err) => Err(err), + Ok(x) => Ok(Ok(x)), + } + } + /// Sends a signal. pub async fn signal( &mut self, diff --git a/lib/chirp-workflow/core/src/db/postgres.rs b/lib/chirp-workflow/core/src/db/postgres.rs index 0a6be8d793..89b4a19b53 100644 --- a/lib/chirp-workflow/core/src/db/postgres.rs +++ b/lib/chirp-workflow/core/src/db/postgres.rs @@ -865,7 +865,7 @@ impl Database for DatabasePostgres { sqlx::query(indoc!( " INSERT INTO db_workflow.workflow_message_send_events( - workflow_id, location, tags, message_name, body + workflow_id, location, tags, message_name, body, loop_location ) VALUES($1, $2, $3, $4, $5, $6) RETURNING 1 diff --git a/svc/pkg/linode/src/workflows/server/mod.rs b/svc/pkg/linode/src/workflows/server/mod.rs index ae18d99a3b..aff5d1ec43 100644 --- a/svc/pkg/linode/src/workflows/server/mod.rs +++ b/svc/pkg/linode/src/workflows/server/mod.rs @@ -27,10 +27,13 @@ pub struct Input { #[workflow] pub async fn linode_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> { let mut cleanup_ctx = CleanupCtx::default(); - let provision_res = match provision(ctx, input, &mut cleanup_ctx).await { + + let res = provision(ctx, input, &mut cleanup_ctx).await; + let provision_res = match ctx.catch_unrecoverable(res)? { + Ok(x) => x, // If we cannot recover a provisioning error, send a failed signal and clean up resources - Err(err) if !err.is_workflow_recoverable() => { - tracing::warn!(?err); + Err(err) => { + tracing::warn!(?err, "unrecoverable provision, cleaning up"); ctx.dispatch_workflow(cleanup::Input { api_token: input.api_token.clone(), @@ -51,7 +54,6 @@ pub async fn linode_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult // Throw the original error from the provisioning activities return Err(err); } - x => x?, }; ctx.tagged_signal(