Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduled and repeat API + retry refactoring #172

Merged
merged 40 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
f5a0581
refactor retry* functions to scheduled*
micossow Jul 3, 2024
1c14d0e
refactor Schedule.Every -> FixedRate
micossow Jul 3, 2024
c449e30
refactor schedule -> runScheduled and add repeat*
micossow Jul 4, 2024
f526f74
fix compilation error
micossow Jul 4, 2024
c27ed5d
fix compilation error and warnings
micossow Jul 4, 2024
3728c62
fix doc compile
micossow Jul 4, 2024
d511f02
add Schedule.InitialDelay, refactor Exponential.initialDelay -> first…
micossow Jul 4, 2024
373ee38
Schedule doc cleanup and disambiguation
micossow Jul 4, 2024
c68a9d6
add todo
micossow Jul 5, 2024
686e152
remove Schedule.FixedRate and move the start-to-start concept to runS…
micossow Jul 5, 2024
bfa4da0
fix default delayPolicy for retry
micossow Jul 5, 2024
a3e71b3
add tolerance to fixedRate tests
micossow Jul 5, 2024
cc73acc
reduce sleep time in fixedRate tests
micossow Jul 5, 2024
d0d94b0
refactor runScheduled -> scheduled
micossow Jul 8, 2024
6406eff
refactor RetryPolicy -> RetryConfig
micossow Jul 8, 2024
013bbdf
refactor Schedule.fallbackTo -> andThen
micossow Jul 8, 2024
e1a5805
close todo
micossow Jul 8, 2024
3ba7005
fix compileDocumentation
micossow Jul 8, 2024
90d2164
restore private modifier to Exponential.delay
micossow Jul 8, 2024
8c3128e
[doc] replace policy with config where applicable
micossow Jul 8, 2024
3569463
[doc] repeat and RepeatConfig doc
micossow Jul 8, 2024
8834a18
add initialDelay to RepeatConfig
micossow Jul 9, 2024
60e51b6
another Schedule refactoring
micossow Jul 9, 2024
5c6ef40
refactor DelayPolicy -> SleepMode
micossow Jul 9, 2024
88e9cec
fix initialDelay combined with forever schedule
micossow Jul 9, 2024
739f1a7
add ImmediateRepeatTest
micossow Jul 9, 2024
3715c25
add adr
micossow Jul 9, 2024
732c37e
update docs
micossow Jul 9, 2024
e9d7660
fix accidentally modified generated doc
micossow Jul 9, 2024
e716e77
reformat adr
micossow Jul 9, 2024
809275d
fix doc
micossow Jul 10, 2024
8f9cbc7
refactor attempt -> iteration in Schedule
micossow Jul 10, 2024
c966a90
remove shouldContinueOnError from repeat
micossow Jul 10, 2024
a893508
[doc] update repeat doc about combination with retry
micossow Jul 10, 2024
657534a
refactor variable/parameter names in scheduled
micossow Jul 10, 2024
0483281
document to prefer to use repeat, retry instead of scheduled
micossow Jul 10, 2024
6bfb177
[doc] mention how Retry/RetryConfig are special case of ScheduledConfig
micossow Jul 10, 2024
493888f
scheduled: rename iteration -> invocation
micossow Jul 10, 2024
a25f23e
fix doc compile
micossow Jul 10, 2024
5b3290c
combining Schedules refactoring and examples
micossow Jul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package ox.resilience

import ox.scheduling.{SleepMode, Jitter, Schedule, ScheduledConfig}

import scala.concurrent.duration.*

/** A policy that defines how to retry a failed operation.
/** A config that defines how to retry a failed operation.
*
* It is a special case of [[ScheduledConfig]] with [[ScheduledConfig.sleepMode]] always set to [[SleepMode.Delay]]
*
* @param schedule
* The retry schedule which determines the maximum number of retries and the delay between subsequent attempts to execute the operation.
Expand All @@ -20,57 +24,65 @@ import scala.concurrent.duration.*
* @tparam T
* The successful result type for the operation.
*/
case class RetryPolicy[E, T](
case class RetryConfig[E, T](
schedule: Schedule,
resultPolicy: ResultPolicy[E, T] = ResultPolicy.default[E, T],
onRetry: (Int, Either[E, T]) => Unit = (_: Int, _: Either[E, T]) => ()
)
) {
def toScheduledConfig: ScheduledConfig[E, T] = ScheduledConfig(
schedule,
onRetry,
shouldContinueOnError = resultPolicy.isWorthRetrying,
shouldContinueOnResult = t => !resultPolicy.isSuccess(t),
sleepMode = SleepMode.Delay
)
}

