Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ZStream to use ZManaged #906

Merged
merged 1 commit into from May 30, 2019
Merged

Conversation

iravid
Copy link
Member

@iravid iravid commented May 26, 2019

This is passing tests, so opening this up for a review.

This pull request contains a refactor to ZStream that utilizes ZManaged as the final representation. Previously, folding a stream using a function (S, A) => ZIO[R, E, S] would result in a value of ZIO[R, E, S]. The problem with this encoding is that composing streams required not only composing their computations, but also requires constructing a tree of finalizers.

On this branch, folding now returns a ZManaged[R, E, S]. Constructors that acquire resources (such as ZStream.bracket) will acquire the resources, uninterruptibly, in the reservation step of ZManaged. The folding of the stream happens, interruptibly, in the acquisition step of ZManaged. And naturally, resource release happens in the release step.

By utilizing ZManaged, we can now create a tree of finalization scopes when composing streams. For example, in the zipWith combinator, the finalizers for the streams feeding the composite stream will run strictly after the finalizer for the composite stream has run.

Over the next few weeks I'll add more tests that exercise finalization scoping and short circuiting. Also, I'll add the concurrency combinators that triggered this refactor.

All comments welcome!

@iravid
Copy link
Member Author

iravid commented May 27, 2019

The concurrency combinators now behave correctly with regard to finalization 🎉

acquire = UIO.succeed(fiber),
release = fiber.interrupt *> finalizer.get.flatMap(identity(_))
)
} yield reservation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! 👍

} yield (cancelled must_=== true) and (result must beLeft("Ouch"))
}

