Skip to content

Commit

Permalink
first rough cut
Browse files Browse the repository at this point in the history
  • Loading branch information
srijs committed Mar 5, 2017
1 parent fb05cdf commit af1a2f8
Show file tree
Hide file tree
Showing 13 changed files with 472 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
target
Cargo.lock
10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "tokio-retry"
version = "0.0.1"
authors = ["Sam Rijs <srijs@airpost.net>"]

[dependencies]
either = "1.0.3"
futures = "0.1.9"
rand = "0.3.15"
tokio-timer = "0.1.0"
172 changes: 172 additions & 0 deletions src/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
use either::Either;
use futures::{Async, IntoFuture, Future, Poll};
use std::error::Error;
use std::cmp;
use std::fmt;
use tokio_timer::{Sleep, Timer, TimerError};

use super::strategy::RetryStrategy;

/// Represents the errors possible during the execution of the `RetryFuture`.
#[derive(Debug)]
pub enum RetryError<E> {
OperationError(E),
TimerError(TimerError)
}

impl<E: cmp::PartialEq> cmp::PartialEq for RetryError<E> {
fn eq(&self, other: &RetryError<E>) -> bool {
match (self, other) {
(&RetryError::TimerError(_), _) => false,
(_, &RetryError::TimerError(_)) => false,
(&RetryError::OperationError(ref left_err), &RetryError::OperationError(ref right_err)) =>
left_err.eq(right_err)
}
}
}

impl<E: fmt::Display> fmt::Display for RetryError<E> {
fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
RetryError::OperationError(ref err) => err.fmt(formatter),
RetryError::TimerError(ref err) => err.fmt(formatter)
}
}
}

impl<E: Error> Error for RetryError<E> {
fn description(&self) -> &str {
match *self {
RetryError::OperationError(ref err) => err.description(),
RetryError::TimerError(ref err) => err.description()
}
}

fn cause(&self) -> Option<&Error> {
match *self {
RetryError::OperationError(ref err) => Some(err),
RetryError::TimerError(ref err) => Some(err)
}
}
}

enum RetryState<A> where A: IntoFuture {
Running(A::Future),
Sleeping(Sleep)
}

/// Future that drives multiple attempts at an action via a retry strategy.
pub struct RetryFuture<S, A, F> where S: RetryStrategy, A: IntoFuture, F: FnMut() -> A {
timer: Timer,
strategy: S,
state: RetryState<A>,
action: F
}

pub fn retry<S, A, F>(strategy: S, timer: Timer, action: F) -> RetryFuture<S, A, F> where S: RetryStrategy, A: IntoFuture, F: FnMut() -> A {
RetryFuture::spawn(strategy, timer, action)
}

impl<S, A, F> RetryFuture<S, A, F> where S: RetryStrategy, A: IntoFuture, F: FnMut() -> A {
fn spawn(strategy: S, timer: Timer, mut action: F) -> RetryFuture<S, A, F> {
RetryFuture {
timer: timer,
strategy: strategy,
state: RetryState::Running(action().into_future()),
action: action
}
}

fn attempt(&mut self) -> Poll<A::Item, RetryError<A::Error>> {
let future = (self.action)().into_future();
self.state = RetryState::Running(future);
return self.poll();
}

fn retry(&mut self, err: A::Error) -> Poll<A::Item, RetryError<A::Error>> {
match self.strategy.delay() {
None => Err(RetryError::OperationError(err)),
Some(duration) => {
let future = self.timer.sleep(duration);
self.state = RetryState::Sleeping(future);
return self.poll();
}
}
}
}

impl<S, A, F> Future for RetryFuture<S, A, F> where S: RetryStrategy, A: IntoFuture, F: FnMut() -> A {
type Item = A::Item;
type Error = RetryError<A::Error>;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let result = match self.state {
RetryState::Running(ref mut future) =>
Either::Left(future.poll()),
RetryState::Sleeping(ref mut future) =>
Either::Right(future.poll().map_err(RetryError::TimerError))
};

match result {
Either::Left(poll_result) => match poll_result {
Ok(async) => Ok(async),
Err(err) => self.retry(err)
},
Either::Right(poll_result) => match poll_result? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(()) => self.attempt()
}
}
}
}

#[test]
fn attempts_just_once() {
use std::default::Default;
use super::strategies::NoRetry;
let s = NoRetry{};
let mut num_calls = 0;
let res = s.run(Timer::default(), || {
num_calls += 1;
Err::<(), u64>(42)
}).wait();

assert_eq!(res, Err(RetryError::OperationError(42)));
assert_eq!(num_calls, 1);
}

#[test]
fn attempts_until_max_retries_exceeded() {
use std::default::Default;
use std::time::Duration;
use super::strategies::FixedInterval;
let s = FixedInterval::new(Duration::from_millis(100)).limit_retries(2);
let mut num_calls = 0;
let res = s.run(Timer::default(), || {
num_calls += 1;
Err::<(), u64>(42)
}).wait();

assert_eq!(res, Err(RetryError::OperationError(42)));
assert_eq!(num_calls, 3);
}

