New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bracket for Task/Coeval, Task.cancelable and Task.onCancelRaiseError #561

Merged
merged 8 commits into from Jan 25, 2018

Conversation

Projects
None yet
5 participants
@alexandru
Member

alexandru commented Jan 23, 2018

Introducing:

  1. task.cancelable for activating the auto-cancelable run-loop for a certain task
  2. task.onCancelRaiseError for throwing an error in case the source task gets cancelled, in a thread-safe way
  3. task.bracket and task.bracketE for resource handling

Bracket

The bracket operation would be the equivalent of try {} finally {} in many mainstream languages.
For both Task and Coeval it comes in two flavors. One would be the simple version:

import java.io._

def readFile(file: File): Task[String] = {
  // Opening a file handle for reading text
  val acquire = Task.eval(new BufferedReader(
    new InputStreamReader(new FileInputStream(file), "utf-8"))
  )

  acquire.bracket { in =>
    // Usage part
    Task.eval {
      // Yes, ugly Java, non-FP loop;
      // side-effects are suspended though
      var line: String = null
      val buff = new StringBuilder()
      do {
        line = in.readLine()
        if (line != null) buff.append(line)
      } while (line != null)
      buff.toString()
    }
  } { in =>
    // The release part
    Task.eval(in.close())
  }
}

And bracketE which allows distinguishing between successful termination, termination in error or, in the case of Task, cancellation:

def readFile(file: File): Task[String] = {
  // Opening a file handle for reading text
  val acquire = Task.eval(new BufferedReader(
    new InputStreamReader(new FileInputStream(file), "utf-8"))
  )

  acquire.bracketE { in =>
    // Usage part
    Task.eval {
      // Yes, ugly Java, non-FP loop;
      // side-effects are suspended though
      var line: String = null
      val buff = new StringBuilder()
      do {
        line = in.readLine()
        if (line != null) buff.append(line)
      } while (line != null)
      buff.toString()
    }
  } { (in, result) =>
    // The release part
    Task.eval {
      result match {
        case Right(_) => println("Successful!")
        case Left(Some(e)) => println("Raised error: " + e)
        case Left(None) => println("Canceled!")
      }
      in.close()
    }
  }
}

Also see the discussion at: typelevel/cats-effect#113

task.onCancelRaiseError

In Monix normally tasks are non-terminating on cancellation. This method transforms a task into one that terminates with error on completion:

import java.util.concurrent.CancellationException

val tenSecs = Task.sleep(10.seconds)
  .onCancelRaiseError(new CancellationException)

val task = tenSecs.fork.flatMap { fa =>
  // Triggering pure cancellation, then trying to get its result
  fa.cancel.flatMap(_ => fa)
}

task.cancelable

This is the opposite of uncancelable, activating "auto-cancelable flatMap chains" for the given task on evaluation. Consider this loop that calculates the n-th Fibonacci number:

def fib(n: Int): Task[Long] = {
  def loop(n: Int, a: Long, b: Long): Task[Long] =
    Task.suspend {
      if (n > 0)
        loop(n - 1, b, a + b)
      else
        Task.now(a)
    }

  loop(n, 0, 1).cancelable
}

By specifying cancelable on the loop, we are ensuring that the internal cancellation flag is checked on asynchronous boundaries, automatically interrupting the evaluation if needed.

@alexandru alexandru added this to the 3.0.0 milestone Jan 23, 2018

* as input the resource that needs that needs release, along with
* the result of `use` (cancellation, error or successful result)
*/
final def bracketE[B](use: A => Task[B])(release: (A, Either[Option[Throwable], B]) => Task[Unit]): Task[B] =

This comment has been minimized.

@LukaJCB

LukaJCB Jan 23, 2018

Just to be 100% clear, Either[Option[Throwable], B] here means, that
Left(None) is cancelled,
Left(Some(ex)) is an error, and
Right(b) is success, correct? :)

This comment has been minimized.

@alexandru

alexandru Jan 23, 2018

Member

Yes. I'm also waiting on that ADT defined in cats-effect, for whenever it will happen, to introduce another overload.

This comment has been minimized.

