Skip to content

Commit

Permalink
Add jitter to retries
Browse files Browse the repository at this point in the history
Fixes #912
  • Loading branch information
AhmedSoliman committed Mar 7, 2024
1 parent ad55dda commit 870177c
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 55 deletions.
7 changes: 3 additions & 4 deletions crates/network/src/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use std::time::Duration;

use rand::Rng;
use restate_types::retries::with_jitter;
use tracing::{info, instrument, trace};

use restate_core::metadata;
Expand Down Expand Up @@ -138,10 +138,9 @@ impl NetworkSender for Networking {
}
}

// todo: replace with RetryPolicy
async fn sleep_with_jitter(duration: Duration) {
let max_jitter = duration.as_millis() * 2;
let jitter = rand::thread_rng().gen_range(1..max_jitter);
let retry_after = Duration::from_millis((duration.as_millis() + jitter) as u64);
let retry_after = with_jitter(duration, 0.3);
trace!("sleeping for {:?}", retry_after);
tokio::time::sleep(retry_after).await;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ publish = false
[features]
default = []

mocks = ["rand"]
mocks = []
serde = ["dep:serde", "dep:serde_with", "enumset/serde"]
serde_schema = ["serde", "dep:schemars"]
tonic_conversions = ["dep:tonic"]
Expand All @@ -29,7 +29,7 @@ http = { workspace = true }
humantime = { workspace = true }
once_cell = { workspace = true }
opentelemetry_api = { workspace = true }
rand = { workspace = true, optional = true }
rand = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true, optional = true }
serde_with = { workspace = true, optional = true }
Expand Down
133 changes: 84 additions & 49 deletions crates/types/src/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ use std::cmp;
use std::future::Future;
use std::time::Duration;

use rand::Rng;

const DEFAULT_JITTER_MULTIPLIER: f32 = 0.3;

