Skip to content

Commit

Permalink
Make Task auto-cancelable (#724)
Browse files Browse the repository at this point in the history
* Make Task auto-cancelable, fix tests, make memoize uninterruptible

* Make Fiber.join yielded by Task.start uncancelable, cleanups

* Fix monix-reactive

* Fix Mima report
  • Loading branch information
alexandru committed Sep 21, 2018
1 parent 48159b4 commit 88f6fa8
Show file tree
Hide file tree
Showing 31 changed files with 194 additions and 363 deletions.
5 changes: 5 additions & 0 deletions build.sbt
Expand Up @@ -361,6 +361,11 @@ def mimaSettings(projectName: String) = Seq(
// Breakage - PR #700: renamed methods
exclude[DirectMissingMethodProblem]("monix.reactive.Observable.delaySubscriptionWith"),
exclude[DirectMissingMethodProblem]("monix.reactive.Observable.delaySubscription"),
// Breakage — PR 724: https://github.com/monix/monix/pull/724
exclude[MissingClassProblem]("monix.eval.Fiber$Impl"),
exclude[DirectMissingMethodProblem]("monix.eval.Fiber.apply"),
exclude[DirectMissingMethodProblem]("monix.eval.internal.TaskFromFuture.lightBuild"),
exclude[DirectMissingMethodProblem]("monix.eval.internal.TaskCancellation.signal"),
// Internals ...
exclude[DirectMissingMethodProblem]("monix.eval.Task#MaterializeTask.recover"),
exclude[DirectMissingMethodProblem]("monix.eval.Coeval#MaterializeCoeval.recover"),
Expand Down
19 changes: 7 additions & 12 deletions monix-eval/shared/src/main/scala/monix/eval/Fiber.scala
Expand Up @@ -17,7 +17,7 @@

package monix.eval

import monix.eval.internal.TaskCancellation
import cats.effect.CancelToken

/** `Fiber` represents the (pure) result of a [[Task]] being started concurrently
* and that can be either joined or cancelled.
Expand Down Expand Up @@ -63,7 +63,7 @@ trait Fiber[A] extends cats.effect.Fiber[Task, A] {
* of the underlying fiber is already complete, then there's nothing
* to cancel.
*/
def cancel: Task[Unit]
def cancel: CancelToken[Task]

/** Returns a new task that will await for the completion of the
* underlying fiber, (asynchronously) blocking the current run-loop
Expand All @@ -74,16 +74,11 @@ trait Fiber[A] extends cats.effect.Fiber[Task, A] {

object Fiber {
/**
* Wraps a [[Task]] value in a `Fiber` interface.
*
* This is usually done when the given `Task` reference is linked
* to some mutable variable and thus something that's worth cancelling.
* Builds a [[Fiber]] value out of a `task` and its cancelation token.
*/
def apply[A](task: Task[A]): Fiber[A] =
new Impl(task)
def apply[A](task: Task[A], cancel: CancelToken[Task]): Fiber[A] =
new Tuple(task, cancel)

private final class Impl[A](val join: Task[A]) extends Fiber[A] {
def cancel: Task[Unit] =
TaskCancellation.signal(join)
}
private final case class Tuple[A](join: Task[A], cancel: CancelToken[Task])
extends Fiber[A]
}
68 changes: 4 additions & 64 deletions monix-eval/shared/src/main/scala/monix/eval/Task.scala
Expand Up @@ -1289,67 +1289,6 @@ sealed abstract class Task[+A] extends TaskBinCompat[A] with Serializable {
final def executeWithOptions(f: Options => Options): Task[A] =
TaskExecuteWithOptions(this, f)

/** Returns a new task that is cancelable.
*
* Normally Monix Tasks have these characteristics:
*
* - `flatMap` chains are not cancelable by default
* - when creating [[Task.cancelable0[A](register* async tasks]]
* the user has to specify explicit cancellation logic
*
* This operation returns a task that has [[Task.Options.autoCancelableRunLoops]]
* enabled upon evaluation, thus being equivalent with:
* {{{
* Task(1 + 1).executeWithOptions(_.enableAutoCancelableRunLoops)
* }}}
*
* What this does is two-fold:
*
* - `flatMap` chains become cancelable on async boundaries, which works in
* combination with [[monix.execution.ExecutionModel.BatchedExecution BatchedExecution]]
* that's enabled by default (injected by [[monix.execution.Scheduler Scheduler]],
* but can also be changed with [[executeWithModel]])
* - even if the source task cannot be cancelled, upon completion the result
* is not allowed to be streamed and the continuation is not allowed to execute
*
* For example this is a function that calculates the n-th Fibonacci element:
* {{{
* def fib(n: Int): Task[Long] = {
* def loop(n: Int, a: Long, b: Long): Task[Long] =
* Task.suspend {
* if (n > 0)
* loop(n - 1, b, a + b)
* else
* Task.now(a)
* }
*
* loop(n, 0, 1).autoCancelable
* }
* }}}
*
* Normally this isn't cancelable and it might take a long time, but
* by calling `autoCancelable` on the result, we ensure that when cancellation
* is observed, at async boundaries, the loop will stop with the task
* becoming a non-terminating one.
*
* This operation represents the opposite of [[uncancelable]]. And note
* that it works even for tasks that are uncancelable / atomic, because
* it blocks the rest of the `flatMap` loop from executing, functioning
* like a sort of cancellation boundary:
*
* {{{
* Task.evalAsync(println("Hello ..."))
* .autoCancelable
* .flatMap(_ => Task.eval(println("World!")))
* }}}
*
* Normally [[Task.apply]] does not yield a cancelable task, but by applying
* the `autoCancelable` transformation to it, the `println` will execute,
* but not the subsequent `flatMap` operation.
*/
def autoCancelable: Task[A] =
TaskCancellation.autoCancelable(this)

/** Returns a failed projection of this task.
*
* The failed projection is a `Task` holding a value of type `Throwable`,
Expand Down Expand Up @@ -2743,8 +2682,9 @@ object Task extends TaskInstancesLevel1 {
* }
* }}}
*
* Note that an alternative to doing this would be usage of
* [[Task.autoCancelable]], which does the same thing, but automatically.
* NOTE: that by default `Task` is configured to be auto-cancelable
* (see [[Task.Options]]), so this isn't strictly needed, unless you
* want to fine tune the cancelation boundaries.
*/
val cancelBoundary: Task[Unit] =
Task.Async { (ctx, cb) => if (!ctx.connection.isCanceled) cb.onSuccess(()) }
Expand Down Expand Up @@ -2792,7 +2732,7 @@ object Task extends TaskInstancesLevel1 {
* }
* }}}
*
* Passed function can also return IO[Unit]` as a task that
* Passed function can also return `IO[Unit]` as a task that
* describes a cancelation action:
*
* {{{
Expand Down
Expand Up @@ -127,13 +127,16 @@ private[eval] abstract class TaskBinCompat[+A] { self: Task[A] =>
}

/**
* DEPRECATED - renamed to [[autoCancelable]], in order to differentiate
* it from the `Task.cancelable` builder.
* DEPRECATED - since Monix 3.0 the `Task` implementation has switched
* to auto-cancelable run-loops by default (which can still be turned off
* in its configuration).
*
* For ensuring the old behavior, you can use [[executeWithOptions]].
*/
@deprecated("Renamed to autoCancelable", "3.0.0")
@deprecated("Switch to executeWithOptions(_.enableAutoCancelableRunLoops)", "3.0.0")
def cancelable: Task[A] = {
// $COVERAGE-OFF$
autoCancelable
executeWithOptions(_.enableAutoCancelableRunLoops)
// $COVERAGE-ON$
}

Expand Down
Expand Up @@ -25,33 +25,6 @@ import monix.execution.schedulers.TrampolinedRunnable
import monix.execution.{Cancelable, Scheduler}

private[eval] object TaskCancellation {
/**
* Implementation for `Task.cancel`.
*/
def signal[A](fa: Task[A]): Task[Unit] = {
val start = (ctx: Context, cb: Callback[Unit]) => {
implicit val sc = ctx.scheduler
// Continues the execution of `fa` using an already cancelled
// cancelable, which will ensure that all future registrations
// will be cancelled immediately and that `isCanceled == false`
val ctx2 = ctx.withConnection(StackedCancelable.alreadyCanceled)
// Starting task
Task.unsafeStartNow(fa, ctx2, Callback.empty)
// Signaling that cancellation has been triggered; given
// the synchronous execution of `fa`, what this means is that
// cancellation succeeded or an asynchronous boundary has
// been hit in `fa`
cb.onSuccess(())
}
Async(start, trampolineBefore = true, trampolineAfter = false)
}

/**
* Implementation for `Task#cancelable`.
*/
def autoCancelable[A](fa: Task[A]): Task[A] =
Task.ContextSwitch(fa, enableAutoCancelableRunLoops, disableAutoCancelableRunLoops)

/**
* Implementation for `Task.uncancelable`.
*/
Expand Down Expand Up @@ -134,18 +107,15 @@ private[eval] object TaskCancellation {
}
}

private[this] val enableAutoCancelableRunLoops: Context => Context =
ctx => {
if (ctx.options.autoCancelableRunLoops) ctx
else ctx.withOptions(ctx.options.enableAutoCancelableRunLoops)
}

private[this] val disableAutoCancelableRunLoops: (Any, Throwable, Context, Context) => Context =
(_, _, old, current) => current.withOptions(old.options)

private[this] val withConnectionUncancelable: Context => Context =
ct => ct.withConnection(StackedCancelable.uncancelable)
ct => {
ct.withConnection(StackedCancelable.uncancelable)
.withOptions(ct.options.disableAutoCancelableRunLoops)
}

private[this] val restoreConnection: (Any, Throwable, Context, Context) => Context =
(_, _, old, current) => current.withConnection(old.connection)
(_, _, old, ct) => {
ct.withConnection(old.connection)
.withOptions(old.options)
}
}
Expand Up @@ -17,35 +17,23 @@

package monix.eval.internal

import monix.eval.{Callback, Task}
import monix.eval.Task.{Async, Context, Options}
import scala.util.control.NonFatal
import monix.eval.Task
import monix.eval.Task.{Context, ContextSwitch, Options}

