Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Type-class hierarchy for cancelation #134

Merged
merged 38 commits into from Mar 6, 2018
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
6a5e2a5
Merge remote-tracking branch 'alexandru/cancelable' into next
alexandru Feb 24, 2018
63bde6b
New type-class hierarchy
alexandru Feb 24, 2018
6a14b0d
Remove junk
alexandru Feb 24, 2018
bb61546
Add more tests
alexandru Feb 24, 2018
a145cc7
Fix comments, remove type junk
alexandru Feb 24, 2018
e81f67a
Fix compilation error
alexandru Feb 25, 2018
30536ea
Fix mima report
alexandru Feb 25, 2018
fef0a32
Merge remote-tracking branch 'upstream/master' into cancel-types1
alexandru Feb 25, 2018
9292181
Merge branch 'cancelable' into cancel-types1
alexandru Feb 25, 2018
9633ed6
Generate proper headers
alexandru Feb 25, 2018
8a23485
Merge remote-tracking branch 'upstream/master' into cancel-types1
alexandru Feb 26, 2018
e6cc6c7
Merge remote-tracking branch 'upstream/master' into cancel-types1
alexandru Feb 26, 2018
e25bfd2
Merge remote-tracking branch 'upstream/master' into cancel-types1
alexandru Mar 1, 2018
a38145f
Fix compilation error
alexandru Mar 1, 2018
38da3f5
Fix compilation error
alexandru Mar 1, 2018
02e9086
Merge branch 'cancel-types1' into cancel-types2
alexandru Mar 1, 2018
8aa56ed
Add new hierarchy
alexandru Mar 2, 2018
0f9371e
Add new hierarchy
alexandru Mar 2, 2018
70db58e
Change laws
alexandru Mar 2, 2018
d0c3fb2
Add ScalaDoc
alexandru Mar 2, 2018
484fe4e
Remove junk
alexandru Mar 2, 2018
d34d4a0
Fix compiler crash by reorganizing implicits :-(
alexandru Mar 2, 2018
81d9bbb
Rename CAsync -> CancelableAsync, CEffect -> CancelableEffect
alexandru Mar 3, 2018
308703c
Activate instance tests
alexandru Mar 4, 2018
5226ff9
Fix naming
alexandru Mar 4, 2018
6d2f25f
Rename CancelableAsync -> Concurrent, CancelableEffect -> ConcurrentE…
alexandru Mar 4, 2018
c200585
Scaladoc update
alexandru Mar 4, 2018
0f00908
Fix tests
alexandru Mar 5, 2018
39c22ea
Replace Cogen[IO], fix tests
alexandru Mar 5, 2018
6066683
Fix comment
alexandru Mar 5, 2018
2f9a2b1
Get rid of mima filters, as they are not relevant for BaseVersion=0.10
alexandru Mar 5, 2018
52c2f12
Activate ConcurrentEffectTests for IO(defaults)
alexandru Mar 5, 2018
1658988
ScalaDocs changes
alexandru Mar 5, 2018
076c409
Add Concurrent#uncancelable and onCancelRaiseError, plus laws
alexandru Mar 5, 2018
a586fa9
Remove unused param in law
alexandru Mar 5, 2018
917bd12
Make project binary compatible with 0.9
alexandru Mar 6, 2018
17c9d33
Address bincompat issues, take 2
alexandru Mar 6, 2018
e88692e
Make Async#shift private[effect]
alexandru Mar 6, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -7,3 +7,4 @@ target/
tags
.idea
benchmarks/results
.java-version
29 changes: 1 addition & 28 deletions build.sbt
Expand Up @@ -144,37 +144,10 @@ val mimaSettings = Seq(
val TagBase(major, minor) = BaseVersion

val tags = "git tag --list".!! split "\n" map { _.trim }

val versions =
tags filter { _ startsWith s"v$major.$minor" } map { _ substring 1 }

versions map { v => organization.value %% name.value % v } toSet
},
mimaBinaryIssueFilters ++= {
import com.typesafe.tools.mima.core._
import com.typesafe.tools.mima.core.ProblemFilters._
Seq(
exclude[MissingTypesProblem]("cats.effect.Sync$"),
exclude[IncompatibleTemplateDefProblem]("cats.effect.EffectInstances"),
exclude[IncompatibleTemplateDefProblem]("cats.effect.SyncInstances"),
exclude[IncompatibleTemplateDefProblem]("cats.effect.IOLowPriorityInstances"),
exclude[MissingTypesProblem]("cats.effect.Async$"),
exclude[MissingTypesProblem]("cats.effect.IO$"),
exclude[IncompatibleTemplateDefProblem]("cats.effect.LiftIOInstances"),
exclude[MissingTypesProblem]("cats.effect.LiftIO$"),
exclude[MissingTypesProblem]("cats.effect.Effect$"),
exclude[IncompatibleTemplateDefProblem]("cats.effect.AsyncInstances"),
exclude[IncompatibleTemplateDefProblem]("cats.effect.IOInstances"),
// Work on cancelable IO
exclude[IncompatibleMethTypeProblem]("cats.effect.IO#Async.apply"),
exclude[IncompatibleResultTypeProblem]("cats.effect.IO#Async.k"),
exclude[IncompatibleMethTypeProblem]("cats.effect.IO#Async.copy"),
exclude[IncompatibleResultTypeProblem]("cats.effect.IO#Async.copy$default$1"),
exclude[IncompatibleMethTypeProblem]("cats.effect.IO#Async.this"),
exclude[DirectMissingMethodProblem]("cats.effect.internals.IORunLoop#RestartCallback.this"),
exclude[DirectMissingMethodProblem]("cats.effect.internals.IOPlatform.onceOnly"),
exclude[MissingClassProblem]("cats.effect.internals.IORunLoop$RestartCallback$")
)
versions.map { v => organization.value %% name.value % v }.toSet
}
)

Expand Down
202 changes: 150 additions & 52 deletions core/shared/src/main/scala/cats/effect/Async.scala
Expand Up @@ -19,118 +19,216 @@ package effect

import simulacrum._
import cats.data.{EitherT, OptionT, StateT, WriterT}
import cats.effect.internals.Callback
import cats.effect.IO.{Delay, Pure, RaiseError}
import cats.effect.internals.{Callback, IORunLoop}

import scala.annotation.implicitNotFound
import scala.concurrent.ExecutionContext
import scala.util.Either

/**
* A monad that can describe asynchronous or synchronous computations that
* produce exactly one result.
* A monad that can describe asynchronous or synchronous computations
* that produce exactly one result.
*
* ==On Asynchrony==
*
* An asynchronous task represents logic that executes independent of
* the main program flow, or current callstack. It can be a task whose
* result gets computed on another thread, or on some other machine on
* the network.
*
* In terms of types, normally asynchronous processes are represented as:
* {{{
* (A => Unit) => Unit
* }}}
*
* This signature can be recognized in the "Observer pattern" described
* in the "Gang of Four", although it should be noted that without
* an `onComplete` event (like in the Rx Observable pattern) you can't
* detect completion in case this callback can be called zero or
* multiple times.
*
* Some abstractions allow for signaling an error condition
* (e.g. `MonadError` data types), so this would be a signature
* that's closer to Scala's `Future#onComplete`:
*
* {{{
* (Either[Throwable, A] => Unit) => Unit
* }}}
*
* And many times the abstractions built to deal with asynchronous tasks
* also provide a way to cancel such processes, to be used in race
* conditions in order to cleanup resources early:
*
* {{{
* (A => Unit) => Cancelable
* }}}
*
* This is approximately the signature of JavaScript's `setTimeout`,
* which will return a "task ID" that can be used to cancel it.
*
* N.B. this type class in particular is NOT describing cancelable
* async processes, see the [[Concurrent]] type class for that.
*
* ==Async Type class==
*
* This type class allows the modeling of data types that:
*
* 1. can start asynchronous processes
* 1. can emit one result on completion
* 1. can end in error
*
* N.B. on the "one result" signaling, this is not an ''exactly once''
* requirement. At this point streaming types can implement `Async`
* and such an ''exactly once'' requirement is only clear in [[Effect]].
*
* Therefore the signature exposed by the [[Async!.async async]]
* builder is this:
*
* {{{
* (Either[Throwable, A] => Unit) => Unit
* }}}
*
* N.B. such asynchronous processes are not cancelable.
* See the [[Concurrent]] alternative for that.
*/
@typeclass
@implicitNotFound("""Cannot find implicit value for Async[${F}].
Building this implicit value might depend on having an implicit
s.c.ExecutionContext in scope, a Strategy or some equivalent type.""")
s.c.ExecutionContext in scope, a Scheduler or some equivalent type.""")
trait Async[F[_]] extends Sync[F] with LiftIO[F] {

/**
* Creates an `F[A]` instance from a provided function
* that will have a callback injected for signaling the
* final result of an asynchronous process.
* Creates a simple, noncancelable `F[A]` instance that
* executes an asynchronous process on evaluation.
*
* The given function is being injected with a side-effectful
* callback for signaling the final result of an asynchronous
* process.
*
* @param k is a function that should be called with a
* callback for signaling the result once it is ready
*/
def async[A](k: (Either[Throwable, A] => Unit) => Unit): F[A]

/**
* @see [[IO.shift(ec* IO#shift]]
* Inherited from [[LiftIO]], defines a conversion from [[IO]]
* in terms of the `Async` type class.
*
* N.B. expressing this conversion in terms of `Async` and its
* capabilities means that the resulting `F` is not cancelable.
* [[Concurrent]] then overrides this with an implementation
* that is.
*
* To access this implementation as a standalone function, you can
* use [[Async$.liftIO Async.liftIO]] (on the object companion).
*/
override def liftIO[A](ioa: IO[A]): F[A] =
Async.liftIO(ioa)(this)
}

object Async {
/**
* Generic shift operation, defined for any `Async` data type.
*
* Shifts the bind continuation onto the specified thread pool.
* Analogous with [[IO.shift(ec* IO.shift]].
*/
def shift(implicit ec: ExecutionContext): F[Unit] = {
async { (cb: Either[Throwable, Unit] => Unit) =>
def shift[F[_]](ec: ExecutionContext)(implicit F: Async[F]): F[Unit] =
F.async { cb =>
ec.execute(new Runnable {
def run() = cb(Callback.rightUnit)
def run(): Unit = cb(Callback.rightUnit)
})
}
}

override def liftIO[A](ioa: IO[A]): F[A] = {
// Able to provide default with `IO#to`, given this `Async[F]`
ioa.to[F](this)
}
}

private[effect] abstract class AsyncInstances {
/**
* Lifts any `IO` value into any data type implementing [[Async]].
*
* This is the default `Async.liftIO` implementation.
*/
def liftIO[F[_], A](io: IO[A])(implicit F: Async[F]): F[A] =
io match {
case Pure(a) => F.pure(a)
case RaiseError(e) => F.raiseError(e)
case Delay(thunk) => F.delay(thunk())
case _ =>
F.suspend {
IORunLoop.step(io) match {
case Pure(a) => F.pure(a)
case RaiseError(e) => F.raiseError(e)
case async => F.async(async.unsafeRunAsync)
}
}
}

/**
* [[Async]] instance built for `cats.data.EitherT` values initialized
* with any `F` data type that also implements `Async`.
*/
implicit def catsEitherTAsync[F[_]: Async, L]: Async[EitherT[F, L, ?]] =
new EitherTAsync[F, L] { def F = Async[F] }

/**
* [[Async]] instance built for `cats.data.OptionT` values initialized
* with any `F` data type that also implements `Async`.
*/
implicit def catsOptionTAsync[F[_]: Async]: Async[OptionT[F, ?]] =
new OptionTAsync[F] { def F = Async[F] }

/**
* [[Async]] instance built for `cats.data.StateT` values initialized
* with any `F` data type that also implements `Async`.
*/
implicit def catsStateTAsync[F[_]: Async, S]: Async[StateT[F, S, ?]] =
new StateTAsync[F, S] { def F = Async[F] }

/**
* [[Async]] instance built for `cats.data.WriterT` values initialized
* with any `F` data type that also implements `Async`.
*/
implicit def catsWriterTAsync[F[_]: Async, L: Monoid]: Async[WriterT[F, L, ?]] =
new WriterTAsync[F, L] { def F = Async[F]; def L = Monoid[L] }

private[effect] trait EitherTAsync[F[_], L]
extends Async[EitherT[F, L, ?]]
with Sync.EitherTSync[F, L]
with LiftIO.EitherTLiftIO[F, L] {
private[effect] trait EitherTAsync[F[_], L] extends Async[EitherT[F, L, ?]]
with Sync.EitherTSync[F, L]
with LiftIO.EitherTLiftIO[F, L] {

override protected def F: Async[F]
private implicit def _F = F
override implicit protected def F: Async[F]
protected def FF = F

def async[A](k: (Either[Throwable, A] => Unit) => Unit): EitherT[F, L, A] =
EitherT.liftF(F.async(k))
}

private[effect] trait OptionTAsync[F[_]]
extends Async[OptionT[F, ?]]
with Sync.OptionTSync[F]
with LiftIO.OptionTLiftIO[F] {

override protected def F: Async[F]
private implicit def _F = F
private[effect] trait OptionTAsync[F[_]] extends Async[OptionT[F, ?]]
with Sync.OptionTSync[F]
with LiftIO.OptionTLiftIO[F] {

override protected implicit def F: Async[F]
protected def FF = F

def async[A](k: (Either[Throwable, A] => Unit) => Unit): OptionT[F, A] =
OptionT.liftF(F.async(k))
}

private[effect] trait StateTAsync[F[_], S]
extends Async[StateT[F, S, ?]]
with Sync.StateTSync[F, S]
with LiftIO.StateTLiftIO[F, S] {

override protected def F: Async[F]
private implicit def _F = F
private[effect] trait StateTAsync[F[_], S] extends Async[StateT[F, S, ?]]
with Sync.StateTSync[F, S]
with LiftIO.StateTLiftIO[F, S] {

override protected implicit def F: Async[F]
protected def FA = F

def async[A](k: (Either[Throwable, A] => Unit) => Unit): StateT[F, S, A] =
StateT.liftF(F.async(k))
}

private[effect] trait WriterTAsync[F[_], L]
extends Async[WriterT[F, L, ?]]
with Sync.WriterTSync[F, L]
with LiftIO.WriterTLiftIO[F, L] {

override protected def F: Async[F]
private implicit def _F = F
private[effect] trait WriterTAsync[F[_], L] extends Async[WriterT[F, L, ?]]
with Sync.WriterTSync[F, L]
with LiftIO.WriterTLiftIO[F, L] {

override protected implicit def F: Async[F]
protected def FA = F

private implicit def _L = L

def async[A](k: (Either[Throwable, A] => Unit) => Unit): WriterT[F, L, A] =
WriterT.liftF(F.async(k))
WriterT.liftF(F.async(k))(L, FA)
}
}

object Async extends AsyncInstances