-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Convert more ZStream combinators to use ZStream#process #1401
Conversation
foldInner(s, cont, f) | ||
foldDefault | ||
|
||
override def process: ZManaged[R1, E1, ZStream.InputStream[R1, E1, B]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jdegoes could you review this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About as terrible as I suspected flatMap
would be... 😆 not all of course will increase, on average I expect we'll come out with much less code.
if (_) InputStream.end | ||
else done.set(true) *> InputStream.emit(a) | ||
} | ||
doneRef <- Ref.make(false).toManaged_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jdegoes and this
ref <- Ref.make[Option[(InputStream[R1, E1, B], Exit[_, _] => ZIO[R1, Nothing, Any])]](None).toManaged_ | ||
as <- self.process | ||
_ <- ZManaged.finalizerExit(e => ref.get.flatMap(_.map(_._2).getOrElse((_: Exit[_, _]) => UIO.unit).apply(e))) | ||
pullOuter = ZIO.uninterruptibleMask { restore => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need to put the pull of a
inside uninterruptible
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pull of a
is restored, so it it'll run interruptibly. The rest has some regions that have to run uninterruptibly (e.g. storing the finalizer)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, sounds good.
case Some((isB, finalizer)) => | ||
isB.catchAll { | ||
case e @ Some(e1) => (finalizer(Exit.fail(e1)) *> ref.set(None)).uninterruptible *> ZIO.fail(e) | ||
case None => (finalizer(Exit.succeed(())) *> ref.set(None)).uninterruptible *> pullOuter *> go |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensuring the finalizer
is not interrupted is one thing, we also really need to ensure the finalizer
will be run in all cases, i.e. that go
will not be interrupted right before it even begins execution of the finalizer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ouch, good point. Will use uninterruptibleMask
here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jdegoes Oh, I actually handled this. If go
gets interrupted before running the finalizer, the ZManaged
will execute the stored finalizer (line 697).
doneRef <- Ref.make(false).toManaged_ | ||
finalizerRef <- Ref.make[Exit[_, _] => ZIO[R, Nothing, Any]](_ => UIO.unit).toManaged_ | ||
_ <- ZManaged.finalizerExit(e => finalizerRef.get.flatMap(_.apply(e))) | ||
pull = ZIO.uninterruptibleMask { restore => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we had a helper:
object InputStream {
def singleton(a: A): UIO[InputStream[Any, Nothing, A]] =
Ref.make[Option[A]](Some(a)).map(ref =>
ref.modify {
case None => ZIO.fail(()) -> None
case Some(a) => ZIO.succeed(a) -> None
}.flatten)
Then it might make this one easier to verify.
Thinking what you want to do is map the A
inside the managed into the input stream. The current version seems quite aggressive about uninterruptibility...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That helper looks useful!
The logic for this constructor is almost what you wrote; I want the managed to allocate the A
only when the InputStream is pulled. Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does, but I can't convince myself there's a difference because of the contract on the original ZManaged[..., A]
that is being lifted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think (not sure yet!) it matters when composing streams together. You're lifting a managed resource to a single-element stream and then flatMap it to create a multiple-element stream. So it should only allocate when you pull the composed stream; not when you enter the scope of the composed stream.
Linking to the original discussion. #1388 |
We should rebase and run the full test suite. Some possible regressions discussed here: #1356 |
de45aae
to
06c91ed
Compare
06c91ed
to
2e05811
Compare
I feel like the scope of this PR is getting a bit broad. I'm taking a look. |
Agreed about the scope @vasilmkd - will get this in today :-) |
Not sure this change is responsible but |
Most likely. Will look into that now.
…On 16 Aug 2019, 8:07 +0300, Pierre Ricadat ***@***.***>, wrote:
Not sure this change is responsible but fast producer progress independently seems flaky now:
https://circleci.com/gh/zio/zio/16869
https://circleci.com/gh/zio/zio/16868
—
You are receiving this because you modified the open/close state.
Reply to this email directly, view it on GitHub, or mute the thread.
|
@ghostdogpr should be fixed in #1409 |
@iravid masted build after that merge had no failures 👏 |
The
buffer
test changed but that's benign because there's now an additional buffer introduced.@vasilmkd