Skip to content

Commit 480b641

Browse files
committed
fix(workflow): fix sleep logic
1 parent d5a1504 commit 480b641

File tree

6 files changed

+37
-28
lines changed

6 files changed

+37
-28
lines changed

lib/chirp-workflow/core/src/ctx/workflow.rs

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,45 +1279,52 @@ impl WorkflowCtx {
12791279
let event = self.relevant_history().nth(self.location_idx);
12801280

12811281
// Slept before
1282-
if let Some(event) = event {
1282+
let (deadline_ts, replay) = if let Some(event) = event {
12831283
// Validate history is consistent
1284-
let Event::Sleep(_) = event else {
1284+
let Event::Sleep(sleep) = event else {
12851285
return Err(WorkflowError::HistoryDiverged(format!(
12861286
"expected {event} at {}, found sleep",
12871287
self.loc(),
12881288
)))
12891289
.map_err(GlobalError::raw);
12901290
};
12911291

1292-
tracing::debug!(name=%self.name, id=%self.workflow_id, "skipping replayed sleep");
1292+
tracing::debug!(name=%self.name, id=%self.workflow_id, "replaying sleep");
1293+
1294+
(sleep.deadline_ts, true)
12931295
}
12941296
// Sleep
12951297
else {
1296-
let ts = time.to_millis()?;
1298+
let deadline_ts = time.to_millis()?;
12971299

12981300
self.db
12991301
.commit_workflow_sleep_event(
13001302
self.workflow_id,
13011303
self.full_location().as_ref(),
1302-
ts,
1304+
deadline_ts,
13031305
self.loop_location(),
13041306
)
13051307
.await?;
13061308

1307-
let duration = ts - rivet_util::timestamp::now();
1308-
if duration < 0 {
1309-
// No-op
1310-
tracing::warn!("tried to sleep for a negative duration");
1311-
} else if duration < worker::TICK_INTERVAL.as_millis() as i64 + 1 {
1312-
tracing::info!(name=%self.name, id=%self.workflow_id, until_ts=%ts, "sleeping in memory");
1309+
(deadline_ts, false)
1310+
};
13131311

1314-
// Sleep in memory if duration is shorter than the worker tick
1315-
tokio::time::sleep(std::time::Duration::from_millis(duration.try_into()?)).await;
1316-
} else {
1317-
tracing::info!(name=%self.name, id=%self.workflow_id, until_ts=%ts, "sleeping");
1312+
let duration = deadline_ts.saturating_sub(rivet_util::timestamp::now());
13181313

1319-
return Err(WorkflowError::Sleep(ts)).map_err(GlobalError::raw);
1314+
// No-op
1315+
if duration < 0 {
1316+
if !replay {
1317+
tracing::warn!("tried to sleep for a negative duration");
13201318
}
1319+
} else if duration < worker::TICK_INTERVAL.as_millis() as i64 + 1 {
1320+
tracing::info!(name=%self.name, id=%self.workflow_id, %deadline_ts, "sleeping in memory");
1321+
1322+
// Sleep in memory if duration is shorter than the worker tick
1323+
tokio::time::sleep(std::time::Duration::from_millis(duration.try_into()?)).await;
1324+
} else {
1325+
tracing::info!(name=%self.name, id=%self.workflow_id, %deadline_ts, "sleeping");
1326+
1327+
return Err(WorkflowError::Sleep(deadline_ts)).map_err(GlobalError::raw);
13211328
}
13221329

13231330
// Move to next event

lib/chirp-workflow/core/src/db/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,4 +280,5 @@ pub struct LoopEventRow {
280280
pub struct SleepEventRow {
281281
pub workflow_id: Uuid,
282282
pub location: Vec<i64>,
283+
pub deadline_ts: i64,
283284
}

lib/chirp-workflow/core/src/db/pg_nats.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ impl Database for DatabasePgNats {
362362
sqlx::query_as::<_, SleepEventRow>(indoc!(
363363
"
364364
SELECT
365-
workflow_id, location
365+
workflow_id, location, deadline_ts
366366
FROM db_workflow.workflow_sleep_events
367367
WHERE workflow_id = ANY($1) AND forgotten = FALSE
368368
ORDER BY workflow_id, location ASC
@@ -1048,22 +1048,22 @@ impl Database for DatabasePgNats {
10481048
&self,
10491049
from_workflow_id: Uuid,
10501050
location: &[usize],
1051-
until_ts: i64,
1051+
deadline_ts: i64,
10521052
loop_location: Option<&[usize]>,
10531053
) -> WorkflowResult<()> {
10541054
self.query(|| async {
10551055
sqlx::query(indoc!(
10561056
"
10571057
INSERT INTO db_workflow.workflow_sleep_events(
1058-
workflow_id, location, until_ts, loop_location
1058+
workflow_id, location, deadline_ts, loop_location
10591059
)
10601060
VALUES($1, $2, $3, $4)
10611061
RETURNING 1
10621062
",
10631063
))
10641064
.bind(from_workflow_id)
10651065
.bind(location.iter().map(|x| *x as i64).collect::<Vec<_>>())
1066-
.bind(until_ts)
1066+
.bind(deadline_ts)
10671067
.bind(loop_location.map(|l| l.iter().map(|x| *x as i64).collect::<Vec<_>>()))
10681068
.execute(&mut *self.conn().await?)
10691069
.await

lib/chirp-workflow/core/src/event.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,13 +178,17 @@ impl TryFrom<LoopEventRow> for LoopEvent {
178178
}
179179

180180
#[derive(Debug)]
181-
pub struct SleepEvent {}
181+
pub struct SleepEvent {
182+
pub deadline_ts: i64,
183+
}
182184

183185
impl TryFrom<SleepEventRow> for SleepEvent {
184186
type Error = WorkflowError;
185187

186-
fn try_from(_value: SleepEventRow) -> WorkflowResult<Self> {
187-
Ok(SleepEvent {})
188+
fn try_from(value: SleepEventRow) -> WorkflowResult<Self> {
189+
Ok(SleepEvent {
190+
deadline_ts: value.deadline_ts,
191+
})
188192
}
189193
}
190194

svc/pkg/linode/src/workflows/server/mod.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -344,10 +344,7 @@ struct WaitDiskReadyInput {
344344
}
345345

346346
#[activity(WaitDiskReady)]
347-
async fn wait_disk_ready(
348-
ctx: &ActivityCtx,
349-
input: &WaitDiskReadyInput,
350-
) -> GlobalResult<()> {
347+
async fn wait_disk_ready(ctx: &ActivityCtx, input: &WaitDiskReadyInput) -> GlobalResult<()> {
351348
// Build HTTP client
352349
let client = client::Client::new(input.api_token.clone()).await?;
353350

svc/pkg/workflow/db/workflow/migrations/20240816203112_add_sleep.up.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
CREATE TABLE workflow_sleep_events (
22
workflow_id UUID NOT NULL REFERENCES workflows,
33
location INT[] NOT NULL,
4-
until_ts INT NOT NULL,
4+
deadline_ts INT NOT NULL,
55

66
loop_location INT[],
77
forgotten BOOLEAN NOT NULL DEFAULT FALSE,

0 commit comments

Comments
 (0)