Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add side effect callbacks during retry #106

Merged
merged 8 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions core/src/main/scala/ox/retry/RetryLifecycle.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package ox.retry

import ox.retry.RetryLifecycle.*

/** A case class representing the lifecycle of a retry operation. It contains two optional callbacks: `beforeEachAttempt` and
* `afterEachAttempt`.
*
* @param beforeEachAttempt
* A function that is executed before each retry attempt. It takes the attempt number as a parameter. By default, it's an empty function.
* @param afterEachAttempt
* A function that is executed after each retry attempt. It takes the attempt number and the result of the attempt as parameters. The
* result is represented as an `Either` type, where `Left` represents an error and `Right` represents a successful result. By default,
* it's an empty function.
* @tparam E
* The type of the error in case the retry operation fails.
* @tparam T
* The type of the successful result in case the retry operation succeeds.
*/
case class RetryLifecycle[E, T](
beforeEachAttempt: BeforeEachAttempt = _ => (),
afterEachAttempt: AfterEachAttempt[E, T] = (_, _: Either[E, T]) => ()
)

object RetryLifecycle:
type BeforeEachAttempt = Int => Unit
type AfterEachAttempt[E, T] = (Int, Either[E, T]) => Unit

/** Creates a `RetryLifecycle` instance with default empty callbacks.
*
* @tparam E
* The type of the error in case the retry operation fails.
* @tparam T
* The type of the successful result in case the retry operation succeeds.
* @return
* A `RetryLifecycle` instance with default empty callbacks.
*/
def default[E, T]: RetryLifecycle[E, T] = RetryLifecycle()

/** Creates a `RetryLifecycle` instance with a specific `beforeEachAttempt` callback.
*
* @param f
* The function to be used as `beforeEachAttempt`.
* @tparam E
* The type of the error in case the retry operation fails.
* @tparam T
* The type of the successful result in case the retry operation succeeds.
* @return
* A `RetryLifecycle` instance with the specified `beforeEachAttempt`.
*/
def beforeEachAttempt[E, T](f: BeforeEachAttempt): RetryLifecycle[E, T] = RetryLifecycle(beforeEachAttempt = f)

/** Creates a `RetryLifecycle` instance with a specific `afterEachAttempt` callback.
*
* @param f
* The function to be used as `afterEachAttempt`.
* @tparam E
* The type of the error in case the retry operation fails.
* @tparam T
* The type of the successful result in case the retry operation succeeds.
* @return
* A `RetryLifecycle` instance with the specified `afterEachAttempt`.
*/
def afterEachAttempt[E, T](f: AfterEachAttempt[E, T]): RetryLifecycle[E, T] = RetryLifecycle(afterEachAttempt = f)
27 changes: 23 additions & 4 deletions core/src/main/scala/ox/retry/retry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,18 @@ import scala.util.Try
* The operation to retry.
* @param policy
* The retry policy - see [[RetryPolicy]].
* @param lifecycle
* The retry lifecycle - see [[RetryLifecycle]]. Defaults to an empty lifecycle.
* @return
* The result of the function if it eventually succeeds.
* @throws anything
* The exception thrown by the last attempt if the policy decides to stop.
*/
def retry[T](operation: => T)(policy: RetryPolicy[Throwable, T]): T =
retryEither(Try(operation).toEither)(policy).fold(throw _, identity)
def retry[T](operation: => T)(
policy: RetryPolicy[Throwable, T],
lifecycle: RetryLifecycle[Throwable, T] = RetryLifecycle.default[Throwable, T]
): T =
retryEither(Try(operation).toEither)(policy, lifecycle).fold(throw _, identity)

/** Retries an operation returning an [[scala.util.Either]] until it succeeds or the policy decides to stop. Note that any exceptions thrown
* by the operation aren't caught and don't cause a retry to happen.
Expand All @@ -27,10 +32,16 @@ def retry[T](operation: => T)(policy: RetryPolicy[Throwable, T]): T =
* The operation to retry.
* @param policy
* The retry policy - see [[RetryPolicy]].
* @param lifecycle
* The retry lifecycle - see [[RetryLifecycle]]. Defaults to an empty lifecycle.
* @return
* A [[scala.util.Right]] if the function eventually succeeds, or, otherwise, a [[scala.util.Left]] with the error from the last attempt.
*/
def retryEither[E, T](operation: => Either[E, T])(policy: RetryPolicy[E, T]): Either[E, T] = retry(EitherMode[E])(operation)(policy)
def retryEither[E, T](operation: => Either[E, T])(
policy: RetryPolicy[E, T],
lifecycle: RetryLifecycle[E, T] = RetryLifecycle.default[E, T]
): Either[E, T] =
retry(EitherMode[E])(operation)(policy, lifecycle)

