diff --git a/core/src/main/scala/ox/resilience/RetryPolicy.scala b/core/src/main/scala/ox/resilience/RetryConfig.scala similarity index 68% rename from core/src/main/scala/ox/resilience/RetryPolicy.scala rename to core/src/main/scala/ox/resilience/RetryConfig.scala index 69359a50..dc9d8d24 100644 --- a/core/src/main/scala/ox/resilience/RetryPolicy.scala +++ b/core/src/main/scala/ox/resilience/RetryConfig.scala @@ -1,8 +1,12 @@ package ox.resilience +import ox.scheduling.{SleepMode, Jitter, Schedule, ScheduledConfig} + import scala.concurrent.duration.* -/** A policy that defines how to retry a failed operation. +/** A config that defines how to retry a failed operation. + * + * It is a special case of [[ScheduledConfig]] with [[ScheduledConfig.sleepMode]] always set to [[SleepMode.Delay]] * * @param schedule * The retry schedule which determines the maximum number of retries and the delay between subsequent attempts to execute the operation. @@ -20,57 +24,65 @@ import scala.concurrent.duration.* * @tparam T * The successful result type for the operation. */ -case class RetryPolicy[E, T]( +case class RetryConfig[E, T]( schedule: Schedule, resultPolicy: ResultPolicy[E, T] = ResultPolicy.default[E, T], onRetry: (Int, Either[E, T]) => Unit = (_: Int, _: Either[E, T]) => () -) +) { + def toScheduledConfig: ScheduledConfig[E, T] = ScheduledConfig( + schedule, + onRetry, + shouldContinueOnError = resultPolicy.isWorthRetrying, + shouldContinueOnResult = t => !resultPolicy.isSuccess(t), + sleepMode = SleepMode.Delay + ) +} -object RetryPolicy: - /** Creates a policy that retries up to a given number of times, with no delay between subsequent attempts, using a default +object RetryConfig: + /** Creates a config that retries up to a given number of times, with no delay between subsequent attempts, using a default * [[ResultPolicy]]. * - * This is a shorthand for {{{RetryPolicy(Schedule.Immediate(maxRetries))}}} + * This is a shorthand for {{{RetryConfig(Schedule.Immediate(maxRetries))}}} * * @param maxRetries * The maximum number of retries. */ - def immediate[E, T](maxRetries: Int): RetryPolicy[E, T] = RetryPolicy(Schedule.Immediate(maxRetries)) + def immediate[E, T](maxRetries: Int): RetryConfig[E, T] = RetryConfig(Schedule.Immediate(maxRetries)) - /** Creates a policy that retries indefinitely, with no delay between subsequent attempts, using a default [[ResultPolicy]]. + /** Creates a config that retries indefinitely, with no delay between subsequent attempts, using a default [[ResultPolicy]]. * - * This is a shorthand for {{{RetryPolicy(Schedule.Immediate.forever)}}} + * This is a shorthand for {{{RetryConfig(Schedule.Immediate.forever)}}} */ - def immediateForever[E, T]: RetryPolicy[E, T] = RetryPolicy(Schedule.Immediate.forever) + def immediateForever[E, T]: RetryConfig[E, T] = RetryConfig(Schedule.Immediate.forever) - /** Creates a policy that retries up to a given number of times, with a fixed delay between subsequent attempts, using a default + /** Creates a config that retries up to a given number of times, with a fixed delay between subsequent attempts, using a default * [[ResultPolicy]]. * - * This is a shorthand for {{{RetryPolicy(Schedule.Delay(maxRetries, delay))}}} + * This is a shorthand for {{{RetryConfig(Schedule.Delay(maxRetries, delay))}}} * * @param maxRetries * The maximum number of retries. * @param delay * The delay between subsequent attempts. */ - def delay[E, T](maxRetries: Int, delay: FiniteDuration): RetryPolicy[E, T] = RetryPolicy(Schedule.Delay(maxRetries, delay)) + def delay[E, T](maxRetries: Int, delay: FiniteDuration): RetryConfig[E, T] = RetryConfig(Schedule.Fixed(maxRetries, delay)) - /** Creates a policy that retries indefinitely, with a fixed delay between subsequent attempts, using a default [[ResultPolicy]]. + /** Creates a config that retries indefinitely, with a fixed delay between subsequent attempts, using a default [[ResultPolicy]]. * - * This is a shorthand for {{{RetryPolicy(Schedule.Delay.forever(delay))}}} + * This is a shorthand for {{{RetryConfig(Schedule.Delay.forever(delay))}}} * * @param delay * The delay between subsequent attempts. */ - def delayForever[E, T](delay: FiniteDuration): RetryPolicy[E, T] = RetryPolicy(Schedule.Delay.forever(delay)) + def delayForever[E, T](delay: FiniteDuration): RetryConfig[E, T] = RetryConfig(Schedule.Fixed.forever(delay)) - /** Creates a policy that retries up to a given number of times, with an increasing delay (backoff) between subsequent attempts, using a + /** Creates a config that retries up to a given number of times, with an increasing delay (backoff) between subsequent attempts, using a * default [[ResultPolicy]]. * * The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay * and capped at the given maximum delay. * - * This is a shorthand for {{{RetryPolicy(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))}}} + * This is a shorthand for {{{RetryConfig(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))}}} * * @param maxRetries * The maximum number of retries. @@ -87,16 +99,16 @@ object RetryPolicy: initialDelay: FiniteDuration, maxDelay: FiniteDuration = 1.minute, jitter: Jitter = Jitter.None - ): RetryPolicy[E, T] = - RetryPolicy(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter)) + ): RetryConfig[E, T] = + RetryConfig(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter)) - /** Creates a policy that retries indefinitely, with an increasing delay (backoff) between subsequent attempts, using a default + /** Creates a config that retries indefinitely, with an increasing delay (backoff) between subsequent attempts, using a default * [[ResultPolicy]]. * * The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay * and capped at the given maximum delay. * - * This is a shorthand for {{{RetryPolicy(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))}}} + * This is a shorthand for {{{RetryConfig(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))}}} * * @param initialDelay * The delay before the first retry. @@ -110,5 +122,5 @@ object RetryPolicy: initialDelay: FiniteDuration, maxDelay: FiniteDuration = 1.minute, jitter: Jitter = Jitter.None - ): RetryPolicy[E, T] = - RetryPolicy(Schedule.Backoff.forever(initialDelay, maxDelay, jitter)) + ): RetryConfig[E, T] = + RetryConfig(Schedule.Backoff.forever(initialDelay, maxDelay, jitter)) diff --git a/core/src/main/scala/ox/resilience/Schedule.scala b/core/src/main/scala/ox/resilience/Schedule.scala deleted file mode 100644 index 83416b2c..00000000 --- a/core/src/main/scala/ox/resilience/Schedule.scala +++ /dev/null @@ -1,144 +0,0 @@ -package ox.resilience - -import scala.concurrent.duration.* -import scala.util.Random - -private[resilience] sealed trait Schedule: - def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration - -object Schedule: - - private[resilience] sealed trait Finite extends Schedule: - def maxRetries: Int - def fallbackTo(fallback: Finite): Finite = FallingBack(this, fallback) - def fallbackTo(fallback: Infinite): Infinite = FallingBack.forever(this, fallback) - - private[resilience] sealed trait Infinite extends Schedule - - /** A schedule that retries up to a given number of times, with no delay between subsequent attempts. - * - * @param maxRetries - * The maximum number of retries. - */ - case class Immediate(maxRetries: Int) extends Finite: - override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = Duration.Zero - - object Immediate: - /** A schedule that retries indefinitely, with no delay between subsequent attempts. */ - def forever: Infinite = ImmediateForever - - private case object ImmediateForever extends Infinite: - override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = Duration.Zero - - /** A schedule that retries up to a given number of times, with a fixed delay between subsequent attempts. - * - * @param maxRetries - * The maximum number of retries. - * @param delay - * The delay between subsequent attempts. - */ - case class Delay(maxRetries: Int, delay: FiniteDuration) extends Finite: - override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = delay - - object Delay: - /** A schedule that retries indefinitely, with a fixed delay between subsequent attempts. - * - * @param delay - * The delay between subsequent attempts. - */ - def forever(delay: FiniteDuration): Infinite = DelayForever(delay) - - case class DelayForever private[resilience] (delay: FiniteDuration) extends Infinite: - override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = delay - - /** A schedule that retries up to a given number of times, with an increasing delay (backoff) between subsequent attempts. - * - * The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay - * and capped at the given maximum delay. - * - * @param maxRetries - * The maximum number of retries. - * @param initialDelay - * The delay before the first retry. - * @param maxDelay - * The maximum delay between subsequent retries. - * @param jitter - * A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter, - * i.e. an exponential backoff with no adjustments. - */ - case class Backoff( - maxRetries: Int, - initialDelay: FiniteDuration, - maxDelay: FiniteDuration = 1.minute, - jitter: Jitter = Jitter.None - ) extends Finite: - override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = - Backoff.nextDelay(attempt, initialDelay, maxDelay, jitter, lastDelay) - - object Backoff: - private[resilience] def delay(attempt: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration): FiniteDuration = - // converting Duration <-> Long back and forth to avoid exceeding maximum duration - (initialDelay.toMillis * Math.pow(2, attempt)).toLong.min(maxDelay.toMillis).millis - - private[resilience] def nextDelay( - attempt: Int, - initialDelay: FiniteDuration, - maxDelay: FiniteDuration, - jitter: Jitter, - lastDelay: Option[FiniteDuration] - ): FiniteDuration = - def backoffDelay = Backoff.delay(attempt, initialDelay, maxDelay) - - jitter match - case Jitter.None => backoffDelay - case Jitter.Full => Random.between(0, backoffDelay.toMillis).millis - case Jitter.Equal => - val backoff = backoffDelay.toMillis - (backoff / 2 + Random.between(0, backoff / 2)).millis - case Jitter.Decorrelated => - val last = lastDelay.getOrElse(initialDelay).toMillis - Random.between(initialDelay.toMillis, last * 3).millis - - /** A schedule that retries indefinitely, with an increasing delay (backoff) between subsequent attempts. - * - * The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial - * delay and capped at the given maximum delay. - * - * @param initialDelay - * The delay before the first retry. - * @param maxDelay - * The maximum delay between subsequent retries. - * @param jitter - * A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter, - * i.e. an exponential backoff with no adjustments. - */ - def forever(initialDelay: FiniteDuration, maxDelay: FiniteDuration = 1.minute, jitter: Jitter = Jitter.None): Infinite = - BackoffForever(initialDelay, maxDelay, jitter) - - case class BackoffForever private[resilience] ( - initialDelay: FiniteDuration, - maxDelay: FiniteDuration = 1.minute, - jitter: Jitter = Jitter.None - ) extends Infinite: - override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = - Backoff.nextDelay(attempt, initialDelay, maxDelay, jitter, lastDelay) - - private[resilience] sealed trait WithFallback extends Schedule: - def base: Finite - def fallback: Schedule - override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = - if base.maxRetries > attempt then base.nextDelay(attempt, lastDelay) - else fallback.nextDelay(attempt - base.maxRetries, lastDelay) - - /** A schedule that combines two schedules, using [[base]] first [[base.maxRetries]] number of times, and then using [[fallback]] - * [[fallback.maxRetries]] number of times. - */ - case class FallingBack(base: Finite, fallback: Finite) extends WithFallback, Finite: - override def maxRetries: Int = base.maxRetries + fallback.maxRetries - - object FallingBack: - /** A schedule that retries indefinitely, using [[base]] first [[base.maxRetries]] number of times, and then always using [[fallback]]. - */ - def forever(base: Finite, fallback: Infinite): Infinite = FallingBackForever(base, fallback) - - case class FallingBackForever private[resilience] (base: Finite, fallback: Infinite) extends WithFallback, Infinite diff --git a/core/src/main/scala/ox/resilience/retry.scala b/core/src/main/scala/ox/resilience/retry.scala index 18aa3881..8e35d6f7 100644 --- a/core/src/main/scala/ox/resilience/retry.scala +++ b/core/src/main/scala/ox/resilience/retry.scala @@ -1,80 +1,63 @@ package ox.resilience -import ox.{EitherMode, ErrorMode, sleep} +import ox.{EitherMode, ErrorMode} +import ox.scheduling.* -import scala.annotation.tailrec -import scala.concurrent.duration.* import scala.util.Try -/** Retries an operation returning a direct result until it succeeds or the policy decides to stop. +/** Retries an operation returning a direct result until it succeeds or the config decides to stop. * - * @param policy - * The retry policy - see [[RetryPolicy]]. + * [[retry]] is a special case of [[scheduled]] with a given set of defaults. See [[RetryConfig]] for more details. + * + * @param config + * The retry config - see [[RetryConfig]]. * @param operation * The operation to retry. * @return * The result of the function if it eventually succeeds. * @throws anything - * The exception thrown by the last attempt if the policy decides to stop. + * The exception thrown by the last attempt if the config decides to stop. + * @see + * [[scheduled]] */ -def retry[T](policy: RetryPolicy[Throwable, T])(operation: => T): T = - retryEither(policy)(Try(operation).toEither).fold(throw _, identity) +def retry[T](config: RetryConfig[Throwable, T])(operation: => T): T = + retryEither(config)(Try(operation).toEither).fold(throw _, identity) -/** Retries an operation returning an [[scala.util.Either]] until it succeeds or the policy decides to stop. Note that any exceptions thrown +/** Retries an operation returning an [[scala.util.Either]] until it succeeds or the config decides to stop. Note that any exceptions thrown * by the operation aren't caught and don't cause a retry to happen. * - * @param policy - * The retry policy - see [[RetryPolicy]]. + * [[retryEither]] is a special case of [[scheduledEither]] with a given set of defaults. See [[RetryConfig]] for more details. + * + * @param config + * The retry config - see [[RetryConfig]]. * @param operation * The operation to retry. * @return * A [[scala.util.Right]] if the function eventually succeeds, or, otherwise, a [[scala.util.Left]] with the error from the last attempt. + * @see + * [[scheduledEither]] */ -def retryEither[E, T](policy: RetryPolicy[E, T])(operation: => Either[E, T]): Either[E, T] = - retryWithErrorMode(EitherMode[E])(policy)(operation) +def retryEither[E, T](config: RetryConfig[E, T])(operation: => Either[E, T]): Either[E, T] = + retryWithErrorMode(EitherMode[E])(config)(operation) -/** Retries an operation using the given error mode until it succeeds or the policy decides to stop. Note that any exceptions thrown by the +/** Retries an operation using the given error mode until it succeeds or the config decides to stop. Note that any exceptions thrown by the * operation aren't caught (unless the operation catches them as part of its implementation) and don't cause a retry to happen. * + * [[retryWithErrorMode]] is a special case of [[scheduledWithErrorMode]] with a given set of defaults. See [[RetryConfig]] for more + * details. + * * @param em * The error mode to use, which specifies when a result value is considered success, and when a failure. - * @param policy - * The retry policy - see [[RetryPolicy]]. + * @param config + * The retry config - see [[RetryConfig]]. * @param operation * The operation to retry. * @return * Either: * - the result of the function if it eventually succeeds, in the context of `F`, as dictated by the error mode. - * - the error `E` in context `F` as returned by the last attempt if the policy decides to stop. + * - the error `E` in context `F` as returned by the last attempt if the config decides to stop. + * @see + * [[scheduledWithErrorMode]] */ -def retryWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(policy: RetryPolicy[E, T])(operation: => F[T]): F[T] = - @tailrec - def loop(attempt: Int, remainingAttempts: Option[Int], lastDelay: Option[FiniteDuration]): F[T] = - def sleepIfNeeded = - val delay = policy.schedule.nextDelay(attempt, lastDelay) - if delay.toMillis > 0 then sleep(delay) - delay - - operation match - case v if em.isError(v) => - val error = em.getError(v) - policy.onRetry(attempt, Left(error)) - - if policy.resultPolicy.isWorthRetrying(error) && remainingAttempts.forall(_ > 0) then - val delay = sleepIfNeeded - loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay)) - else v - case v => - val result = em.getT(v) - policy.onRetry(attempt, Right(result)) - - if !policy.resultPolicy.isSuccess(result) && remainingAttempts.forall(_ > 0) then - val delay = sleepIfNeeded - loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay)) - else v - - val remainingAttempts = policy.schedule match - case finiteSchedule: Schedule.Finite => Some(finiteSchedule.maxRetries) - case _ => None - - loop(1, remainingAttempts, None) +def retryWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(config: RetryConfig[E, T])(operation: => F[T]): F[T] = + scheduledWithErrorMode(em)(config.toScheduledConfig)(operation) diff --git a/core/src/main/scala/ox/resilience/Jitter.scala b/core/src/main/scala/ox/scheduling/Jitter.scala similarity index 98% rename from core/src/main/scala/ox/resilience/Jitter.scala rename to core/src/main/scala/ox/scheduling/Jitter.scala index 60bbde44..6dafd127 100644 --- a/core/src/main/scala/ox/resilience/Jitter.scala +++ b/core/src/main/scala/ox/scheduling/Jitter.scala @@ -1,4 +1,4 @@ -package ox.resilience +package ox.scheduling /** A random factor used for calculating the delay between subsequent retries when a backoff strategy is used for calculating the delay. * diff --git a/core/src/main/scala/ox/scheduling/RepeatConfig.scala b/core/src/main/scala/ox/scheduling/RepeatConfig.scala new file mode 100644 index 00000000..79ea5e47 --- /dev/null +++ b/core/src/main/scala/ox/scheduling/RepeatConfig.scala @@ -0,0 +1,138 @@ +package ox.scheduling + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.DurationInt + +/** A config that defines how to repeat an operation. + * + * [[Schedule]] provides the interval between subsequent invocations, which guarantees that the next operation will start no sooner than + * the specified duration after the previous operations has finished. If the previous operation takes longer than the interval, the next + * operation will start immediately after the previous one has finished. + * + * It is a special case of [[ScheduledConfig]] with [[ScheduledConfig.sleepMode]] always set to [[SleepMode.Interval]] and + * [[ScheduledConfig.shouldContinueOnError]] always returning `false`. + * + * @param schedule + * The repeat schedule which determines the maximum number of invocations and the interval between subsequent invocations. See + * [[Schedule]] for more details. + * @param shouldContinueOnResult + * A function that determines whether to continue the loop after a success. The function receives the value that was emitted by the last + * invocation. Defaults to [[_ => true]]. + * @tparam E + * The error type of the operation. For operations returning a `T` or a `Try[T]`, this is fixed to `Throwable`. For operations returning + * an `Either[E, T]`, this can be any `E`. + * @tparam T + * The successful result type for the operation. + */ +case class RepeatConfig[E, T]( + schedule: Schedule, + shouldContinueOnResult: T => Boolean = (_: T) => true +) { + def toScheduledConfig: ScheduledConfig[E, T] = ScheduledConfig( + schedule, + shouldContinueOnResult = shouldContinueOnResult, + sleepMode = SleepMode.Interval + ) +} + +object RepeatConfig: + /** Creates a config that repeats up to a given number of times, with no interval between subsequent invocations and optional initial + * delay. + * + * @param maxInvocations + * The maximum number of invocations. + * @param initialDelay + * The initial delay before the first invocation. + */ + def immediate[E, T](maxInvocations: Int, initialDelay: Option[FiniteDuration] = None): RepeatConfig[E, T] = + initialDelay match + case Some(delay) => RepeatConfig(Schedule.InitialDelay(delay).andThen(Schedule.Immediate(maxInvocations))) + case None => RepeatConfig(Schedule.Immediate(maxInvocations)) + + /** Creates a config that repeats indefinitely, with no interval between subsequent invocations and optional initial delay. + * + * @param initialDelay + * The initial delay before the first invocation. + */ + def immediateForever[E, T](initialDelay: Option[FiniteDuration] = None): RepeatConfig[E, T] = + initialDelay match + case Some(delay) => RepeatConfig(Schedule.InitialDelay(delay).andThen(Schedule.Immediate.forever)) + case None => RepeatConfig(Schedule.Immediate.forever) + + /** Creates a config that repeats up to a given number of times, with a fixed interval between subsequent invocations and optional initial + * delay. + * + * @param maxInvocations + * The maximum number of invocations. + * @param interval + * The interval between subsequent attempts. + * @param initialDelay + * The initial delay before the first invocation. + */ + def fixedRate[E, T](maxInvocations: Int, interval: FiniteDuration, initialDelay: Option[FiniteDuration] = None): RepeatConfig[E, T] = + initialDelay match + case Some(delay) => RepeatConfig(Schedule.InitialDelay(delay).andThen(Schedule.Fixed(maxInvocations, interval))) + case None => RepeatConfig(Schedule.Fixed(maxInvocations, interval)) + + /** Creates a config that repeats indefinitely, with a fixed interval between subsequent invocations and optional initial delay. + * + * @param interval + * The interval between subsequent invocations. + */ + def fixedRateForever[E, T](interval: FiniteDuration, initialDelay: Option[FiniteDuration] = None): RepeatConfig[E, T] = + initialDelay match + case Some(delay) => RepeatConfig(Schedule.InitialDelay(delay).andThen(Schedule.Fixed.forever(interval))) + case None => RepeatConfig(Schedule.Fixed.forever(interval)) + + /** Creates a config that repeats up to a given number of times, with an increasing interval (backoff) between subsequent attempts and + * optional initial delay. + * + * The backoff is exponential with base 2 (i.e. the next interval is twice as long as the previous one), starting at the given first + * interval and capped at the given maximum interval. + * + * @param maxInvocations + * The maximum number of invocations. + * @param firstInterval + * The interval between the first and the second operation. + * @param maxInterval + * The maximum interval between subsequent invocations. Defaults to 1 minute. + * @param jitter + * A random factor used for calculating the interval between subsequent retries. See [[Jitter]] for more details. Defaults to no + * jitter, i.e. an exponential backoff with no adjustments. + */ + def backoff[E, T]( + maxInvocations: Int, + firstInterval: FiniteDuration, + maxInterval: FiniteDuration = 1.minute, + jitter: Jitter = Jitter.None, + initialDelay: Option[FiniteDuration] = None + ): RepeatConfig[E, T] = + initialDelay match + case Some(delay) => + RepeatConfig(Schedule.InitialDelay(delay).andThen(Schedule.Backoff(maxInvocations, firstInterval, maxInterval, jitter))) + case None => RepeatConfig(Schedule.Backoff(maxInvocations, firstInterval, maxInterval, jitter)) + + /** Creates a config that repeats indefinitely, with an increasing interval (backoff) between subsequent invocations and optional initial + * delay. + * + * The backoff is exponential with base 2 (i.e. the next interval is twice as long as the previous one), starting at the given first + * interval and capped at the given maximum interval. + * + * @param firstInterval + * The interval between the first and the second operation. + * @param maxInterval + * The maximum interval between subsequent invocations. Defaults to 1 minute. + * @param jitter + * A random factor used for calculating the interval between subsequent retries. See [[Jitter]] for more details. Defaults to no + * jitter, i.e. an exponential backoff with no adjustments. + */ + def backoffForever[E, T]( + firstInterval: FiniteDuration, + maxInterval: FiniteDuration = 1.minute, + jitter: Jitter = Jitter.None, + initialDelay: Option[FiniteDuration] = None + ): RepeatConfig[E, T] = + initialDelay match + case Some(delay) => + RepeatConfig(Schedule.InitialDelay(delay).andThen(Schedule.Backoff.forever(firstInterval, maxInterval, jitter))) + case None => RepeatConfig(Schedule.Backoff.forever(firstInterval, maxInterval, jitter)) diff --git a/core/src/main/scala/ox/scheduling/Schedule.scala b/core/src/main/scala/ox/scheduling/Schedule.scala new file mode 100644 index 00000000..db0fd319 --- /dev/null +++ b/core/src/main/scala/ox/scheduling/Schedule.scala @@ -0,0 +1,162 @@ +package ox.scheduling + +import scala.concurrent.duration.* +import scala.util.Random + +sealed trait Schedule: + def nextDuration(invocation: Int, lastDuration: Option[FiniteDuration]): FiniteDuration + def initialDelay: FiniteDuration = Duration.Zero + +object Schedule: + + private[scheduling] sealed trait Finite extends Schedule: + def maxRepeats: Int + def andThen(nextSchedule: Finite): Finite = FiniteAndFiniteSchedules(this, nextSchedule) + def andThen(nextSchedule: Infinite): Infinite = FiniteAndInfiniteSchedules(this, nextSchedule) + + private[scheduling] sealed trait Infinite extends Schedule + + /** A schedule that represents an initial delay applied before the first invocation of operation being scheduled. Usually used in + * combination with other schedules using [[andThen]] + * + * @param delay + * The initial delay. + * @example + * {{{ + * Schedule.InitialDelay(1.second).andThen(Schedule.Delay.forever(100.millis)) + * }}} + */ + case class InitialDelay(delay: FiniteDuration) extends Finite: + override def maxRepeats: Int = 0 + override def nextDuration(invocation: Int, lastDuration: Option[FiniteDuration]): FiniteDuration = + Duration.Zero + override def initialDelay: FiniteDuration = delay + + /** A schedule that represents an immediate invocation, up to a given number of times. + * + * @param maxRepeats + * The maximum number of invocations. + */ + case class Immediate(maxRepeats: Int) extends Finite: + override def nextDuration(invocation: Int, lastDuration: Option[FiniteDuration]): FiniteDuration = + Duration.Zero + + object Immediate: + /** A schedule that represents an immediate invocation without any invocations limit */ + def forever: Infinite = ImmediateForever + + private case object ImmediateForever extends Infinite: + override def nextDuration(invocation: Int, lastDuration: Option[FiniteDuration]): FiniteDuration = + Duration.Zero + + /** A schedule that represents a fixed duration between invocations, up to a given number of times. + * + * @param maxRepeats + * The maximum number of repeats. + * @param duration + * The duration between subsequent invocations. + */ + case class Fixed(maxRepeats: Int, duration: FiniteDuration) extends Finite: + override def nextDuration(invocation: Int, lastDuration: Option[FiniteDuration]): FiniteDuration = duration + + object Fixed: + /** A schedule that represents a fixed duration between invocations without any invocations limit. + * + * @param duration + * The duration between subsequent invocations. + */ + def forever(duration: FiniteDuration): Infinite = FixedForever(duration) + + private[scheduling] case class FixedForever(duration: FiniteDuration) extends Infinite: + override def nextDuration(invocation: Int, lastDuration: Option[FiniteDuration]): FiniteDuration = duration + + /** A schedule that represents an increasing duration between invocations (backoff), up to a given number of times. + * + * The backoff is exponential with base 2 (i.e. the next duration is twice as long as the previous one), starting at the given first + * duration and capped at the given maximum duration. + * + * @param maxRepeats + * The maximum number of repeats. + * @param firstDuration + * The duration between the first and the second invocations. + * @param maxDuration + * The maximum duration between subsequent invocations. + * @param jitter + * A random factor used for calculating the duration between subsequent repeats. See [[Jitter]] for more details. Defaults to no + * jitter, i.e. an exponential backoff with no adjustments. + */ + case class Backoff( + maxRepeats: Int, + firstDuration: FiniteDuration, + maxDuration: FiniteDuration = 1.minute, + jitter: Jitter = Jitter.None + ) extends Finite: + override def nextDuration(invocation: Int, lastDuration: Option[FiniteDuration]): FiniteDuration = + Backoff.nextDuration(invocation, firstDuration, maxDuration, jitter, lastDuration) + + object Backoff: + private[scheduling] def calculateDuration(invocation: Int, firstDuration: FiniteDuration, maxDuration: FiniteDuration): FiniteDuration = + // converting Duration <-> Long back and forth to avoid exceeding maximum duration + (firstDuration.toMillis * Math.pow(2, invocation)).toLong.min(maxDuration.toMillis).millis + + private[scheduling] def nextDuration( + invocation: Int, + firstDuration: FiniteDuration, + maxDuration: FiniteDuration, + jitter: Jitter, + lastDuration: Option[FiniteDuration] + ): FiniteDuration = + def exponentialDuration = Backoff.calculateDuration(invocation, firstDuration, maxDuration) + + jitter match + case Jitter.None => exponentialDuration + case Jitter.Full => Random.between(0, exponentialDuration.toMillis).millis + case Jitter.Equal => + val backoff = exponentialDuration.toMillis + (backoff / 2 + Random.between(0, backoff / 2)).millis + case Jitter.Decorrelated => + val last = lastDuration.getOrElse(firstDuration).toMillis + Random.between(firstDuration.toMillis, last * 3).millis + + /** A schedule that represents an increasing duration between invocations (backoff) without any invocations limit. + * + * The backoff is exponential with base 2 (i.e. the next duration is twice as long as the previous one), starting at the given first + * duration and capped at the given maximum duration. + * + * @param firstDuration + * The duration between the first and the second invocations. + * @param maxDuration + * The maximum duration between subsequent repeats. + * @param jitter + * A random factor used for calculating the duration between subsequent repeats. See [[Jitter]] for more details. Defaults to no + * jitter, i.e. an exponential backoff with no adjustments. + */ + def forever(firstDuration: FiniteDuration, maxDuration: FiniteDuration = 1.minute, jitter: Jitter = Jitter.None): Infinite = + BackoffForever(firstDuration, maxDuration, jitter) + + private[scheduling] case class BackoffForever( + firstDuration: FiniteDuration, + maxDuration: FiniteDuration = 1.minute, + jitter: Jitter = Jitter.None + ) extends Infinite: + override def nextDuration(invocation: Int, lastDuration: Option[FiniteDuration]): FiniteDuration = + Backoff.nextDuration(invocation, firstDuration, maxDuration, jitter, lastDuration) + + private[scheduling] sealed trait CombinedSchedules extends Schedule: + def first: Finite + def second: Schedule + override def nextDuration(invocation: Int, lastDuration: Option[FiniteDuration]): FiniteDuration = + if first.maxRepeats > invocation then first.nextDuration(invocation, lastDuration) + else second.nextDuration(invocation - first.maxRepeats, lastDuration) + + /** A schedule that combines two schedules, using [[first]] first [[first.maxRepeats]] number of times, and then using [[second]] + * [[second.maxRepeats]] number of times. + */ + private[scheduling] case class FiniteAndFiniteSchedules(first: Finite, second: Finite) extends CombinedSchedules, Finite: + override def maxRepeats: Int = first.maxRepeats + second.maxRepeats + override def initialDelay: FiniteDuration = first.initialDelay + + /** A schedule that combines two schedules, using [[first]] first [[first.maxRepeats]] number of times, and then always using [[second]]. + */ + private[scheduling] case class FiniteAndInfiniteSchedules(first: Finite, second: Infinite) extends CombinedSchedules, Infinite: + override def initialDelay: FiniteDuration = first.initialDelay diff --git a/core/src/main/scala/ox/scheduling/repeat.scala b/core/src/main/scala/ox/scheduling/repeat.scala new file mode 100644 index 00000000..958850b4 --- /dev/null +++ b/core/src/main/scala/ox/scheduling/repeat.scala @@ -0,0 +1,61 @@ +package ox.scheduling + +import ox.scheduling.* +import ox.{EitherMode, ErrorMode} + +import scala.util.Try + +/** Repeats an operation returning a direct result until it succeeds or the config decides to stop. + * + * [[repeat]] is a special case of [[scheduled]] with a given set of defaults. See [[RepeatConfig]] for more details. + * + * @param config + * The repeat config - see [[RepeatConfig]]. + * @param operation + * The operation to repeat. + * @return + * The result of the last invocation if the config decides to stop. + * @throws anything + * The exception thrown by the last invocation if the config decides to stop. + * @see + * [[scheduled]] + */ +def repeat[T](config: RepeatConfig[Throwable, T])(operation: => T): T = + repeatEither(config)(Try(operation).toEither).fold(throw _, identity) + +/** Repeats an operation returning an [[scala.util.Either]] until the config decides to stop. Note that any exceptions thrown by the + * operation aren't caught and effectively interrupt the repeat loop. + * + * [[repeatEither]] is a special case of [[scheduledEither]] with a given set of defaults. See [[RepeatConfig]] for more details. + * + * @param config + * The repeat config - see [[RepeatConfig]]. + * @param operation + * The operation to repeat. + * @return + * The result of the last invocation if the config decides to stop. + * @see + * [[scheduledEither]] + */ +def repeatEither[E, T](config: RepeatConfig[E, T])(operation: => Either[E, T]): Either[E, T] = + repeatWithErrorMode(EitherMode[E])(config)(operation) + +/** Repeats an operation using the given error mode until the config decides to stop. Note that any exceptions thrown by the operation + * aren't caught and effectively interrupt the repeat loop. + * + * [[repeatWithErrorMode]] is a special case of [[scheduledWithErrorMode]] with a given set of defaults. See [[RepeatConfig]] for more + * details. + * + * @param em + * The error mode to use, which specifies when a result value is considered success, and when a failure. + * @param config + * The repeat config - see [[RepeatConfig]]. + * @param operation + * The operation to repeat. + * @return + * The result of the last invocation if the config decides to stop. + * @see + * [[scheduledWithErrorMode]] + */ +def repeatWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(config: RepeatConfig[E, T])(operation: => F[T]): F[T] = + scheduledWithErrorMode[E, F, T](em)(config.toScheduledConfig)(operation) diff --git a/core/src/main/scala/ox/scheduling/scheduled.scala b/core/src/main/scala/ox/scheduling/scheduled.scala new file mode 100644 index 00000000..645c2164 --- /dev/null +++ b/core/src/main/scala/ox/scheduling/scheduled.scala @@ -0,0 +1,130 @@ +package ox.scheduling + +import ox.{EitherMode, ErrorMode, sleep} + +import scala.annotation.tailrec +import scala.concurrent.duration.{Duration, FiniteDuration, DurationLong} +import scala.util.Try + +/** The mode that specifies how to interpret the duration provided by the schedule. */ +enum SleepMode: + /** Interval (since the start of the last operation), i.e. guarantees that the next operation will start no sooner than the specified + * duration after the previous operation has started. If the previous operation takes longer than the interval, the next operation will + * be started immediately after the previous one has finished. + */ + case Interval + + /** Delay (since the end of the last operation), i.e. sleeps the duration provided by the schedule before the next operation starts. + */ + case Delay + +/** A config that defines how to schedule an operation. + * + * @param schedule + * The schedule which determines the maximum number of invocations and the duration between subsequent invocations. See [[Schedule]] for + * more details. + * @param onOperationResult + * A function that is invoked after each invocation. The callback receives the number of the current invocations number (starting from 1) + * and the result of the operation. The result is either a successful value or an error. + * @param shouldContinueOnError + * A function that determines whether to continue the loop after an error. The function receives the error that was emitted by the last + * invocation. Defaults to [[_ => false]]. + * @param shouldContinueOnResult + * A function that determines whether to continue the loop after a success. The function receives the value that was emitted by the last + * invocation. Defaults to [[_ => true]]. + * @param sleepMode + * The mode that specifies how to interpret the duration provided by the schedule. See [[SleepMode]] for more details. + * @tparam E + * The error type of the operation. For operations returning a `T` or a `Try[T]`, this is fixed to `Throwable`. For operations returning + * an `Either[E, T]`, this can be any `E`. + * @tparam T + * The successful result type for the operation. + */ +case class ScheduledConfig[E, T]( + schedule: Schedule, + onOperationResult: (Int, Either[E, T]) => Unit = (_: Int, _: Either[E, T]) => (), + shouldContinueOnError: E => Boolean = (_: E) => false, + shouldContinueOnResult: T => Boolean = (_: T) => true, + sleepMode: SleepMode = SleepMode.Interval +) + +/** Schedules an operation returning a direct result until it succeeds or the config decides to stop. + * + * @param config + * The repeat config - see [[ScheduledConfig]]. + * @param operation + * The operation to schedule. + * @return + * The result of the last invocation if the config decides to stop. + * @throws anything + * The exception thrown by the last invocation if the config decides to stop. + */ +def scheduled[T](config: ScheduledConfig[Throwable, T])(operation: => T): T = + scheduledEither(config)(Try(operation).toEither).fold(throw _, identity) + +/** Schedules an operation returning an [[scala.util.Either]] until the config decides to stop. Note that any exceptions thrown by the + * operation aren't caught and effectively interrupt the schedule loop. + * + * @param config + * The schedule config - see [[ScheduledConfig]]. + * @param operation + * The operation to schedule. + * @return + * The result of the last invocation if the config decides to stop. + */ +def scheduledEither[E, T](config: ScheduledConfig[E, T])(operation: => Either[E, T]): Either[E, T] = + scheduledWithErrorMode(EitherMode[E])(config)(operation) + +/** Schedules an operation using the given error mode until the config decides to stop. Note that any exceptions thrown by the operation + * aren't caught and effectively interrupt the schedule loop. + * + * @param em + * The error mode to use, which specifies when a result value is considered success, and when a failure. + * @param config + * The schedule config - see [[ScheduledConfig]]. + * @param operation + * The operation to schedule. + * @return + * The result of the last invocation if the config decides to stop. + */ +def scheduledWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(config: ScheduledConfig[E, T])(operation: => F[T]): F[T] = + @tailrec + def loop(invocation: Int, remainingInvocations: Option[Int], lastDuration: Option[FiniteDuration]): F[T] = + def sleepIfNeeded(startTimestamp: Long) = + val nextDuration = config.schedule.nextDuration(invocation, lastDuration) + val delay = config.sleepMode match + case SleepMode.Interval => + val elapsed = System.nanoTime() - startTimestamp + val remaining = nextDuration.toNanos - elapsed + remaining.nanos + case SleepMode.Delay => nextDuration + if delay.toMillis > 0 then sleep(delay) + delay + + val startTimestamp = System.nanoTime() + operation match + case v if em.isError(v) => + val error = em.getError(v) + config.onOperationResult(invocation, Left(error)) + + if config.shouldContinueOnError(error) && remainingInvocations.forall(_ > 0) then + val delay = sleepIfNeeded(startTimestamp) + loop(invocation + 1, remainingInvocations.map(_ - 1), Some(delay)) + else v + case v => + val result = em.getT(v) + config.onOperationResult(invocation, Right(result)) + + if config.shouldContinueOnResult(result) && remainingInvocations.forall(_ > 0) then + val delay = sleepIfNeeded(startTimestamp) + loop(invocation + 1, remainingInvocations.map(_ - 1), Some(delay)) + else v + + val remainingInvocations = config.schedule match + case finiteSchedule: Schedule.Finite => Some(finiteSchedule.maxRepeats) + case _ => None + + val initialDelay = config.schedule.initialDelay + if initialDelay.toMillis > 0 then sleep(initialDelay) + + loop(1, remainingInvocations, None) diff --git a/core/src/test/scala/ox/resilience/BackoffRetryTest.scala b/core/src/test/scala/ox/resilience/BackoffRetryTest.scala index 916ddc11..8a6ba005 100644 --- a/core/src/test/scala/ox/resilience/BackoffRetryTest.scala +++ b/core/src/test/scala/ox/resilience/BackoffRetryTest.scala @@ -5,6 +5,7 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.{EitherValues, TryValues} import ox.ElapsedTime import ox.resilience.* +import ox.scheduling.{Jitter, Schedule} import scala.concurrent.duration.* @@ -22,11 +23,11 @@ class BackoffRetryTest extends AnyFlatSpec with Matchers with EitherValues with if true then throw new RuntimeException("boom") // when - val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(RetryPolicy.backoff(maxRetries, initialDelay))(f)) + val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(RetryConfig.backoff(maxRetries, initialDelay))(f)) // then result should have message "boom" - elapsedTime.toMillis should be >= expectedTotalBackoffTimeMillis(maxRetries, initialDelay) + elapsedTime.toMillis should be >= (initialDelay.toMillis + 2 * initialDelay.toMillis + 4 * initialDelay.toMillis) counter shouldBe 4 } @@ -42,7 +43,7 @@ class BackoffRetryTest extends AnyFlatSpec with Matchers with EitherValues with if counter <= retriesUntilSuccess then throw new RuntimeException("boom") else successfulResult // when - val result = retry(RetryPolicy.backoffForever(initialDelay, maxDelay = 2.millis))(f) + val result = retry(RetryConfig.backoffForever(initialDelay, maxDelay = 2.millis))(f) // then result shouldBe successfulResult @@ -60,11 +61,11 @@ class BackoffRetryTest extends AnyFlatSpec with Matchers with EitherValues with if true then throw new RuntimeException("boom") // when - val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(RetryPolicy.backoff(maxRetries, initialDelay, maxDelay))(f)) + val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(RetryConfig.backoff(maxRetries, initialDelay, maxDelay))(f)) // then result should have message "boom" - elapsedTime.toMillis should be >= expectedTotalBackoffTimeMillis(maxRetries, initialDelay, maxDelay) + elapsedTime.toMillis should be >= (initialDelay.toMillis + maxDelay.toMillis + maxDelay.toMillis) elapsedTime.toMillis should be < initialDelay.toMillis + maxRetries * maxDelay.toMillis counter shouldBe 4 } @@ -81,11 +82,11 @@ class BackoffRetryTest extends AnyFlatSpec with Matchers with EitherValues with // when val (result, elapsedTime) = - measure(the[RuntimeException] thrownBy retry(RetryPolicy.backoff(maxRetries, initialDelay, maxDelay, Jitter.Equal))(f)) + measure(the[RuntimeException] thrownBy retry(RetryConfig.backoff(maxRetries, initialDelay, maxDelay, Jitter.Equal))(f)) // then result should have message "boom" - elapsedTime.toMillis should be >= expectedTotalBackoffTimeMillis(maxRetries, initialDelay, maxDelay) / 2 + elapsedTime.toMillis should be >= (initialDelay.toMillis + maxDelay.toMillis + maxDelay.toMillis) / 2 elapsedTime.toMillis should be < initialDelay.toMillis + maxRetries * maxDelay.toMillis counter shouldBe 4 } @@ -102,13 +103,10 @@ class BackoffRetryTest extends AnyFlatSpec with Matchers with EitherValues with Left(errorMessage) // when - val (result, elapsedTime) = measure(retryEither(RetryPolicy.backoff(maxRetries, initialDelay))(f)) + val (result, elapsedTime) = measure(retryEither(RetryConfig.backoff(maxRetries, initialDelay))(f)) // then result.left.value shouldBe errorMessage - elapsedTime.toMillis should be >= expectedTotalBackoffTimeMillis(maxRetries, initialDelay) + elapsedTime.toMillis should be >= (initialDelay.toMillis + 2 * initialDelay.toMillis + 4 * initialDelay.toMillis) counter shouldBe 4 } - - private def expectedTotalBackoffTimeMillis(maxRetries: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration = 1.day): Long = - (0 until maxRetries).map(Schedule.Backoff.delay(_, initialDelay, maxDelay).toMillis).sum diff --git a/core/src/test/scala/ox/resilience/DelayedRetryTest.scala b/core/src/test/scala/ox/resilience/DelayedRetryTest.scala index ffe417e5..c2e59278 100644 --- a/core/src/test/scala/ox/resilience/DelayedRetryTest.scala +++ b/core/src/test/scala/ox/resilience/DelayedRetryTest.scala @@ -22,7 +22,7 @@ class DelayedRetryTest extends AnyFlatSpec with Matchers with EitherValues with if true then throw new RuntimeException("boom") // when - val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(RetryPolicy.delay(maxRetries, sleep))(f)) + val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(RetryConfig.delay(maxRetries, sleep))(f)) // then result should have message "boom" @@ -42,7 +42,7 @@ class DelayedRetryTest extends AnyFlatSpec with Matchers with EitherValues with if counter <= retriesUntilSuccess then throw new RuntimeException("boom") else successfulResult // when - val result = retry(RetryPolicy.delayForever(sleep))(f) + val result = retry(RetryConfig.delayForever(sleep))(f) // then result shouldBe successfulResult @@ -61,7 +61,7 @@ class DelayedRetryTest extends AnyFlatSpec with Matchers with EitherValues with Left(errorMessage) // when - val (result, elapsedTime) = measure(retryEither(RetryPolicy.delay(maxRetries, sleep))(f)) + val (result, elapsedTime) = measure(retryEither(RetryConfig.delay(maxRetries, sleep))(f)) // then result.left.value shouldBe errorMessage diff --git a/core/src/test/scala/ox/resilience/ImmediateRetryTest.scala b/core/src/test/scala/ox/resilience/ImmediateRetryTest.scala index a736d6be..a424bfc1 100644 --- a/core/src/test/scala/ox/resilience/ImmediateRetryTest.scala +++ b/core/src/test/scala/ox/resilience/ImmediateRetryTest.scala @@ -4,6 +4,7 @@ import org.scalatest.{EitherValues, TryValues} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import ox.resilience.* +import ox.scheduling.Schedule class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues with Matchers: @@ -19,7 +20,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi successfulResult // when - val result = retry(RetryPolicy.immediate(3))(f) + val result = retry(RetryConfig.immediate(3))(f) // then result shouldBe successfulResult @@ -30,7 +31,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi // given var counter = 0 val errorMessage = "boom" - val policy = RetryPolicy[Throwable, Unit](Schedule.Immediate(3), ResultPolicy.retryWhen(_.getMessage != errorMessage)) + val policy = RetryConfig[Throwable, Unit](Schedule.Immediate(3), ResultPolicy.retryWhen(_.getMessage != errorMessage)) def f = counter += 1 @@ -45,7 +46,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi // given var counter = 0 val unsuccessfulResult = -1 - val policy = RetryPolicy[Throwable, Int](Schedule.Immediate(3), ResultPolicy.successfulWhen(_ > 0)) + val policy = RetryConfig[Throwable, Int](Schedule.Immediate(3), ResultPolicy.successfulWhen(_ > 0)) def f = counter += 1 @@ -69,7 +70,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi if true then throw new RuntimeException(errorMessage) // when/then - the[RuntimeException] thrownBy retry(RetryPolicy.immediate(3))(f) should have message errorMessage + the[RuntimeException] thrownBy retry(RetryConfig.immediate(3))(f) should have message errorMessage counter shouldBe 4 } @@ -84,7 +85,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi if counter <= retriesUntilSuccess then throw new RuntimeException("boom") else successfulResult // when - val result = retry(RetryPolicy.immediateForever)(f) + val result = retry(RetryConfig.immediateForever)(f) // then result shouldBe successfulResult @@ -101,7 +102,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi Right(successfulResult) // when - val result = retryEither(RetryPolicy.immediate(3))(f) + val result = retryEither(RetryConfig.immediate(3))(f) // then result.value shouldBe successfulResult @@ -112,7 +113,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi // given var counter = 0 val errorMessage = "boom" - val policy: RetryPolicy[String, Int] = RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != errorMessage)) + val policy: RetryConfig[String, Int] = RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != errorMessage)) def f: Either[String, Int] = counter += 1 @@ -130,7 +131,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi // given var counter = 0 val unsuccessfulResult = -1 - val policy: RetryPolicy[String, Int] = RetryPolicy(Schedule.Immediate(3), ResultPolicy.successfulWhen(_ > 0)) + val policy: RetryConfig[String, Int] = RetryConfig(Schedule.Immediate(3), ResultPolicy.successfulWhen(_ > 0)) def f: Either[String, Int] = counter += 1 @@ -154,7 +155,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi Left(errorMessage) // when - val result = retryEither(RetryPolicy.immediate(3))(f) + val result = retryEither(RetryConfig.immediate(3))(f) // then result.left.value shouldBe errorMessage diff --git a/core/src/test/scala/ox/resilience/OnRetryTest.scala b/core/src/test/scala/ox/resilience/OnRetryTest.scala index 1e5293c8..d7c69239 100644 --- a/core/src/test/scala/ox/resilience/OnRetryTest.scala +++ b/core/src/test/scala/ox/resilience/OnRetryTest.scala @@ -4,6 +4,7 @@ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import org.scalatest.{EitherValues, TryValues} import ox.resilience.* +import ox.scheduling.Schedule class OnRetryTest extends AnyFlatSpec with Matchers with EitherValues with TryValues: behavior of "RetryPolicy onRetry callback" @@ -25,7 +26,7 @@ class OnRetryTest extends AnyFlatSpec with Matchers with EitherValues with TryVa returnedResult = result // when - val result = retry(RetryPolicy(Schedule.Immediate(3), onRetry = onRetry))(f) + val result = retry(RetryConfig(Schedule.Immediate(3), onRetry = onRetry))(f) // then result shouldBe successfulResult @@ -52,7 +53,7 @@ class OnRetryTest extends AnyFlatSpec with Matchers with EitherValues with TryVa returnedResult = result // when - val result = the[RuntimeException] thrownBy retry(RetryPolicy(Schedule.Immediate(3), onRetry = onRetry))(f) + val result = the[RuntimeException] thrownBy retry(RetryConfig(Schedule.Immediate(3), onRetry = onRetry))(f) // then result shouldBe failedResult diff --git a/core/src/test/scala/ox/resilience/ScheduleFallingBackRetryTest.scala b/core/src/test/scala/ox/resilience/ScheduleFallingBackRetryTest.scala index fa9f335d..6ff8ee13 100644 --- a/core/src/test/scala/ox/resilience/ScheduleFallingBackRetryTest.scala +++ b/core/src/test/scala/ox/resilience/ScheduleFallingBackRetryTest.scala @@ -3,6 +3,7 @@ package ox.resilience import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import ox.ElapsedTime +import ox.scheduling.Schedule import scala.concurrent.duration.* @@ -20,10 +21,10 @@ class ScheduleFallingBackRetryTest extends AnyFlatSpec with Matchers with Elapse counter += 1 throw new RuntimeException("boom") - val schedule = Schedule.Immediate(immediateRetries).fallbackTo(Schedule.Delay(delayedRetries, sleep)) + val schedule = Schedule.Immediate(immediateRetries).andThen(Schedule.Fixed(delayedRetries, sleep)) // when - val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(RetryPolicy(schedule))(f)) + val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(RetryConfig(schedule))(f)) // then result should have message "boom" @@ -41,10 +42,10 @@ class ScheduleFallingBackRetryTest extends AnyFlatSpec with Matchers with Elapse counter += 1 if counter <= retriesUntilSuccess then throw new RuntimeException("boom") else successfulResult - val schedule = Schedule.Immediate(100).fallbackTo(Schedule.Delay.forever(2.millis)) + val schedule = Schedule.Immediate(100).andThen(Schedule.Fixed.forever(2.millis)) // when - val result = retry(RetryPolicy(schedule))(f) + val result = retry(RetryConfig(schedule))(f) // then result shouldBe successfulResult diff --git a/core/src/test/scala/ox/scheduling/FixedRateRepeatTest.scala b/core/src/test/scala/ox/scheduling/FixedRateRepeatTest.scala new file mode 100644 index 00000000..b530d5d6 --- /dev/null +++ b/core/src/test/scala/ox/scheduling/FixedRateRepeatTest.scala @@ -0,0 +1,95 @@ +package ox.scheduling + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.{EitherValues, TryValues} +import ox.{ElapsedTime, sleep} + +import scala.concurrent.duration.* + +class FixedRateRepeatTest extends AnyFlatSpec with Matchers with EitherValues with TryValues with ElapsedTime: + + behavior of "repeat" + + it should "repeat a function at fixed rate" in { + // given + val repeats = 3 + val funcSleepTime = 30.millis + val interval = 100.millis + var counter = 0 + def f = + counter += 1 + sleep(funcSleepTime) + counter + + // when + val (result, elapsedTime) = measure(repeat(RepeatConfig.fixedRate(repeats, interval))(f)) + + // then + elapsedTime.toMillis should be >= 3 * interval.toMillis + funcSleepTime.toMillis - 5 // tolerance + elapsedTime.toMillis should be < 4 * interval.toMillis + result shouldBe 4 + counter shouldBe 4 + } + + it should "repeat a function at fixed rate with initial delay" in { + // given + val repeats = 3 + val initialDelay = 50.millis + val interval = 100.millis + var counter = 0 + + def f = + counter += 1 + counter + + // when + val (result, elapsedTime) = measure(repeat(RepeatConfig.fixedRate(repeats, interval, Some(initialDelay)))(f)) + + // then + elapsedTime.toMillis should be >= 3 * interval.toMillis + initialDelay.toMillis - 5 // tolerance + elapsedTime.toMillis should be < 4 * interval.toMillis + result shouldBe 4 + counter shouldBe 4 + } + + it should "repeat a function forever at fixed rate" in { + // given + val interval = 100.millis + var counter = 0 + + def f = + counter += 1 + if counter == 4 then throw new RuntimeException("boom") + counter + + // when + val (ex, elapsedTime) = measure(the[RuntimeException] thrownBy repeat(RepeatConfig.fixedRateForever(interval))(f)) + + // then + elapsedTime.toMillis should be >= 3 * interval.toMillis - 5 // tolerance + elapsedTime.toMillis should be < 4 * interval.toMillis + ex.getMessage shouldBe "boom" + counter shouldBe 4 + } + + it should "repeat a function forever at fixed rate with initial delay" in { + // given + val initialDelay = 50.millis + val interval = 100.millis + var counter = 0 + + def f = + counter += 1 + if counter == 4 then throw new RuntimeException("boom") + counter + + // when + val (ex, elapsedTime) = measure(the[RuntimeException] thrownBy repeat(RepeatConfig.fixedRateForever(interval, Some(initialDelay)))(f)) + + // then + elapsedTime.toMillis should be >= 3 * interval.toMillis + initialDelay.toMillis - 5 // tolerance + elapsedTime.toMillis should be < 4 * interval.toMillis + ex.getMessage shouldBe "boom" + counter shouldBe 4 + } diff --git a/core/src/test/scala/ox/scheduling/ImmediateRepeatTest.scala b/core/src/test/scala/ox/scheduling/ImmediateRepeatTest.scala new file mode 100644 index 00000000..f922bd25 --- /dev/null +++ b/core/src/test/scala/ox/scheduling/ImmediateRepeatTest.scala @@ -0,0 +1,87 @@ +package ox.scheduling + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.{EitherValues, TryValues} +import ox.{ElapsedTime, sleep} + +import scala.concurrent.duration.* + +class ImmediateRepeatTest extends AnyFlatSpec with Matchers with EitherValues with TryValues with ElapsedTime: + + behavior of "repeat" + + it should "repeat a function immediately" in { + // given + val repeats = 3 + var counter = 0 + def f = + counter += 1 + counter + + // when + val (result, elapsedTime) = measure(repeat(RepeatConfig.immediate(repeats))(f)) + + // then + elapsedTime.toMillis should be < 20L + result shouldBe 4 + counter shouldBe 4 + } + + it should "repeat a function immediately with initial delay" in { + // given + val repeats = 3 + val initialDelay = 50.millis + var counter = 0 + + def f = + counter += 1 + counter + + // when + val (result, elapsedTime) = measure(repeat(RepeatConfig.immediate(repeats, Some(initialDelay)))(f)) + + // then + elapsedTime.toMillis should be >= initialDelay.toMillis + elapsedTime.toMillis should be < initialDelay.toMillis + 20 + result shouldBe 4 + counter shouldBe 4 + } + + it should "repeat a function immediately forever" in { + // given + var counter = 0 + + def f = + counter += 1 + if counter == 4 then throw new RuntimeException("boom") + counter + + // when + val (ex, elapsedTime) = measure(the[RuntimeException] thrownBy repeat(RepeatConfig.immediateForever())(f)) + + // then + elapsedTime.toMillis should be < 20L + ex.getMessage shouldBe "boom" + counter shouldBe 4 + } + + it should "repeat a function immediately forever with initial delay" in { + // given + val initialDelay = 50.millis + var counter = 0 + + def f = + counter += 1 + if counter == 4 then throw new RuntimeException("boom") + counter + + // when + val (ex, elapsedTime) = measure(the[RuntimeException] thrownBy repeat(RepeatConfig.immediateForever(Some(initialDelay)))(f)) + + // then + elapsedTime.toMillis should be >= initialDelay.toMillis + elapsedTime.toMillis should be < initialDelay.toMillis + 20 + ex.getMessage shouldBe "boom" + counter shouldBe 4 + } diff --git a/core/src/test/scala/ox/resilience/JitterTest.scala b/core/src/test/scala/ox/scheduling/JitterTest.scala similarity index 65% rename from core/src/test/scala/ox/resilience/JitterTest.scala rename to core/src/test/scala/ox/scheduling/JitterTest.scala index 35c65ca7..e1fe788b 100644 --- a/core/src/test/scala/ox/resilience/JitterTest.scala +++ b/core/src/test/scala/ox/scheduling/JitterTest.scala @@ -1,8 +1,9 @@ -package ox.resilience +package ox.scheduling import org.scalatest.Inspectors import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import ox.scheduling.{Jitter, Schedule} import scala.concurrent.duration.* @@ -10,14 +11,14 @@ class JitterTest extends AnyFlatSpec with Matchers { behavior of "Jitter" - private val baseSchedule = Schedule.Backoff(maxRetries = 3, initialDelay = 100.millis) + private val baseSchedule = Schedule.Backoff(maxRepeats = 3, firstDuration = 100.millis) it should "use no jitter" in { // given val schedule = baseSchedule // when - val delays = (1 to 5).map(schedule.nextDelay(_, None)) + val delays = (1 to 5).map(schedule.nextDuration(_, None)) // then delays should contain theSameElementsInOrderAs Seq(200, 400, 800, 1600, 3200).map(_.millis) @@ -28,11 +29,11 @@ class JitterTest extends AnyFlatSpec with Matchers { val schedule = baseSchedule.copy(jitter = Jitter.Full) // when - val delays = (1 to 5).map(schedule.nextDelay(_, None)) + val delays = (1 to 5).map(schedule.nextDuration(_, None)) // then Inspectors.forEvery(delays.zipWithIndex) { case (delay, i) => - val backoffDelay = Schedule.Backoff.delay(i + 1, schedule.initialDelay, schedule.maxDelay) + val backoffDelay = Schedule.Backoff.calculateDuration(i + 1, schedule.firstDuration, schedule.maxDuration) delay should (be >= 0.millis and be <= backoffDelay) } } @@ -42,11 +43,11 @@ class JitterTest extends AnyFlatSpec with Matchers { val schedule = baseSchedule.copy(jitter = Jitter.Equal) // when - val delays = (1 to 5).map(schedule.nextDelay(_, None)) + val delays = (1 to 5).map(schedule.nextDuration(_, None)) // then Inspectors.forEvery(delays.zipWithIndex) { case (delay, i) => - val backoffDelay = Schedule.Backoff.delay(i + 1, schedule.initialDelay, schedule.maxDelay) + val backoffDelay = Schedule.Backoff.calculateDuration(i + 1, schedule.firstDuration, schedule.maxDuration) delay should (be >= backoffDelay / 2 and be <= backoffDelay) } } @@ -56,12 +57,12 @@ class JitterTest extends AnyFlatSpec with Matchers { val schedule = baseSchedule.copy(jitter = Jitter.Decorrelated) // when - val delays = (1 to 5).map(schedule.nextDelay(_, None)) + val delays = (1 to 5).map(schedule.nextDuration(_, None)) // then Inspectors.forEvery(delays.sliding(2).toList) { case Seq(previousDelay, delay) => - delay should (be >= schedule.initialDelay and be <= previousDelay * 3) + delay should (be >= schedule.firstDuration and be <= previousDelay * 3) case _ => fail("should never happen") // so that the match is exhaustive } } diff --git a/doc/adr/0008-scheduled-repeat-retry.md b/doc/adr/0008-scheduled-repeat-retry.md new file mode 100644 index 00000000..f3faca16 --- /dev/null +++ b/doc/adr/0008-scheduled-repeat-retry.md @@ -0,0 +1,18 @@ +# 8. Retries + +Date: 2024-07-09 + +## Context + +How should the [retries](../retries.md) and [repeat](../repeat.md) APIs have the common implementation. + +## Decision + +We're introducing [scheduled](../scheduled.md) as a common API for both retries and repeats. + +In addition, `Schedule` trait and its implementations are decoupled from the retry DSL, so that they can be used for repeating as well. +`retry` API remains unchanged, but it now uses `scheduled` underneath. + +Also, `repeat` functions has been added as a sugar for `scheduled` with DSL focused on repeating. + +The main difference between `retry` and `repeat` is about interpretation of the duration provided by the `Schedule` (delay vs interval). diff --git a/doc/basics/quick-example.md b/doc/basics/quick-example.md index 28514ec6..0b31c033 100644 --- a/doc/basics/quick-example.md +++ b/doc/basics/quick-example.md @@ -8,6 +8,7 @@ import ox.* import ox.either.ok import ox.channels.* import ox.resilience.* +import ox.scheduling.* import scala.concurrent.duration.* // run two computations in parallel @@ -35,7 +36,7 @@ supervised { // retry a computation def computationR: Int = ??? -retry(RetryPolicy.backoff(3, 100.millis, 5.minutes, Jitter.Equal))(computationR) +retry(RetryConfig.backoff(3, 100.millis, 5.minutes, Jitter.Equal))(computationR) // create channels & transform them using high-level operations supervised { diff --git a/doc/index.md b/doc/index.md index 84943737..5a1a3c78 100644 --- a/doc/index.md +++ b/doc/index.md @@ -49,6 +49,8 @@ In addition to this documentation, ScalaDocs can be browsed at [https://javadoc. io retries + repeat + scheduled resources control-flow utility diff --git a/doc/repeat.md b/doc/repeat.md new file mode 100644 index 00000000..78c6f4e4 --- /dev/null +++ b/doc/repeat.md @@ -0,0 +1,80 @@ +# Repeat + +The `repeat` functions allow to repeat an operation according to a given schedule (e.g. repeat 3 times with a 100ms +interval and 50ms of initial delay). + +## API + +The basic syntax for `repeat` is: + +```scala +import ox.scheduling.repeat + +repeat(config)(operation) +``` + +The `repeat` API uses `scheduled` underneath with DSL focused on repeats. See [scheduled](scheduled.md) for more details. + +## Operation definition + +Similarly to the `retry` API, the `operation` can be defined: +* directly using a by-name parameter, i.e. `f: => T` +* using a by-name `Either[E, T]` +* or using an arbitrary [error mode](basics/error-handling.md), accepting the computation in an `F` context: `f: => F[T]`. + +## Configuration + +The `repeat` config requires a `Schedule`, which indicates how many times and with what interval should the `operation` +be repeated. + +In addition, it is possible to define a custom `shouldContinueOnSuccess` strategy for deciding if the operation +should continue to be repeated after a successful result returned by the previous operation (defaults to `_: T => true`). + +If an operation returns an error, the repeat loop will always be stopped. If an error handling within the operation +is needed, you can use a `retry` inside it (see an example below) or use `scheduled` instead of `repeat`, which allows +full customization. + +### API shorthands + +You can use one of the following shorthands to define a `RepeatConfig` with a given schedule with an optional initial delay: +- `RepeatConfig.immediate(maxInvocations: Int, initialDelay: Option[FiniteDuration] = None)` +- `RepeatConfig.immediateForever[E, T](initialDelay: Option[FiniteDuration] = None)` +- `RepeatConfig.fixedRate[E, T](maxInvocations: Int, interval: FiniteDuration, initialDelay: Option[FiniteDuration] = None)` +- `RepeatConfig.fixedRateForever[E, T](interval: FiniteDuration, initialDelay: Option[FiniteDuration] = None)` +- `RepeatConfig.backoff[E, T](maxInvocations: Int, firstInterval: FiniteDuration, maxInterval: FiniteDuration = 1.minute, jitter: Jitter = Jitter.None, initialDelay: Option[FiniteDuration] = None)` +- `RepeatConfig.backoffForever[E, T](firstInterval: FiniteDuration, maxInterval: FiniteDuration = 1.minute, jitter: Jitter = Jitter.None, initialDelay: Option[FiniteDuration] = None)` + +See [scheduled](scheduled.md) for details on how to create custom schedules. + +## Examples + +```scala mdoc:compile-only +import ox.UnionMode +import ox.scheduling.{Jitter, Schedule, repeat, repeatEither, repeatWithErrorMode, RepeatConfig} +import ox.resilience.{retry, RetryConfig} +import scala.concurrent.duration.* + +def directOperation: Int = ??? +def eitherOperation: Either[String, Int] = ??? +def unionOperation: String | Int = ??? + +// various operation definitions - same syntax +repeat(RepeatConfig.immediate(3))(directOperation) +repeatEither(RepeatConfig.immediate(3))(eitherOperation) + +// various schedules +repeat(RepeatConfig.fixedRate(3, 100.millis))(directOperation) +repeat(RepeatConfig.fixedRate(3, 100.millis, Some(50.millis)))(directOperation) + +// infinite repeats with a custom strategy +def customStopStrategy: Int => Boolean = ??? +repeat(RepeatConfig.fixedRateForever(100.millis).copy(shouldContinueOnResult = customStopStrategy))(directOperation) + +// custom error mode +repeatWithErrorMode(UnionMode[String])(RepeatConfig.fixedRate(3, 100.millis))(unionOperation) + +// repeat with retry inside +repeat(RepeatConfig.fixedRate(3, 100.millis)) { + retry(RetryConfig.backoff(3, 100.millis))(directOperation) +} +``` diff --git a/doc/retries.md b/doc/retries.md index 0bb4d696..ded83bc4 100644 --- a/doc/retries.md +++ b/doc/retries.md @@ -1,6 +1,6 @@ # Retries -The retries mechanism allows to retry a failing operation according to a given policy (e.g. retry 3 times with a 100ms +The retries mechanism allows to retry a failing operation according to a given configuration (e.g. retry 3 times with a 100ms delay between attempts). ## API @@ -10,9 +10,11 @@ The basic syntax for retries is: ```scala import ox.resilience.retry -retry(policy)(operation) +retry(config)(operation) ``` +The `retry` API uses `scheduled` underneath with DSL focused on retries. See [scheduled](scheduled.md) for more details. + ## Operation definition The `operation` can be provided directly using a by-name parameter, i.e. `f: => T`. @@ -20,9 +22,9 @@ The `operation` can be provided directly using a by-name parameter, i.e. `f: => There's also a `retryEither` variant which accepts a by-name `Either[E, T]`, i.e. `f: => Either[E, T]`, as well as one which accepts arbitrary [error modes](basics/error-handling.md), accepting the computation in an `F` context: `f: => F[T]`. -## Policies +## Configuration -A retry policy consists of three parts: +A retry config consists of three parts: - a `Schedule`, which indicates how many times and with what delay should we retry the `operation` after an initial failure, @@ -33,48 +35,6 @@ A retry policy consists of three parts: - a `onRetry`, which is a callback function that is invoked after each attempt to execute the operation. It is used to perform any necessary actions or checks after each attempt, regardless of whether the attempt was successful or not. -The available schedules are defined in the `Schedule` object. Each schedule has a finite and an infinite variant. - -### Finite schedules - -Finite schedules have a common `maxRetries: Int` parameter, which determines how many times the `operation` would be -retried after an initial failure. This means that the operation could be executed at most `maxRetries + 1` times. - -### Infinite schedules - -Each finite schedule has an infinite variant, whose settings are similar to those of the respective finite schedule, but -without the `maxRetries` setting. Using the infinite variant can lead to a possibly infinite number of retries (unless -the `operation` starts to succeed again at some point). The infinite schedules are created by calling `.forever` on the -companion object of the respective finite schedule (see examples below). - -### Schedule types - -The supported schedules (specifically - their finite variants) are: - -- `Immediate(maxRetries: Int)` - retries up to `maxRetries` times without any delay between subsequent attempts. -- `Delay(maxRetries: Int, delay: FiniteDuration)` - retries up to `maxRetries` times , sleeping for `delay` between - subsequent attempts. -- `Backoff(maxRetries: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)` - retries up - to `maxRetries` times , sleeping for `initialDelay` before the first retry, increasing the sleep between subsequent - attempts exponentially (with base `2`) up to an optional `maxDelay` (default: 1 minute). - - Optionally, a random factor (jitter) can be used when calculating the delay before the next attempt. The purpose of - jitter is to avoid clustering of subsequent retries, i.e. to reduce the number of clients calling a service exactly at - the same time, which can result in subsequent failures, contrary to what you would expect from retrying. By - introducing randomness to the delays, the retries become more evenly distributed over time. - - See - the [AWS Architecture Blog article on backoff and jitter](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/) - for a more in-depth explanation. - - The following jitter strategies are available (defined in the `Jitter` enum): - - `None` - the default one, when no randomness is added, i.e. a pure exponential backoff is used, - - `Full` - picks a random value between `0` and the exponential backoff calculated for the current attempt, - - `Equal` - similar to `Full`, but prevents very short delays by always using a half of the original backoff and - adding a random value between `0` and the other half, - - `Decorrelated` - uses the delay from the previous attempt (`lastDelay`) and picks a random value between - the `initalAttempt` and `3 * lastDelay`. - ### Result policies A result policy allows to customize how the results of the `operation` are treated. It consists of two predicates: @@ -107,16 +67,17 @@ Where: ### API shorthands -When you don't need to customize the result policy (i.e. use the default one), you can use one of the following -shorthands to define a retry policy with a given schedule (note that the parameters are the same as when manually -creating the respective `Schedule`): +When you don't need to customize the result policy (i.e. use the default one) or use complex schedules, +you can use one of the following shorthands to define a retry config with a given schedule: + +- `RetryConfig.immediate(maxRetries: Int)`, +- `RetryConfig.immediateForever`, +- `RetryConfig.delay(maxRetries: Int, delay: FiniteDuration)`, +- `RetryConfig.delayForever(delay: FiniteDuration)`, +- `RetryConfig.backoff(maxRetries: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)`, +- `RetryConfig.backoffForever(initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)`. -- `RetryPolicy.immediate(maxRetries: Int)`, -- `RetryPolicy.immediateForever`, -- `RetryPolicy.delay(maxRetries: Int, delay: FiniteDuration)`, -- `RetryPolicy.delayForever(delay: FiniteDuration)`, -- `RetryPolicy.backoff(maxRetries: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)`, -- `RetryPolicy.backoffForever(initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)`. +See [scheduled](scheduled.md) for details on how to create custom schedules. If you want to customize a part of the result policy, you can use the following shorthands: @@ -131,8 +92,8 @@ If you want to customize a part of the result policy, you can use the following ```scala mdoc:compile-only import ox.UnionMode -import ox.resilience.{retry, retryEither, retryWithErrorMode} -import ox.resilience.{Jitter, ResultPolicy, RetryPolicy, Schedule} +import ox.resilience.{retry, retryEither, retryWithErrorMode, ResultPolicy, RetryConfig} +import ox.scheduling.{Jitter, Schedule} import scala.concurrent.duration.* def directOperation: Int = ??? @@ -140,27 +101,27 @@ def eitherOperation: Either[String, Int] = ??? def unionOperation: String | Int = ??? // various operation definitions - same syntax -retry(RetryPolicy.immediate(3))(directOperation) -retryEither(RetryPolicy.immediate(3))(eitherOperation) +retry(RetryConfig.immediate(3))(directOperation) +retryEither(RetryConfig.immediate(3))(eitherOperation) -// various policies with custom schedules and default ResultPolicy -retry(RetryPolicy.delay(3, 100.millis))(directOperation) -retry(RetryPolicy.backoff(3, 100.millis))(directOperation) // defaults: maxDelay = 1.minute, jitter = Jitter.None -retry(RetryPolicy.backoff(3, 100.millis, 5.minutes, Jitter.Equal))(directOperation) +// various configs with custom schedules and default ResultPolicy +retry(RetryConfig.delay(3, 100.millis))(directOperation) +retry(RetryConfig.backoff(3, 100.millis))(directOperation) // defaults: maxDelay = 1.minute, jitter = Jitter.None +retry(RetryConfig.backoff(3, 100.millis, 5.minutes, Jitter.Equal))(directOperation) // infinite retries with a default ResultPolicy -retry(RetryPolicy.delayForever(100.millis))(directOperation) -retry(RetryPolicy.backoffForever(100.millis, 5.minutes, Jitter.Full))(directOperation) +retry(RetryConfig.delayForever(100.millis))(directOperation) +retry(RetryConfig.backoffForever(100.millis, 5.minutes, Jitter.Full))(directOperation) // result policies // custom success -retry[Int](RetryPolicy(Schedule.Immediate(3), ResultPolicy.successfulWhen(_ > 0)))(directOperation) +retry[Int](RetryConfig(Schedule.Immediate(3), ResultPolicy.successfulWhen(_ > 0)))(directOperation) // fail fast on certain errors -retry(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_.getMessage != "fatal error")))(directOperation) -retryEither(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))(eitherOperation) +retry(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_.getMessage != "fatal error")))(directOperation) +retryEither(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))(eitherOperation) // custom error mode -retryWithErrorMode(UnionMode[String])(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))(unionOperation) +retryWithErrorMode(UnionMode[String])(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))(unionOperation) ``` See the tests in `ox.resilience.*` for more. diff --git a/doc/scheduled.md b/doc/scheduled.md new file mode 100644 index 00000000..5eca66ce --- /dev/null +++ b/doc/scheduled.md @@ -0,0 +1,94 @@ +# Scheduled + +The `scheduled` functions allow to run an operation according to a given schedule. +It is preferred to use `repeat`, `retry`, or combination of both functions for most use cases, as they provide a more convenient DSL. +In fact `retry` and `repeat` use `scheduled` internally. + +## Operation definition + +Similarly to the `retry` and `repeat` APIs, the `operation` can be defined: +* directly using a by-name parameter, i.e. `f: => T` +* using a by-name `Either[E, T]` +* or using an arbitrary [error mode](basics/error-handling.md), accepting the computation in an `F` context: `f: => F[T]`. + +## Configuration + +The `scheduled` config consists of: +- a `Schedule`, which indicates how many times the `operation` should be run, provides a duration based on which + a sleep is calculated and provides an initial delay if configured. +- a `SleepMode`, which determines how the sleep between subsequent operations should be calculated: + - `Interval` - default for `repeat` operations, where the sleep is calculated as the duration provided by schedule + minus the duration of the last operation (can be negative, in which case the next operation occurs immediately). + - `Delay` - default for `retry` operations, where the sleep is just the duration provided by schedule. +- `onOperationResult` - a callback function that is invoked after each operation. Used primarily for `onRetry` in `retry` API. + +In addition, it is possible to define strategies for handling the results and errors returned by the `operation`: +- `shouldContinueOnError` - defaults to `_: E => false`, which allows to decide if the scheduler loop should continue + after an error returned by the previous operation. +- `shouldContinueOnSuccess` - defaults to `_: T => true`, which allows to decide if the scheduler loop should continue + after a successful result returned by the previous operation. + +## Schedule + +### Finite schedules + +Finite schedules have a common `maxRepeats: Int` parameter, which determines how many times the `operation` can be +repeated. This means that the operation could be executed at most `maxRepeats + 1` times. + +### Infinite schedules + +Each finite schedule has an infinite variant, whose settings are similar to those of the respective finite schedule, but +without the `maxRepeats` setting. Using the infinite variant can lead to a possibly infinite number of retries (unless +the `operation` starts to succeed again at some point). The infinite schedules are created by calling `.forever` on the +companion object of the respective finite schedule (see examples below). + +### Schedule types + +The supported schedules (specifically - their finite variants) are: + +- `InitialDelay(delay: FiniteDuration)` - used to configure the initial delay (the delay before the first invocation of + the operation, used in `repeat` API only). +- `Immediate(maxRepeats: Int)` - repeats up to `maxRepeats` times, always returning duration equal to 0. +- `Fixed(maxRepeats: Int, duration: FiniteDuration)` - repeats up to `maxRepeats` times, always returning + the provided `duration`. +- `Backoff(maxRepeats: Int, firstDuration: FiniteDuration, maxDuration: FiniteDuration, jitter: Jitter)` - repeats up + to `maxRepeats` times, returning `firstDuration` after the first invocation, increasing the duration between subsequent + invocations exponentially (with base `2`) up to an optional `maxDuration` (default: 1 minute). + + Optionally, a random factor (jitter) can be used when calculating the delay before the next invocation. The purpose of + jitter is to avoid clustering of subsequent invocations, i.e. to reduce the number of clients calling a service exactly at + the same time, which can result in subsequent failures, contrary to what you would expect from retrying. By + introducing randomness to the delays, the retries become more evenly distributed over time. + + See + the [AWS Architecture Blog article on backoff and jitter](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/) + for a more in-depth explanation. + + The following jitter strategies are available (defined in the `Jitter` enum): + - `None` - the default one, when no randomness is added, i.e. a pure exponential backoff is used, + - `Full` - picks a random value between `0` and the exponential backoff calculated for the current attempt, + - `Equal` - similar to `Full`, but prevents very short delays by always using a half of the original backoff and + adding a random value between `0` and the other half, + - `Decorrelated` - uses the delay from the previous attempt (`lastDelay`) and picks a random value between + the `initalAttempt` and `3 * lastDelay`. + +## Combining schedules + +It is possible to combine schedules using `.andThen` method. The left side schedule must be a `Finite` +or `InitialDelay` schedule and the right side can be any schedule. + +### Examples + +```scala mdoc:compile-only +import ox.scheduling.Schedule +import scala.concurrent.duration.* + +// schedule 3 times immediately and then 3 times with fixed duration +Schedule.Immediate(3).andThen(Schedule.Fixed(3, 100.millis)) + +// schedule 3 times immediately and then forever with fixed duration +Schedule.Immediate(3).andThen(Schedule.Fixed.forever(100.millis)) + +// schedule with an initial delay and then forever with fixed duration +Schedule.InitialDelay(100.millis).andThen(Schedule.Fixed.forever(100.millis)) +```