Skip to content

Commit f55f356

Browse files
committed
feat(workflows): implement retry backoff for activity errors
1 parent 0e452fe commit f55f356

File tree

5 files changed

+87
-22
lines changed

5 files changed

+87
-22
lines changed

lib/chirp-workflow/core/src/ctx/workflow.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use crate::{
2121
workflow::{Workflow, WorkflowInput},
2222
};
2323

24-
// Time to delay a worker from retrying after an error
25-
const RETRY_TIMEOUT: Duration = Duration::from_millis(2000);
24+
// Time to delay a workflow from retrying after an error
25+
pub const RETRY_TIMEOUT_MS: usize = 2000;
2626
// Poll interval when polling for signals in-process
2727
const SIGNAL_RETRY: Duration = Duration::from_millis(100);
2828
// Most in-process signal poll tries
@@ -200,8 +200,10 @@ impl WorkflowCtx {
200200
}
201201
Err(err) => {
202202
// Retry the workflow if its recoverable
203-
let deadline = if err.is_recoverable() {
204-
Some(rivet_util::timestamp::now() + RETRY_TIMEOUT.as_millis() as i64)
203+
let deadline_ts = if let Some(deadline_ts) = err.backoff() {
204+
Some(deadline_ts)
205+
} else if err.is_recoverable() {
206+
Some(rivet_util::timestamp::now() + RETRY_TIMEOUT_MS as i64)
205207
} else {
206208
None
207209
};
@@ -214,7 +216,7 @@ impl WorkflowCtx {
214216
// finish. This workflow will be retried when the sub workflow completes
215217
let wake_sub_workflow = err.sub_workflow();
216218

217-
if deadline.is_some() || !wake_signals.is_empty() || wake_sub_workflow.is_some() {
219+
if deadline_ts.is_some() || !wake_signals.is_empty() || wake_sub_workflow.is_some() {
218220
tracing::info!(name=%self.name, id=%self.workflow_id, ?err, "workflow sleeping");
219221
} else {
220222
tracing::error!(name=%self.name, id=%self.workflow_id, ?err, "workflow error");
@@ -236,7 +238,7 @@ impl WorkflowCtx {
236238
.fail_workflow(
237239
self.workflow_id,
238240
false,
239-
deadline,
241+
deadline_ts,
240242
wake_signals,
241243
wake_sub_workflow,
242244
&err_str,
@@ -319,7 +321,7 @@ impl WorkflowCtx {
319321
)
320322
.await?;
321323

322-
Err(WorkflowError::ActivityFailure(err))
324+
Err(WorkflowError::ActivityFailure(err, 0))
323325
}
324326
Err(err) => {
325327
tracing::debug!("activity timeout");
@@ -643,11 +645,12 @@ impl WorkflowCtx {
643645
// Convert error in the case of max retries exceeded. This will only act on retryable
644646
// errors
645647
let err = match err {
646-
WorkflowError::ActivityFailure(err) => {
648+
WorkflowError::ActivityFailure(err, _) => {
647649
if error_count + 1 >= I::Activity::MAX_RETRIES {
648650
WorkflowError::ActivityMaxFailuresReached(err)
649651
} else {
650-
WorkflowError::ActivityFailure(err)
652+
// Add error count to the error
653+
WorkflowError::ActivityFailure(err, error_count)
651654
}
652655
}
653656
WorkflowError::ActivityTimeout | WorkflowError::OperationTimeout => {

lib/chirp-workflow/core/src/error.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
1+
use std::time::{SystemTime, UNIX_EPOCH};
2+
3+
use tokio::time::Instant;
14
use global_error::GlobalError;
25
use uuid::Uuid;
36

7+
use crate::ctx::workflow::RETRY_TIMEOUT_MS;
8+
49
pub type WorkflowResult<T> = Result<T, WorkflowError>;
510

611
/// Throwing this will eject from the workflow scope back in to the engine.
@@ -12,8 +17,9 @@ pub enum WorkflowError {
1217
#[error("workflow failure: {0:?}")]
1318
WorkflowFailure(GlobalError),
1419

20+
// Includes error count
1521
#[error("activity failure: {0:?}")]
16-
ActivityFailure(GlobalError),
22+
ActivityFailure(GlobalError, u32),
1723

1824
#[error("activity failure, max retries reached: {0:?}")]
1925
ActivityMaxFailuresReached(GlobalError),
@@ -119,9 +125,31 @@ pub enum WorkflowError {
119125
}
120126

121127
impl WorkflowError {
128+
pub fn backoff(&self) -> Option<i64> {
129+
if let WorkflowError::ActivityFailure(_, error_count) = self {
130+
// NOTE: Max retry is handled in `WorkflowCtx::activity`
131+
let mut backoff =
132+
rivet_util::Backoff::new_at(8, None, RETRY_TIMEOUT_MS, 500, *error_count as usize);
133+
let next = backoff.step().expect("should not have max retry");
134+
135+
// Calculate timestamp based on the backoff
136+
let duration_until = next.duration_since(Instant::now());
137+
let deadline_ts = (SystemTime::now() + duration_until)
138+
.duration_since(UNIX_EPOCH)
139+
.unwrap_or_else(|err| unreachable!("time is broken: {}", err))
140+
.as_millis()
141+
.try_into()
142+
.expect("doesn't fit in i64");
143+
144+
Some(deadline_ts)
145+
} else {
146+
None
147+
}
148+
}
149+
122150
pub fn is_recoverable(&self) -> bool {
123151
match self {
124-
WorkflowError::ActivityFailure(_) => true,
152+
WorkflowError::ActivityFailure(_, _) => true,
125153
WorkflowError::ActivityTimeout => true,
126154
WorkflowError::OperationTimeout => true,
127155
_ => false,

lib/chirp/client/src/client.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -411,17 +411,17 @@ impl Client {
411411
{
412412
tracing::warn!(tick_index = %service_unavailable_backoff.tick_index(), "service unavailable");
413413
if service_unavailable_backoff.tick().await {
414-
return Err(ClientError::NatsResponseStatus(status));
415-
} else {
416414
continue 'req;
415+
} else {
416+
return Err(ClientError::NatsResponseStatus(status));
417417
}
418418
}
419419
Err(ClientError::RpcAckTimedOut) => {
420420
tracing::warn!(tick_index = %service_unavailable_backoff.tick_index(), "rpc ack timed out");
421421
if service_unavailable_backoff.tick().await {
422-
return Err(ClientError::RpcAckTimedOut);
423-
} else {
424422
continue 'req;
423+
} else {
424+
return Err(ClientError::RpcAckTimedOut);
425425
}
426426
}
427427
Err(err) => {

lib/chirp/worker/src/manager.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -890,10 +890,10 @@ where
890890
tracing::info!("ticking request retry backoff");
891891

892892
if backoff.tick().await {
893+
tracing::info!("retrying request");
894+
} else {
893895
tracing::warn!("retry request failed too many times");
894896
return res;
895-
} else {
896-
tracing::info!("retrying request");
897897
}
898898
} else {
899899
// Return result immediately
@@ -1040,7 +1040,7 @@ where
10401040
async fn consumer_ack(self: Arc<Self>, msg_meta: RedisMessageMeta) {
10411041
// let mut backoff = rivet_util::Backoff::default();
10421042
// loop {
1043-
// if backoff.tick().await {
1043+
// if !backoff.tick().await {
10441044
// tracing::error!("acking stream message failed too many times, aborting");
10451045
// return;
10461046
// }

lib/util/core/src/lib.rs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,17 +150,34 @@ impl Backoff {
150150
sleep_until: Instant::now(),
151151
}
152152
}
153+
154+
pub fn new_at(
155+
max_exponent: usize,
156+
max_retries: Option<usize>,
157+
wait: usize,
158+
randomness: usize,
159+
i: usize,
160+
) -> Backoff {
161+
Backoff {
162+
max_exponent,
163+
max_retries,
164+
wait,
165+
randomness,
166+
i,
167+
sleep_until: Instant::now(),
168+
}
169+
}
153170

154171
pub fn tick_index(&self) -> usize {
155172
self.i
156173
}
157174

158175
/// Waits for the next backoff tick.
159176
///
160-
/// Returns true if the index is greater than `max_retries`.
177+
/// Returns false if the index is greater than `max_retries`.
161178
pub async fn tick(&mut self) -> bool {
162179
if self.max_retries.map_or(false, |x| self.i > x) {
163-
return true;
180+
return false;
164181
}
165182

166183
tokio::time::sleep_until(self.sleep_until.into()).await;
@@ -171,7 +188,24 @@ impl Backoff {
171188

172189
self.i += 1;
173190

174-
false
191+
true
192+
}
193+
194+
/// Returns the instant of the next backoff tick. Does not wait.
195+
///
196+
/// Returns None if the index is greater than `max_retries`.
197+
pub fn step(&mut self) -> Option<Instant> {
198+
if self.max_retries.map_or(false, |x| self.i > x) {
199+
return None;
200+
}
201+
202+
let next_wait = self.wait * 2usize.pow(self.i.min(self.max_exponent) as u32)
203+
+ rand::thread_rng().gen_range(0..self.randomness);
204+
self.sleep_until += Duration::from_millis(next_wait as u64);
205+
206+
self.i += 1;
207+
208+
Some(self.sleep_until)
175209
}
176210

177211
pub fn default_infinite() -> Backoff {
@@ -202,7 +236,7 @@ mod tests {
202236
last_tick = now;
203237
println!("tick: {}", dt.as_secs_f64());
204238

205-
if backoff.tick().await {
239+
if !backoff.tick().await {
206240
println!("cancelling");
207241
break;
208242
}

0 commit comments

Comments
 (0)