Skip to content

Commit

Permalink
Add error and environment combinators for ZStream
Browse files Browse the repository at this point in the history
  • Loading branch information
iravid committed Aug 21, 2019
1 parent 4136700 commit b8afd68
Show file tree
Hide file tree
Showing 3 changed files with 275 additions and 28 deletions.
24 changes: 24 additions & 0 deletions core/shared/src/main/scala/zio/ZManaged.scala
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,12 @@ final case class ZManaged[-R, +E, +A](reserve: ZIO[R, E, Reservation[R, E, A]])
final def mapError[E1](f: E => E1): ZManaged[R, E1, A] =
ZManaged(reserve.mapError(f).map(r => Reservation(r.acquire.mapError(f), r.release)))

/**
* Returns an effect whose full failure is mapped by the specified `f` function.
*/
final def mapErrorCause[E1](f: Cause[E] => Cause[E1]): ZManaged[R, E1, A] =
ZManaged(reserve.mapErrorCause(f).map(r => Reservation(r.acquire.mapErrorCause(f), r.release)))

/**
* Ensures that a cleanup function runs when this ZManaged is finalized, after
* the existing finalizers.
Expand Down Expand Up @@ -832,6 +838,24 @@ object ZManaged {
final def finalizerExit[R](f: Exit[_, _] => ZIO[R, Nothing, Any]): ZManaged[R, Nothing, Unit] =
ZManaged.reserve(Reservation(ZIO.unit, f))

/**
* Creates an effect that executes a finalizer stored in a [[Ref]]. The `Ref`
* is yielded as the result of the effect, allowing for control flows that require
* mutating finalizers.
*/
final def finalizerRef[R](
initial: Exit[_, _] => ZIO[R, Nothing, Any]
): ZManaged[R, Nothing, Ref[Exit[_, _] => ZIO[R, Nothing, Any]]] =
ZManaged {
for {
ref <- Ref.make(initial)
reservation = Reservation(
acquire = ZIO.succeed(ref),
release = e => ref.get.flatMap(_.apply(e))
)
} yield reservation
}

/**
* Returns an effect that performs the outer effect first, followed by the
* inner effect, yielding the value of the inner effect.
Expand Down
50 changes: 50 additions & 0 deletions streams-tests/jvm/src/test/scala/zio/stream/StreamSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ class ZStreamSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestR
buffer the Stream with Error $bufferStreamError
fast producer progress independently $bufferFastProducerSlowConsumer

Stream.catchAllCause
recovery from errors $catchAllCauseErrors
recovery from defects $catchAllCauseDefects
happy path $catchAllCauseHappyPath
executes finalizers $catchAllCauseFinalizers
failures on the scope $catchAllCauseScopeErrors

Stream.collect $collect
Stream.collectWhile
collectWhile $collectWhile
Expand Down Expand Up @@ -427,6 +434,49 @@ class ZStreamSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestR
} yield l.reverse must_=== (1 to 4).toList
)

private def catchAllCauseErrors =
unsafeRun {
val s1 = Stream(1, 2) ++ Stream.fail("Boom")
val s2 = Stream(3, 4)

s1.catchAllCause(_ => s2).runCollect.map(_ must_=== List(1, 2, 3, 4))
}

private def catchAllCauseDefects =
unsafeRun {
val s1 = Stream(1, 2) ++ Stream.dieMessage("Boom")
val s2 = Stream(3, 4)

s1.catchAllCause(_ => s2).runCollect.map(_ must_=== List(1, 2, 3, 4))
}

private def catchAllCauseHappyPath =
unsafeRun {
val s1 = Stream(1, 2)
val s2 = Stream(3, 4)

s1.catchAllCause(_ => s2).runCollect.map(_ must_=== List(1, 2))
}

private def catchAllCauseFinalizers =
unsafeRun {
for {
fins <- Ref.make(List[String]())
s1 = (Stream(1, 2) ++ Stream.fail("Boom")).ensuring(fins.update("s1" :: _))
s2 = (Stream(3, 4) ++ Stream.fail("Boom")).ensuring(fins.update("s2" :: _))
_ <- s1.catchAllCause(_ => s2).runCollect.run
result <- fins.get
} yield result must_=== List("s2", "s1")
}

private def catchAllCauseScopeErrors =
unsafeRun {
val s1 = Stream(1, 2) ++ ZStream.fromInputStreamManaged(ZManaged.fail("Boom"))
val s2 = Stream(3, 4)

s1.catchAllCause(_ => s2).runCollect.map(_ must_=== List(1, 2, 3, 4))
}

private def collect = unsafeRun {
Stream(Left(1), Right(2), Left(3)).collect {
case Right(n) => n
Expand Down

0 comments on commit b8afd68

Please sign in to comment.