#[test]
fn attempts_until_success() {
use std::default::Default;
use std::time::Duration;
use super::strategies::FixedInterval;
let s = FixedInterval::new(Duration::from_millis(100));
let mut num_calls = 0;
let res = s.run(Timer::default(), || {
num_calls += 1;
if num_calls < 4 {
Err::<(), u64>(42)
} else {
Ok::<(), u64>(())
}
}).wait();

assert_eq!(res, Ok(()));
assert_eq!(num_calls, 4);
}
56 changes: 56 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//! This library provides extensible asynchronous retry behaviours
//! for use with the popular [`futures`](https://crates.io/crates/futures) crate
//! and the ecosystem of [`tokio`](https://tokio.rs/) libraries.
//!
//! # Installation
//!
//! Add this to your `Cargo.toml`:
//!
//! ```toml
//! [dependencies]
//! tokio-retry = "0.1"
//! ```
//!
//! # Examples
//!
//! ```rust
//! extern crate futures;
//! extern crate tokio_timer;
//! extern crate tokio_retry;
//!
//! use std::time::Duration;
//! use std::default::Default;
//! use futures::future::Future;
//! use tokio_timer::Timer;
//! use tokio_retry::RetryStrategy;
//! use tokio_retry::strategies::ExponentialBackoff;
//!
//! fn action() -> Result<u64, ()> {
//! // do some real-world stuff here...
//! Ok(42)
//! }
//!
//! pub fn main() {
//! let retry_strategy = ExponentialBackoff::from_millis(10)
//! .limit_delay(Duration::from_millis(1000))
//! .limit_retries(3)
//! .jitter();
//! let retry_future = retry_strategy.run(Timer::default(), action);
//! let retry_result = retry_future.wait();
//!
//! assert_eq!(retry_result, Ok(42));
//! }
//! ```

extern crate either;
extern crate futures;
extern crate rand;
extern crate tokio_timer;

mod future;
mod strategy;
/// Assorted retry strategies including fixed interval and exponential back-off.
pub mod strategies;

pub use future::{RetryError, RetryFuture};
pub use strategy::*;
47 changes: 47 additions & 0 deletions src/strategies/exponential_backoff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::time::Duration;
use super::super::RetryStrategy;

/// A retry strategy driven by exponential back-off.
///
/// The power corresponds to the number of past attempts.
pub struct ExponentialBackoff {
current: u64,
base: u64
}

impl ExponentialBackoff {
/// Constructs a new exponential back-off strategy,
/// given a base duration in milliseconds.
///
/// The resulting duration is calculated by taking the base to the `n`-th power,
/// where `n` denotes the number of past attempts.
pub fn from_millis(base: u64) -> ExponentialBackoff {
ExponentialBackoff{current: base, base: base}
}
}

impl RetryStrategy for ExponentialBackoff {
fn delay(&mut self) -> Option<Duration> {
let duration = Duration::from_millis(self.current);
self.current = self.current * self.base;
return Some(duration);
}
}

#[test]
fn returns_some_exponential_base_10() {
let mut s = ExponentialBackoff::from_millis(10);

assert_eq!(s.delay(), Some(Duration::from_millis(10)));
assert_eq!(s.delay(), Some(Duration::from_millis(100)));
assert_eq!(s.delay(), Some(Duration::from_millis(1000)));
}

#[test]
fn returns_some_exponential_base_2() {
let mut s = ExponentialBackoff::from_millis(2);

assert_eq!(s.delay(), Some(Duration::from_millis(2)));
assert_eq!(s.delay(), Some(Duration::from_millis(4)));
assert_eq!(s.delay(), Some(Duration::from_millis(8)));
}
29 changes: 29 additions & 0 deletions src/strategies/fixed_interval.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::time::Duration;
use super::super::RetryStrategy;

/// A retry strategy driven by a fixed interval.
pub struct FixedInterval {
duration: Duration
}

impl FixedInterval {
/// Constructs a new fixed interval strategy.
pub fn new(duration: Duration) -> FixedInterval {
FixedInterval{duration: duration}
}
}

impl RetryStrategy for FixedInterval {
fn delay(&mut self) -> Option<Duration> {
Some(self.duration)
}
}

#[test]
fn returns_some_fixed() {
let mut s = FixedInterval::new(Duration::from_millis(123));

assert_eq!(s.delay(), Some(Duration::from_millis(123)));
assert_eq!(s.delay(), Some(Duration::from_millis(123)));
assert_eq!(s.delay(), Some(Duration::from_millis(123)));
}
7 changes: 7 additions & 0 deletions src/strategies/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod no_retry;
mod fixed_interval;
mod exponential_backoff;

pub use self::no_retry::NoRetry;
pub use self::fixed_interval::FixedInterval;
pub use self::exponential_backoff::ExponentialBackoff;
18 changes: 18 additions & 0 deletions src/strategies/no_retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use std::time::Duration;
use super::super::RetryStrategy;

/// A retry strategy that will not attempt any retries.
pub struct NoRetry {}

impl RetryStrategy for NoRetry {
fn delay(&mut self) -> Option<Duration> {
None
}
}

#[test]
fn returns_none() {
let mut s = NoRetry{};

assert_eq!(s.delay(), None);
}
3 changes: 3 additions & 0 deletions src/strategy/decorators.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub use super::jittered::Jittered;
pub use super::limited_retries::LimitedRetries;
pub use super::limited_delay::LimitedDelay;
24 changes: 24 additions & 0 deletions src/strategy/jittered.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::time::Duration;
use rand::{random, Closed01};

use super::RetryStrategy;

/// A decorator adding full random jitter to a retry strategy.
pub struct Jittered<S: RetryStrategy> {
inner: S
}

pub fn jitter<S: RetryStrategy>(inner: S) -> Jittered<S> {
Jittered{inner: inner}
}

impl<S: RetryStrategy> RetryStrategy for Jittered<S> {
fn delay(&mut self) -> Option<Duration> {
self.inner.delay().map(|duration| {
let Closed01(jitter) = random::<Closed01<f64>>();
let secs = ((duration.as_secs() as f64) * jitter).ceil() as u64;
let nanos = ((duration.subsec_nanos() as f64) * jitter).ceil() as u32;
return Duration::new(secs, nanos);
})
}
}
Loading

0 comments on commit af1a2f8

Please sign in to comment.