object RetryPolicy:
/** Creates a policy that retries up to a given number of times, with no delay between subsequent attempts, using a default
object RetryConfig:
/** Creates a config that retries up to a given number of times, with no delay between subsequent attempts, using a default
* [[ResultPolicy]].
*
* This is a shorthand for {{{RetryPolicy(Schedule.Immediate(maxRetries))}}}
* This is a shorthand for {{{RetryConfig(Schedule.Immediate(maxRetries))}}}
*
* @param maxRetries
* The maximum number of retries.
*/
def immediate[E, T](maxRetries: Int): RetryPolicy[E, T] = RetryPolicy(Schedule.Immediate(maxRetries))
def immediate[E, T](maxRetries: Int): RetryConfig[E, T] = RetryConfig(Schedule.Immediate(maxRetries))

/** Creates a policy that retries indefinitely, with no delay between subsequent attempts, using a default [[ResultPolicy]].
/** Creates a config that retries indefinitely, with no delay between subsequent attempts, using a default [[ResultPolicy]].
*
* This is a shorthand for {{{RetryPolicy(Schedule.Immediate.forever)}}}
* This is a shorthand for {{{RetryConfig(Schedule.Immediate.forever)}}}
*/
def immediateForever[E, T]: RetryPolicy[E, T] = RetryPolicy(Schedule.Immediate.forever)
def immediateForever[E, T]: RetryConfig[E, T] = RetryConfig(Schedule.Immediate.forever)

/** Creates a policy that retries up to a given number of times, with a fixed delay between subsequent attempts, using a default
/** Creates a config that retries up to a given number of times, with a fixed delay between subsequent attempts, using a default
* [[ResultPolicy]].
*
* This is a shorthand for {{{RetryPolicy(Schedule.Delay(maxRetries, delay))}}}
* This is a shorthand for {{{RetryConfig(Schedule.Delay(maxRetries, delay))}}}
*
* @param maxRetries
* The maximum number of retries.
* @param delay
* The delay between subsequent attempts.
*/
def delay[E, T](maxRetries: Int, delay: FiniteDuration): RetryPolicy[E, T] = RetryPolicy(Schedule.Delay(maxRetries, delay))
def delay[E, T](maxRetries: Int, delay: FiniteDuration): RetryConfig[E, T] = RetryConfig(Schedule.Fixed(maxRetries, delay))

/** Creates a policy that retries indefinitely, with a fixed delay between subsequent attempts, using a default [[ResultPolicy]].
/** Creates a config that retries indefinitely, with a fixed delay between subsequent attempts, using a default [[ResultPolicy]].
*
* This is a shorthand for {{{RetryPolicy(Schedule.Delay.forever(delay))}}}
* This is a shorthand for {{{RetryConfig(Schedule.Delay.forever(delay))}}}
*
* @param delay
* The delay between subsequent attempts.
*/
def delayForever[E, T](delay: FiniteDuration): RetryPolicy[E, T] = RetryPolicy(Schedule.Delay.forever(delay))
def delayForever[E, T](delay: FiniteDuration): RetryConfig[E, T] = RetryConfig(Schedule.Fixed.forever(delay))

/** Creates a policy that retries up to a given number of times, with an increasing delay (backoff) between subsequent attempts, using a
/** Creates a config that retries up to a given number of times, with an increasing delay (backoff) between subsequent attempts, using a
* default [[ResultPolicy]].
*
* The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay
* and capped at the given maximum delay.
*
* This is a shorthand for {{{RetryPolicy(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))}}}
* This is a shorthand for {{{RetryConfig(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))}}}
*
* @param maxRetries
* The maximum number of retries.
Expand All @@ -87,16 +99,16 @@ object RetryPolicy:
initialDelay: FiniteDuration,
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
): RetryPolicy[E, T] =
RetryPolicy(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))
): RetryConfig[E, T] =
RetryConfig(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))

/** Creates a policy that retries indefinitely, with an increasing delay (backoff) between subsequent attempts, using a default
/** Creates a config that retries indefinitely, with an increasing delay (backoff) between subsequent attempts, using a default
* [[ResultPolicy]].
*
* The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay
* and capped at the given maximum delay.
*
* This is a shorthand for {{{RetryPolicy(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))}}}
* This is a shorthand for {{{RetryConfig(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))}}}
*
* @param initialDelay
* The delay before the first retry.
Expand All @@ -110,5 +122,5 @@ object RetryPolicy:
initialDelay: FiniteDuration,
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
): RetryPolicy[E, T] =
RetryPolicy(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))
): RetryConfig[E, T] =
RetryConfig(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))
144 changes: 0 additions & 144 deletions core/src/main/scala/ox/resilience/Schedule.scala

This file was deleted.

81 changes: 32 additions & 49 deletions core/src/main/scala/ox/resilience/retry.scala
Original file line number Diff line number Diff line change
@@ -1,80 +1,63 @@
package ox.resilience

import ox.{EitherMode, ErrorMode, sleep}
import ox.{EitherMode, ErrorMode}
import ox.scheduling.*

import scala.annotation.tailrec
import scala.concurrent.duration.*
import scala.util.Try

/** Retries an operation returning a direct result until it succeeds or the policy decides to stop.
/** Retries an operation returning a direct result until it succeeds or the config decides to stop.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd also mention here that retry is a special-case of schedule, with a given set of defaults (and enumarte these defaults). Maybe this should be in the @see section?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

*
* @param policy
* The retry policy - see [[RetryPolicy]].
* [[retry]] is a special case of [[scheduled]] with a given set of defaults. See [[RetryConfig]] for more details.
*
* @param config
* The retry config - see [[RetryConfig]].
* @param operation
* The operation to retry.
* @return
* The result of the function if it eventually succeeds.
* @throws anything
* The exception thrown by the last attempt if the policy decides to stop.
* The exception thrown by the last attempt if the config decides to stop.
* @see
* [[scheduled]]
*/
def retry[T](policy: RetryPolicy[Throwable, T])(operation: => T): T =
retryEither(policy)(Try(operation).toEither).fold(throw _, identity)
def retry[T](config: RetryConfig[Throwable, T])(operation: => T): T =
retryEither(config)(Try(operation).toEither).fold(throw _, identity)

