Skip to content

Commit

Permalink
Make ZStream#[halt|interrupt]When(IO) take IO[E, Unit]
Browse files Browse the repository at this point in the history
Since these IO will be evaluated, and their values ignore, if they aren't `Unit` it's essentially the same as a value discard. Since it's so easy to make that explicit by adding `.unit`, the methods should probably take `IO[E, Unit]`.
  • Loading branch information
jeremyrsmith committed Apr 10, 2020
1 parent b64ac93 commit 969b832
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
Expand Up @@ -43,10 +43,10 @@ object StreamHaltWhenSpec extends ZIOBaseSpec {
latch <- Promise.make[Nothing, Unit]
halt <- Promise.make[Nothing, Unit]
_ <- ZStream
.fromEffect(latch.await.onInterrupt(interrupted.set(true)))
.haltWhen(halt.await)
.runDrain
.fork
.fromEffect(latch.await.onInterrupt(interrupted.set(true)))
.haltWhen(halt.await)
.runDrain
.fork
_ <- halt.succeed(())
_ <- latch.succeed(())
result <- interrupted.get
Expand All @@ -57,9 +57,9 @@ object StreamHaltWhenSpec extends ZIOBaseSpec {
halt <- Promise.make[String, Nothing]
_ <- halt.fail("Fail")
result <- ZStream(1)
.haltWhen(halt.await)
.runDrain
.either
.haltWhen(halt.await)
.runDrain
.either
} yield assert(result)(isLeft(equalTo("Fail")))
} @@ zioTag(errors)
)
Expand Down
4 changes: 2 additions & 2 deletions streams/shared/src/main/scala/zio/stream/ZStream.scala
Expand Up @@ -1760,7 +1760,7 @@ class ZStream[-R, +E, +A] private[stream] (private[stream] val structure: ZStrea
*
* If the IO completes with a failure, the stream will emit that failure.
*/
final def interruptWhen[E1 >: E](io: IO[E1, _]): ZStream[R, E1, A] =
final def interruptWhen[E1 >: E](io: IO[E1, Unit]): ZStream[R, E1, A] =
ZStream {
for {
as <- self.process
Expand All @@ -1786,7 +1786,7 @@ class ZStream[-R, +E, +A] private[stream] (private[stream] val structure: ZStrea
* If the promise completes with a failure, the stream will emit that failure.
*/
final def interruptWhen[E1 >: E](p: Promise[E1, _]): ZStream[R, E1, A] =
interruptWhen(p.await)
interruptWhen(p.await.unit)

/**
* Enqueues elements of this stream into a queue. Stream failure and ending will also be
Expand Down

0 comments on commit 969b832

Please sign in to comment.