Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add side effect callbacks during retry #106

Merged
merged 8 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions core/src/main/scala/ox/retry/retry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,49 @@ import scala.util.Try
*
* @param operation
* The operation to retry.
* @param onRetry
* The function to call on each retry - see [[OnRetry]].
* @param policy
* The retry policy - see [[RetryPolicy]].
* @return
* The result of the function if it eventually succeeds.
* @throws anything
* The exception thrown by the last attempt if the policy decides to stop.
*/
def retry[T](operation: => T)(policy: RetryPolicy[Throwable, T]): T =
retryEither(Try(operation).toEither)(policy).fold(throw _, identity)
def retry[T](operation: => T, onRetry: OnRetry[Throwable, T] = emptyOnRetry)(policy: RetryPolicy[Throwable, T]): T =
DybekK marked this conversation as resolved.
Show resolved Hide resolved
retryEither(Try(operation).toEither, onRetry)(policy).fold(throw _, identity)

/** Retries an operation returning an [[scala.util.Either]] until it succeeds or the policy decides to stop. Note that any exceptions thrown
* by the operation aren't caught and don't cause a retry to happen.
*
* @param operation
* The operation to retry.
* @param onRetry
* The function to call on each retry - see [[OnRetry]].
* @param policy
* The retry policy - see [[RetryPolicy]].
* @return
* A [[scala.util.Right]] if the function eventually succeeds, or, otherwise, a [[scala.util.Left]] with the error from the last attempt.
*/
def retryEither[E, T](operation: => Either[E, T])(policy: RetryPolicy[E, T]): Either[E, T] = retry(EitherMode[E])(operation)(policy)
def retryEither[E, T](operation: => Either[E, T], onRetry: OnRetry[E, T] = emptyOnRetry)(policy: RetryPolicy[E, T]): Either[E, T] =
retryWithErrorMode(EitherMode[E])(operation, onRetry)(policy)

/** This is a type alias for a function that represents an action to be performed on each retry attempt.
*
* @tparam E
* The type of the error that can be returned by the operation.
* @tparam T
* The type of the successful result that can be returned by the operation.
* @param Int
* The attempt number of the retry operation.
* @param Either[E,
* T] The result of the retry operation, represented as an `Either` type. `Left` represents an error and `Right` represents a successful
* result.
* @return
* Unit
*/
private[ox] type OnRetry[E, T] = (Int, Either[E, T]) => Unit
private[ox] def emptyOnRetry[E, T] = (_: Int, _: Either[E, T]) => ()

/** Retries an operation using the given error mode until it succeeds or the policy decides to stop. Note that any exceptions thrown by the
* operation aren't caught (unless the operation catches them as part of its implementation) and don't cause a retry to happen.
Expand All @@ -39,30 +61,39 @@ def retryEither[E, T](operation: => Either[E, T])(policy: RetryPolicy[E, T]): Ei
* The error mode to use, which specifies when a result value is considered success, and when a failure.
* @param operation
* The operation to retry.
* @param onRetry
* The function to call on each retry - see [[OnRetry]].
* @param policy
* The retry policy - see [[RetryPolicy]].
* @return
* Either:
* - the result of the function if it eventually succeeds, in the context of `F`, as dictated by the error mode.
* - the error `E` in context `F` as returned by the last attempt if the policy decides to stop.
*/
def retry[E, F[_], T](em: ErrorMode[E, F])(operation: => F[T])(policy: RetryPolicy[E, T]): F[T] =
def retryWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(
operation: => F[T],
onRetry: OnRetry[E, T] = emptyOnRetry
)(policy: RetryPolicy[E, T]): F[T] =
@tailrec
def loop(attempt: Int, remainingAttempts: Option[Int], lastDelay: Option[FiniteDuration]): F[T] =
def sleepIfNeeded =
val delay = policy.schedule.nextDelay(attempt + 1, lastDelay).toMillis
val delay = policy.schedule.nextDelay(attempt, lastDelay).toMillis
if (delay > 0) Thread.sleep(delay)
delay

operation match
case v if em.isError(v) =>
val error = em.getError(v)
onRetry(attempt, Left(error))

if policy.resultPolicy.isWorthRetrying(error) && remainingAttempts.forall(_ > 0) then
val delay = sleepIfNeeded
loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay.millis))
else v
case v =>
val result = em.getT(v)
onRetry(attempt, Right(result))

if !policy.resultPolicy.isSuccess(result) && remainingAttempts.forall(_ > 0) then
val delay = sleepIfNeeded
loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay.millis))
Expand All @@ -72,4 +103,4 @@ def retry[E, F[_], T](em: ErrorMode[E, F])(operation: => F[T])(policy: RetryPoli
case finiteSchedule: Schedule.Finite => Some(finiteSchedule.maxRetries)
case _ => None

loop(0, remainingAttempts, None)
loop(1, remainingAttempts, None)
13 changes: 8 additions & 5 deletions core/src/main/scala/ox/syntax.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package ox

import ox.retry.RetryPolicy
import ox.retry.{OnRetry, emptyOnRetry, RetryPolicy}

import scala.concurrent.duration.FiniteDuration

object syntax:
extension [T](f: => T) def forever: Fork[Nothing] = ox.forever(f)

extension [T](f: => T) def retry(policy: RetryPolicy[Throwable, T]): T = ox.retry.retry(f)(policy)
extension [E, T](f: => Either[E, T]) def retryEither(policy: RetryPolicy[E, T]): Either[E, T] = ox.retry.retryEither(f)(policy)
extension [T](f: => T)
def retry(policy: RetryPolicy[Throwable, T], onRetry: OnRetry[Throwable, T] = emptyOnRetry): T =
ox.retry.retry(f, onRetry)(policy)
extension [E, T](f: => Either[E, T])
def retryEither(policy: RetryPolicy[E, T], onRetry: OnRetry[E, T] = emptyOnRetry): Either[E, T] =
ox.retry.retryEither(f, onRetry)(policy)

extension [T](f: => T)(using Ox)
def forkUser: Fork[T] = ox.forkUser(f)
Expand All @@ -25,8 +29,7 @@ object syntax:
def race(f2: => T): T = ox.race(f, f2)
def raceResultWith(f2: => T): T = ox.raceResult(f, f2)

extension [T <: AutoCloseable](f: => T)(using Ox)
def useInScope: T = ox.useCloseableInScope(f)
extension [T <: AutoCloseable](f: => T)(using Ox) def useInScope: T = ox.useCloseableInScope(f)

extension [I, C[E] <: Iterable[E]](f: => C[I])
def mapPar[O](parallelism: Int)(transform: I => O): C[O] = ox.mapPar(parallelism)(f)(transform)
Expand Down
63 changes: 63 additions & 0 deletions core/src/test/scala/ox/retry/OnRetryTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package ox.retry

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{EitherValues, TryValues}
import ox.retry.*

class OnRetryTest extends AnyFlatSpec with Matchers with EitherValues with TryValues:
behavior of "OnRetry callback"

it should "retry a succeeding function with onRetry callback" in {
// given
var onRetryInvocationCount = 0

var counter = 0
val successfulResult = 42

def f =
counter += 1
successfulResult

var returnedResult: Either[Throwable, Int] = null
def onRetry(attempt: Int, result: Either[Throwable, Int]): Unit =
onRetryInvocationCount += 1
returnedResult = result

// when
val result = retry(f, onRetry)(RetryPolicy.immediate(3))

// then
result shouldBe successfulResult
counter shouldBe 1

onRetryInvocationCount shouldBe 1
returnedResult shouldBe Right(successfulResult)
}

it should "retry a failing function with onRetry callback" in {
// given
var onRetryInvocationCount = 0

var counter = 0
val failedResult = new RuntimeException("boom")

def f =
counter += 1
if true then throw failedResult

var returnedResult: Either[Throwable, Unit] = null
def onRetry(attempt: Int, result: Either[Throwable, Unit]): Unit =
onRetryInvocationCount += 1
returnedResult = result

// when
val result = the[RuntimeException] thrownBy retry(f, onRetry)(RetryPolicy.immediate(3))

// then
result shouldBe failedResult
counter shouldBe 4

onRetryInvocationCount shouldBe 4
returnedResult shouldBe Left(failedResult)
}
101 changes: 76 additions & 25 deletions doc/retries.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
# Retries

The retries mechanism allows to retry a failing operation according to a given policy (e.g. retry 3 times with a 100ms delay between attempts).
The retries mechanism allows to retry a failing operation according to a given policy (e.g. retry 3 times with a 100ms
delay between attempts).

## API

The basic syntax for retries is:

```scala
import ox.retry.retry

retry(operation)(policy)
retry(operation, onRetry)(policy)
```

or, using syntax sugar:

```scala
import ox.syntax.*

operation.retry(policy)
operation.retry(policy, onRetry)
```

## Operation definition
Expand All @@ -26,54 +28,100 @@ The `operation` can be provided directly using a by-name parameter, i.e. `f: =>
There's also a `retryEither` variant which accepts a by-name `Either[E, T]`, i.e. `f: => Either[E, T]`, as well as one
which accepts arbitrary [error modes](error-handling.md), accepting the computation in an `F` context: `f: => F[T]`.

## OnRetry definition

The `onRetry` callback is a function that is invoked after each attempt to execute the operation. It is used to perform
any necessary actions or checks after each attempt, regardless of whether the attempt was successful or not.

The callback function has the following signature:

```
(Int, Either[E, T]) => Unit
```

Where:

- The first parameter, an `Int`, represents the attempt number of the retry operation.
- The second parameter is an `Either[E, T]` type, representing the result of the retry operation. Left represents an
error
and Right represents a successful result.

## Policies

A retry policy consists of two parts:
- a `Schedule`, which indicates how many times and with what delay should we retry the `operation` after an initial failure,

- a `Schedule`, which indicates how many times and with what delay should we retry the `operation` after an initial
failure,
- a `ResultPolicy`, which indicates whether:
- a non-erroneous outcome of the `operation` should be considered a success (if not, the `operation` would be retried),
- an erroneous outcome of the `operation` should be retried or fail fast.
- a non-erroneous outcome of the `operation` should be considered a success (if not, the `operation` would be
retried),
- an erroneous outcome of the `operation` should be retried or fail fast.

The available schedules are defined in the `Schedule` object. Each schedule has a finite and an infinite variant.

### Finite schedules

Finite schedules have a common `maxRetries: Int` parameter, which determines how many times the `operation` would be retried after an initial failure. This means that the operation could be executed at most `maxRetries + 1` times.
Finite schedules have a common `maxRetries: Int` parameter, which determines how many times the `operation` would be
retried after an initial failure. This means that the operation could be executed at most `maxRetries + 1` times.

### Infinite schedules

Each finite schedule has an infinite variant, whose settings are similar to those of the respective finite schedule, but without the `maxRetries` setting. Using the infinite variant can lead to a possibly infinite number of retries (unless the `operation` starts to succeed again at some point). The infinite schedules are created by calling `.forever` on the companion object of the respective finite schedule (see examples below).
Each finite schedule has an infinite variant, whose settings are similar to those of the respective finite schedule, but
without the `maxRetries` setting. Using the infinite variant can lead to a possibly infinite number of retries (unless
the `operation` starts to succeed again at some point). The infinite schedules are created by calling `.forever` on the
companion object of the respective finite schedule (see examples below).

### Schedule types

The supported schedules (specifically - their finite variants) are:

- `Immediate(maxRetries: Int)` - retries up to `maxRetries` times without any delay between subsequent attempts.
- `Delay(maxRetries: Int, delay: FiniteDuration)` - retries up to `maxRetries` times , sleeping for `delay` between subsequent attempts.
- `Backoff(maxRetries: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)` - retries up to `maxRetries` times , sleeping for `initialDelay` before the first retry, increasing the sleep between subsequent attempts exponentially (with base `2`) up to an optional `maxDelay` (default: 1 minute).
- `Delay(maxRetries: Int, delay: FiniteDuration)` - retries up to `maxRetries` times , sleeping for `delay` between
subsequent attempts.
- `Backoff(maxRetries: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)` - retries up
to `maxRetries` times , sleeping for `initialDelay` before the first retry, increasing the sleep between subsequent
attempts exponentially (with base `2`) up to an optional `maxDelay` (default: 1 minute).

Optionally, a random factor (jitter) can be used when calculating the delay before the next attempt. The purpose of jitter is to avoid clustering of subsequent retries, i.e. to reduce the number of clients calling a service exactly at the same time, which can result in subsequent failures, contrary to what you would expect from retrying. By introducing randomness to the delays, the retries become more evenly distributed over time.
Optionally, a random factor (jitter) can be used when calculating the delay before the next attempt. The purpose of
jitter is to avoid clustering of subsequent retries, i.e. to reduce the number of clients calling a service exactly at
the same time, which can result in subsequent failures, contrary to what you would expect from retrying. By
introducing randomness to the delays, the retries become more evenly distributed over time.

See the [AWS Architecture Blog article on backoff and jitter](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/) for a more in-depth explanation.
See
the [AWS Architecture Blog article on backoff and jitter](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/)
for a more in-depth explanation.

The following jitter strategies are available (defined in the `Jitter` enum):
- `None` - the default one, when no randomness is added, i.e. a pure exponential backoff is used,
- `Full` - picks a random value between `0` and the exponential backoff calculated for the current attempt,
- `Equal` - similar to `Full`, but prevents very short delays by always using a half of the original backoff and adding a random value between `0` and the other half,
- `Decorrelated` - uses the delay from the previous attempt (`lastDelay`) and picks a random value between the `initalAttempt` and `3 * lastDelay`.
- `None` - the default one, when no randomness is added, i.e. a pure exponential backoff is used,
- `Full` - picks a random value between `0` and the exponential backoff calculated for the current attempt,
- `Equal` - similar to `Full`, but prevents very short delays by always using a half of the original backoff and
adding a random value between `0` and the other half,
- `Decorrelated` - uses the delay from the previous attempt (`lastDelay`) and picks a random value between
the `initalAttempt` and `3 * lastDelay`.

### Result policies

A result policy allows to customize how the results of the `operation` are treated. It consists of two predicates:
- `isSuccess: T => Boolean` (default: `true`) - determines whether a non-erroneous result of the `operation` should be considered a success. When it evaluates to `true` - no further attempts would be made, otherwise - we'd keep retrying.

With finite schedules (i.e. those with `maxRetries` defined), if `isSuccess` keeps returning `false` when `maxRetries` are reached, the result is returned as-is, even though it's considered "unsuccessful",
- `isWorthRetrying: E => Boolean` (default: `true`) - determines whether another attempt would be made if the `operation` results in an error `E`. When it evaluates to `true` - we'd keep retrying, otherwise - we'd fail fast with the error.
- `isSuccess: T => Boolean` (default: `true`) - determines whether a non-erroneous result of the `operation` should be
considered a success. When it evaluates to `true` - no further attempts would be made, otherwise - we'd keep retrying.

The `ResultPolicy[E, T]` is generic both over the error (`E`) and result (`T`) type. Note, however, that for the direct variant `retry`, the error type `E` is fixed to `Throwable`, while for the `Either` and error-mode variants, `E` can ba an arbitrary type.
With finite schedules (i.e. those with `maxRetries` defined), if `isSuccess` keeps returning `false` when `maxRetries`
are reached, the result is returned as-is, even though it's considered "unsuccessful",
- `isWorthRetrying: E => Boolean` (default: `true`) - determines whether another attempt would be made if
the `operation` results in an error `E`. When it evaluates to `true` - we'd keep retrying, otherwise - we'd fail fast
with the error.

The `ResultPolicy[E, T]` is generic both over the error (`E`) and result (`T`) type. Note, however, that for the direct
variant `retry`, the error type `E` is fixed to `Throwable`, while for the `Either` and error-mode variants, `E` can ba
an arbitrary type.

### API shorthands

When you don't need to customize the result policy (i.e. use the default one), you can use one of the following shorthands to define a retry policy with a given schedule (note that the parameters are the same as when manually creating the respective `Schedule`):
When you don't need to customize the result policy (i.e. use the default one), you can use one of the following
shorthands to define a retry policy with a given schedule (note that the parameters are the same as when manually
creating the respective `Schedule`):

- `RetryPolicy.immediate(maxRetries: Int)`,
- `RetryPolicy.immediateForever`,
- `RetryPolicy.delay(maxRetries: Int, delay: FiniteDuration)`,
Expand All @@ -82,16 +130,19 @@ When you don't need to customize the result policy (i.e. use the default one), y
- `RetryPolicy.backoffForever(initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)`.

If you want to customize a part of the result policy, you can use the following shorthands:

- `ResultPolicy.default[E, T]` - uses the default settings,
- `ResultPolicy.successfulWhen[E, T](isSuccess: T => Boolean)` - uses the default `isWorthRetrying` and the provided `isSuccess`,
- `ResultPolicy.retryWhen[E, T](isWorthRetrying: E => Boolean)` - uses the default `isSuccess` and the provided `isWorthRetrying`,
- `ResultPolicy.successfulWhen[E, T](isSuccess: T => Boolean)` - uses the default `isWorthRetrying` and the
provided `isSuccess`,
- `ResultPolicy.retryWhen[E, T](isWorthRetrying: E => Boolean)` - uses the default `isSuccess` and the
provided `isWorthRetrying`,
- `ResultPolicy.neverRetry[E, T]` - uses the default `isSuccess` and fails fast on any error.

## Examples

```scala mdoc:compile-only
import ox.UnionMode
import ox.retry.{retry, retryEither}
import ox.retry.{retry, retryEither, retryWithErrorMode}
import ox.retry.{Jitter, ResultPolicy, RetryPolicy, Schedule}
import scala.concurrent.duration.*

Expand Down Expand Up @@ -120,7 +171,7 @@ retry(directOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen
retryEither(eitherOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))

// custom error mode
retry(UnionMode[String])(unionOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))
retryWithErrorMode(UnionMode[String])(unionOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))
```

See the tests in `ox.retry.*` for more.
Loading
Loading