Skip to content

Commit

Permalink
Scheduled and repeat API + retry refactoring (#172)
Browse files Browse the repository at this point in the history
  • Loading branch information
micossow committed Jul 10, 2024
1 parent 3910012 commit c61beae
Show file tree
Hide file tree
Showing 22 changed files with 1,009 additions and 327 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package ox.resilience

import ox.scheduling.{SleepMode, Jitter, Schedule, ScheduledConfig}

import scala.concurrent.duration.*

/** A policy that defines how to retry a failed operation.
/** A config that defines how to retry a failed operation.
*
* It is a special case of [[ScheduledConfig]] with [[ScheduledConfig.sleepMode]] always set to [[SleepMode.Delay]]
*
* @param schedule
* The retry schedule which determines the maximum number of retries and the delay between subsequent attempts to execute the operation.
Expand All @@ -20,57 +24,65 @@ import scala.concurrent.duration.*
* @tparam T
* The successful result type for the operation.
*/
case class RetryPolicy[E, T](
case class RetryConfig[E, T](
schedule: Schedule,
resultPolicy: ResultPolicy[E, T] = ResultPolicy.default[E, T],
onRetry: (Int, Either[E, T]) => Unit = (_: Int, _: Either[E, T]) => ()
)
) {
def toScheduledConfig: ScheduledConfig[E, T] = ScheduledConfig(
schedule,
onRetry,
shouldContinueOnError = resultPolicy.isWorthRetrying,
shouldContinueOnResult = t => !resultPolicy.isSuccess(t),
sleepMode = SleepMode.Delay
)
}

object RetryPolicy:
/** Creates a policy that retries up to a given number of times, with no delay between subsequent attempts, using a default
object RetryConfig:
/** Creates a config that retries up to a given number of times, with no delay between subsequent attempts, using a default
* [[ResultPolicy]].
*
* This is a shorthand for {{{RetryPolicy(Schedule.Immediate(maxRetries))}}}
* This is a shorthand for {{{RetryConfig(Schedule.Immediate(maxRetries))}}}
*
* @param maxRetries
* The maximum number of retries.
*/
def immediate[E, T](maxRetries: Int): RetryPolicy[E, T] = RetryPolicy(Schedule.Immediate(maxRetries))
def immediate[E, T](maxRetries: Int): RetryConfig[E, T] = RetryConfig(Schedule.Immediate(maxRetries))

/** Creates a policy that retries indefinitely, with no delay between subsequent attempts, using a default [[ResultPolicy]].
/** Creates a config that retries indefinitely, with no delay between subsequent attempts, using a default [[ResultPolicy]].
*
* This is a shorthand for {{{RetryPolicy(Schedule.Immediate.forever)}}}
* This is a shorthand for {{{RetryConfig(Schedule.Immediate.forever)}}}
*/
def immediateForever[E, T]: RetryPolicy[E, T] = RetryPolicy(Schedule.Immediate.forever)
def immediateForever[E, T]: RetryConfig[E, T] = RetryConfig(Schedule.Immediate.forever)

/** Creates a policy that retries up to a given number of times, with a fixed delay between subsequent attempts, using a default
/** Creates a config that retries up to a given number of times, with a fixed delay between subsequent attempts, using a default
* [[ResultPolicy]].
*
* This is a shorthand for {{{RetryPolicy(Schedule.Delay(maxRetries, delay))}}}
* This is a shorthand for {{{RetryConfig(Schedule.Delay(maxRetries, delay))}}}
*
* @param maxRetries
* The maximum number of retries.
* @param delay
* The delay between subsequent attempts.
*/
def delay[E, T](maxRetries: Int, delay: FiniteDuration): RetryPolicy[E, T] = RetryPolicy(Schedule.Delay(maxRetries, delay))
def delay[E, T](maxRetries: Int, delay: FiniteDuration): RetryConfig[E, T] = RetryConfig(Schedule.Fixed(maxRetries, delay))

/** Creates a policy that retries indefinitely, with a fixed delay between subsequent attempts, using a default [[ResultPolicy]].
/** Creates a config that retries indefinitely, with a fixed delay between subsequent attempts, using a default [[ResultPolicy]].
*
* This is a shorthand for {{{RetryPolicy(Schedule.Delay.forever(delay))}}}
* This is a shorthand for {{{RetryConfig(Schedule.Delay.forever(delay))}}}
*
* @param delay
* The delay between subsequent attempts.
*/
def delayForever[E, T](delay: FiniteDuration): RetryPolicy[E, T] = RetryPolicy(Schedule.Delay.forever(delay))
def delayForever[E, T](delay: FiniteDuration): RetryConfig[E, T] = RetryConfig(Schedule.Fixed.forever(delay))

/** Creates a policy that retries up to a given number of times, with an increasing delay (backoff) between subsequent attempts, using a
/** Creates a config that retries up to a given number of times, with an increasing delay (backoff) between subsequent attempts, using a
* default [[ResultPolicy]].
*
* The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay
* and capped at the given maximum delay.
*
* This is a shorthand for {{{RetryPolicy(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))}}}
* This is a shorthand for {{{RetryConfig(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))}}}
*
* @param maxRetries
* The maximum number of retries.
Expand All @@ -87,16 +99,16 @@ object RetryPolicy:
initialDelay: FiniteDuration,
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
): RetryPolicy[E, T] =
RetryPolicy(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))
): RetryConfig[E, T] =
RetryConfig(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))

/** Creates a policy that retries indefinitely, with an increasing delay (backoff) between subsequent attempts, using a default
/** Creates a config that retries indefinitely, with an increasing delay (backoff) between subsequent attempts, using a default
* [[ResultPolicy]].
*
* The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay
* and capped at the given maximum delay.
*
* This is a shorthand for {{{RetryPolicy(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))}}}
* This is a shorthand for {{{RetryConfig(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))}}}
*
* @param initialDelay
* The delay before the first retry.
Expand All @@ -110,5 +122,5 @@ object RetryPolicy:
initialDelay: FiniteDuration,
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
): RetryPolicy[E, T] =
RetryPolicy(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))
): RetryConfig[E, T] =
RetryConfig(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))
144 changes: 0 additions & 144 deletions core/src/main/scala/ox/resilience/Schedule.scala

This file was deleted.

81 changes: 32 additions & 49 deletions core/src/main/scala/ox/resilience/retry.scala
Original file line number Diff line number Diff line change
@@ -1,80 +1,63 @@
package ox.resilience

import ox.{EitherMode, ErrorMode, sleep}
import ox.{EitherMode, ErrorMode}
import ox.scheduling.*

import scala.annotation.tailrec
import scala.concurrent.duration.*
import scala.util.Try

/** Retries an operation returning a direct result until it succeeds or the policy decides to stop.
/** Retries an operation returning a direct result until it succeeds or the config decides to stop.
*
* @param policy
* The retry policy - see [[RetryPolicy]].
* [[retry]] is a special case of [[scheduled]] with a given set of defaults. See [[RetryConfig]] for more details.
*
* @param config
* The retry config - see [[RetryConfig]].
* @param operation
* The operation to retry.
* @return
* The result of the function if it eventually succeeds.
* @throws anything
* The exception thrown by the last attempt if the policy decides to stop.
* The exception thrown by the last attempt if the config decides to stop.
* @see
* [[scheduled]]
*/
def retry[T](policy: RetryPolicy[Throwable, T])(operation: => T): T =
retryEither(policy)(Try(operation).toEither).fold(throw _, identity)
def retry[T](config: RetryConfig[Throwable, T])(operation: => T): T =
retryEither(config)(Try(operation).toEither).fold(throw _, identity)

/** Retries an operation returning an [[scala.util.Either]] until it succeeds or the policy decides to stop. Note that any exceptions thrown
/** Retries an operation returning an [[scala.util.Either]] until it succeeds or the config decides to stop. Note that any exceptions thrown
* by the operation aren't caught and don't cause a retry to happen.
*
* @param policy
* The retry policy - see [[RetryPolicy]].
* [[retryEither]] is a special case of [[scheduledEither]] with a given set of defaults. See [[RetryConfig]] for more details.
*
* @param config
* The retry config - see [[RetryConfig]].
* @param operation
* The operation to retry.
* @return
* A [[scala.util.Right]] if the function eventually succeeds, or, otherwise, a [[scala.util.Left]] with the error from the last attempt.
* @see
* [[scheduledEither]]
*/
def retryEither[E, T](policy: RetryPolicy[E, T])(operation: => Either[E, T]): Either[E, T] =
retryWithErrorMode(EitherMode[E])(policy)(operation)
def retryEither[E, T](config: RetryConfig[E, T])(operation: => Either[E, T]): Either[E, T] =
retryWithErrorMode(EitherMode[E])(config)(operation)

/** Retries an operation using the given error mode until it succeeds or the policy decides to stop. Note that any exceptions thrown by the
/** Retries an operation using the given error mode until it succeeds or the config decides to stop. Note that any exceptions thrown by the
* operation aren't caught (unless the operation catches them as part of its implementation) and don't cause a retry to happen.
*
* [[retryWithErrorMode]] is a special case of [[scheduledWithErrorMode]] with a given set of defaults. See [[RetryConfig]] for more
* details.
*
* @param em
* The error mode to use, which specifies when a result value is considered success, and when a failure.
* @param policy
* The retry policy - see [[RetryPolicy]].
* @param config
* The retry config - see [[RetryConfig]].
* @param operation
* The operation to retry.
* @return
* Either:
* - the result of the function if it eventually succeeds, in the context of `F`, as dictated by the error mode.
* - the error `E` in context `F` as returned by the last attempt if the policy decides to stop.
* - the error `E` in context `F` as returned by the last attempt if the config decides to stop.
* @see
* [[scheduledWithErrorMode]]
*/
def retryWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(policy: RetryPolicy[E, T])(operation: => F[T]): F[T] =
@tailrec
def loop(attempt: Int, remainingAttempts: Option[Int], lastDelay: Option[FiniteDuration]): F[T] =
def sleepIfNeeded =
val delay = policy.schedule.nextDelay(attempt, lastDelay)
if delay.toMillis > 0 then sleep(delay)
delay

operation match
case v if em.isError(v) =>
val error = em.getError(v)
policy.onRetry(attempt, Left(error))

if policy.resultPolicy.isWorthRetrying(error) && remainingAttempts.forall(_ > 0) then
val delay = sleepIfNeeded
loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay))
else v
case v =>
val result = em.getT(v)
policy.onRetry(attempt, Right(result))

if !policy.resultPolicy.isSuccess(result) && remainingAttempts.forall(_ > 0) then
val delay = sleepIfNeeded
loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay))
else v

val remainingAttempts = policy.schedule match
case finiteSchedule: Schedule.Finite => Some(finiteSchedule.maxRetries)
case _ => None

loop(1, remainingAttempts, None)
def retryWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(config: RetryConfig[E, T])(operation: => F[T]): F[T] =
scheduledWithErrorMode(em)(config.toScheduledConfig)(operation)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ox.resilience
package ox.scheduling

/** A random factor used for calculating the delay between subsequent retries when a backoff strategy is used for calculating the delay.
*
Expand Down
Loading

0 comments on commit c61beae

Please sign in to comment.