Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Stream#flatMapPar, Stream.flatten and Stream.mergeAll #840

Closed
wants to merge 7 commits into from

Conversation

iravid
Copy link
Member

@iravid iravid commented May 8, 2019

Resolves #836.

I'm going to add a bunch of more tests that exercise errors, interruption and other issues, but I'd love to get comments on the implementation and possible simplifications.

Pending fixes:

  • an error in an inner fiber would enqueue a Take.Fail, which would cause the output stream to fail; however nothing in this case would stop the driver stream
  • currently the outer stream ends completely before the inner fibers; which means that an outer stream created using Stream.bracket would release its resources before the inner fibers complete

@iravid iravid force-pushed the flatmap-par branch 2 times, most recently from efe7af8 to 86dcb49 Compare May 8, 2019 18:35
@iravid iravid changed the title Add Stream#flatMapPar and Stream.flatten Add Stream#flatMapPar, Stream.flatten and Stream.mergeAll May 9, 2019
)
} yield ()
)
.onInterrupt(ZIO.children.flatMap(Fiber.interruptAll) *> out.shutdown)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this redundant with the line below?

Copy link
Member Author

@iravid iravid May 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

supervise is the one that adds a supervisor that interrupts all children; supervised just enables supervision so children are tracked and introspectable by ZIO.children :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😰 ah yeah, I remember. I have some catching up to do!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need better names too... like enableSupervision and cleanupChildren or something...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. I am constantly confused between the two ;-)

@iravid
Copy link
Member Author

iravid commented May 10, 2019

Deferring the outer stream's completion to after the inner fibers complete actually seems quite difficult with the current encoding of Fold, because we can't know when the fold has ended.

For example, say we have the following stream:

Stream
  .bracket(acquire)(release) { resource =>
    resource.read
  }
  .mapM { element =>
    process(element).fork
  }
  .foldLeft(List[Fiber[Nothing, Element]]())((fibers, fiber) => fiber :: fibers)
  .flatMap(Fiber.joinAll)

By the time foldLeft returns the list of fibers, the finalizer for the bracket has already run.

If anyone has a clever idea, I'd love to hear it!

@iravid
Copy link
Member Author

iravid commented May 17, 2019

I tried some approaches of encoding a "finalization" step into the stream's fold; e.g. something like S => ZIO[R, Nothing, _]. This seems promising and works in the simple combinators (bracket, map) but I haven't explored it further as it has one discouraging facet: when used with a flatMap'd stream, the finalization step would run an arbitrary number of times. Not sure this is a good direction.

Another thought I had: if Stream#fold would return a ZManaged, this could enable the arbitrary manipulation of finalizers.

@jdegoes
Copy link
Member

jdegoes commented May 18, 2019

@iravid All right, I began reviewing this today. At a high level, the strategy for flatMapPar seems correct. I'll do a more thorough review tomorrow.

Can you explain this:

currently the outer stream ends completely before the inner fibers; which means that an outer stream created using Stream.bracket would release its resources before the inner fibers complete

Specifically, it's clear this is the case because the data passes through a queue, which will be out-of-sync with the input stream; buffered by some number of elements. But since the child fibers only read from the queue, it should not be an issue; and if it is, we can always avoid exiting the flatMapPar stream until after all the child fibers are done reading from the queue.

Or is there another more fundamental problem that I'm not seeing?

@iravid
Copy link
Member Author

iravid commented May 18, 2019

Yeah that’s the fundamental issue. The problem is that I could not find a way to delay the outer stream from completing until the child fibers are empty.

When folding, there’s no way to know that you’re at the last element and join the remaining fibers :-/

@iravid
Copy link
Member Author

iravid commented May 24, 2019

Going to revisit these combinators after restructuring ZStream around ZManaged.

@iravid iravid closed this May 24, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add Stream#flatMapPar and Stream#flattenPar
3 participants