@LukaJCB
*
* For those cases you might want to do synchronization (e.g. usage of
* locks and semaphores) and you might want to use [[bracketE]], the
* version that allows you to differentiate between nuormal termination

This comment has been minimized.

@oleg-py

oleg-py Jan 23, 2018

Collaborator

Typo: nuormal 😛

This comment has been minimized.

@alexandru
* if (n > 0)
* loop(n - 1, b, a + b)
* else
* a

This comment has been minimized.

@oleg-py

oleg-py Jan 23, 2018

Collaborator

Shouldn't it be Task.now(a)?

This comment has been minimized.

@alexandru
* {{{
* Task(println("Hello ..."))
* .cancelable
* .flatMap(_ => println("World!"))

This comment has been minimized.

@oleg-py

oleg-py Jan 23, 2018

Collaborator

I think println needs to be wrapped in Task

This comment has been minimized.

@alexandru
* {{{
* import java.util.concurrent.CancellationException
*
* val tenSecs = Task.sleep(10.seconds).onCancelRaiseError

This comment has been minimized.

@oleg-py

oleg-py Jan 23, 2018

Collaborator

Forgotten parameter for onCancelRaiseError

This comment has been minimized.

@alexandru
acquire.flatMap { a =>
val next = try use(a) catch { case NonFatal(e) => Task.raiseError(e) }
next.onCancelRaiseError(isCancel).transformWith[B](
b => release(a, Right(b)).map(_ => b),

This comment has been minimized.

@oleg-py

oleg-py Jan 23, 2018

Collaborator

So as I understand that correctly, if release throws or return a failed task, it will shadow the exception from use. Maybe it should use reportFailure from current scheduler instead?

alexandru added some commits Jan 23, 2018

@oleg-py

This comment has been minimized.

Collaborator

oleg-py commented Jan 24, 2018

My last comment got hidden, but it kinda was important: what happens if both use and release result in a failure?

@alexandru alexandru changed the title from WIP: Task.cancelable, Task.onCancelRaiseError and Task.bracket to Bracket for Task/Coeval, Task.cancelable and Task.onCancelRaiseError Jan 24, 2018

@codecov

This comment has been minimized.

codecov bot commented Jan 24, 2018

Codecov Report

Merging #561 into master will increase coverage by 0.06%.
The diff coverage is 98.33%.

@@            Coverage Diff            @@
##           master    #561      +/-   ##
=========================================
+ Coverage   90.53%   90.6%   +0.06%     
=========================================
  Files         367     369       +2     
  Lines        9742    9782      +40     
  Branches     1838    1841       +3     
=========================================
+ Hits         8820    8863      +43     
+ Misses        922     919       -3
@alexandru

This comment has been minimized.

Member

alexandru commented Jan 25, 2018

@oleg-py that's a very good point.

I added a detailed reply here: typelevel/cats-effect#113 (comment)

Currently the behavior, if both use and release throw, is this:

  1. the error sent downstream is the one in release
  2. the other error is completely silent

I'll modify the behavior to be consistent with the one in Haskell, that I just tested:

  1. the error sent downstream should be the one in use
  2. the error in release should be reported using Scheduler.reportError
@alexandru

This comment has been minimized.

Member

alexandru commented Jan 25, 2018

@oleg-py I have implemented the changes.

@alexandru alexandru referenced this pull request Jan 25, 2018

Merged

Add Bracket type class #113

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 25, 2018

@oleg-py given your involvement in bracket, I'd really like your approval for merging this PR.

So what say you? Does it look good after the latest changes?

UPDATE: corrected the username

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 25, 2018

Oh, above I misspelled the name, sorry @olafurpg, I wanted to mention @oleg-py, both of you have usernames starting with "ol" 😜

@oleg-py

This comment has been minimized.

Collaborator

oleg-py commented Jan 25, 2018

@alexandru looks great 👍

@alexandru alexandru merged commit 827d2cc into monix:master Jan 25, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
@ngbinh

This comment has been minimized.

Contributor

ngbinh commented Jan 25, 2018

wow! This is awesome!

@olafurpg

This comment has been minimized.

Contributor

olafurpg commented Jan 26, 2018

bracket looks amazing, looking forward to give it a try!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment