Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/libraries/workflow/GOTCHAS.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,9 @@ None -> "null" -> None
```

Be careful when writing your struct definitions.

## Force waking a sleeping workflow

When force waking a sleeping workflow by setting `wake_immediate = true`, know that if the workflow is
currently on a `sleep` step it will go back to sleep if it has not reached its `wake_deadline` yet. For all
other steps, the workflow will continue normally (usually just go back to sleep).
39 changes: 23 additions & 16 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1279,45 +1279,52 @@ impl WorkflowCtx {
let event = self.relevant_history().nth(self.location_idx);

// Slept before
if let Some(event) = event {
let (deadline_ts, replay) = if let Some(event) = event {
// Validate history is consistent
let Event::Sleep(_) = event else {
let Event::Sleep(sleep) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event} at {}, found sleep",
self.loc(),
)))
.map_err(GlobalError::raw);
};

tracing::debug!(name=%self.name, id=%self.workflow_id, "skipping replayed sleep");
tracing::debug!(name=%self.name, id=%self.workflow_id, "replaying sleep");

(sleep.deadline_ts, true)
}
// Sleep
else {
let ts = time.to_millis()?;
let deadline_ts = time.to_millis()?;

self.db
.commit_workflow_sleep_event(
self.workflow_id,
self.full_location().as_ref(),
ts,
deadline_ts,
self.loop_location(),
)
.await?;

let duration = ts - rivet_util::timestamp::now();
if duration < 0 {
// No-op
tracing::warn!("tried to sleep for a negative duration");
} else if duration < worker::TICK_INTERVAL.as_millis() as i64 + 1 {
tracing::info!(name=%self.name, id=%self.workflow_id, until_ts=%ts, "sleeping in memory");
(deadline_ts, false)
};

// Sleep in memory if duration is shorter than the worker tick
tokio::time::sleep(std::time::Duration::from_millis(duration.try_into()?)).await;
} else {
tracing::info!(name=%self.name, id=%self.workflow_id, until_ts=%ts, "sleeping");
let duration = deadline_ts.saturating_sub(rivet_util::timestamp::now());

return Err(WorkflowError::Sleep(ts)).map_err(GlobalError::raw);
// No-op
if duration < 0 {
if !replay {
tracing::warn!("tried to sleep for a negative duration");
}
} else if duration < worker::TICK_INTERVAL.as_millis() as i64 + 1 {
tracing::info!(name=%self.name, id=%self.workflow_id, %deadline_ts, "sleeping in memory");

// Sleep in memory if duration is shorter than the worker tick
tokio::time::sleep(std::time::Duration::from_millis(duration.try_into()?)).await;
} else {
tracing::info!(name=%self.name, id=%self.workflow_id, %deadline_ts, "sleeping");

return Err(WorkflowError::Sleep(deadline_ts)).map_err(GlobalError::raw);
}

// Move to next event
Expand Down
1 change: 1 addition & 0 deletions lib/chirp-workflow/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,5 @@ pub struct LoopEventRow {
pub struct SleepEventRow {
pub workflow_id: Uuid,
pub location: Vec<i64>,
pub deadline_ts: i64,
}
8 changes: 4 additions & 4 deletions lib/chirp-workflow/core/src/db/pg_nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ impl Database for DatabasePgNats {
sqlx::query_as::<_, SleepEventRow>(indoc!(
"
SELECT
workflow_id, location
workflow_id, location, deadline_ts
FROM db_workflow.workflow_sleep_events
WHERE workflow_id = ANY($1) AND forgotten = FALSE
ORDER BY workflow_id, location ASC
Expand Down Expand Up @@ -1048,22 +1048,22 @@ impl Database for DatabasePgNats {
&self,
from_workflow_id: Uuid,
location: &[usize],
until_ts: i64,
deadline_ts: i64,
loop_location: Option<&[usize]>,
) -> WorkflowResult<()> {
self.query(|| async {
sqlx::query(indoc!(
"
INSERT INTO db_workflow.workflow_sleep_events(
workflow_id, location, until_ts, loop_location
workflow_id, location, deadline_ts, loop_location
)
VALUES($1, $2, $3, $4)
RETURNING 1
",
))
.bind(from_workflow_id)
.bind(location.iter().map(|x| *x as i64).collect::<Vec<_>>())
.bind(until_ts)
.bind(deadline_ts)
.bind(loop_location.map(|l| l.iter().map(|x| *x as i64).collect::<Vec<_>>()))
.execute(&mut *self.conn().await?)
.await
Expand Down
10 changes: 7 additions & 3 deletions lib/chirp-workflow/core/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,17 @@ impl TryFrom<LoopEventRow> for LoopEvent {
}

#[derive(Debug)]
pub struct SleepEvent {}
pub struct SleepEvent {
pub deadline_ts: i64,
}

impl TryFrom<SleepEventRow> for SleepEvent {
type Error = WorkflowError;

fn try_from(_value: SleepEventRow) -> WorkflowResult<Self> {
Ok(SleepEvent {})
fn try_from(value: SleepEventRow) -> WorkflowResult<Self> {
Ok(SleepEvent {
deadline_ts: value.deadline_ts,
})
}
}

Expand Down
5 changes: 1 addition & 4 deletions svc/pkg/linode/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,7 @@ struct WaitDiskReadyInput {
}

#[activity(WaitDiskReady)]
async fn wait_disk_ready(
ctx: &ActivityCtx,
input: &WaitDiskReadyInput,
) -> GlobalResult<()> {
async fn wait_disk_ready(ctx: &ActivityCtx, input: &WaitDiskReadyInput) -> GlobalResult<()> {
// Build HTTP client
let client = client::Client::new(input.api_token.clone()).await?;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
CREATE TABLE workflow_sleep_events (
workflow_id UUID NOT NULL REFERENCES workflows,
location INT[] NOT NULL,
until_ts INT NOT NULL,
deadline_ts INT NOT NULL,

loop_location INT[],
forgotten BOOLEAN NOT NULL DEFAULT FALSE,
Expand Down