private def flatMapParFinalizerOrdering = unsafeRun {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love all the new tests! ❤️

* and should almost never require specification of any type parameters.
*
*/
trait ZStreamOld[-R, +E, +A] extends Serializable { self =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this around, do we?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope! Will remove it.

jdegoes
jdegoes previously approved these changes May 28, 2019
Copy link
Member

@jdegoes jdegoes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything is fitting together nicely! I can also see how we can do things like:

acquire ++ use ++ release

in an easy & safe fashion now (something I've needed to do for a while).

Appreciate the work towards alphabetizing the methods (it's almost there).

Let me know if you want me to take a close look at anything in particular, otherwise I'd say this is a clear win and completes the story of resource management for left-fold-style streams! 🎉

@iravid
Copy link
Member Author

iravid commented May 28, 2019

Thanks @jdegoes! Yes, acquire ++ use ++ release "just works" now, as the RHS of ++ gets folded before the finalizer of LHS runs.

We can also add Stream.ensuring, onComplete and so forth. I'll add them in subsequent PRs.

I'm pretty happy with the encoding too, the only thing I'm wondering about is the ergonomics - lots of ZManaged.fromEffect and ZManaged.unwrap are needed everywhere and this smells of transformer-ness. Is there something more foundational we can do here?

@jdegoes
Copy link
Member

jdegoes commented May 28, 2019

Thanks @jdegoes! Yes, acquire ++ use ++ release "just works" now, as the RHS of ++ gets folded before the finalizer of LHS runs.

Superb! I suppose that means with infinite ++, the resources will never be released, but then again, infinite concatenation is an anti-pattern with lazy left folds, anyway (and may leak heap, too).

@iravid
Copy link
Member Author

iravid commented May 28, 2019

Yup, that's correct

@iravid
Copy link
Member Author

iravid commented May 29, 2019

@jdegoes would love to get this in for the next RC so we can use it in zio-kafka. Will resolve the conflicts now

@jdegoes
Copy link
Member

jdegoes commented May 29, 2019

@iravid Sounds good, resolve the conflicts, point me to any tricky bits, and we'll get this in for testing in the next RC. 👍

// The reservation phase of the new `ZManaged` runs uninterruptibly;
// so to make sure the acquire phase of the original `ZManaged` runs
// interruptibly, we need to create an interruptible hole in the region.
fiber <- ZIO.interruptibleMask { restore =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdegoes would love another look at this operator for validation. I used interruptibleMask to allow the caller to run the reserve step interruptibly rather than .interruptible

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is clever and looks correct to me.

One thing to keep in mind is that this fiber will inherit the interruptibility settings of its parent, at the moment when it is forked.

Since this is done in reserve, I believe we can therefore simplify the code to just use interruptible rather than interruptibleMask, because we know what the parent interruptibility status will be (right?).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we can assume that 99% of the cases would use an uninterruptible reserve, but, that's not always true, as reserve can also be run manually for advanced manipulations. Which is why I thought interruptibleMask is more compositional.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. Works for me. 👍

def collectWhile[B](pred: PartialFunction[A, B]): ZStream[R, E, B] = new ZStream[R, E, B] {
override def fold[R1 <: R, E1 >: E, B1 >: B, S]: Fold[R1, E1, B1, S] =
IO.succeedLazy { (s, cont, f) =>
ZManaged.unwrap {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdegoes ZManaged.unwrap { fold.map(...) } repeats itself all over the file which is unfortunate

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that. Any way to factor out that duplication?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have this as a private helper:

  private def foldUnwrap[R1 <: R, E1 >: E, A1 >: A, S] =
    ZManaged.fromEffect(self.fold[R1, E1, A1, S])

And then the combinators look like this:

  final def ++[R1 <: R, E1 >: E, A1 >: A](other: ZStream[R1, E1, A1]): ZStream[R1, E1, A1] =
    new ZStream[R1, E1, A1] {
      def fold[R2 <: R1, E2 >: E1, A2 >: A1, S]: Fold[R2, E2, A2, S] =
        ZIO.succeedLazy { (s, cont, f) =>
          self.foldUnwrap[R2, E2, A2, S].flatMap { foldLeft =>
            foldLeft(s, cont, f).flatMap { s =>
              if (!cont(s)) ZManaged.succeed(s)
              else
                ZManaged.unwrap {
                  other.fold[R2, E2, A2, S].map(foldRight => foldRight(s, cont, f))
                }
            }
          }
        }
    }

I guess that's decent enough!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another possibility is to use ZManaged directly in the Fold wrapper type. That could work too.

.toManaged_
)
.fork
supervisingDriver = ZManaged {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdegoes This step is pretty clunky; we need to prepend a step to the finalizer of driver here, which makes me feel like we're missing an operator for that on ZManaged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so. We're missing finalizer, something like:

def finalizer(f: UIO[_]): ZManaged[Any, Nothing, Unit]

Then we can do something like zmanaged.flatMap(a => ZManaged.finalizer(fin).const(a))

That would do it, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! Thank you. That's what I was looking for.

@iravid iravid force-pushed the stream-zmanaged branch 2 times, most recently from 45a12bc to 9bc5a68 Compare May 29, 2019 20:53
@iravid
Copy link
Member Author

iravid commented May 29, 2019

@jdegoes all done here. thanks for the review!

@iravid
Copy link
Member Author

iravid commented May 29, 2019

Hmm one of the new RTS tests is flaky:

[error]     x executor is hereditble
[error]  None ==> Some(...) (RTSSpec.scala:986)
[error] Actual:   None
[error] Expected: Some(scalaz.zio.internal.Executor$$anon$2@2155e0c)

@jdegoes
Copy link
Member

jdegoes commented May 30, 2019

@iravid Let's see if #914 gets the job done.

@iravid
Copy link
Member Author

iravid commented May 30, 2019

@jdegoes Looks good this time.

StreamPure.super.takeWhile(pred).fold[R, E, A1, S].flatMap(f0 => f0(s, cont, f))
}
override def fold[R, E, A1 >: A, S]: Fold[R, E, A1, S] =
StreamPure.super.takeWhile(pred).fold[R, E, A1, S]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice cleanups! ❤️

@jdegoes
Copy link
Member

jdegoes commented May 30, 2019

@iravid Looks great! 🎉

@iravid iravid merged commit 8dc72df into zio:master May 30, 2019
@iravid
Copy link
Member Author

iravid commented May 30, 2019

🎉

@iravid iravid deleted the stream-zmanaged branch May 30, 2019 10:24
@jdegoes
Copy link
Member

jdegoes commented May 30, 2019

giphy

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants