New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race #137

Merged
merged 9 commits into from Mar 7, 2018

Conversation

5 participants
@alexandru
Member

alexandru commented Mar 6, 2018

Adding these two operations in the new Concurrent type class (see #134) for creating a race condition between two tasks.

The first that finishes wins the race, emitting its result. And on a successful first result, racePair provides the ability to cancel or join the loser (Fiber, see #121):

trait Concurrent[F[_]] extends Async[F] {
  //...

  // Advanced race that can join the loser
  def racePair[A,B](fa: F[A], fb: F[B]): F[Either[(A, Fiber[F, B]), (Fiber[F, A], B)]]

  // Simpler, derived race
  def race[A, B](fa: F[A], fb: F[B]): F[Either[A, B]] =
    flatMap(racePair(fa, fb)) {
      case Left((a, fiberB)) => map(fiberB.cancel)(_ => Left(a))
      case Right((fiberA, b)) => map(fiberA.cancel)(_ => Right(b))
    }
}

This raises the total of abstract methods needing implementation in the Concurrent type class to 5, these being:

  1. cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): F[A]
  2. uncancelable[A](fa: F[A]): F[A]
  3. onCancelRaiseError[A](fa: F[A], e: Throwable): F[A]
  4. start[A](fa: F[A]): F[Fiber[F, A]]
  5. racePair[A,B](fa: F[A], fb: F[B]): F[Either[(A, Fiber[F, B]), (Fiber[F, A], B)]]

There is a connection between them — for data types participating in race conditions you need the cancelable builder, this being the FFI needed to wrap any side effecting API, but then uncancelable is necessary for yielding atomic tasks and onCancelRaiseError is necessary for materialising cancellation, so all 3 are a group needed to deal with cancellation.

Then without start we cannot observe cancelation, so the laws described can't be very useful.
And if we have start, we might as well have race, since it's basically a different side of the same coin.

Example from the ScalaDoc, here's how a timeout operation could be implemented:

import cats.effect._
import scala.concurrent.duration._
   
def timeoutTo[F[_], A](fa: F[A], after: FiniteDuration, fallback: F[A])
  (implicit F: Concurrent[F], timer: Timer[F]): F[A] = {
   
   F.race(fa, timer.sleep(timer)).flatMap {
     case Left((a, _)) => F.pure(a)
     case Right((_, _)) => fallback
   }
}
   
def timeout[F[_], A](fa: F[A], after: FiniteDuration)
  (implicit F: Concurrent[F], timer: Timer[F]): F[A] = {
   
   timeoutTo(fa, after,
     F.raiseError(new TimeoutException(after.toString)))
}

IO also implements race and racePair:

object IO {
  // ...
  // simple version
  def race[A, B](lh: IO[A], rh: IO[B]): IO[Either[A, B]]
  
  // advanced version
  def racePair[A, B](lh: IO[A], rh: IO[B]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]]
}

And a sample timeout operation described directly for IO:

def timeout[A](io: IO[A], after: FiniteDuration)(implicit t: Timer[IO]): IO[A] =
  IO.race(io, t.sleep(after)).flatMap {
    case Left(a) => IO.pure(a)
    case Right(_) => 
      IO.raiseError(new TimeoutException(s"$after"))
  }

Future Improvements

Given how difficult it was to describe the laws for the Concurrent instances (e.g. EitherT, OptionT, StateT and WriterT) and that the implementation of racePair has to look at the underlying element, to deconstruct it in order to decide what to do next (e.g. Left vs Right, None vs Some), I'm getting the feeling that race might add restrictions to Concurrent.

In the future we might realise that we need a Concurrent but without race. At that point we can extract a Cancelable (or something) that doesn't have race in it. Similar with Monad vs FlatMap vs Applicative vs Apply, which have developed after some use cases emerged (e.g. the need of instances for Map[K, V]).

Extracting a lighter Concurrent, that doesn't have race, in the future is doable without breaking backwards compatibility. Even post 1.0.

alexandru added some commits Mar 4, 2018

@alexandru alexandru added this to the 0.10 milestone Mar 6, 2018


def raceCancelsLeftLoser[A, B](fa: F[A], b: B, f: (B, B) => B) = {
val received = F.suspend {
var effect = b

This comment has been minimized.

@alexandru

alexandru Mar 6, 2018

Member

TODO: replace var with cats.effect.laws.util.Pledge.

@codecov-io

This comment has been minimized.

codecov-io commented Mar 6, 2018

Codecov Report

Merging #137 into master will increase coverage by 0.79%.
The diff coverage is 95.89%.

@@            Coverage Diff             @@
##           master     #137      +/-   ##
==========================================
+ Coverage   89.14%   89.94%   +0.79%     
==========================================
  Files          44       45       +1     
  Lines         811      935     +124     
  Branches       59       67       +8     
==========================================
+ Hits          723      841     +118     
- Misses         88       94       +6

alexandru added some commits Mar 6, 2018

@alexandru alexandru changed the title from WIP: Race to Race Mar 6, 2018

@alexandru alexandru changed the title from Race to WIP: Race Mar 6, 2018

@alexandru alexandru changed the title from WIP: Race to Race Mar 6, 2018

* with a result, being equivalent with `async(_ => ())`.
*/
def never[F[_], A](implicit F: Async[F]): F[A] =
F.async(_ => ())

This comment has been minimized.

@johnynek

johnynek Mar 6, 2018

should we put this on Async? since some implementations might be better than using async and never calling (for instance we could avoid an allocation for a covariant F that keeps a never value around.

This comment has been minimized.

@alexandru

alexandru Mar 6, 2018

Member

I initially wanted to do it, but adding it on Async breaks compatibility and I think we might want to keep binary compatibility for 0.10. Maybe we'll create a ticket for it and after 0.10 we can do it — the plans are set in stone though, all this is just tentative.

* Also see [[racePair]] for a version that does not cancel
* the loser automatically on successful results.
*/
def race[A, B](fa: F[A], fb: F[B]): F[Either[A, B]] =

This comment has been minimized.

@johnynek

johnynek Mar 6, 2018

i wonder if this can actually be on Async. I think you can do this one without cancellation.

This comment has been minimized.

@alexandru

alexandru Mar 6, 2018

Member

You can do it without cancelation, but it's going to be unsafe because you have no way to cancel the loser and release resources early. It's an optimisation, but a very important one.

Like Future.firstCompletedOf for example — people use it, but it's what drove me away from Future, because I was doing timeouts with it and my process was crashing.

I think Async should keep its original spirit, being meant for things that don't deal with preemption or race conditions. And race is the definition of race conditions 🙂

case Left((value, fiberB)) =>
value match {
case Left(_) =>
fiberB.cancel.map(_ => value.asInstanceOf[Left[L, Nothing]])

This comment has been minimized.

@johnynek

johnynek Mar 6, 2018

maybe you want .leftCast here. https://github.com/typelevel/cats/blob/master/core/src/main/scala/cats/syntax/either.scala#L341 so we have an easier time seeing the cast is safe. if you bind left@Left(_) => you can use leftCast.

This comment has been minimized.

@alexandru

alexandru Mar 6, 2018

Member

Ah, nice, thanks.

/**
* A non-terminating `IO`, alias for `async(_ => ())`.
*/
val never: IO[Nothing] = async(_ => ())

This comment has been minimized.

@johnynek

johnynek Mar 6, 2018

we could reuse this if we put this on Async.

This comment has been minimized.

@alexandru

alexandru Mar 6, 2018

Member

👍 but maybe after 0.10 or after we decide on the milestones?

Tried drafting a plan today, so the timeline I was thinking of looks like this: https://github.com/typelevel/cats-effect/milestones

@mpilquist mpilquist self-requested a review Mar 6, 2018

@ChristopherDavenport ChristopherDavenport merged commit a236954 into typelevel:master Mar 7, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment