Skip to content

Commit

Permalink
Add side effect callbacks during retry (#106)
Browse files Browse the repository at this point in the history
Co-authored-by: Adam Warski <adam@warski.org>
  • Loading branch information
DybekK and adamw authored Mar 25, 2024
1 parent 18d79f2 commit 072f61c
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 35 deletions.
10 changes: 9 additions & 1 deletion core/src/main/scala/ox/retry/RetryPolicy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,21 @@ import scala.concurrent.duration.*
* @param resultPolicy
* A policy that allows to customize when a non-erroneous result is considered successful and when an error is worth retrying (which
* allows for failing fast on certain errors). See [[ResultPolicy]] for more details.
* @param onRetry
* A function that is invoked after each retry attempt. The callback receives the number of the current retry attempt (starting from 1)
* and the result of the operation that was attempted. The result is either a successful value or an error. The callback can be used to
* log information about the retry attempts, or to perform other side effects. By default, the callback does nothing.
* @tparam E
* The error type of the operation. For operations returning a `T` or a `Try[T]`, this is fixed to `Throwable`. For operations returning
* an `Either[E, T]`, this can be any `E`.
* @tparam T
* The successful result type for the operation.
*/
case class RetryPolicy[E, T](schedule: Schedule, resultPolicy: ResultPolicy[E, T] = ResultPolicy.default[E, T])
case class RetryPolicy[E, T](
schedule: Schedule,
resultPolicy: ResultPolicy[E, T] = ResultPolicy.default[E, T],
onRetry: (Int, Either[E, T]) => Unit = (_: Int, _: Either[E, T]) => ()
)

object RetryPolicy:
/** Creates a policy that retries up to a given number of times, with no delay between subsequent attempts, using a default
Expand Down
13 changes: 9 additions & 4 deletions core/src/main/scala/ox/retry/retry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ def retry[T](operation: => T)(policy: RetryPolicy[Throwable, T]): T =
* @return
* A [[scala.util.Right]] if the function eventually succeeds, or, otherwise, a [[scala.util.Left]] with the error from the last attempt.
*/
def retryEither[E, T](operation: => Either[E, T])(policy: RetryPolicy[E, T]): Either[E, T] = retry(EitherMode[E])(operation)(policy)
def retryEither[E, T](operation: => Either[E, T])(policy: RetryPolicy[E, T]): Either[E, T] =
retryWithErrorMode(EitherMode[E])(operation)(policy)

/** Retries an operation using the given error mode until it succeeds or the policy decides to stop. Note that any exceptions thrown by the
* operation aren't caught (unless the operation catches them as part of its implementation) and don't cause a retry to happen.
Expand All @@ -46,23 +47,27 @@ def retryEither[E, T](operation: => Either[E, T])(policy: RetryPolicy[E, T]): Ei
* - the result of the function if it eventually succeeds, in the context of `F`, as dictated by the error mode.
* - the error `E` in context `F` as returned by the last attempt if the policy decides to stop.
*/
def retry[E, F[_], T](em: ErrorMode[E, F])(operation: => F[T])(policy: RetryPolicy[E, T]): F[T] =
def retryWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(operation: => F[T])(policy: RetryPolicy[E, T]): F[T] =
@tailrec
def loop(attempt: Int, remainingAttempts: Option[Int], lastDelay: Option[FiniteDuration]): F[T] =
def sleepIfNeeded =
val delay = policy.schedule.nextDelay(attempt + 1, lastDelay).toMillis
val delay = policy.schedule.nextDelay(attempt, lastDelay).toMillis
if (delay > 0) Thread.sleep(delay)
delay

operation match
case v if em.isError(v) =>
val error = em.getError(v)
policy.onRetry(attempt, Left(error))

if policy.resultPolicy.isWorthRetrying(error) && remainingAttempts.forall(_ > 0) then
val delay = sleepIfNeeded
loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay.millis))
else v
case v =>
val result = em.getT(v)
policy.onRetry(attempt, Right(result))

if !policy.resultPolicy.isSuccess(result) && remainingAttempts.forall(_ > 0) then
val delay = sleepIfNeeded
loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay.millis))
Expand All @@ -72,4 +77,4 @@ def retry[E, F[_], T](em: ErrorMode[E, F])(operation: => F[T])(policy: RetryPoli
case finiteSchedule: Schedule.Finite => Some(finiteSchedule.maxRetries)
case _ => None

loop(0, remainingAttempts, None)
loop(1, remainingAttempts, None)
3 changes: 1 addition & 2 deletions core/src/main/scala/ox/syntax.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ object syntax:
def race(f2: => T): T = ox.race(f, f2)
def raceResultWith(f2: => T): T = ox.raceResult(f, f2)

extension [T <: AutoCloseable](f: => T)(using Ox)
def useInScope: T = ox.useCloseableInScope(f)
extension [T <: AutoCloseable](f: => T)(using Ox) def useInScope: T = ox.useCloseableInScope(f)

extension [I, C[E] <: Iterable[E]](f: => C[I])
def mapPar[O](parallelism: Int)(transform: I => O): C[O] = ox.mapPar(parallelism)(f)(transform)
Expand Down
63 changes: 63 additions & 0 deletions core/src/test/scala/ox/retry/OnRetryTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package ox.retry

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{EitherValues, TryValues}
import ox.retry.*

class OnRetryTest extends AnyFlatSpec with Matchers with EitherValues with TryValues:
behavior of "RetryPolicy onRetry callback"

it should "retry a succeeding function with onRetry callback" in {
// given
var onRetryInvocationCount = 0

var counter = 0
val successfulResult = 42

def f =
counter += 1
successfulResult

var returnedResult: Either[Throwable, Int] = null
def onRetry(attempt: Int, result: Either[Throwable, Int]): Unit =
onRetryInvocationCount += 1
returnedResult = result

// when
val result = retry(f)(RetryPolicy(Schedule.Immediate(3), onRetry = onRetry))

// then
result shouldBe successfulResult
counter shouldBe 1

onRetryInvocationCount shouldBe 1
returnedResult shouldBe Right(successfulResult)
}

it should "retry a failing function with onRetry callback" in {
// given
var onRetryInvocationCount = 0

var counter = 0
val failedResult = new RuntimeException("boom")

def f =
counter += 1
if true then throw failedResult

var returnedResult: Either[Throwable, Unit] = null
def onRetry(attempt: Int, result: Either[Throwable, Unit]): Unit =
onRetryInvocationCount += 1
returnedResult = result

// when
val result = the[RuntimeException] thrownBy retry(f)(RetryPolicy(Schedule.Immediate(3), onRetry = onRetry))

// then
result shouldBe failedResult
counter shouldBe 4

onRetryInvocationCount shouldBe 4
returnedResult shouldBe Left(failedResult)
}
96 changes: 72 additions & 24 deletions doc/retries.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Retries

The retries mechanism allows to retry a failing operation according to a given policy (e.g. retry 3 times with a 100ms delay between attempts).
The retries mechanism allows to retry a failing operation according to a given policy (e.g. retry 3 times with a 100ms
delay between attempts).

## API

The basic syntax for retries is:

```scala
Expand All @@ -28,52 +30,95 @@ which accepts arbitrary [error modes](error-handling.md), accepting the computat

## Policies

A retry policy consists of two parts:
- a `Schedule`, which indicates how many times and with what delay should we retry the `operation` after an initial failure,
A retry policy consists of three parts:

- a `Schedule`, which indicates how many times and with what delay should we retry the `operation` after an initial
failure,
- a `ResultPolicy`, which indicates whether:
- a non-erroneous outcome of the `operation` should be considered a success (if not, the `operation` would be retried),
- an erroneous outcome of the `operation` should be retried or fail fast.
- a non-erroneous outcome of the `operation` should be considered a success (if not, the `operation` would be
retried),
- an erroneous outcome of the `operation` should be retried or fail fast.
- a `onRetry`, which is a callback function that is invoked after each attempt to execute the operation. It is used to
perform any necessary actions or checks after each attempt, regardless of whether the attempt was successful or not.

The available schedules are defined in the `Schedule` object. Each schedule has a finite and an infinite variant.

### Finite schedules

Finite schedules have a common `maxRetries: Int` parameter, which determines how many times the `operation` would be retried after an initial failure. This means that the operation could be executed at most `maxRetries + 1` times.
Finite schedules have a common `maxRetries: Int` parameter, which determines how many times the `operation` would be
retried after an initial failure. This means that the operation could be executed at most `maxRetries + 1` times.

### Infinite schedules

Each finite schedule has an infinite variant, whose settings are similar to those of the respective finite schedule, but without the `maxRetries` setting. Using the infinite variant can lead to a possibly infinite number of retries (unless the `operation` starts to succeed again at some point). The infinite schedules are created by calling `.forever` on the companion object of the respective finite schedule (see examples below).
Each finite schedule has an infinite variant, whose settings are similar to those of the respective finite schedule, but
without the `maxRetries` setting. Using the infinite variant can lead to a possibly infinite number of retries (unless
the `operation` starts to succeed again at some point). The infinite schedules are created by calling `.forever` on the
companion object of the respective finite schedule (see examples below).

### Schedule types

The supported schedules (specifically - their finite variants) are:

- `Immediate(maxRetries: Int)` - retries up to `maxRetries` times without any delay between subsequent attempts.
- `Delay(maxRetries: Int, delay: FiniteDuration)` - retries up to `maxRetries` times , sleeping for `delay` between subsequent attempts.
- `Backoff(maxRetries: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)` - retries up to `maxRetries` times , sleeping for `initialDelay` before the first retry, increasing the sleep between subsequent attempts exponentially (with base `2`) up to an optional `maxDelay` (default: 1 minute).
- `Delay(maxRetries: Int, delay: FiniteDuration)` - retries up to `maxRetries` times , sleeping for `delay` between
subsequent attempts.
- `Backoff(maxRetries: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)` - retries up
to `maxRetries` times , sleeping for `initialDelay` before the first retry, increasing the sleep between subsequent
attempts exponentially (with base `2`) up to an optional `maxDelay` (default: 1 minute).

Optionally, a random factor (jitter) can be used when calculating the delay before the next attempt. The purpose of jitter is to avoid clustering of subsequent retries, i.e. to reduce the number of clients calling a service exactly at the same time, which can result in subsequent failures, contrary to what you would expect from retrying. By introducing randomness to the delays, the retries become more evenly distributed over time.
Optionally, a random factor (jitter) can be used when calculating the delay before the next attempt. The purpose of
jitter is to avoid clustering of subsequent retries, i.e. to reduce the number of clients calling a service exactly at
the same time, which can result in subsequent failures, contrary to what you would expect from retrying. By
introducing randomness to the delays, the retries become more evenly distributed over time.

See the [AWS Architecture Blog article on backoff and jitter](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/) for a more in-depth explanation.
See
the [AWS Architecture Blog article on backoff and jitter](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/)
for a more in-depth explanation.

The following jitter strategies are available (defined in the `Jitter` enum):
- `None` - the default one, when no randomness is added, i.e. a pure exponential backoff is used,
- `Full` - picks a random value between `0` and the exponential backoff calculated for the current attempt,
- `Equal` - similar to `Full`, but prevents very short delays by always using a half of the original backoff and adding a random value between `0` and the other half,
- `Decorrelated` - uses the delay from the previous attempt (`lastDelay`) and picks a random value between the `initalAttempt` and `3 * lastDelay`.
- `None` - the default one, when no randomness is added, i.e. a pure exponential backoff is used,
- `Full` - picks a random value between `0` and the exponential backoff calculated for the current attempt,
- `Equal` - similar to `Full`, but prevents very short delays by always using a half of the original backoff and
adding a random value between `0` and the other half,
- `Decorrelated` - uses the delay from the previous attempt (`lastDelay`) and picks a random value between
the `initalAttempt` and `3 * lastDelay`.

### Result policies

A result policy allows to customize how the results of the `operation` are treated. It consists of two predicates:
- `isSuccess: T => Boolean` (default: `true`) - determines whether a non-erroneous result of the `operation` should be considered a success. When it evaluates to `true` - no further attempts would be made, otherwise - we'd keep retrying.

With finite schedules (i.e. those with `maxRetries` defined), if `isSuccess` keeps returning `false` when `maxRetries` are reached, the result is returned as-is, even though it's considered "unsuccessful",
- `isWorthRetrying: E => Boolean` (default: `true`) - determines whether another attempt would be made if the `operation` results in an error `E`. When it evaluates to `true` - we'd keep retrying, otherwise - we'd fail fast with the error.
- `isSuccess: T => Boolean` (default: `true`) - determines whether a non-erroneous result of the `operation` should be
considered a success. When it evaluates to `true` - no further attempts would be made, otherwise - we'd keep retrying.

With finite schedules (i.e. those with `maxRetries` defined), if `isSuccess` keeps returning `false` when `maxRetries`
are reached, the result is returned as-is, even though it's considered "unsuccessful",
- `isWorthRetrying: E => Boolean` (default: `true`) - determines whether another attempt would be made if
the `operation` results in an error `E`. When it evaluates to `true` - we'd keep retrying, otherwise - we'd fail fast
with the error.

The `ResultPolicy[E, T]` is generic both over the error (`E`) and result (`T`) type. Note, however, that for the direct
variant `retry`, the error type `E` is fixed to `Throwable`, while for the `Either` and error-mode variants, `E` can ba
an arbitrary type.

### On retry

The `ResultPolicy[E, T]` is generic both over the error (`E`) and result (`T`) type. Note, however, that for the direct variant `retry`, the error type `E` is fixed to `Throwable`, while for the `Either` and error-mode variants, `E` can ba an arbitrary type.
The callback function has the following signature:

```
(Int, Either[E, T]) => Unit
```

Where:
- The first parameter, an `Int`, represents the attempt number of the retry operation.
- The second parameter is an `Either[E, T]` type, representing the result of the retry operation. Left represents an
error and Right represents a successful result.

### API shorthands

When you don't need to customize the result policy (i.e. use the default one), you can use one of the following shorthands to define a retry policy with a given schedule (note that the parameters are the same as when manually creating the respective `Schedule`):
When you don't need to customize the result policy (i.e. use the default one), you can use one of the following
shorthands to define a retry policy with a given schedule (note that the parameters are the same as when manually
creating the respective `Schedule`):

- `RetryPolicy.immediate(maxRetries: Int)`,
- `RetryPolicy.immediateForever`,
- `RetryPolicy.delay(maxRetries: Int, delay: FiniteDuration)`,
Expand All @@ -82,16 +127,19 @@ When you don't need to customize the result policy (i.e. use the default one), y
- `RetryPolicy.backoffForever(initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)`.

If you want to customize a part of the result policy, you can use the following shorthands:

- `ResultPolicy.default[E, T]` - uses the default settings,
- `ResultPolicy.successfulWhen[E, T](isSuccess: T => Boolean)` - uses the default `isWorthRetrying` and the provided `isSuccess`,
- `ResultPolicy.retryWhen[E, T](isWorthRetrying: E => Boolean)` - uses the default `isSuccess` and the provided `isWorthRetrying`,
- `ResultPolicy.successfulWhen[E, T](isSuccess: T => Boolean)` - uses the default `isWorthRetrying` and the
provided `isSuccess`,
- `ResultPolicy.retryWhen[E, T](isWorthRetrying: E => Boolean)` - uses the default `isSuccess` and the
provided `isWorthRetrying`,
- `ResultPolicy.neverRetry[E, T]` - uses the default `isSuccess` and fails fast on any error.

## Examples

```scala mdoc:compile-only
import ox.UnionMode
import ox.retry.{retry, retryEither}
import ox.retry.{retry, retryEither, retryWithErrorMode}
import ox.retry.{Jitter, ResultPolicy, RetryPolicy, Schedule}
import scala.concurrent.duration.*

Expand Down Expand Up @@ -120,7 +168,7 @@ retry(directOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen
retryEither(eitherOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))

// custom error mode
retry(UnionMode[String])(unionOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))
retryWithErrorMode(UnionMode[String])(unionOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))
```

See the tests in `ox.retry.*` for more.
8 changes: 4 additions & 4 deletions generated-doc/out/retries.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ The retries mechanism allows to retry a failing operation according to a given p
The basic syntax for retries is:

```scala
import ox.retry.retry
import ox.retry.retryWithErrorMode

retry(operation)(policy)
retryWithErrorMode(operation)(policy)
```

or, using syntax sugar:
Expand Down Expand Up @@ -91,7 +91,7 @@ If you want to customize a part of the result policy, you can use the following

```scala
import ox.UnionMode
import ox.retry.{retry, retryEither}
import ox.retry.{retryWithErrorMode, retryEither}
import ox.retry.{Jitter, ResultPolicy, RetryPolicy, Schedule}
import scala.concurrent.duration.*

Expand Down Expand Up @@ -120,7 +120,7 @@ retry(directOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen
retryEither(eitherOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))

// custom error mode
retry(UnionMode[String])(unionOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))
retryWithErrorMode(UnionMode[String])(unionOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))
```

See the tests in `ox.retry.*` for more.

0 comments on commit 072f61c

Please sign in to comment.