Skip to content

Commit

Permalink
implement retry (#3566)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamgfraser committed May 10, 2020
1 parent 811dc1b commit 2aa4ba2
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 72 deletions.
9 changes: 9 additions & 0 deletions core-tests/shared/src/test/scala/zio/ZLayerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,15 @@ object ZLayerSpec extends ZIOBaseSpec {
_ <- layer.build.use(ref => ref.update(_ :+ "test"))
result <- testRef.get
} yield assert(result)(equalTo(Vector("test")))
},
testM("retry") {
for {
ref <- Ref.make(0)
effect = ref.update(_ + 1) *> ZIO.fail("fail")
layer = ZLayer.fromEffectMany(effect).retry(Schedule.recurs(3))
_ <- layer.build.useNow.ignore
result <- ref.get
} yield assert(result)(equalTo(4))
}
)
}
205 changes: 133 additions & 72 deletions core/shared/src/main/scala/zio/ZLayer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,32 @@ import zio.internal.Platform
*/
sealed trait ZLayer[-RIn, +E, +ROut] { self =>

final def +!+[E1 >: E, RIn2, ROut2](that: ZLayer[RIn2, E1, ROut2])(implicit ev: Has.AreHas[ROut, ROut2]): ZLayer[RIn with RIn2, E1, ROut with ROut2] =
final def +!+[E1 >: E, RIn2, ROut2](
that: ZLayer[RIn2, E1, ROut2]
)(implicit ev: Has.AreHas[ROut, ROut2]): ZLayer[RIn with RIn2, E1, ROut with ROut2] =
self.zipWithPar(that)(ev.unionAll[ROut, ROut2])

/**
* Combines this layer with the specified layer, producing a new layer that
* has the inputs of both layers, and the outputs of both layers.
*/
final def ++[E1 >: E, RIn2, ROut1 >: ROut, ROut2](that: ZLayer[RIn2, E1, ROut2])(implicit ev: Has.AreHas[ROut1, ROut2], tag: Tag[ROut2]): ZLayer[RIn with RIn2, E1, ROut1 with ROut2] =
final def ++[E1 >: E, RIn2, ROut1 >: ROut, ROut2](
that: ZLayer[RIn2, E1, ROut2]
)(implicit ev: Has.AreHas[ROut1, ROut2], tag: Tag[ROut2]): ZLayer[RIn with RIn2, E1, ROut1 with ROut2] =
self.zipWithPar(that)(ev.union[ROut1, ROut2])

/**
* A symbolic alias for `zipPar`.
*/
final def <&>[E1 >: E, RIn2, ROut2](that: ZLayer[RIn2, E1, ROut2]): ZLayer[RIn with RIn2, E1, (ROut, ROut2)] =
zipWithPar(that)((_, _))

/**
* A symbolic alias for `orElse`.
*/
def <>[RIn1 <: RIn, E1, ROut1 >: ROut](that: => ZLayer[RIn1, E1, ROut1])(implicit ev: CanFail[E]): ZLayer[RIn1, E1, ROut1] =
def <>[RIn1 <: RIn, E1, ROut1 >: ROut](
that: => ZLayer[RIn1, E1, ROut1]
)(implicit ev: CanFail[E]): ZLayer[RIn1, E1, ROut1] =
self.orElse(that)

/**
Expand All @@ -60,7 +72,7 @@ sealed trait ZLayer[-RIn, +E, +ROut] { self =>
* outputs of both this layer and the specified layer.
*/
final def >+>[E1 >: E, RIn2 >: ROut, ROut1 >: ROut, ROut2](
that: ZLayer[RIn2, E1, ROut2]
that: => ZLayer[RIn2, E1, ROut2]
)(implicit ev: Has.AreHas[ROut1, ROut2], tagged: Tag[ROut2]): ZLayer[RIn, E1, ROut1 with ROut2] =
self ++ (self >>> that)

Expand All @@ -69,8 +81,8 @@ sealed trait ZLayer[-RIn, +E, +ROut] { self =>
* layer, resulting in a new layer with the inputs of this layer, and the
* outputs of the specified layer.
*/
final def >>>[E1 >: E, ROut2](that: ZLayer[ROut, E1, ROut2]): ZLayer[RIn, E1, ROut2] =
fold(ZLayer.fromFunctionManyM { case (_, cause) => ZIO.halt(cause) }, that)
final def >>>[E1 >: E, ROut2](that: => ZLayer[ROut, E1, ROut2]): ZLayer[RIn, E1, ROut2] =
fold(ZLayer.second >>> ZLayer.fromFunctionManyM(ZIO.halt(_)), that)

/**
* A named alias for `++`.
Expand All @@ -84,7 +96,7 @@ sealed trait ZLayer[-RIn, +E, +ROut] { self =>
* A named alias for `>+>`.
*/
final def andTo[E1 >: E, RIn2 >: ROut, ROut1 >: ROut, ROut2](
that: ZLayer[RIn2, E1, ROut2]
that: => ZLayer[RIn2, E1, ROut2]
)(implicit ev: Has.AreHas[ROut1, ROut2], tagged: Tag[ROut2]): ZLayer[RIn, E1, ROut1 with ROut2] =
self >+> that

Expand All @@ -98,16 +110,33 @@ sealed trait ZLayer[-RIn, +E, +ROut] { self =>
value <- run(memoMap)
} yield value

/**
* Recovers from all errors.
*/
final def catchAll[RIn1 <: RIn, E1, ROut1 >: ROut](
handler: => ZLayer[(RIn1, E), E1, ROut1]
): ZLayer[RIn1, E1, ROut1] = {
val failureOrDie: ZLayer[(RIn1, Cause[E]), Nothing, (RIn1, E)] =
ZLayer.fromFunctionManyM {
case (r, cause) =>
cause.failureOrCause.fold(
e => ZIO.succeed((r, e)),
c => ZIO.halt(c)
)
}
fold(failureOrDie >>> handler, ZLayer.identity)
}

/**
* Feeds the error or output services of this layer into the input of either
* the specified `failure` or `success` layers, resulting in a new layer with
* the inputs of this layer, and the error or outputs of the specified layer.
*/
final def fold[E1, RIn1 <: RIn, ROut2](
failure: ZLayer[(RIn1, Cause[E]), E1, ROut2],
success: ZLayer[ROut, E1, ROut2]
failure: => ZLayer[(RIn1, Cause[E]), E1, ROut2],
success: => ZLayer[ROut, E1, ROut2]
)(implicit ev: CanFail[E]): ZLayer[RIn1, E1, ROut2] =
ZLayer.Fold(self, failure, success)
ZLayer.Fold(self, () => failure, () => success)

/**
* Creates a fresh version of this layer that will not be shared.
Expand All @@ -133,7 +162,7 @@ sealed trait ZLayer[-RIn, +E, +ROut] { self =>
* function.
*/
final def mapError[E1](f: E => E1)(implicit ev: CanFail[E]): ZLayer[RIn, E1, ROut] =
fold(ZLayer.fromFunctionManyM { case (_, cause) => ZIO.halt(cause.map(f)) }, ZLayer.identity)
catchAll(ZLayer.second >>> ZLayer.fromFunctionManyM(e => ZIO.fail(f(e))))

/**
* Returns a managed effect that, if evaluated, will return the lazily
Expand All @@ -147,33 +176,50 @@ sealed trait ZLayer[-RIn, +E, +ROut] { self =>
* unchecked and not a part of the type of the layer.
*/
final def orDie(implicit ev1: E <:< Throwable, ev2: CanFail[E]): ZLayer[RIn, Nothing, ROut] =
fold(ZLayer.fromFunctionManyM(_._2.failureOrCause.fold(ZIO.die(_), ZIO.halt(_))), ZLayer.identity)
catchAll(ZLayer.second >>> ZLayer.fromFunctionManyM(ZIO.die(_)))

/**
* Executes this layer and returns its output, if it succeeds, but otherwise
* executes the specified layer.
*/
final def orElse[RIn1 <: RIn, E1, ROut1 >: ROut](that: => ZLayer[RIn1, E1, ROut1])(implicit ev: CanFail[E]): ZLayer[RIn1, E1, ROut1] =
fold(ZLayer.first >>> that, ZLayer.identity)
final def orElse[RIn1 <: RIn, E1, ROut1 >: ROut](
that: => ZLayer[RIn1, E1, ROut1]
)(implicit ev: CanFail[E]): ZLayer[RIn1, E1, ROut1] =
catchAll(ZLayer.first >>> that)

/**
* Retries constructing this layer according to the specified schedule.
*/
final def retry[RIn1 <: RIn](schedule: Schedule[RIn1, E, Any]): ZLayer[RIn1, E, ROut] = {
type S = schedule.State
lazy val loop: ZLayer[(RIn1, S), E, ROut] =
(ZLayer.first >>> self).catchAll {
val update: ZLayer[((RIn1, S), E), E, (RIn1, S)] =
ZLayer.fromFunctionManyM {
case ((r, s), e) =>
schedule
.update(e, s)
.provide(r)
.foldM(
_ => ZIO.fail(e),
s => ZIO.succeed((r, s))
)
}
update >>> loop.fresh
}
ZLayer.identity <&> ZLayer.fromEffectMany(schedule.initial) >>> loop
}

/**
* Performs the specified effect if this layer fails.
*/
final def tapError[RIn1 <: RIn, E1 >: E](f: E => ZIO[RIn1, E1, Any]): ZLayer[RIn1, E1, ROut] =
fold(
ZLayer.fromFunctionManyM {
case (r, cause) => cause.failureOrCause.fold(
e => f(e).provide(r) *> ZIO.fail(e),
c => ZIO.halt(c)
)
},
ZLayer.identity
)
catchAll(ZLayer.fromFunctionManyM { case (r, e) => f(e).provide(r) *> ZIO.fail(e) })

/**
* A named alias for `>>>`.
*/
final def to[E1 >: E, ROut2](that: ZLayer[ROut, E1, ROut2]): ZLayer[RIn, E1, ROut2] =
final def to[E1 >: E, ROut2](that: => ZLayer[ROut, E1, ROut2]): ZLayer[RIn, E1, ROut2] =
self >>> that

/**
Expand All @@ -189,6 +235,14 @@ sealed trait ZLayer[-RIn, +E, +ROut] { self =>
final def update[A: Tag](f: A => A)(implicit ev1: Has.IsHas[ROut], ev2: ROut <:< Has[A]): ZLayer[RIn, E, ROut] =
self >>> ZLayer.fromFunctionMany(ev1.update[ROut, A](_, f))

/**
* Combines this layer with the specified layer, prodicing a new layer that
* has the inputs of both layers, and the outputs of both layers combined
* into a tuple.
*/
final def zipPar[E1 >: E, RIn2, ROut2](that: ZLayer[RIn2, E1, ROut2]): ZLayer[RIn with RIn2, E1, (ROut, ROut2)] =
zipWithPar(that)((_, _))

/**
* Combines this layer with the specified layer, producing a new layer that
* has the inputs of both layers, and the outputs of both layers combined
Expand All @@ -206,29 +260,35 @@ sealed trait ZLayer[-RIn, +E, +ROut] { self =>
memoMap
.getOrElseMemoize(self)
.foldCauseM(
e => ZManaged.environment[RIn].flatMap(r => memoMap.getOrElseMemoize(failure).provide((r, e))),
r => memoMap.getOrElseMemoize(success).provide(r)(NeedsEnv.needsEnv)
e => ZManaged.environment[RIn].flatMap(r => memoMap.getOrElseMemoize(failure()).provide((r, e))),
r => memoMap.getOrElseMemoize(success()).provide(r)(NeedsEnv.needsEnv)
)
)
case ZLayer.Fresh(self) =>
Managed.succeed(_ => self.build)
case ZLayer.Managed(self) =>
Managed.succeed(_ => self)
case ZLayer.ZipWithPar(self, that, f) =>
ZManaged.succeed { memoMap =>
memoMap.getOrElseMemoize(self).zipWith(memoMap.getOrElseMemoize(that))(f)
}
ZManaged.succeed(memoMap => memoMap.getOrElseMemoize(self).zipWith(memoMap.getOrElseMemoize(that))(f))
}
}

object ZLayer {
@deprecated("use Layer", "1.0.0")
type NoDeps[+E, +B] = ZLayer[Any, E, B]

private final case class Fold[RIn, E, E1, ROut, ROut1](self: ZLayer[RIn, E, ROut], failure: ZLayer[(RIn, Cause[E]), E1, ROut1], success: ZLayer[ROut, E1, ROut1]) extends ZLayer[RIn, E1, ROut1]
private final case class Fresh[RIn, E, ROut](self: ZLayer[RIn, E, ROut]) extends ZLayer[RIn, E, ROut]
private final case class Fold[RIn, E, E1, ROut, ROut1](
self: ZLayer[RIn, E, ROut],
failure: () => ZLayer[(RIn, Cause[E]), E1, ROut1],
success: () => ZLayer[ROut, E1, ROut1]
) extends ZLayer[RIn, E1, ROut1]
private final case class Fresh[RIn, E, ROut](self: ZLayer[RIn, E, ROut]) extends ZLayer[RIn, E, ROut]
private final case class Managed[-RIn, +E, +ROut](self: ZManaged[RIn, E, ROut]) extends ZLayer[RIn, E, ROut]
private final case class ZipWithPar[-RIn, +E, ROut, ROut2, ROut3](self: ZLayer[RIn, E, ROut], that: ZLayer[RIn, E, ROut2], f: (ROut, ROut2) => ROut3) extends ZLayer[RIn, E, ROut3]
private final case class ZipWithPar[-RIn, +E, ROut, ROut2, ROut3](
self: ZLayer[RIn, E, ROut],
that: ZLayer[RIn, E, ROut2],
f: (ROut, ROut2) => ROut3
) extends ZLayer[RIn, E, ROut3]

/**
* Constructs a layer from a managed resource.
Expand All @@ -242,11 +302,11 @@ object ZLayer {
def fail[E](e: => E): Layer[E, Nothing] =
ZLayer(ZManaged.fail(e))

/**
* A layer that passes along the first element of a tuple.
*/
def first[A]: ZLayer[(A, Any), Nothing, A] =
ZLayer.fromFunctionMany(_._1)
/**
* A layer that passes along the first element of a tuple.
*/
def first[A]: ZLayer[(A, Any), Nothing, A] =
ZLayer.fromFunctionMany(_._1)

/**
* Constructs a layer from acquire and release actions. The acquire and
Expand Down Expand Up @@ -2131,8 +2191,7 @@ object ZLayer {
* returns it. Otherwise, obtains the dependency, stores it in the memo map,
* and adds a finalizer to the outer `Managed`.
*/
def getOrElseMemoize[E, A, B](
layer: ZLayer[A, E, B]): ZManaged[A, E, B]
def getOrElseMemoize[E, A, B](layer: ZLayer[A, E, B]): ZManaged[A, E, B]
}

private object MemoMap {
Expand All @@ -2151,47 +2210,49 @@ object ZLayer {
map.get(layer) match {
case Some((acquire, release)) =>
val cached =
ZIO.accessM[(A, ReleaseMap)] { case (_, releaseMap) =>
acquire
.asInstanceOf[IO[E, B]]
.onExit {
case Exit.Success(_) => releaseMap.add(release)
case Exit.Failure(_) => UIO.unit
}
.map((release, _))
ZIO.accessM[(A, ReleaseMap)] {
case (_, releaseMap) =>
acquire
.asInstanceOf[IO[E, B]]
.onExit {
case Exit.Success(_) => releaseMap.add(release)
case Exit.Failure(_) => UIO.unit
}
.map((release, _))
}

UIO.succeed((cached, map))

case None =>
for {
observers <- Ref.make(0)
promise <- Promise.make[E, B]
observers <- Ref.make(0)
promise <- Promise.make[E, B]
finalizerRef <- Ref.make[ZManaged.Finalizer](ZManaged.Finalizer.noop)

resource =
ZIO.uninterruptibleMask { restore =>
for {
env <- ZIO.environment[(A, ReleaseMap)]
(a, outerReleaseMap) = env
innerReleaseMap <- ZManaged.ReleaseMap.make
tp <- restore(layer.scope.flatMap(_.apply(self)).zio.provide((a, innerReleaseMap))).run.flatMap {
case e @ Exit.Failure(cause) =>
promise.halt(cause) *> innerReleaseMap.releaseAll(e, ExecutionStrategy.Sequential) *> ZIO.halt(cause)

case Exit.Success((_, b)) =>
for {
_ <- finalizerRef.set { (e: Exit[Any, Any]) =>
ZIO.whenM(observers.modify(n => (n == 1, n - 1) ))(
innerReleaseMap.releaseAll(e, ExecutionStrategy.Sequential))
}
_ <- observers.update(_ + 1)
outerFinalizer <- outerReleaseMap.add(e => finalizerRef.get.flatMap(_.apply(e)))
_ <- promise.succeed(b)
} yield (outerFinalizer, b)
}
} yield tp
}
resource = ZIO.uninterruptibleMask { restore =>
for {
env <- ZIO.environment[(A, ReleaseMap)]
(a, outerReleaseMap) = env
innerReleaseMap <- ZManaged.ReleaseMap.make
tp <- restore(layer.scope.flatMap(_.apply(self)).zio.provide((a, innerReleaseMap))).run.flatMap {
case e @ Exit.Failure(cause) =>
promise.halt(cause) *> innerReleaseMap.releaseAll(e, ExecutionStrategy.Sequential) *> ZIO
.halt(cause)

case Exit.Success((_, b)) =>
for {
_ <- finalizerRef.set { (e: Exit[Any, Any]) =>
ZIO.whenM(observers.modify(n => (n == 1, n - 1)))(
innerReleaseMap.releaseAll(e, ExecutionStrategy.Sequential)
)
}
_ <- observers.update(_ + 1)
outerFinalizer <- outerReleaseMap.add(e => finalizerRef.get.flatMap(_.apply(e)))
_ <- promise.succeed(b)
} yield (outerFinalizer, b)
}
} yield tp
}

memoized = (
promise.await.onExit {
Expand Down

0 comments on commit 2aa4ba2

Please sign in to comment.