Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding ZSink#zipPar #1344

Merged
merged 3 commits into from Aug 7, 2019
Merged

Adding ZSink#zipPar #1344

merged 3 commits into from Aug 7, 2019

Conversation

simpadjo
Copy link
Contributor

@simpadjo simpadjo commented Aug 4, 2019

#1310

Added a combinator for aggregating a stream into two sinks at the same time.
Please review my approach. I'll add more tests and squash commits afterwards.

@simpadjo
Copy link
Contributor Author

simpadjo commented Aug 4, 2019

@iravid could you please take a look?

Copy link
Member

@iravid iravid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @simpadjo! I left some comments inline. The logic looks correct and most comments are just about aesthetics. We need a bit more coverage and then this is good to go!

ZIO.succeed(Right(Step.state(st)))
} else {
self
.extract(Step.state(st))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, why not defer the extraction to extract?

streams/shared/src/main/scala/zio/stream/ZSink.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZSink.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZSink.scala Outdated Show resolved Hide resolved
}

override def initial: ZIO[R1, E1, Step[State, Nothing]] =
self.initial.flatMap(s1 => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 minor notes:

  • these types of blocks usually read nicer as self.initial.flatMap { s1 =>
  • whenever you have multiple consecutive flatMaps, those usually read nicer as a for comprehension

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree about using for and initially tried to write it like that. But I faced no method withFilter error and then continued with flatMaps

streams/shared/src/main/scala/zio/stream/ZSink.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZSink.scala Outdated Show resolved Hide resolved
r2 match {
case Left((c, rem2)) => {
val minLeftover =
if (rem1.isEmpty && rem2.isEmpty) Chunk.empty else (rem1.toList ++ rem2.toList).minBy(_.length)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting the remainders to a list could be potentially costly, so let's do this the old fashioned way by comparing the lengths with an if

Copy link
Contributor Author

@simpadjo simpadjo Aug 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's not obvious but rem1 and rem2 here are Option[Chunk], not Chunk. Option#toList it's relatively cheap and this code runs only once.

The logic is following: if one sink terminated strictly before the other I drop remainder of the former anyway. I compare remainders' length only if sinks finished simultaneously.

But if you still think it's costly - I'll change it.

@@ -167,6 +167,9 @@ class SinkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)
zipLeft (<*)
happy path $zipLeftHappyPath

zipPar
zipPar somehow works $zipParSanity
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are some additional test cases I suggest we add:

  • left sink finishes earlier
  • right sink finishes earlier
  • both sinks finish at the same time
  • left sink errors during step
  • left sink errors during extract
  • right sink errors during step
  • right sink errors during extract

Copy link
Contributor Author

@simpadjo simpadjo Aug 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Must cover cases of non-terminating sinks as well. And also it would be nice to explicitly check the logic of choosing the shorter leftover when sinks terminated at the same time.
Haven't found test sinks with non-trivial leftovers to use here. Will invent one.

=====
Actually there is no need to manually test sink1.zipPar(sink2) and then sink2.zipPar(sink1). Swapping the input swaps the output. Just need to verify this law.

@simpadjo
Copy link
Contributor Author

simpadjo commented Aug 4, 2019

@iravid thank you for kind words!

@simpadjo
Copy link
Contributor Author

simpadjo commented Aug 5, 2019

@iravid I think I'm done with it.
I found few laws that together more or less specify zipPar. But since we can't generate Sink I run them manually.

@simpadjo simpadjo changed the title [WIP] Adding ZSink#zipPar Adding ZSink#zipPar Aug 5, 2019
@@ -780,6 +818,116 @@ class SinkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)
unsafeRun(sinkIteration(sink, 1).map(_ must_=== "1Hello"))
}

private object ZipParLaws {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is excellent, comprehensive work! Big like :-)

Copy link
Member

@iravid iravid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @simpadjo, this is a great addition.

Copy link
Member

@iravid iravid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, one last request!

streams/shared/src/main/scala/zio/stream/ZSink.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZSink.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZSink.scala Outdated Show resolved Hide resolved
@iravid iravid merged commit 5e8c9ae into zio:master Aug 7, 2019
@simpadjo simpadjo deleted the zippar branch August 13, 2019 08:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants