Skip to content

Commit

Permalink
Fix halt/interrupt tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyrsmith committed Apr 10, 2020
1 parent e779a61 commit 128f859
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 4 deletions.
Expand Up @@ -56,7 +56,8 @@ object StreamHaltWhenSpec extends ZIOBaseSpec {
for {
halt <- Promise.make[String, Nothing]
_ <- halt.fail("Fail")
result <- ZStream(1, 2, 3)
result <- ZStream
.fromIterable(scala.Stream.from(0))
.haltWhen(halt.await)
.runDrain
.either
Expand Down
Expand Up @@ -59,7 +59,8 @@ object StreamInterruptWhenSpec extends ZIOBaseSpec {
for {
halt <- Promise.make[String, Nothing]
_ <- halt.fail("Fail")
result <- ZStream(1)
result <- ZStream
.fromIterable(scala.Stream.from(0))
.haltWhen(halt.await)
.runDrain
.either
Expand Down
5 changes: 3 additions & 2 deletions streams/shared/src/main/scala/zio/stream/ZStream.scala
Expand Up @@ -1766,14 +1766,15 @@ class ZStream[-R, +E, +A] private[stream] (private[stream] val structure: ZStrea
* IO will be forked as part of this stream, and its success will be discarded. This
* combinator will also interrupt any in-progress element being pulled from upstream.
*
* If the IO completes with a failure, the stream will emit that failure.
* If the IO completes with a failure before the stream completes, the returned stream
* will emit that failure.
*/
final def interruptWhen[R1 <: R, E1 >: E](io: ZIO[R1, E1, Any]): ZStream[R1, E1, A] =
ZStream {
for {
as <- self.process
runIO <- (io.asSomeError *> Pull.end).forkManaged
} yield as.raceFirst(runIO.join)
} yield runIO.join.disconnect.raceFirst(as)
}

/**
Expand Down

0 comments on commit 128f859

Please sign in to comment.