New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancelable cats.effect.IO #121

Merged
merged 13 commits into from Feb 26, 2018

Conversation

@alexandru
Member

alexandru commented Jan 27, 2018

If there's consensus, this is a proof of concept for a cancelable cats.effect.IO.

It is inspired by Monix's Task, using approximately the same cancellation mechanism, minus some features.

This doesn't necessarily have to be approved, however the implementation is solid and I'm pushing it because I've come to the conclusion that the type class hierarchy in cats-effect is effectively crippled due to IO not being cancelable.

I came to this conclusion after trying to describe this data type:

trait Timer[F[_]] {
  def async: Async[F]
  def shift: F[Unit]

  def sleep(timespan: FiniteDuration): F[Unit]
}

This is basically the pure equivalent of ScheduledExecutorService and of JavaScript's setTimeout, an operation that's fundamental for any platform capable of asynchronous processing and that we need to describe in a pure way.

Implementation notes

  1. not all IO tasks are cancelable, only tasks built with IO.cancelable can cancel the evaluation (see below)
  • should go without saying (after point 1) that flatMap chains are not auto-cancelable, and compared with Monix's Task there's no way to configure an IO task to be auto-cancelable
  • if this is a problem, flatMap loops can be made cancelable by using IO.cancelBoundary, see below
  1. IO tasks that are cancelable, become non-terminating on cancel
  • they can be turned into tasks that trigger an error with onCancelRaiseError, which can be used for implementing bracket or other operations, see below
  1. this PR does NOT break API compatibility
  2. this PR does NOT break semantics for current code, given that cancelable tasks can only be built with IO.cancelable, whereas tasks built with IO.async, IO.apply or other builders are not cancelable

Versus the "async interruption" from Haskell and Scalaz 8

On "cancel" what happens is that whatever you specify in the IO.cancelable builder gets executed. And depending on the implementation of an IO.cancelable task, it can become non-terminating. And our impure cancel() is a () => Unit.

By comparison Haskell and Scalaz sends an error, a Throwable on interruption and cancelled tasks get completed with that Throwable. Their impure cancel is a Throwable => Unit.

We on the other hand have an onCancelRaiseError(e: Throwable), which can transform a task that's non-terminating on cancel, into one that raises an error on cancel. This operation also creates a race condition - cutting off the signaling to downstream, even if the source is uncancelable.

Throwable => Unit allows the task's logic to know the cancellation reason, however I have yet to see a good use case for it — usually cancellation is about cutting the connection to the producer, closing all resources as soon as possible, because you're no longer interested in the result, due to some race condition that happened.

Throwable => Unit is also a little confusing, being too broad in scope. Users might be tricked into sending messages back to the producer via this channel, in order to steer it, to change its outcome - however cancellation is cancellation, we're doing it for the purpose of releasing resources and the implementation of race conditions will end up closing the connection, disallowing the cancelled task to send anything downstream.

Therefore it's confusing for the user and the only practical use is to release resources differently, based on the received error. But that's not a use-case I feel that's worth pursuing, given the increase in complexity.

IO.cancelable builder

IO.async remains the same and we now have a new IO.cancelable builder - and in the provided function the user is expected to return a "cancelable", which is a () => Unit function that is supposed to cancel the started asynchronous computation. For example:

def sleep(duration: FiniteDuration)(implicit sc: ScheduledExecutorService): IO[Unit] =
  IO.cancelable { cb => 
    val c = sc.schedule(() => cb(Right()), duration.delay, duration.unit)
    // Returning a function that can cancel our IO evaluation
    () => c.cancel()
  }

start + Fiber + cancel as a pure action

IO values that get started in the IO context (e.g. forked) get represented with a new interface:

trait Fiber[F[+_], +A] {

  def cancel: F[Unit]

  def join: F[A]
}

And IO now has a new start method on it exposing it:

sealed abstract class IO[+A] {

  def start: IO[Fiber[IO, A]]
}

A Fiber can be cancelled or joined, representing a background process that may run concurrently.
NOTE: we don't fork threads automatically, we just start the IO and if the IO happens to run asynchronously, then we have actual parallelism, otherwise we don't.

Example:

for {
  fiber <- IO.shift *> launchMissiles.start
  _  <- runToBunker.handleErrorWith { e =>
    // Retreating to bunker failed, abort!
    fiber.cancel *> IO.raiseError(e)
  }
  aftermath <- fiber.join
} yield {
  aftermath
}

unsafeRunCancelable

