Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ZStream#haltWhen(IO) and ZStream#interruptWhen(IO) #3306

Merged
merged 5 commits into from Apr 11, 2020

Conversation

jeremyrsmith
Copy link
Contributor

No description provided.

@jeremyrsmith
Copy link
Contributor Author

Ping @iravid

*
* If the IO completes with a failure, the stream will emit that failure.
*/
final def haltWhen[E1 >: E](io: IO[E1, Unit]): ZStream[R, E1, A] =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is probably a better way to do this one

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Why don't we implement haltWhen(Promise) in terms of this combinator? I think it should be simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, just didn't want to disrupt the existing logic (which depends on poll) too much.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I forgot it does that. In that case your implementation may be the simpler one. Let me know if so and happy to merge as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an alternate implementation of haltWhen(IO) could be like this, and then haltWhen(Promise) could be implemented in terms of haltWhen(p.await). But I'm probably missing some subtlety.

  final def haltWhen[E1 >: E](io: IO[E1, Unit]): ZStream[R, E1, A] =
    ZStream {
      for {
        as    <- self.process
        runIO <- (io.asSomeError *> Pull.end).forkManaged
      } yield as.raceFirst(runIO.join)
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this implementation fails the test because it doesn't explicitly check the IO first, which the test expects because it is expecting an already-successful ZStream to fail if the IO has failed. Not sure if that's more an issue with the expectation or the implementation (or maybe that's the subtlety I'm missing 😁 )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! The implementation in this revision looks good. Let’s just generalize it to [R1 <: R, E1 >: E](zio: ZIO[R1, E1, Any]).

*
* If the promise completes with a failure, the stream will emit that failure.
*/
final def interruptWhen[E1 >: E](p: Promise[E1, _]): ZStream[R, E1, A] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you change the signature while you're here?

Suggested change
final def interruptWhen[E1 >: E](p: Promise[E1, _]): ZStream[R, E1, A] =
final def interruptWhen[E1 >: E](p: Promise[E1, Any]): ZStream[R, E1, A] =

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this would have the desired outcome – Promise is invariant, so you'd have to exactly pass a Promise[E1, Any]. The existential is unfortunate here, but it doesn't appear to be avoidable unless Promise is made covariant.

The IO variants could certainly take IO[E, Any], but now that I think about it, that's basically tantamount to a value discard, which is usually considered a wart. So I changed them to take IO[E, Unit] (it's easy to explicitly discard the value by just calling .unit on the argument)

Copy link
Member

@iravid iravid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jeremyrsmith. One comment about haltWhen and this should be good to go. Great addition!

Since these IO will be evaluated, and their values ignore, if they aren't `Unit` it's essentially the same as a value discard. Since it's so easy to make that explicit by adding `.unit`, the methods should probably take `IO[E, Unit]`.
@jeremyrsmith
Copy link
Contributor Author

Thinking about this more, there are two subtly different possible behaviors for these variants, given that they take an effect.

One possibility is that they evaluate the effect on each pull (racing with the underlying pull). This would be surprising for something like stream.haltWhen(ZIO.sleep(10.seconds)) – you'd think this would halt the stream after 10 seconds, but really it would halt only when any given pull took more than 10 seconds.

The other possibility is that they fork the given effect just before the first pull, and race each pull against that fiber. I can't think of when this would be surprising, but I'm sure it also has cases where it's not what's expected.

Currently the interruptWhen behavior is the former, and the haltWhen behavior is effectively the latter (since it effectively forks the IO to a new Promise and then calls haltWhen(Promise)). I'll change them both to be the latter behavior explicitly, because it's more intuitive to me and it at least needs to be consistent.

This will make the IO variants look very similar to the Promise variants, except that they'll poll a fiber instead of a promise. I'm hesitant to change the existing Promise variants to delegate to the new IO variants, though, since these will now involve allocating a fiber while the Promise variants don't.

Copy link
Member

@iravid iravid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Two small generalizations to the signatures and we can merge this.

streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
*
* If the IO completes with a failure, the stream will emit that failure.
*/
final def haltWhen[E1 >: E](io: IO[E1, Unit]): ZStream[R, E1, A] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! The implementation in this revision looks good. Let’s just generalize it to [R1 <: R, E1 >: E](zio: ZIO[R1, E1, Any]).

@iravid
Copy link
Member

iravid commented Apr 10, 2020

Thinking about this more, there are two subtly different possible behaviors for these variants, given that they take an effect.

One possibility is that they evaluate the effect on each pull (racing with the underlying pull). This would be surprising for something like stream.haltWhen(ZIO.sleep(10.seconds)) – you'd think this would halt the stream after 10 seconds, but really it would halt only when any given pull took more than 10 seconds.

The other possibility is that they fork the given effect just before the first pull, and race each pull against that fiber. I can't think of when this would be surprising, but I'm sure it also has cases where it's not what's expected.

This is definitely the desired behavior. Apologies if I’ve missed this in the first review.

Currently the interruptWhen behavior is the former, and the haltWhen behavior is effectively the latter (since it effectively forks the IO to a new Promise and then calls haltWhen(Promise)). I'll change them both to be the latter behavior explicitly, because it's more intuitive to me and it at least needs to be consistent.

This will make the IO variants look very similar to the Promise variants, except that they'll poll a fiber instead of a promise. I'm hesitant to change the existing Promise variants to delegate to the new IO variants, though, since these will now involve allocating a fiber while the Promise variants don't.

I’d be fine with forwarding to the IO variants using Promise.await but we can deal with that in a future follow-up. I’ll probably do that in the ZConduit branch.

Thanks for thinking this through! These are very useful combinators.

@jeremyrsmith
Copy link
Contributor Author

OK, made those changes and removed the broken scaladoc link (I could not figure out the incantation to get a link to the overloaded interruptWhen method. Oh well)

for {
halt <- Promise.make[String, Nothing]
_ <- halt.fail("Fail")
result <- ZStream.fromIterable(scala.Stream.from(0))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iravid just want to call this out – I had to change this from ZStream(1) because if the ZStream is already successful and is small, then there is no chance to observe the completion of halt.await (even though it's already done) and the resulting stream doesn't fail like it should. I'm not sure if that's an issue or not, in real life (what does it mean to halt/interrupt an already successful stream?)

@jeremyrsmith
Copy link
Contributor Author

Oh, FFS 😠 guess I can't use scala.Stream either.

@jeremyrsmith
Copy link
Contributor Author

@iravid finally got through the gauntlet of scalafmt and test flakiness. Please see my comment re: testing against strict (ZStream.apply-based) streams, I think the behavior there is a judgement call.

Copy link
Member

@iravid iravid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @jeremyrsmith, thank you!

@iravid iravid merged commit 9d0e4d0 into zio:master Apr 11, 2020
@jeremyrsmith jeremyrsmith deleted the zstream-interrupt-io branch April 12, 2020 23:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants