Skip to content

Commit

Permalink
Merge pull request #137 from alexandru/race
Browse files Browse the repository at this point in the history
Race
  • Loading branch information
ChristopherDavenport committed Mar 7, 2018
2 parents ea8fbc9 + 0331776 commit a236954
Show file tree
Hide file tree
Showing 12 changed files with 540 additions and 51 deletions.
7 changes: 7 additions & 0 deletions core/shared/src/main/scala/cats/effect/Async.scala
Expand Up @@ -144,6 +144,13 @@ trait Async[F[_]] extends Sync[F] with LiftIO[F] {
}

object Async {
/**
* Returns an non-terminating `F[_]`, that never completes
* with a result, being equivalent with `async(_ => ())`.
*/
def never[F[_], A](implicit F: Async[F]): F[A] =
F.async(_ => ())

/**
* Generic shift operation, defined for any `Async` data type.
*
Expand Down
181 changes: 158 additions & 23 deletions core/shared/src/main/scala/cats/effect/Concurrent.scala
Expand Up @@ -314,6 +314,74 @@ trait Concurrent[F[_]] extends Async[F] {
*/
def start[A](fa: F[A]): F[Fiber[F, A]]

/**
* Run two tasks concurrently, creating a race between them and returns a
* pair containing both the winner's successful value and the loser
* represented as a still-unfinished fiber.
*
* If the first task completes in error, then the result will
* complete in error, the other task being cancelled.
*
* On usage the user has the option of cancelling the losing task,
* this being equivalent with plain [[race]]:
*
* {{{
* val ioA: IO[A] = ???
* val ioB: IO[B] = ???
*
* Concurrent[IO].racePair(ioA, ioB).flatMap {
* case Left((a, fiberB)) =>
* fiberB.cancel.map(_ => a)
* case Right((fiberA, b)) =>
* fiberA.cancel.map(_ => b)
* }
* }}}
*
* See [[race]] for a simpler version that cancels the loser
* immediately.
*/
def racePair[A,B](fa: F[A], fb: F[B]): F[Either[(A, Fiber[F, B]), (Fiber[F, A], B)]]

/**
* Run two tasks concurrently and return the first to finish,
* either in success or error. The loser of the race is cancelled.
*
* The two tasks are potentially executed in parallel, the winner
* being the first that signals a result.
*
* As an example, this would be the implementation of a "timeout"
* operation:
*
* {{{
* import cats.effect._
* import scala.concurrent.duration._
*
* def timeoutTo[F[_], A](fa: F[A], after: FiniteDuration, fallback: F[A])
* (implicit F: Concurrent[F], timer: Timer[F]): F[A] = {
*
* F.race(fa, timer.sleep(timer)).flatMap {
* case Left((a, _)) => F.pure(a)
* case Right((_, _)) => fallback
* }
* }
*
* def timeout[F[_], A](fa: F[A], after: FiniteDuration)
* (implicit F: Concurrent[F], timer: Timer[F]): F[A] = {
*
* timeoutTo(fa, after,
* F.raiseError(new TimeoutException(after.toString)))
* }
* }}}
*
* Also see [[racePair]] for a version that does not cancel
* the loser automatically on successful results.
*/
def race[A, B](fa: F[A], fb: F[B]): F[Either[A, B]] =
flatMap(racePair(fa, fb)) {
case Left((a, fiberB)) => map(fiberB.cancel)(_ => Left(a))
case Right((fiberA, b)) => map(fiberA.cancel)(_ => Right(b))
}

/**
* Inherited from [[LiftIO]], defines a conversion from [[IO]]
* in terms of the `Concurrent` type class.
Expand Down Expand Up @@ -374,14 +442,14 @@ object Concurrent {
* [[Concurrent]] instance built for `cats.data.StateT` values initialized
* with any `F` data type that also implements `Concurrent`.
*/
implicit def catsStateTAsync[F[_]: Concurrent, S]: Concurrent[StateT[F, S, ?]] =
implicit def catsStateTConcurrent[F[_]: Concurrent, S]: Concurrent[StateT[F, S, ?]] =
new StateTConcurrent[F, S] { def F = Concurrent[F] }

/**
* [[Concurrent]] instance built for `cats.data.WriterT` values initialized
* with any `F` data type that also implements `Concurrent`.
*/
implicit def catsWriterTAsync[F[_]: Concurrent, L: Monoid]: Concurrent[WriterT[F, L, ?]] =
implicit def catsWriterTConcurrent[F[_]: Concurrent, L: Monoid]: Concurrent[WriterT[F, L, ?]] =
new WriterTConcurrent[F, L] { def F = Concurrent[F]; def L = Monoid[L] }

private[effect] trait EitherTConcurrent[F[_], L] extends Async.EitherTAsync[F, L]
Expand All @@ -390,6 +458,10 @@ object Concurrent {
override protected implicit def F: Concurrent[F]
override protected def FF = F

// Needed to drive static checks, otherwise the
// compiler will choke on type inference :-(
type Fiber[A] = cats.effect.Fiber[EitherT[F, L, ?], A]

def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): EitherT[F, L, A] =
EitherT.liftF(F.cancelable(k))(F)

Expand All @@ -400,12 +472,28 @@ object Concurrent {
EitherT(F.onCancelRaiseError(fa.value, e))

def start[A](fa: EitherT[F, L, A]) =
EitherT.liftF(
F.start(fa.value).map { fiber =>
Fiber(
EitherT(fiber.join),
EitherT.liftF(fiber.cancel))
})
EitherT.liftF(F.start(fa.value).map(fiberT))

def racePair[A, B](fa: EitherT[F, L, A], fb: EitherT[F, L, B]): EitherT[F, L, Either[(A, Fiber[B]), (Fiber[A], B)]] =
EitherT(F.racePair(fa.value, fb.value).flatMap {
case Left((value, fiberB)) =>
value match {
case Left(_) =>
fiberB.cancel.map(_ => value.asInstanceOf[Left[L, Nothing]])
case Right(r) =>
F.pure(Right(Left((r, fiberT[B](fiberB)))))
}
case Right((fiberA, value)) =>
value match {
case Left(_) =>
fiberA.cancel.map(_ => value.asInstanceOf[Left[L, Nothing]])
case Right(r) =>
F.pure(Right(Right((fiberT[A](fiberA), r))))
}
})

protected def fiberT[A](fiber: effect.Fiber[F, Either[L, A]]): Fiber[A] =
Fiber(EitherT(fiber.join), EitherT.liftF(fiber.cancel))
}

private[effect] trait OptionTConcurrent[F[_]] extends Async.OptionTAsync[F]
Expand All @@ -414,21 +502,42 @@ object Concurrent {
override protected implicit def F: Concurrent[F]
override protected def FF = F

// Needed to drive static checks, otherwise the
// compiler will choke on type inference :-(
type Fiber[A] = cats.effect.Fiber[OptionT[F, ?], A]

def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): OptionT[F, A] =
OptionT.liftF(F.cancelable(k))(F)

def start[A](fa: OptionT[F, A]) =
OptionT.liftF(F.start(fa.value).map(fiberT))

def racePair[A, B](fa: OptionT[F, A], fb: OptionT[F, B]): OptionT[F, Either[(A, Fiber[B]), (Fiber[A], B)]] =
OptionT(F.racePair(fa.value, fb.value).flatMap {
case Left((value, fiberB)) =>
value match {
case None =>
fiberB.cancel.map(_ => None)
case Some(r) =>
F.pure(Some(Left((r, fiberT[B](fiberB)))))
}
case Right((fiberA, value)) =>
value match {
case None =>
fiberA.cancel.map(_ => None)
case Some(r) =>
F.pure(Some(Right((fiberT[A](fiberA), r))))
}
})

def uncancelable[A](fa: OptionT[F, A]): OptionT[F, A] =
OptionT(F.uncancelable(fa.value))

def onCancelRaiseError[A](fa: OptionT[F, A], e: Throwable): OptionT[F, A] =
OptionT(F.onCancelRaiseError(fa.value, e))

def start[A](fa: OptionT[F, A]) = {
OptionT.liftF(
F.start(fa.value).map { fiber =>
Fiber(OptionT(fiber.join), OptionT.liftF(fiber.cancel))
})
}
protected def fiberT[A](fiber: effect.Fiber[F, Option[A]]): Fiber[A] =
Fiber(OptionT(fiber.join), OptionT.liftF(fiber.cancel))
}

private[effect] trait StateTConcurrent[F[_], S] extends Async.StateTAsync[F, S]
Expand All @@ -437,21 +546,34 @@ object Concurrent {
override protected implicit def F: Concurrent[F]
override protected def FA = F

// Needed to drive static checks, otherwise the
// compiler will choke on type inference :-(
type Fiber[A] = cats.effect.Fiber[StateT[F, S, ?], A]

def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): StateT[F, S, A] =
StateT.liftF(F.cancelable(k))(F)

def start[A](fa: StateT[F, S, A]): StateT[F, S, Fiber[A]] =
StateT(s => F.start(fa.run(s)).map { fiber => (s, fiberT(fiber)) })

def racePair[A, B](fa: StateT[F, S, A], fb: StateT[F, S, B]): StateT[F, S, Either[(A, Fiber[B]), (Fiber[A], B)]] =
StateT { startS =>
F.racePair(fa.run(startS), fb.run(startS)).map {
case Left(((s, value), fiber)) =>
(s, Left((value, fiberT(fiber))))
case Right((fiber, (s, value))) =>
(s, Right((fiberT(fiber), value)))
}
}

def uncancelable[A](fa: StateT[F, S, A]): StateT[F, S, A] =
fa.transformF(F.uncancelable)

def onCancelRaiseError[A](fa: StateT[F, S, A], e: Throwable): StateT[F, S, A] =
fa.transformF(F.onCancelRaiseError(_, e))

def start[A](fa: StateT[F, S, A]) =
StateT(s => F.start(fa.run(s)).map { fiber =>
(s, Fiber(
StateT(_ => fiber.join),
StateT.liftF(fiber.cancel)))
})
protected def fiberT[A](fiber: effect.Fiber[F, (S, A)]): Fiber[A] =
Fiber(StateT(_ => fiber.join), StateT.liftF(fiber.cancel))
}

private[effect] trait WriterTConcurrent[F[_], L] extends Async.WriterTAsync[F, L]
Expand All @@ -460,6 +582,10 @@ object Concurrent {
override protected implicit def F: Concurrent[F]
override protected def FA = F

// Needed to drive static checks, otherwise the
// compiler will choke on type inference :-(
type Fiber[A] = cats.effect.Fiber[WriterT[F, L, ?], A]

def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): WriterT[F, L, A] =
WriterT.liftF(F.cancelable(k))(L, F)

Expand All @@ -471,9 +597,18 @@ object Concurrent {

def start[A](fa: WriterT[F, L, A]) =
WriterT(F.start(fa.run).map { fiber =>
(L.empty, Fiber(
WriterT(fiber.join),
WriterT.liftF(fiber.cancel)))
(L.empty, fiberT[A](fiber))
})

def racePair[A, B](fa: WriterT[F, L, A], fb: WriterT[F, L, B]): WriterT[F, L, Either[(A, Fiber[B]), (Fiber[A], B)]] =
WriterT(F.racePair(fa.run, fb.run).map {
case Left(((l, value), fiber)) =>
(l, Left((value, fiberT(fiber))))
case Right((fiber, (l, value))) =>
(l, Right((fiberT(fiber), value)))
})

protected def fiberT[A](fiber: effect.Fiber[F, (L, A)]): Fiber[A] =
Fiber(WriterT(fiber.join), WriterT.liftF(fiber.cancel))
}
}
21 changes: 13 additions & 8 deletions core/shared/src/main/scala/cats/effect/ConcurrentEffect.scala
Expand Up @@ -23,8 +23,7 @@ import scala.annotation.implicitNotFound
import scala.util.Either

/**
* Type class describing effect data types that are cancelable and
* can be evaluated concurrently.
* Type class describing effect data types that are cancelable.
*
* In addition to the algebras of [[Concurrent]] and of
* [[Effect]], instances must also implement a
Expand Down Expand Up @@ -85,35 +84,41 @@ object ConcurrentEffect {
implicit def catsWriterTConcurrentEffect[F[_]: ConcurrentEffect, L: Monoid]: ConcurrentEffect[WriterT[F, L, ?]] =
new WriterTConcurrentEffect[F, L] { def F = ConcurrentEffect[F]; def L = Monoid[L] }

private[effect] trait EitherTConcurrentEffect[F[_]] extends ConcurrentEffect[EitherT[F, Throwable, ?]]
private[effect] trait EitherTConcurrentEffect[F[_]]
extends ConcurrentEffect[EitherT[F, Throwable, ?]]
with Concurrent.EitherTConcurrent[F, Throwable]
with Effect.EitherTEffect[F] {

protected def F: ConcurrentEffect[F]

def runCancelable[A](fa: EitherT[F, Throwable, A])(cb: Either[Throwable, A] => IO[Unit]): IO[IO[Unit]] =
def runCancelable[A](fa: EitherT[F, Throwable, A])
(cb: Either[Throwable, A] => IO[Unit]): IO[IO[Unit]] =
F.runCancelable(fa.value)(cb.compose(_.right.flatMap(x => x)))
}

private[effect] trait StateTConcurrentEffect[F[_], S] extends ConcurrentEffect[StateT[F, S, ?]]
private[effect] trait StateTConcurrentEffect[F[_], S]
extends ConcurrentEffect[StateT[F, S, ?]]
with Concurrent.StateTConcurrent[F, S]
with Effect.StateTEffect[F, S] {

protected def F: ConcurrentEffect[F]
protected def S: Monoid[S]

def runCancelable[A](fa: StateT[F, S, A])(cb: Either[Throwable, A] => IO[Unit]): IO[IO[Unit]] =
def runCancelable[A](fa: StateT[F, S, A])
(cb: Either[Throwable, A] => IO[Unit]): IO[IO[Unit]] =
F.runCancelable(fa.runA(S.empty)(F))(cb)
}

private[effect] trait WriterTConcurrentEffect[F[_], L] extends ConcurrentEffect[WriterT[F, L, ?]]
private[effect] trait WriterTConcurrentEffect[F[_], L]
extends ConcurrentEffect[WriterT[F, L, ?]]
with Concurrent.WriterTConcurrent[F, L]
with Effect.WriterTEffect[F, L] {

protected def F: ConcurrentEffect[F]
protected def L: Monoid[L]

def runCancelable[A](fa: WriterT[F, L, A])(cb: Either[Throwable, A] => IO[Unit]): IO[IO[Unit]] =
def runCancelable[A](fa: WriterT[F, L, A])
(cb: Either[Throwable, A] => IO[Unit]): IO[IO[Unit]] =
F.runCancelable(fa.run)(cb.compose(_.right.map(_._2)))
}
}

0 comments on commit a236954

Please sign in to comment.