// This is the sleep 
val io: IO[Unit] = sleep(10.seconds) *> IO.eval(println("Hello!")

val cancel: () => Unit =
  io.unsafeRunCancelable(r => println(s"Done: $r"))

// ... if a race condition happens, we can cancel it ...
cancel()

uncancelable marker

Given a cancelable IO, we can turn it into an IO that cannot be cancelled:

// Our reference from above
val io: IO[Unit] = sleep(10.seconds) *> IO.eval(println("Hello!")

// This IO can't be canceled, even if we try
io.uncancelable

onCancelRaiseError

Normally IO tasks that are cancelable, like our sleep above, become non-terminating on cancellation.

Given any IO (cancelable or not), the onCancelRaiseError operator creates a race condition between the source and cancellation ... and if cancellation happens, the cancel signal gets sent to the source and the resulting IO completes with the specified error.

import java.util.concurrent.CancellationException 

// Our reference from above
val io: IO[Unit] = sleep(10.seconds) *> IO.eval(println("Hello!")

// In case of cancellation, complete in error
val io2 = io.onCancelRaiseError(new CancellationException("Boo"))

val cancel: () => Unit =
  io.unsafeRunCancelable(r => println(s"Done: $r"))

// ... if a race condition happens, we can cancel it ...
cancel()
// Prints:
// Done: Left(CancellationException: Boo)

IO.cancelBoundary

Returns a cancelable boundary — an IO task that checks for the cancellation status of the run-loop and does not allow for the bind continuation to keep executing in case cancellation happened.

This operation is very similar to IO.shift, as it can be dropped in flatMap chains in order to make loops cancelable.

def fib(n: Int, a: Long, b: Long): IO[Long] =
  IO.suspend {
    if (n <= 0) IO.pure(a) else {
      val next = fib(n - 1, b, a + b)

      // Every 100-th cycle, check cancellation status
      if (n % 100 == 0)
        IO.cancelBoundary *> next
      else
        next
    }
  }

parMap2

This PR updates the parMap2 implementation to cancel the other task if any of them fails — thus no longer waiting on both to complete in order to signal an error.

@codecov-io

This comment has been minimized.

codecov-io commented Jan 27, 2018

Codecov Report

Merging #121 into master will decrease coverage by 4.06%.
The diff coverage is 75.7%.

@@            Coverage Diff             @@
##           master     #121      +/-   ##
==========================================
- Coverage   89.77%   85.71%   -4.07%     
==========================================
  Files          23       31       +8     
  Lines         489      630     +141     
  Branches       36       59      +23     
==========================================
+ Hits          439      540     +101     
- Misses         50       90      +40

alexandru added some commits Jan 27, 2018

@edmundnoble

This comment has been minimized.

edmundnoble commented Jan 28, 2018

Can you explain how having cancel as a pure function attached to an IO[A] works?

Here's an example of what I mean. Say I have an IO action here:
def launchMissiles: IO[Unit]

I run two copies of launchMissiles concurrently; I want to cancel one of the missile launches. I use launchMissiles.cancel; is it not the case that because launchMissiles here is a def and not a val, that doesn't actually cancel anything, because it's a new IO? Is it not also the case that if I used a val, all launches have to be cancelled? So I have to thread the unexecuted IO actions through my program now, to make sure I can cancel them individually later, and I cannot store IO in a val (or at least, I have to know how it will be used by clients before doing so)?

It just seems wrong to me to have a cancellation handle, which is morally state that is generated by the starting of an IO action, be accessible before the IO action has actually begun.

Because there not being a difference between using a val and a def is actually one of the reasons why IO is important. Because if there is a difference then it is not pure. That is the exact definition of referential transparency.

Edit: Perhaps an example would help?

Edit 2: Also, what happens when I cancel an IO action before running it? What happens when I try to re-run an IO action that has already been cancelled?

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 28, 2018

@edmundnoble the pure cancel action makes sense in the context of operations creating race conditions, which haven't been described in this PR - because I wanted to keep it focused on cancellation only. The test however describes an operation such as this:

  def fork[A](fa: IO[A])(implicit ec: ExecutionContext): IO[IO[A]] =
    IO.async { cb =>
      val p = Promise[A]()
      val conn = ForwardCancelable()
      // Executing `fa` on another thread
      ec.execute(new Runnable {
        def run(): Unit = {
          conn := fa.unsafeRunCancelable {
            case Right(a) => p.success(a)
            case Left(e) => p.failure(e)
          }
        }
      })
      // Signaling a new, memoized IO reference
      cb(Right(
        IO.cancelable { cb =>
          p.future.onComplete {
            case Success(a) => cb(Right(a))
            case Failure(e) => cb(Left(e))
          }(immediate)
          // Returning cancellation reference
          conn
        }))
    }

So now, we can create a race condition:

for {
  fa <- fork(launchMissiles)
  _  <- runToBunker.handleErrorWith { e =>
    // Retreating to bunker failed, abort!
    fa.cancel *> IO.raiseError(e)
  },
  result <- fa
} yield result

Note that in this case we are talking of an IO[A] reference attached to a running process.

@johnynek

This comment has been minimized.

johnynek commented Jan 28, 2018

@alexandru I admire the work here. Twitter futures are cancelable (but not referentially transparent of course) and I have worked with them a fair amount.

That said I have never liked cancellation since it seems hard to reason about: we can’t be sure a cancelation takes (in for instance the example you give above with the missiles). How do you think about this? I know you have a lot of experience with Monix with this.

In an ideal world we could have atomic IO: the whole thing works or state doesn’t change. That is obviously very hard since you need distributed transactions in the general case. Maybe cancelation is no worse than general failure since failure leaves you in a state where you can’t be sure where exactly the IO failed.

Anyway, I’d be interested in your thoughts about cancelation in general and maybe some real world examples where you really want it. Is it about recovering computational resources? Stop working on things that won’t be useful?

@edmundnoble

This comment has been minimized.

edmundnoble commented Jan 28, 2018

I don't believe that your comment answers any of my questions. I'll try to phrase them more clearly.

  1. Are you or are you not telling me that functions that create IO actions are impure with this cancellation strategy, because they cannot be substituted with the IO actions themselves safely?

  2. Is it not the case that start should actually provide an IO[(IO[A], IO[Unit])]? Why not?

  3. How can we actually abstract over this cancel action; i.e., write a meaningful method that takes an IO[A] as a parameter, and calls cancel on it.

  4. What happens with the edge cases I provided earlier: cancelling an IO action before running it, and rerunning an IO action after cancelling it. Perhaps even more importantly, cancelling an IO action while running it multiple times concurrently.

  5. Is there no way to make a cancellable IO[A] that isn't attached to a running process (that was produced inside IO itself)? Because if there isn't, you're essentially using the Scalaz 8 model of fiber cancellation but with much weaker types. Providing an example and usage of a cancellable IO[A] which is not generated in IO itself (i.e. IO[IO[A]]) would help clear this up. If that isn't practically useful, then there's no getting around the fact that the types here are more loose than they need to be.

You can totally provide a pure cancel, but I'm not convinced in the slightest that it makes sense as a member on IO and not Fiber (or whatever would be returned by IO.fork; hell, call it CancellationHandle if you want).

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 29, 2018

@edmundnoble I understand your point, but for one It's easier to get answers by taking a look at the implementation of cancel, as there's nothing special about it.

If I were to introduce a "Fiber" type, it would literally be this:

final case class Fiber[+A](val join: IO[A]) {
  def cancel: IO[Unit] = IOCancel.signal(join)
}

Now it might be more intuitive from a types perspective on usage, but from an implementation perspective it's just pretending something that isn't there, as this cancel is defined for IO. And in fact users can define this operation themselves:

def cancel[A](fa: IO[A]): IO[Unit] =
  IO.async { cb =>
    // Yes, from an implementation perspective, it's the equivalent of 
    // executing the source, followed by a cancel ...
    val cancel = fa.unsafeRunCancelable(onlyReportErrors)
    cancel() // triggering cancel
    cb(Right(()))
  }

Are you or are you not telling me that functions that create IO actions are impure with this cancellation strategy

What I'm telling you is that the operation becomes useful for IO actions attached to an already running process, like what happens with fork above. But its definition simply exist and given that it can be defined by users themselves, for as long as they get an unsafeRunCancelable, I didn't see the point in inventing a new type.

That said, the pure cancel is a red herring — I wouldn't mind adding a new Forked or Fiber type, plus at this point without an operation like start, fork or racePair it's basically a distraction that I could remove from the PR.

You drive a good argument though. If others feel the same way, then Forked / Fiber will happen.

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 29, 2018

@johnynek I hear you. Will think of some samples where cancellation is useful, can't write another long message right now, just dropping a note that I'll compile something.

@LukaJCB

This comment has been minimized.

Collaborator

LukaJCB commented Jan 29, 2018

FWIW, as a Person not super well versed with most of the internals of this repo, I think adding a Forked makes the API much more clear in how to use it. I understand the current formulation is more honest, but maybe some times it's better to be less honest and hide the implementation details.

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 29, 2018

Very well then, I've now added the Fiber interface, along with the start method (otherwise there's no way to expose it).

/cc @edmundnoble @LukaJCB

@alexandru alexandru referenced this pull request Feb 6, 2018

Closed

IO#memoize #120

@bkirwi

This comment has been minimized.

bkirwi commented Feb 9, 2018

I'm not sure I follow... if flatMap chains aren't automatically cancellable, doesn't that break the right identity law for Monad?

cancellable.flatMap(IO.pure) != cancellable
@alexandru

This comment has been minimized.

Member

alexandru commented Feb 9, 2018

@bkirwi that sample isn’t true, no.

@alexandru

This comment has been minimized.

Member

alexandru commented Feb 9, 2018

@bkirwi to add more details - the implementation doesn't make flatMap chains to be auto-cancelable, but the cancellation signal does get propagated until a boundary that can do something with it, so applying flatMap on a cancellable IO will yield a cancellable IO.

Also see the sample for IO.cancelBoundary.

@mpilquist

This comment has been minimized.

Member

mpilquist commented Feb 9, 2018

This looks great to me. We need to add cancelation to the type class hierarchy somewhere in order to allow middleware to take advantage of cancelation. Are you planning on adding that?

@pchlupacek

This comment has been minimized.

Collaborator

pchlupacek commented Feb 15, 2018

@alexandru I am completely lost how the "backpressure" is related to IO/cancellation itself. Can you please explain to me what do you mean by backpressure?

@alexandru

This comment has been minimized.

Member

alexandru commented Feb 16, 2018

@SystemFw @pchlupacek

It's hard to explain more than what I already did. I'm not happy about a () => Unit either, due to not signaling precisely the semantics we need, but then we can't really put an impure Cancelable in cats-effect.

Therefore I realized this is not a hill I'm willing to die on. So I changed the signature:

def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A]

Is this OK with you?

@pchlupacek

This comment has been minimized.

Collaborator

pchlupacek commented Feb 19, 2018

@alexandru thanks a lot for patience. Yeah that signature is ok with me.

@pchlupacek

This comment has been minimized.

Collaborator

pchlupacek commented Feb 19, 2018

@alexandru wait, just to understand this completely. To build the IO, that is cancellable, I need to use IO.cancellable, and then how I precisely cancel this ? I think we have sort of dicussed that cancel shall return IO, instead of pure function, but this is just a constructor? I am also puzzled with api cancellable/uncancellable.

Could you perhaps sum it up now ?

Thank you.

@alexandru

This comment has been minimized.

Member

alexandru commented Feb 19, 2018

@pchlupacek

To build the IO, that is cancellable, I need to use IO.cancellable, and then how I precisely cancel this ?

As a side-effectful action you get unsafeRunCancelable, which gives you a () => Unit back that can be cancelled. This is a low-level API for IO specifically and won't get exposed in type classes.

Usually cancellation will happen under the hood on race conditions. For example the implementation of parMap2 was changed in this PR to cancel the active task in case one of them finishes in error.

At some point we'll also have an IO.race that will cancel the loser. Based on that you could then express a timeout operation that cancels the source in case a certain timespan was exceeded.
All of these are race conditions for which cancellation happens automatically.

N.B. whether a cancellation action is exposed to the user or not is not very relevant, as the benefits of cancellation will be noticeable on race conditions.

Now there are instances in which an IO value gets exposed that's attached to a running process. For example this would be the result of a start operation, which in FS2 currently produces an IO[IO[A]]. These are IO values that are running concurrently and that the user might wish to cancel manually, therefore an operation like start is producing an IO[Fiber[A]], where Fiber has a cancel: IO[Unit] on it.

I think the best thing you can do right now is to download this branch and play with it.

I am also puzzled with api cancellable/uncancelable.

What exactly is confusing?

The uncancelable operation does what it says. Consider this very simple analogy:

type Run = () => (() => Unit)

def buildCancelable(): Run = {
  ???
}

def uncancelable(run: Run): Run = {
  () => {
    // execution
    run()
    // make cancellation a no-op
   () => {}
  }
}
@alexandru

This comment has been minimized.

Member

alexandru commented Feb 19, 2018

NOTE: I'm leaving this PR active until the end of this month.

In absence of any hard objections, it will get merged, then work can proceed for the type class hierarchy.

The plan I'm thinking of is to not do a new release unless we reach agreement on what happens to the current type class hierarchy.

@pchlupacek

This comment has been minimized.

Collaborator

pchlupacek commented Feb 19, 2018

@alexandru ok lets go with this further. I still don't understand why we need uncancellable, but likely it won't hurt. Lets see how this fits in typeclases and then we could see.

Thanks for explanation.

@alexandru

This comment has been minimized.

Member

alexandru commented Feb 19, 2018

@pchlupacek uncancelable is useful in cases where you really need to ensure that an IO has an atomic execution (either all of it, or none of it), effectively undoing any cancellation logic, needed sometimes as to not leave the program in an inconsistent state. This tends to happen when dealing with concurrent stuff a lot.

For example with Monix or with FS2 streams, the user could specify logic to be executed at the end of the stream, via the stream's own finalization protocol. And then when you end up doing something like:

// Pseudocode
val result: IO[A] = stream.fold(???)

Well, there will be cases in which the user will not know whether this resulting IO can be safely cancelled or not. It will also depend on the stream's own implementation, although making cancelation automatic is always problematic, because its logic is going to be concurrent, versus a stream's own interruption mechanism that's properly ordered (well I can speak more for Monix's Iterant, but I'm fairly sure this applies to FS2's Stream as well).

