New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Iterant#bracket operation #554

Merged
merged 16 commits into from Feb 2, 2018

Conversation

Projects
None yet
2 participants
@oleg-py
Collaborator

oleg-py commented Jan 21, 2018

Closes #479

This is just the first pass for now. I'll look into implementing bracket as its own operator instead of combining bunch of existing ones, and there are few other things to consider too.

@@ -507,6 +507,10 @@ sealed abstract class Iterant[F[_], A] extends Product with Serializable {
final def doOnFinish(f: Option[Throwable] => F[Unit])(implicit F: Sync[F]): Iterant[F, A] =
IterantStop.doOnFinish(this, f)(F)
// @TODO determine the destiny of this method
private[tail] final def doOnFullConsumption(f: Option[Throwable] => F[Unit])(implicit F: Sync[F]): Iterant[F, A] =
IterantStop.doOnFullConsumption(this, f)(F)

This comment has been minimized.

@oleg-py

oleg-py Jan 21, 2018

Collaborator

I added that operation because currently there's no way to handle early stop and full consumption separately (well, without closing over vars and checking them, or something like that)

This comment has been minimized.

@alexandru

alexandru Jan 22, 2018

Member

Actually this highlights that doOnFinish is not correctly defined.

If we are going to use the semantics of Task.doOnFinish, since after all that's where I got the name from, then doOnFinish shouldn't handle early stop.

case object Completed extends BracketResult
case object EarlyStop extends BracketResult
case class Error(e: Throwable) extends BracketResult

This comment has been minimized.

@oleg-py

oleg-py Jan 21, 2018

Collaborator

I was going to do something similar with typelevel/cats-effect#113. Tho here for BracketResult:

  • The error type parameter is not needed, since Iterant is bound to Throwable
  • Signaling the result in Completed probably does not make sense:
    • It's not possible to always signal a B from use because it might return an empty Iterant
    • It's possible to signal a Seq[B] (which would potentially require unlimited buffering, I'd rather not have an operator do that implicitly) or an Option[B] with the first element (for which I could not find any compelling use case)

This comment has been minimized.

@alexandru

alexandru Jan 22, 2018

Member

Indeed, we don't have a result for Completed. This type should be top-level in monix.tail.

But make BracketResult a sealed abstract class and the case items members of object BracketResult. Oh and that case class needs to be final. We think of them as final, but the Scala compiler doesn't do it for legacy reasons.

Iterant.liftF(acquire).flatMap { a =>
Iterant.suspend(F.map(F.attempt(F.delay(use(a))))(_.fold(
ex => Iterant.raiseError[F, B](ex),
iterant => iterant

This comment has been minimized.

@oleg-py

oleg-py Jan 21, 2018

Collaborator

This weird machinery is required for lifting errors use might throw directly into Iterant. As a result, it will be reported as Halt(Some(ex)) inside Iterant[F, ?] context . Alternatively, release could be called directly after use fails, and the error would be reported inside F itself. I went for the former.

See the test

Bracket releases resource if use throws

case object Completed extends BracketResult
case object EarlyStop extends BracketResult
case class Error(e: Throwable) extends BracketResult

This comment has been minimized.

@alexandru

alexandru Jan 22, 2018

Member

Indeed, we don't have a result for Completed. This type should be top-level in monix.tail.

But make BracketResult a sealed abstract class and the case items members of object BracketResult. Oh and that case class needs to be final. We think of them as final, but the Scala compiler doesn't do it for legacy reasons.

.doOnFullConsumption {
case None => F.suspend(release(a, Completed))
case Some(ex) => F.suspend(release(a, Error(ex)))
}

This comment has been minimized.

@alexandru

alexandru Jan 22, 2018

Member

I think you're trying too much to describe bracket with available operations.

bracket is kind of a primitive operation. I would do an ugly tail-recursive loop for it because you'd then have full control over error handling and so on.

This comment has been minimized.

@oleg-py

oleg-py Jan 22, 2018

Collaborator

Yeah, I kinda said that 😄 Just wanted to get tests right on the first pass

@@ -507,6 +507,10 @@ sealed abstract class Iterant[F[_], A] extends Product with Serializable {
final def doOnFinish(f: Option[Throwable] => F[Unit])(implicit F: Sync[F]): Iterant[F, A] =
IterantStop.doOnFinish(this, f)(F)
// @TODO determine the destiny of this method
private[tail] final def doOnFullConsumption(f: Option[Throwable] => F[Unit])(implicit F: Sync[F]): Iterant[F, A] =
IterantStop.doOnFullConsumption(this, f)(F)

This comment has been minimized.

@alexandru

alexandru Jan 22, 2018

Member

Actually this highlights that doOnFinish is not correctly defined.

If we are going to use the semantics of Task.doOnFinish, since after all that's where I got the name from, then doOnFinish shouldn't handle early stop.

@codecov

This comment has been minimized.

codecov bot commented Jan 22, 2018

Codecov Report

Merging #554 into master will increase coverage by 0.03%.
The diff coverage is 91.66%.

@@            Coverage Diff             @@
##           master     #554      +/-   ##
==========================================
+ Coverage   90.65%   90.68%   +0.03%     
==========================================
  Files         370      371       +1     
  Lines        9844     9868      +24     
  Branches     1851     1846       -5     
==========================================
+ Hits         8924     8949      +25     
+ Misses        920      919       -1
@oleg-py

This comment has been minimized.

Collaborator

oleg-py commented Jan 22, 2018

Everything should be better now 😅 . Tho doc command seems to not like my example, failing the build:

/home/travis/build/monix/monix/monix-tail/shared/src/main/scala/monix/tail/Iterant.scala:1906:58: Variable i undefined in comment for method bracket in object Iterant
 *         .mapEval(i => IO { writer.println(s"Line #$i") }),
                                                      ^

EDIT: nvm, figured that out

@oleg-py oleg-py changed the title from WIP: Add Iterant#bracket operation to Add Iterant#bracket operation Jan 22, 2018

def loop(a: A)(source: Iterant[F, B]): Iterant[F, B] =
source match {
case Next(item, rest, stop) =>
Next(item, rest.map(loop(a)), earlyRelease(a) *> stop)

This comment has been minimized.

@alexandru

alexandru Jan 22, 2018

Member

N.B. a design decision in Iterant is that we don't care about F[_] itself throwing exceptions on evaluation for these operations, because checking F[_] for errors at each step is insane — and we can probably do it somewhere else.

I wonder how FS2 handles it though and I'm mentioning FS2 because they represent the prior art. If their F[_] throws, does it get handled in bracket?

This comment has been minimized.

@oleg-py

oleg-py Jan 22, 2018

Collaborator

@alexandru you mean, if methods of Sync[F] instance throw? That is not expected, as far as I can tell by fs2 sources & my own experiments. If F has no guarantees, neither does bracket (tho it works sometimes)

This comment has been minimized.

@oleg-py

oleg-py Jan 22, 2018

Collaborator

Ah, I think I get it now, after encountering a bug with my implementation 😞.

Seems the error check is unavoidable at least in Suspend case if we want guarantees to hold, which I assume we do.
I'll see tomorrow if earlyStop requires similar treatment

This comment has been minimized.

@alexandru

alexandru Jan 23, 2018

Member

So the thing is what happens if F[_] evaluates to F.raiseError(e)?

I think we need a global policy for dealing with error.
But we can't be defensive about it everywhere, such an implementation isn't feasible.

I also worry that the type classes in Cats do not allow for usage of our Task.transformWith, which is much better than succumbing to attempt.

oleg-py added some commits Jan 22, 2018

@oleg-py oleg-py changed the title from Add Iterant#bracket operation to WIP: Add Iterant#bracket operation Jan 22, 2018

@oleg-py

This comment has been minimized.

Collaborator

oleg-py commented Jan 24, 2018

I went ahead and made everything more defensive in bracket, but it seems it requires #562 / #563 to handle broken batches or cursors 🤕 . Tho more tests won't hurt and error handling can be removed later if not really needed.

I also worry that the type classes in Cats do not allow for usage of our Task.transformWith, which is much better than succumbing to attempt.

If it's so much better, and would not benefit only Monix, maybe it's worth proposing to add it to cats.MonadError with a default implementation based on attempt, so it can be overriden in Task and Coeval instances?

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 25, 2018

Yes, I think a proposal for a transformWith on MonadError would be worth it, but we have to take care of #562 and #563 in order to prove the need for it.

object BracketResult {
case object Completed extends BracketResult
case object EarlyStop extends BracketResult
final case class Error(e: Throwable) extends BracketResult

This comment has been minimized.

@alexandru

alexandru Jan 29, 2018

Member

I am very conflicted about this ADT because the same type might happen in that Bracket PR.

Are you OK with waiting to see what becomes of that PR?

This comment has been minimized.

@alexandru

alexandru Jan 29, 2018

Member

I am referring to cats-effect of course.

This comment has been minimized.

@oleg-py

oleg-py Jan 29, 2018

Collaborator

Yeah, no problem in waiting.

I could add a weaker version of bracket that does not provide an ADT, and make the ADT version private for the time being (it should probably be renamed to be consistent with Task/Coeval).

This comment has been minimized.

@alexandru

alexandru Jan 30, 2018

Member

@oleg-py a weaker version, along with the private full version / ADT sounds good.

@oleg-py

This comment has been minimized.

Collaborator

oleg-py commented Jan 29, 2018

Per my testing, handling errors in left folds removes the need for doing it in bracket. Although I found few folds where it was not added yet (#577) - these still work with current version, but break if bracket does not handle errors.

*/
def bracket[F[_], A, B](acquire: F[A])(
use: A => Iterant[F, B],
release: (A, BracketResult) => F[Unit]

This comment has been minimized.

@alexandru

alexandru Jan 30, 2018

Member

Btw, do you think it would make sense for use and release to be curried?
Maybe it's too much, but for Task it works with curly brackets, e.g..

task.bracket { in =>
  // use part
  ???
} { in =>
  // release part
  ???
}

Sort of resembles a try/finally statement to me and right now in Jan 2018 doesn't look ugly, but maybe I'll change my opinion in a month or so 😀

What do you think?

This comment has been minimized.

@oleg-py

oleg-py Jan 30, 2018

Collaborator

I copied over the currying on acquire from fs2 because without it inference does not work.

Semantics of this Iterant.bracket are different from try/finally and bracket discussed at cats-effect, as here you can continue flatMapping safely. Actually, because of this property, my use blocks in fs2 bracket are often one-liners:

fs2.Stream.bracket(delay { file.open() })(
  file => fs2.Stream(file),
  file => delay { file.close() }
)

I think it's nicer this way than

fs2.Stream.bracket(delay { file.open() }) { file =>
  fs2.Stream(file)
} { file =>
  delay { file.close() }
}

although I would not mind having it curried just for the sake of consistency, as Task has it this way and it seems to be where cats-effect is going too.

This comment has been minimized.

@alexandru

alexandru Jan 30, 2018

Member

I think you're right that the semantics are different and that the use function gets to be very light.

So leave it as it is.

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 31, 2018

@oleg-py when you think it's ready for merge / review, pull it out of WIP state.

@oleg-py

This comment has been minimized.

Collaborator

oleg-py commented Jan 31, 2018

@alexandru I'm pretty much done (or at least lacking more ideas for improvements / tests). #581 needs to be merged to ensure safety, as I already removed error handling on each step.

@oleg-py oleg-py changed the title from WIP: Add Iterant#bracket operation to Add Iterant#bracket operation Jan 31, 2018

@alexandru alexandru merged commit 38284c9 into monix:master Feb 2, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment