Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ZStream#{groupBy, split, fanOut} #1387

Merged
merged 28 commits into from Aug 20, 2019

Conversation

mschuwalow
Copy link
Member

@mschuwalow mschuwalow commented Aug 11, 2019

Resolves #1378
Resolves #1102

@iravid Would be great if you could take a look at this.

Based on the toQueuesManaged (naming things is hard, I'm very open to suggestions :D) one can implent quite a bit more powerful stuff. In fact I had a version with a partioner using a consitent hashing scheme that allowed adhoc up and down scaling. I don't think this is paticularly useful as is, but might be interesting to explore later

@iravid iravid changed the title add ZSink#{groupBy, split, fanOut} add ZStream#{groupBy, split, fanOut} Aug 12, 2019
Copy link
Member

@iravid iravid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing work @mschuwalow. I left a first round of comments below. Need to spend some more time with groupBy.

Few notes:

  • not entirely sold on flatMapParUnbounded and flatMapParSema- these need a bit more justification
  • we'll wait with getting this in until we address the encoding, so you'll have a chance to kick the tires on a simpler representation of stream
  • the infrastructure in toQueuesBalanced0 is really impressive. I'd be very interested in seeing the consistent hashing implementation. This is very close to a good general purpose PubSub implementation; the only thing it is currently missing is dynamic subscribe/unsubscribe.

streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
@mschuwalow
Copy link
Member Author

i think this is ready for another review

Copy link
Member

@iravid iravid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rebase this and give this another look afterwards. Sorry for taking so long @mschuwalow !

streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
@@ -791,7 +791,7 @@ object ZManaged {
* specified text message. This method can be used for terminating a fiber
* because a defect has been detected in the code.
*/
final def dieMessage(message: String): ZManaged[Any, Throwable, Nothing] = die(new RuntimeException(message))
final def dieMessage(message: String): ZManaged[Any, Nothing, Nothing] = die(new RuntimeException(message))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! 👍

queues <- queues.get
_ <- ZIO.foreach_(queues) { q =>
q.offer(a).catchAllCause {
case c if (c.interrupted) => ZIO.unit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial function used where total one expected...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh that was close. Shameful 😨
I brainfarted and thought this was defined as a partialfunction on causes, halting with any that don't match

@mschuwalow
Copy link
Member Author

@iravid please take a look at the new signature of groupby.
I think much of the reason for the questionable new combinators was that groupby exposed an api that was very easy to deadlock with. I think this new one is better and makes them obsolete

Copy link
Member

@iravid iravid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor changes and this is good to go. Amazing work @mschuwalow!

streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
@mschuwalow
Copy link
Member Author

A few minor changes and this is good to go. Amazing work @mschuwalow!

Great. I think you can take another look when you have the time :)

Copy link
Member

@iravid iravid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mschuwalow! Merging this now; could you follow up with the scaladoc adjustments? Thanks!

@@ -542,6 +569,75 @@ trait ZStream[-R, +E, +A] extends Serializable { self =>
final def concat[R1 <: R, E1 >: E, A1 >: A](other: => ZStream[R1, E1, A1]): ZStream[R1, E1, A1] =
ZStream(UIO.succeed(self), UIO(other)).flatMap(ZStream.unwrap)

/**
* More powerful version of `ZStream#toQueues`. Allows to provide a function that determines what
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scaladoc needs adjusting here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Will try to do it tomorrow :)

}

/**
* More powerful version of `ZStream#toQueuesBalanced`. This returns a function that will produce
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

@iravid iravid merged commit a74e4dc into zio:master Aug 20, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add ZStream#fanOut? Add ZStream#partition
4 participants