Of course an IO.bracket operation will be available too, but that won't be sufficient, this being IMO a weakness in Haskell's IO, because bracket forces you to design solutions around it, it forces you to deal with resource handling within the context of IO, which isn't very compatible with streaming approaches that are doing higher-level resource handling, like FS2's Stream or Monix's Iterant.

So uncancelable can ensure that an IO value has an atomic execution and in that regard it never hurts for it to be there.

@pchlupacek

This comment has been minimized.

Collaborator

pchlupacek commented Feb 19, 2018

@alexandru ok got it. Thanks.

@durban

This comment has been minimized.

Contributor

durban commented Feb 20, 2018

This program:

object CancellableIOExample {

  val promise = Promise[Int]()

  val task: IO[Unit] = IO.cancelable[Int] { cb =>
    println("starting")
    promise.future.foreach { i =>
      println("calling callback")
      cb(Right(i))
    }
    IO { println("cancelling") }
  }.flatMap { i =>
    IO { println(s"completed with $i") }
  }

  def main(args: Array[String]): Unit = {
    val fib = task.start.unsafeRunSync()
    Thread.sleep(100)
    fib.cancel.unsafeRunSync()
    Thread.sleep(100)
    promise.success(42)
    Thread.sleep(100)
    println("end")
  }
}

prints this:

starting
cancelling
calling callback
completed with 42
end

That is, calling the callback takes effect even after cancellation. Is this intentional?

@alexandru alexandru changed the title from Proof of concept: cancelable cats.effect.IO to Cancelable cats.effect.IO Feb 24, 2018

@alexandru

This comment has been minimized.

Member

alexandru commented Feb 24, 2018

@durban

That is, calling the callback takes effect even after cancellation. Is this intentional?

Yes. In regards to cancellation, nothing happens automatically that the user doesn't intend, cancellation being a signal that the producer must notice and act on it, explicitly:

// Version one, which triggers error on cancel:
IO.cancelable[Int] { cb =>
  println("starting")
  promise.future.foreach { i =>
    println("calling callback")
    cb(Right(i))
  }
  IO { 
    promise.tryFailure(new CancellationException)
  }
}

// Version two, which makes it non-terminating on cancel:
IO.cancelable[Int] { cb =>
  // For synchronizing, because cancellation is concurrent ;-)
  val active = new AtomicBoolean(true)
  println("starting")
  promise.future.foreach { i =>
    if (active.getAndSet(false)) {
      println("calling callback")
      cb(Right(i))
    }
  }
  IO { 
    if (active.getAndSet(false))
      println("cancelling")
  }
}
@alexandru

This comment has been minimized.

Member

alexandru commented Feb 24, 2018

If nobody will mind, I'd like to merge this on Monday because I have some time off and would like to focus on #126

No release will be made, so we can still backtrack if needed.

@alexandru alexandru added this to the 0.10 milestone Feb 25, 2018

@alexandru alexandru merged commit 9547961 into typelevel:master Feb 26, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details

This was referenced Mar 2, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment