diff --git a/docs/libraries/workflow/GOTCHAS.md b/docs/libraries/workflow/GOTCHAS.md index 6478aad49b..7391237de4 100644 --- a/docs/libraries/workflow/GOTCHAS.md +++ b/docs/libraries/workflow/GOTCHAS.md @@ -134,3 +134,9 @@ None -> "null" -> None ``` Be careful when writing your struct definitions. + +## Force waking a sleeping workflow + +When force waking a sleeping workflow by setting `wake_immediate = true`, know that if the workflow is +currently on a `sleep` step it will go back to sleep if it has not reached its `wake_deadline` yet. For all +other steps, the workflow will continue normally (usually just go back to sleep). diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index 77ff2060ee..d39349901f 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -1279,9 +1279,9 @@ impl WorkflowCtx { let event = self.relevant_history().nth(self.location_idx); // Slept before - if let Some(event) = event { + let (deadline_ts, replay) = if let Some(event) = event { // Validate history is consistent - let Event::Sleep(_) = event else { + let Event::Sleep(sleep) = event else { return Err(WorkflowError::HistoryDiverged(format!( "expected {event} at {}, found sleep", self.loc(), @@ -1289,35 +1289,42 @@ impl WorkflowCtx { .map_err(GlobalError::raw); }; - tracing::debug!(name=%self.name, id=%self.workflow_id, "skipping replayed sleep"); + tracing::debug!(name=%self.name, id=%self.workflow_id, "replaying sleep"); + + (sleep.deadline_ts, true) } // Sleep else { - let ts = time.to_millis()?; + let deadline_ts = time.to_millis()?; self.db .commit_workflow_sleep_event( self.workflow_id, self.full_location().as_ref(), - ts, + deadline_ts, self.loop_location(), ) .await?; - let duration = ts - rivet_util::timestamp::now(); - if duration < 0 { - // No-op - tracing::warn!("tried to sleep for a negative duration"); - } else if duration < worker::TICK_INTERVAL.as_millis() as i64 + 1 { - tracing::info!(name=%self.name, id=%self.workflow_id, until_ts=%ts, "sleeping in memory"); + (deadline_ts, false) + }; - // Sleep in memory if duration is shorter than the worker tick - tokio::time::sleep(std::time::Duration::from_millis(duration.try_into()?)).await; - } else { - tracing::info!(name=%self.name, id=%self.workflow_id, until_ts=%ts, "sleeping"); + let duration = deadline_ts.saturating_sub(rivet_util::timestamp::now()); - return Err(WorkflowError::Sleep(ts)).map_err(GlobalError::raw); + // No-op + if duration < 0 { + if !replay { + tracing::warn!("tried to sleep for a negative duration"); } + } else if duration < worker::TICK_INTERVAL.as_millis() as i64 + 1 { + tracing::info!(name=%self.name, id=%self.workflow_id, %deadline_ts, "sleeping in memory"); + + // Sleep in memory if duration is shorter than the worker tick + tokio::time::sleep(std::time::Duration::from_millis(duration.try_into()?)).await; + } else { + tracing::info!(name=%self.name, id=%self.workflow_id, %deadline_ts, "sleeping"); + + return Err(WorkflowError::Sleep(deadline_ts)).map_err(GlobalError::raw); } // Move to next event diff --git a/lib/chirp-workflow/core/src/db/mod.rs b/lib/chirp-workflow/core/src/db/mod.rs index e8f7b10b2c..8f7e5f2c35 100644 --- a/lib/chirp-workflow/core/src/db/mod.rs +++ b/lib/chirp-workflow/core/src/db/mod.rs @@ -280,4 +280,5 @@ pub struct LoopEventRow { pub struct SleepEventRow { pub workflow_id: Uuid, pub location: Vec, + pub deadline_ts: i64, } diff --git a/lib/chirp-workflow/core/src/db/pg_nats.rs b/lib/chirp-workflow/core/src/db/pg_nats.rs index bfd21f0b58..a81628f24e 100644 --- a/lib/chirp-workflow/core/src/db/pg_nats.rs +++ b/lib/chirp-workflow/core/src/db/pg_nats.rs @@ -362,7 +362,7 @@ impl Database for DatabasePgNats { sqlx::query_as::<_, SleepEventRow>(indoc!( " SELECT - workflow_id, location + workflow_id, location, deadline_ts FROM db_workflow.workflow_sleep_events WHERE workflow_id = ANY($1) AND forgotten = FALSE ORDER BY workflow_id, location ASC @@ -1048,14 +1048,14 @@ impl Database for DatabasePgNats { &self, from_workflow_id: Uuid, location: &[usize], - until_ts: i64, + deadline_ts: i64, loop_location: Option<&[usize]>, ) -> WorkflowResult<()> { self.query(|| async { sqlx::query(indoc!( " INSERT INTO db_workflow.workflow_sleep_events( - workflow_id, location, until_ts, loop_location + workflow_id, location, deadline_ts, loop_location ) VALUES($1, $2, $3, $4) RETURNING 1 @@ -1063,7 +1063,7 @@ impl Database for DatabasePgNats { )) .bind(from_workflow_id) .bind(location.iter().map(|x| *x as i64).collect::>()) - .bind(until_ts) + .bind(deadline_ts) .bind(loop_location.map(|l| l.iter().map(|x| *x as i64).collect::>())) .execute(&mut *self.conn().await?) .await diff --git a/lib/chirp-workflow/core/src/event.rs b/lib/chirp-workflow/core/src/event.rs index 9d8a3ffca2..cf4fd90317 100644 --- a/lib/chirp-workflow/core/src/event.rs +++ b/lib/chirp-workflow/core/src/event.rs @@ -178,13 +178,17 @@ impl TryFrom for LoopEvent { } #[derive(Debug)] -pub struct SleepEvent {} +pub struct SleepEvent { + pub deadline_ts: i64, +} impl TryFrom for SleepEvent { type Error = WorkflowError; - fn try_from(_value: SleepEventRow) -> WorkflowResult { - Ok(SleepEvent {}) + fn try_from(value: SleepEventRow) -> WorkflowResult { + Ok(SleepEvent { + deadline_ts: value.deadline_ts, + }) } } diff --git a/svc/pkg/linode/src/workflows/server/mod.rs b/svc/pkg/linode/src/workflows/server/mod.rs index f28a916f5a..aef25b7763 100644 --- a/svc/pkg/linode/src/workflows/server/mod.rs +++ b/svc/pkg/linode/src/workflows/server/mod.rs @@ -344,10 +344,7 @@ struct WaitDiskReadyInput { } #[activity(WaitDiskReady)] -async fn wait_disk_ready( - ctx: &ActivityCtx, - input: &WaitDiskReadyInput, -) -> GlobalResult<()> { +async fn wait_disk_ready(ctx: &ActivityCtx, input: &WaitDiskReadyInput) -> GlobalResult<()> { // Build HTTP client let client = client::Client::new(input.api_token.clone()).await?; diff --git a/svc/pkg/workflow/db/workflow/migrations/20240816203112_add_sleep.up.sql b/svc/pkg/workflow/db/workflow/migrations/20240816203112_add_sleep.up.sql index 1b625fdadf..06fc5bf14d 100644 --- a/svc/pkg/workflow/db/workflow/migrations/20240816203112_add_sleep.up.sql +++ b/svc/pkg/workflow/db/workflow/migrations/20240816203112_add_sleep.up.sql @@ -1,7 +1,7 @@ CREATE TABLE workflow_sleep_events ( workflow_id UUID NOT NULL REFERENCES workflows, location INT[] NOT NULL, - until_ts INT NOT NULL, + deadline_ts INT NOT NULL, loop_location INT[], forgotten BOOLEAN NOT NULL DEFAULT FALSE,