/** Retries an operation returning an [[scala.util.Either]] until it succeeds or the policy decides to stop. Note that any exceptions thrown
/** Retries an operation returning an [[scala.util.Either]] until it succeeds or the config decides to stop. Note that any exceptions thrown
* by the operation aren't caught and don't cause a retry to happen.
*
* @param policy
* The retry policy - see [[RetryPolicy]].
* [[retryEither]] is a special case of [[scheduledEither]] with a given set of defaults. See [[RetryConfig]] for more details.
*
* @param config
* The retry config - see [[RetryConfig]].
* @param operation
* The operation to retry.
* @return
* A [[scala.util.Right]] if the function eventually succeeds, or, otherwise, a [[scala.util.Left]] with the error from the last attempt.
* @see
* [[scheduledEither]]
*/
def retryEither[E, T](policy: RetryPolicy[E, T])(operation: => Either[E, T]): Either[E, T] =
retryWithErrorMode(EitherMode[E])(policy)(operation)
def retryEither[E, T](config: RetryConfig[E, T])(operation: => Either[E, T]): Either[E, T] =
retryWithErrorMode(EitherMode[E])(config)(operation)

/** Retries an operation using the given error mode until it succeeds or the policy decides to stop. Note that any exceptions thrown by the
/** Retries an operation using the given error mode until it succeeds or the config decides to stop. Note that any exceptions thrown by the
* operation aren't caught (unless the operation catches them as part of its implementation) and don't cause a retry to happen.
*
* [[retryWithErrorMode]] is a special case of [[scheduledWithErrorMode]] with a given set of defaults. See [[RetryConfig]] for more
* details.
*
* @param em
* The error mode to use, which specifies when a result value is considered success, and when a failure.
* @param policy
* The retry policy - see [[RetryPolicy]].
* @param config
* The retry config - see [[RetryConfig]].
* @param operation
* The operation to retry.
* @return
* Either:
* - the result of the function if it eventually succeeds, in the context of `F`, as dictated by the error mode.
* - the error `E` in context `F` as returned by the last attempt if the policy decides to stop.
* - the error `E` in context `F` as returned by the last attempt if the config decides to stop.
* @see
* [[scheduledWithErrorMode]]
*/
def retryWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(policy: RetryPolicy[E, T])(operation: => F[T]): F[T] =
@tailrec
def loop(attempt: Int, remainingAttempts: Option[Int], lastDelay: Option[FiniteDuration]): F[T] =
def sleepIfNeeded =
val delay = policy.schedule.nextDelay(attempt, lastDelay)
if delay.toMillis > 0 then sleep(delay)
delay

operation match
case v if em.isError(v) =>
val error = em.getError(v)
policy.onRetry(attempt, Left(error))

if policy.resultPolicy.isWorthRetrying(error) && remainingAttempts.forall(_ > 0) then
val delay = sleepIfNeeded
loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay))
else v
case v =>
val result = em.getT(v)
policy.onRetry(attempt, Right(result))

if !policy.resultPolicy.isSuccess(result) && remainingAttempts.forall(_ > 0) then
val delay = sleepIfNeeded
loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay))
else v

val remainingAttempts = policy.schedule match
case finiteSchedule: Schedule.Finite => Some(finiteSchedule.maxRetries)
case _ => None

loop(1, remainingAttempts, None)
def retryWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(config: RetryConfig[E, T])(operation: => F[T]): F[T] =
scheduledWithErrorMode(em)(config.toScheduledConfig)(operation)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ox.resilience
package ox.scheduling

/** A random factor used for calculating the delay between subsequent retries when a backoff strategy is used for calculating the delay.
*
Expand Down
Loading
Loading