/** Retries an operation using the given error mode until it succeeds or the policy decides to stop. Note that any exceptions thrown by the
* operation aren't caught (unless the operation catches them as part of its implementation) and don't cause a retry to happen.
Expand All @@ -41,28 +52,36 @@ def retryEither[E, T](operation: => Either[E, T])(policy: RetryPolicy[E, T]): Ei
* The operation to retry.
* @param policy
* The retry policy - see [[RetryPolicy]].
* @param lifecycle
* The retry lifecycle - see [[RetryLifecycle]].
* @return
* Either:
* - the result of the function if it eventually succeeds, in the context of `F`, as dictated by the error mode.
* - the error `E` in context `F` as returned by the last attempt if the policy decides to stop.
*/
def retry[E, F[_], T](em: ErrorMode[E, F])(operation: => F[T])(policy: RetryPolicy[E, T]): F[T] =
def retry[E, F[_], T](em: ErrorMode[E, F])(operation: => F[T])(policy: RetryPolicy[E, T], lifecycle: RetryLifecycle[E, T]): F[T] =
@tailrec
def loop(attempt: Int, remainingAttempts: Option[Int], lastDelay: Option[FiniteDuration]): F[T] =
def sleepIfNeeded =
val delay = policy.schedule.nextDelay(attempt + 1, lastDelay).toMillis
if (delay > 0) Thread.sleep(delay)
delay

lifecycle.beforeEachAttempt(attempt + 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we could just number the attempts starting from 1 instead of 0 - WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. In my opinion, it is more intuitive to read the code if we get rid of attempt + 1 in some places.


operation match
case v if em.isError(v) =>
val error = em.getError(v)
lifecycle.afterEachAttempt(attempt + 1, Left(error))

if policy.resultPolicy.isWorthRetrying(error) && remainingAttempts.forall(_ > 0) then
val delay = sleepIfNeeded
loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay.millis))
else v
case v =>
val result = em.getT(v)
lifecycle.afterEachAttempt(attempt + 1, Right(result))

if !policy.resultPolicy.isSuccess(result) && remainingAttempts.forall(_ > 0) then
val delay = sleepIfNeeded
loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay.millis))
Expand Down
13 changes: 8 additions & 5 deletions core/src/main/scala/ox/syntax.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package ox

import ox.retry.RetryPolicy
import ox.retry.{RetryLifecycle, RetryPolicy}

import scala.concurrent.duration.FiniteDuration

object syntax:
extension [T](f: => T) def forever: Fork[Nothing] = ox.forever(f)

extension [T](f: => T) def retry(policy: RetryPolicy[Throwable, T]): T = ox.retry.retry(f)(policy)
extension [E, T](f: => Either[E, T]) def retryEither(policy: RetryPolicy[E, T]): Either[E, T] = ox.retry.retryEither(f)(policy)
extension [T](f: => T)
def retry(policy: RetryPolicy[Throwable, T], lifecycle: RetryLifecycle[Throwable, T] = RetryLifecycle[Throwable, T]()): T =
ox.retry.retry(f)(policy, lifecycle)
extension [E, T](f: => Either[E, T])
def retryEither(policy: RetryPolicy[E, T], lifecycle: RetryLifecycle[E, T] = RetryLifecycle[E, T]()): Either[E, T] =
ox.retry.retryEither(f)(policy, lifecycle)

extension [T](f: => T)(using Ox)
def forkUser: Fork[T] = ox.forkUser(f)
Expand All @@ -25,8 +29,7 @@ object syntax:
def race(f2: => T): T = ox.race(f, f2)
def raceResultWith(f2: => T): T = ox.raceResult(f, f2)

extension [T <: AutoCloseable](f: => T)(using Ox)
def useInScope: T = ox.useCloseableInScope(f)
extension [T <: AutoCloseable](f: => T)(using Ox) def useInScope: T = ox.useCloseableInScope(f)

extension [I, C[E] <: Iterable[E]](f: => C[I])
def mapPar[O](parallelism: Int)(transform: I => O): C[O] = ox.mapPar(parallelism)(f)(transform)
Expand Down
95 changes: 95 additions & 0 deletions core/src/test/scala/ox/retry/RetryLifecycleTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package ox.retry

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{EitherValues, TryValues}
import ox.retry.*

class RetryLifecycleTest extends AnyFlatSpec with Matchers with EitherValues with TryValues:
behavior of "Retry lifecycle"

it should "retry a succeeding function with lifecycle callbacks" in {
// given
var beforeEachAttemptInvoked = false
var beforeEachAttemptInvocationCount = 0

var afterEachAttemptInvoked = false
var afterEachAttemptInvocationCount = 0

var counter = 0
val successfulResult = 42

def f =
counter += 1
successfulResult

def beforeEachAttempt(attempt: Int): Unit =
beforeEachAttemptInvoked = true
beforeEachAttemptInvocationCount += 1

var returnedResult: Either[Throwable, Int] = null
def afterEachAttempt(attempt: Int, result: Either[Throwable, Int]): Unit =
afterEachAttemptInvoked = true
afterEachAttemptInvocationCount += 1
returnedResult = result

// when
val result = retry(f)(
RetryPolicy.immediate(3),
RetryLifecycle(beforeEachAttempt, afterEachAttempt)
)

// then
result shouldBe successfulResult
counter shouldBe 1

beforeEachAttemptInvoked shouldBe true
beforeEachAttemptInvocationCount shouldBe 1

afterEachAttemptInvoked shouldBe true
afterEachAttemptInvocationCount shouldBe 1
returnedResult shouldBe Right(successfulResult)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't those assertions redundant? If you verify that the callback was invoked once, I guess one of them would be sufficient. Same for the "before" callback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • onRetryInvoked shouldBe true can be removed, since checking the invocation count would be sufficient enough.
  • returnedResult shouldBe Right(successfulResult) should not be removed. It checks whether the second parameter of the onRetry function has been given the correct result, in this case, Right(successfulResult). If this assertion fails, it indicates that the onRetry function wasn't given the correct result, and this oversight could potentially allow bugs to go unnoticed.

}

it should "retry a failing function with lifecycle callbacks" in {
// given
var beforeEachAttemptInvoked = false
var beforeEachAttemptInvocationCount = 0

var afterEachAttemptInvoked = false
var afterEachAttemptInvocationCount = 0

var counter = 0
val failedResult = new RuntimeException("boom")

def f =
counter += 1
if true then throw failedResult

def beforeEachAttempt(attempt: Int): Unit =
beforeEachAttemptInvoked = true
beforeEachAttemptInvocationCount += 1

var returnedResult: Either[Throwable, Unit] = null
def afterEachAttempt(attempt: Int, result: Either[Throwable, Unit]): Unit =
afterEachAttemptInvoked = true
afterEachAttemptInvocationCount += 1
returnedResult = result

// when
val result = the[RuntimeException] thrownBy retry(f)(
RetryPolicy.immediate(3),
RetryLifecycle(beforeEachAttempt, afterEachAttempt)
)

// then
result shouldBe failedResult
counter shouldBe 4

beforeEachAttemptInvoked shouldBe true
beforeEachAttemptInvocationCount shouldBe 4

afterEachAttemptInvoked shouldBe true
afterEachAttemptInvocationCount shouldBe 4
returnedResult shouldBe Left(failedResult)
DybekK marked this conversation as resolved.
Show resolved Hide resolved
}
44 changes: 40 additions & 4 deletions doc/retries.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ The basic syntax for retries is:
```scala
import ox.retry.retry

retry(operation)(policy)
retry(operation)(policy, lifecycle)
```

or, using syntax sugar:

```scala
import ox.syntax.*

operation.retry(policy)
operation.retry(policy, lifecycle)
```

## Operation definition
Expand Down Expand Up @@ -92,7 +92,7 @@ If you want to customize a part of the result policy, you can use the following
```scala mdoc:compile-only
import ox.UnionMode
import ox.retry.{retry, retryEither}
import ox.retry.{Jitter, ResultPolicy, RetryPolicy, Schedule}
import ox.retry.{Jitter, ResultPolicy, RetryPolicy, RetryLifecycle, Schedule}
import scala.concurrent.duration.*

def directOperation: Int = ???
Expand Down Expand Up @@ -120,7 +120,43 @@ retry(directOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen
retryEither(eitherOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))

// custom error mode
retry(UnionMode[String])(unionOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))
retry(UnionMode[String])(unionOperation)(
RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")),
RetryLifecycle.default
)
```

## RetryLifecycle

The `RetryLifecycle` is a case class that represents the lifecycle of a retry operation. It contains two optional actions: `beforeEachAttempt` and `afterEachAttempt`.

- `beforeEachAttempt` is executed before each retry attempt. It takes the attempt number as a parameter. By default, it's an empty function.
- `afterEachAttempt` is executed after each retry attempt. It takes the attempt number and the result of the attempt as parameters. The result is represented as an `Either` type, where `Left` represents an error and `Right` represents a successful result. By default, it's an empty function.

The `RetryLifecycle` class is generic over the error (`E`) and result (`T`) type.

## Examples

```scala mdoc:compile-only
import ox.retry.{RetryLifecycle, RetryPolicy, retry}

val policy: RetryPolicy[Throwable, Int] = ???
def operation: Int = ???

// using only beforeEachAttempt callback
val beforeLifecycleOnly = RetryLifecycle.beforeEachAttempt[Throwable, Int](attempt => println(s"Attempt $attempt"))
retry(operation)(policy, beforeLifecycleOnly)

// using only afterEachAttempt callback
val afterLifecycleOnly = RetryLifecycle.afterEachAttempt[Throwable, Int]((attempt, result) => println(s"Attempt $attempt returned $result"))
retry(operation)(policy, afterLifecycleOnly)

// using both beforeEachAttempt and afterEachAttempt callbacks
val fullLifecycle = RetryLifecycle[Throwable, Int](
attempt => println(s"Attempt $attempt"),
(attempt, result) => println(s"Attempt $attempt returned $result")
)
retry(operation)(policy, fullLifecycle)
```

See the tests in `ox.retry.*` for more.
Loading