/// This struct represents the policy to execute retries.
///
/// It can be used in components which needs to configure policies to execute retries.
Expand Down Expand Up @@ -191,6 +195,7 @@ pub struct RetryIter {
impl Iterator for RetryIter {
type Item = Duration;

/// adds up to 1/3 target duration as jitter
fn next(&mut self) -> Option<Self::Item> {
self.attempts += 1;
match self.policy {
Expand All @@ -202,7 +207,7 @@ impl Iterator for RetryIter {
if self.attempts > max_attempts {
None
} else {
Some(interval.into())
Some(with_jitter(interval.into(), DEFAULT_JITTER_MULTIPLIER))
}
}
RetryPolicy::Exponential {
Expand All @@ -219,10 +224,10 @@ impl Iterator for RetryIter {
max_interval.map(Into::into).unwrap_or(Duration::MAX),
);
self.last_retry = Some(new_retry);
return Some(new_retry);
return Some(with_jitter(new_retry, DEFAULT_JITTER_MULTIPLIER));
} else {
self.last_retry = Some(*initial_interval);
return Some(*initial_interval);
return Some(with_jitter(*initial_interval, DEFAULT_JITTER_MULTIPLIER));
}
}
}
Expand All @@ -241,6 +246,23 @@ impl Iterator for RetryIter {
}
}

// Jitter is a random duration added to the desired target, it ranges from 3ms to
// (max_multiplier * duration) of the original requested delay. The minimum of +2ms
// is to avoid falling into common zero-ending values (0, 10, 100, etc.) which are
// common cause of harmonics in systems (avoiding resonance frequencies)
static MIN_JITTER: Duration = Duration::from_millis(3);

pub fn with_jitter(duration: Duration, max_multiplier: f32) -> Duration {
let max_jitter = duration.mul_f32(max_multiplier);
if max_jitter <= MIN_JITTER {
// We can't get a random value unless max_jitter is higher than MIN_JITTER.
duration + MIN_JITTER
} else {
let jitter = rand::thread_rng().gen_range(MIN_JITTER..max_jitter);
duration + jitter
}
}

impl ExactSizeIterator for RetryIter {}

#[cfg(test)]
Expand All @@ -257,60 +279,73 @@ mod tests {

#[test]
fn fixed_delay_retry_policy() {
assert_eq!(
vec![Duration::from_millis(100); 10],
RetryPolicy::fixed_delay(Duration::from_millis(100), 10)
.into_iter()
.collect::<Vec<_>>()
)
let expected = vec![Duration::from_millis(100); 10];
let actuals = RetryPolicy::fixed_delay(Duration::from_millis(100), 10)
.into_iter()
.collect::<Vec<_>>();
assert_eq!(actuals.len(), expected.len());
for (expected, actual) in expected.iter().zip(actuals.iter()) {
assert!(within_jitter(*expected, *actual, DEFAULT_JITTER_MULTIPLIER));
}
}

#[test]
fn exponential_retry_policy() {
assert_eq!(
vec![
// Manually building this powers to avoid rounding issues :)
Duration::from_millis(100),
Duration::from_millis(100).mul_f32(2.0),
Duration::from_millis(100).mul_f32(2.0).mul_f32(2.0),
Duration::from_millis(100)
.mul_f32(2.0)
.mul_f32(2.0)
.mul_f32(2.0),
Duration::from_millis(100)
.mul_f32(2.0)
.mul_f32(2.0)
.mul_f32(2.0)
.mul_f32(2.0)
],
RetryPolicy::exponential(Duration::from_millis(100), 2.0, 5, None)
.into_iter()
.collect::<Vec<_>>()
)
let expected = vec![
// Manually building this powers to avoid rounding issues :)
Duration::from_millis(100),
Duration::from_millis(100).mul_f32(2.0),
Duration::from_millis(100).mul_f32(2.0).mul_f32(2.0),
Duration::from_millis(100)
.mul_f32(2.0)
.mul_f32(2.0)
.mul_f32(2.0),
Duration::from_millis(100)
.mul_f32(2.0)
.mul_f32(2.0)
.mul_f32(2.0)
.mul_f32(2.0),
];

let actuals = RetryPolicy::exponential(Duration::from_millis(100), 2.0, 5, None)
.into_iter()
.collect::<Vec<_>>();
assert_eq!(actuals.len(), expected.len());
for (expected, actual) in expected.iter().zip(actuals.iter()) {
assert!(within_jitter(*expected, *actual, DEFAULT_JITTER_MULTIPLIER));
}
}

#[test]
fn exponential_retry_policy_with_max_interval() {
assert_eq!(
vec![
// Manually building this powers to avoid rounding issues :)
Duration::from_millis(100),
Duration::from_millis(100).mul_f32(2.0),
Duration::from_millis(100).mul_f32(2.0).mul_f32(2.0),
Duration::from_millis(100)
.mul_f32(2.0)
.mul_f32(2.0)
.mul_f32(2.0),
Duration::from_secs(1)
],
RetryPolicy::exponential(
Duration::from_millis(100),
2.0,
5,
Some(Duration::from_secs(1))
)
.into_iter()
.collect::<Vec<_>>()
let expected = vec![
// Manually building this powers to avoid rounding issues :)
Duration::from_millis(100),
Duration::from_millis(100).mul_f32(2.0),
Duration::from_millis(100).mul_f32(2.0).mul_f32(2.0),
Duration::from_millis(100)
.mul_f32(2.0)
.mul_f32(2.0)
.mul_f32(2.0),
Duration::from_secs(1),
];
let actuals = RetryPolicy::exponential(
Duration::from_millis(100),
2.0,
5,
Some(Duration::from_secs(1)),
)
.into_iter()
.collect::<Vec<_>>();
assert_eq!(actuals.len(), expected.len());
for (expected, actual) in expected.iter().zip(actuals.iter()) {
assert!(within_jitter(*expected, *actual, DEFAULT_JITTER_MULTIPLIER));
}
}

fn within_jitter(expected: Duration, actual: Duration, max_multiplier: f32) -> bool {
let min_inc_jitter = expected + MIN_JITTER;
let max_inc_jitter = expected + expected.mul_f32(max_multiplier);
actual >= min_inc_jitter && actual <= max_inc_jitter
}
}

0 comments on commit 870177c

Please sign in to comment.