Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* allow for exponential backoff exponent to be configured [#5](https://github.com/softprops/again/pull/5)
* add `collect` [#6](https://github.com/softprops/again/pull/6)
* add `collect_and_retry` [#7](https://github.com/softprops/again/pull/7)

# 0.1.2

Expand Down
167 changes: 166 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,35 @@ where
.await
}

/// Reruns and collects the results of a `Future`, if successful, with a default `RetryPolicy`
/// under a certain provided success condition. Also retries the `Future`, if
/// not successful under the same policy configuration and the provided error condition.
///
/// ```
/// again::collect_and_retry(
/// |input: u32| async move { Ok::<u32, u32>(input + 1) },
/// |result: &u32| if *result < 2 { Some(*result) } else { None },
/// |err: &u32| *err > 1,
/// 0 as u32,
/// );
/// ```
pub async fn collect_and_retry<T, C, D, S>(
task: T,
success_condition: C,
error_condition: D,
start_value: S,
) -> Result<Vec<T::Item>, T::Error>
where
T: TaskWithParameter<S>,
C: SuccessCondition<T::Item, S>,
D: Condition<T::Error>,
S: Clone,
{
RetryPolicy::default()
.collect_and_retry(task, success_condition, error_condition, start_value)
.await
}

#[derive(Clone, Copy)]
enum Backoff {
Fixed,
Expand Down Expand Up @@ -370,6 +399,77 @@ impl RetryPolicy {
}
}

/// Reruns and collects the results of a `Future`, if successful, with this policy's
/// configuration under a certain provided success condition. Also retries the `Future`, if
/// not successful under the same policy configuration and the provided error condition.
pub async fn collect_and_retry<T, C, D, S>(
&self,
task: T,
success_condition: C,
error_condition: D,
start_value: S,
) -> Result<Vec<T::Item>, T::Error>
where
T: TaskWithParameter<S>,
C: SuccessCondition<T::Item, S>,
D: Condition<T::Error>,
S: Clone,
{
let mut success_backoffs = self.backoffs();
let mut error_backoffs = self.backoffs();
let mut success_condition = success_condition;
let mut error_condition = error_condition;
let mut task = task;
let mut results = vec![];
let mut input = start_value.clone();
let mut last_result = start_value;

loop {
match task.call(input).await {
Ok(result) => {
let maybe_new_input = success_condition.retry_with(&result);
results.push(result);

if let Some(new_input) = maybe_new_input {
if let Some(delay) = success_backoffs.next() {
#[cfg(feature = "log")]
{
log::trace!(
"task succeeded and condition is met. will run again in {:?}",
delay
);
}
let _ = Delay::new(delay).await;
input = new_input.clone();
last_result = new_input;
continue;
}
}

return Ok(results);
}
Err(err) => {
if error_condition.is_retryable(&err) {
if let Some(delay) = error_backoffs.next() {
#[cfg(feature = "log")]
{
log::trace!(
"task failed with error {:?}. will try again in {:?}",
err,
delay
);
}
let _ = Delay::new(delay).await;
input = last_result.clone();
continue;
}
}
return Err(err);
}
}
}
}

/// Retries a fallible `Future` with this policy's configuration under certain provided conditions
pub async fn retry_if<T, C>(
&self,
Expand Down Expand Up @@ -635,7 +735,7 @@ mod tests {
}

#[tokio::test]
async fn collect_retries_when_condition_is_not_met() -> Result<(), Box<dyn Error>> {
async fn collect_does_not_retry_when_condition_is_not_met() -> Result<(), Box<dyn Error>> {
let result = RetryPolicy::fixed(Duration::from_millis(1))
.collect(
|input: u32| async move { Ok::<u32, ()>(input + 1) },
Expand All @@ -647,6 +747,71 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn collect_and_retry_retries_when_success_condition_is_met() -> Result<(), Box<dyn Error>>
{
let result = RetryPolicy::fixed(Duration::from_millis(1))
.collect_and_retry(
|input: u32| async move { Ok::<u32, u32>(input + 1) },
|result: &u32| if *result < 2 { Some(*result) } else { None },
|err: &u32| *err > 1,
0 as u32,
)
.await;
assert_eq!(result, Ok(vec![1, 2]));
Ok(())
}

#[tokio::test]
async fn collect_and_retry_does_not_retry_when_success_condition_is_not_met(
) -> Result<(), Box<dyn Error>> {
let result = RetryPolicy::fixed(Duration::from_millis(1))
.collect_and_retry(
|input: u32| async move { Ok::<u32, u32>(input + 1) },
|result: &u32| if *result < 1 { Some(*result) } else { None },
|err: &u32| *err > 1,
0 as u32,
)
.await;
assert_eq!(result, Ok(vec![1]));
Ok(())
}

#[tokio::test]
async fn collect_and_retry_retries_when_error_condition_is_met() -> Result<(), Box<dyn Error>> {
let mut task_ran = 0;
let _ = RetryPolicy::fixed(Duration::from_millis(1))
.collect_and_retry(
|_input: u32| {
task_ran += 1;
async move { Err::<u32, u32>(0) }
},
|result: &u32| if *result < 2 { Some(*result) } else { None },
|err: &u32| *err == 0,
0 as u32,
)
.await;
// Default for retry policy is 5, so we end up with the task being
// retries 5 times and being run 6 times.
assert_eq!(task_ran, 6);
Ok(())
}

#[tokio::test]
async fn collect_and_retry_does_not_retry_when_error_condition_is_not_met(
) -> Result<(), Box<dyn Error>> {
let result = RetryPolicy::fixed(Duration::from_millis(1))
.collect_and_retry(
|input: u32| async move { Err::<u32, u32>(input + 1) },
|result: &u32| if *result < 1 { Some(*result) } else { None },
|err: &u32| *err > 1,
0 as u32,
)
.await;
assert_eq!(result, Err(1));
Ok(())
}

#[tokio::test]
async fn ok_futures_yield_ok() -> Result<(), Box<dyn Error>> {
let result = RetryPolicy::default()
Expand Down