private[eval] object TaskExecuteWithOptions {
/**
* Implementation for `Task.executeWithOptions`
*/
def apply[A](self: Task[A], f: Options => Options): Task[A] = {
val start = (context: Context, cb: Callback[A]) => {
implicit val s = context.scheduler
var streamErrors = true
try {
val context2 = context.withOptions(f(context.options))
streamErrors = false
Task.unsafeStartNow[A](self, context2, cb)
} catch {
case ex if NonFatal(ex) =>
if (streamErrors) cb.onError(ex) else {
// $COVERAGE-OFF$
context.scheduler.reportFailure(ex)
// $COVERAGE-ON$
}
}
def apply[A](self: Task[A], f: Options => Options): Task[A] =
ContextSwitch(self, enable(f), disable)

private[this] def enable(f: Options => Options): Context => Context =
ctx => {
val opts2 = f(ctx.options)
if (opts2 != ctx.options) ctx.withOptions(opts2)
else ctx
}
Async(
start,
trampolineBefore = false,
trampolineAfter = true,
restoreLocals = false)
}

private[this] val disable: (Any, Throwable, Context, Context) => Context =
(_, _, old, current) => current.withOptions(old.options)
}
Expand Up @@ -59,6 +59,7 @@ private[eval] object TaskFromFuture {
// Already completed future, streaming value immediately,
// but with light async boundary to prevent stack overflows
callback(value)

case None =>
future match {
case cf: CancelableFuture[A] @unchecked =>
Expand All @@ -74,18 +75,6 @@ private[eval] object TaskFromFuture {
}
}

/** Internal implementation used in `Task.start`. */
def lightBuild[A](f: Future[A], c: Cancelable): Task[A] = {
// The task could have been a strict or easily computed value
// in which case we're already there
f.value match {
case None =>
rawAsync(startCancelable(_, _, f, c))
case Some(value) =>
Task.fromTry(value)
}
}

private def rawAsync[A](start: (Context, Callback[A]) => Unit): Task[A] =
Task.Async(
start,
Expand Down
Expand Up @@ -75,7 +75,7 @@ private[eval] object TaskMemoize {
// or only successful results?
if (self.cacheErrors || value.isSuccess) {
state.getAndSet(value) match {
case (p: Promise[A] @unchecked, _) =>
case p: Promise[A] @unchecked =>
if (!p.tryComplete(value)) {
// $COVERAGE-OFF$
if (value.isFailure)
Expand All @@ -90,10 +90,10 @@ private[eval] object TaskMemoize {
} else {
// Error happened and we are not caching errors!
state.get match {
case current @ (p: Promise[A] @unchecked, _) =>
case p: Promise[A] @unchecked =>
// Resetting the state to `null` will trigger the
// execution again on next `runAsync`
if (state.compareAndSet(current, null)) {
if (state.compareAndSet(p, null)) {
p.tryComplete(value)
} else {
// Race condition, retry
Expand Down Expand Up @@ -122,17 +122,17 @@ private[eval] object TaskMemoize {
* that will receive the result once the task is complete.
*/
private def registerListener(
ref: (Promise[A], StackedCancelable),
p: Promise[A],
context: Context,
cb: Callback[A])
(implicit ec: ExecutionContext): Unit = {

val (p, c) = ref
context.connection.push(c)
p.future.onComplete { r =>
context.connection.pop()
context.frameRef.reset()
startFull(Task.fromTry(r), context, cb, null, null, null, 1)
// Listener is cancelable: we simply ensure that the result isn't streamed
if (!context.connection.isCanceled) {
context.frameRef.reset()
startFull(Task.fromTry(r), context, cb, null, null, null, 1)
}
}
}

Expand All @@ -143,19 +143,26 @@ private[eval] object TaskMemoize {
implicit val sc: Scheduler = context.scheduler
self.state.get match {
case null =>
val update = (Promise[A](), context.connection)
val update = Promise[A]()

if (!self.state.compareAndSet(null, update)) {
// $COVERAGE-OFF$
start(context, cb) // retry
// $COVERAGE-ON$
} else {
// Registering listener callback for when listener is ready
self.registerListener(update, context, cb)
// With light async boundary to prevent stack-overflows!
Task.unsafeStartTrampolined(self.thunk, context, self.complete)

// Running main task in `uncancelable` model
val ctx2 = context
.withOptions(context.options.disableAutoCancelableRunLoops)
.withConnection(StackedCancelable.uncancelable)

// Start with light async boundary to prevent stack-overflows!
Task.unsafeStartTrampolined(self.thunk, ctx2, self.complete)
}

case ref: (Promise[A], StackedCancelable) @unchecked =>
case ref: Promise[A] @unchecked =>
self.registerListener(ref, context, cb)

case ref: Try[A] @unchecked =>
Expand Down
Expand Up @@ -60,7 +60,7 @@ private[eval] object TaskRacePair {
Task.unsafeStartEnsureAsync(fa, contextA, new Callback[A] {
def onSuccess(valueA: A): Unit =
if (isActive.getAndSet(false)) {
val fiberB = Fiber(TaskFromFuture.lightBuild(pb.future, connB))
val fiberB = Fiber(TaskFromFuture.strict(pb.future), Task(connB.cancel()))
conn.pop()
cb.onSuccess(Left((valueA, fiberB)))
} else {
Expand All @@ -81,7 +81,7 @@ private[eval] object TaskRacePair {
Task.unsafeStartEnsureAsync(fb, contextB, new Callback[B] {
def onSuccess(valueB: B): Unit =
if (isActive.getAndSet(false)) {
val fiberA = Fiber(TaskFromFuture.lightBuild(pa.future, connA))
val fiberA = Fiber(TaskFromFuture.strict(pa.future), Task(connA.cancel()))
conn.pop()
cb.onSuccess(Right((fiberA, valueB)))
} else {
Expand Down

0 comments on commit 88f6fa8

Please sign in to comment.