From dd7aa87d74b77135d99a253ad69b846489e00c0b Mon Sep 17 00:00:00 2001 From: Olivier Melois Date: Thu, 22 Apr 2021 22:50:02 +0200 Subject: [PATCH 01/19] Polymorphic AsyncAwait implementation This preliminary work adds an async/await implementation based off the now built-in mechanism in the Scala 2 compiler. The grittiest details of the implementation are borrowed from : * https://github.com/scala/scala/pull/8816 * https://github.com/retronym/monad-ui/tree/master/src/main/scala/monadui * https://github.com/scala/scala-async Due to the reliance on Dispatcher#unsafeRunSync, the implementation currently only works on JVM. Error propagation / cancellation seems to behave as it should. NB : it is worth noting that despite it compiling, using this with OptionT/EitherT/IorT is currently unsafe, for two reasons: * what seems to be a bug in the MonadCancel instance tied to those Error-able types: See https://gitter.im/typelevel/cats-effect-dev?at=60818cf4ae90f3684098c042 * The fact that calling `unsafeRunSync` is `F[A] => A`, which obviously doesn't work for types that have an error channel that isn't accounted for by the CE typeclasses. --- build.sbt | 23 ++- std/jvm/src/main/scala-2/AsyncAwait.scala | 175 ++++++++++++++++++ .../cats/effect/std/AsyncAwaitSpec.scala | 121 ++++++++++++ 3 files changed, 310 insertions(+), 9 deletions(-) create mode 100644 std/jvm/src/main/scala-2/AsyncAwait.scala create mode 100644 tests/jvm/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala diff --git a/build.sbt b/build.sbt index 7a2bfd9b34..e3c765f479 100644 --- a/build.sbt +++ b/build.sbt @@ -222,13 +222,12 @@ lazy val kernel = crossProject(JSPlatform, JVMPlatform) libraryDependencies += "org.specs2" %%% "specs2-core" % Specs2Version % Test) .settings(dottyLibrarySettings) .settings(libraryDependencies += "org.typelevel" %%% "cats-core" % CatsVersion) - .jsSettings( - Compile / doc / sources := { - if (isDotty.value) - Seq() - else - (Compile / doc / sources).value - }) + .jsSettings(Compile / doc / sources := { + if (isDotty.value) + Seq() + else + (Compile / doc / sources).value + }) /** * Reference implementations (including a pure ConcurrentBracket), generic ScalaCheck @@ -303,7 +302,8 @@ lazy val tests = crossProject(JSPlatform, JVMPlatform) name := "cats-effect-tests", libraryDependencies ++= Seq( "org.typelevel" %%% "discipline-specs2" % DisciplineVersion % Test, - "org.typelevel" %%% "cats-kernel-laws" % CatsVersion % Test) + "org.typelevel" %%% "cats-kernel-laws" % CatsVersion % Test), + scalacOptions ++= List("-Xasync") ) .jvmSettings( Test / fork := true, @@ -329,7 +329,12 @@ lazy val std = crossProject(JSPlatform, JVMPlatform) else "org.specs2" %%% "specs2-scalacheck" % Specs2Version % Test }, - libraryDependencies += "org.scalacheck" %%% "scalacheck" % ScalaCheckVersion % Test + libraryDependencies += "org.scalacheck" %%% "scalacheck" % ScalaCheckVersion % Test, + libraryDependencies ++= { + if (!isDotty.value) + Seq("org.scala-lang" % "scala-reflect" % scalaVersion.value % "provided") + else Seq() + } ) /** diff --git a/std/jvm/src/main/scala-2/AsyncAwait.scala b/std/jvm/src/main/scala-2/AsyncAwait.scala new file mode 100644 index 0000000000..2516447988 --- /dev/null +++ b/std/jvm/src/main/scala-2/AsyncAwait.scala @@ -0,0 +1,175 @@ +/* + * Copyright 2020-2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std + +import scala.annotation.compileTimeOnly +import scala.reflect.macros.whitebox + +import cats.effect.std.Dispatcher +import cats.effect.kernel.Outcome +import cats.effect.kernel.Sync +import cats.effect.kernel.Async +import cats.effect.kernel.syntax.all._ + +class AsyncAwaitDsl[F[_]](implicit F: Async[F]) { + + /** + * Type member used by the macro expansion to recover what `F` is without typetags + */ + type _AsyncContext[A] = F[A] + + /** + * Value member used by the macro expansion to recover the Async instance associated to the block. + */ + implicit val _AsyncInstance: Async[F] = F + + /** + * Non-blocking await the on result of `awaitable`. This may only be used directly within an enclosing `async` block. + * + * Internally, this will register the remainder of the code in enclosing `async` block as a callback + * in the `onComplete` handler of `awaitable`, and will *not* block a thread. + */ + @compileTimeOnly("[async] `await` must be enclosed in an `async` block") + def await[T](awaitable: F[T]): T = + ??? // No implementation here, as calls to this are translated to `onComplete` by the macro. + + /** + * Run the block of code `body` asynchronously. `body` may contain calls to `await` when the results of + * a `Future` are needed; this is translated into non-blocking code. + */ + def async[T](body: => T): F[T] = macro AsyncAwaitDsl.asyncImpl[F, T] + +} + +object AsyncAwaitDsl { + + type Callback = Either[Throwable, AnyRef] => Unit + + def asyncImpl[F[_], T]( + c: whitebox.Context + )(body: c.Tree): c.Tree = { + import c.universe._ + if (!c.compilerSettings.contains("-Xasync")) { + c.abort( + c.macroApplication.pos, + "The async requires the compiler option -Xasync (supported only by Scala 2.12.12+ / 2.13.3+)" + ) + } else + try { + val awaitSym = typeOf[AsyncAwaitDsl[Any]].decl(TermName("await")) + def mark(t: DefDef): Tree = { + import language.reflectiveCalls + c.internal + .asInstanceOf[{ + def markForAsyncTransform( + owner: Symbol, + method: DefDef, + awaitSymbol: Symbol, + config: Map[String, AnyRef] + ): DefDef + }] + .markForAsyncTransform( + c.internal.enclosingOwner, + t, + awaitSym, + Map.empty + ) + } + val name = TypeName("stateMachine$async") + // format: off + q""" + final class $name(dispatcher: _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext], callback: _root_.cats.effect.std.AsyncAwaitDsl.Callback) extends _root_.cats.effect.std.AsyncAwaitStateMachine(dispatcher, callback) { + ${mark(q"""override def apply(tr$$async: _root_.cats.effect.kernel.Outcome[${c.prefix}._AsyncContext, _root_.scala.Throwable, _root_.scala.AnyRef]): _root_.scala.Unit = ${body}""")} + } + ${c.prefix}._AsyncInstance.recoverWith { + _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext].use { dispatcher => + ${c.prefix}._AsyncInstance.async_[_root_.scala.AnyRef](cb => new $name(dispatcher, cb).start()) + } + }{ + case _root_.cats.effect.std.AsyncAwaitDsl.CancelBridge => + ${c.prefix}._AsyncInstance.map(${c.prefix}._AsyncInstance.canceled)(_ => null.asInstanceOf[AnyRef]) + }.asInstanceOf[${c.macroApplication.tpe}] + """ + } catch { + case e: ReflectiveOperationException => + c.abort( + c.macroApplication.pos, + "-Xasync is provided as a Scala compiler option, but the async macro is unable to call c.internal.markForAsyncTransform. " + e.getClass.getName + " " + e.getMessage + ) + } + } + + // A marker exception to communicate cancellation through the async runtime. + object CancelBridge extends Throwable with scala.util.control.NoStackTrace +} + +abstract class AsyncAwaitStateMachine[F[_]]( + dispatcher: Dispatcher[F], + callback: AsyncAwaitDsl.Callback +)(implicit F: Sync[F]) extends Function1[Outcome[F, Throwable, AnyRef], Unit] { + + // FSM translated method + //def apply(v1: Outcome[IO, Throwable, AnyRef]): Unit = ??? + + private[this] var state$async: Int = 0 + + /** Retrieve the current value of the state variable */ + protected def state: Int = state$async + + /** Assign `i` to the state variable */ + protected def state_=(s: Int): Unit = state$async = s + + protected def completeFailure(t: Throwable): Unit = + callback(Left(t)) + + protected def completeSuccess(value: AnyRef): Unit = { + callback(Right(value)) + } + + protected def onComplete(f: F[AnyRef]): Unit = { + dispatcher.unsafeRunAndForget(f.guaranteeCase(outcome => F.delay(this(outcome)))) + } + + protected def getCompleted(f: F[AnyRef]): Outcome[F, Throwable, AnyRef] = { + val _ = f + null + } + + protected def tryGet(tr: Outcome[F, Throwable, AnyRef]): AnyRef = + tr match { + case Outcome.Succeeded(value) => + // TODO discuss how to propagate "errors"" from other + // error channels than the Async's, such as None + // in OptionT. Maybe some ad-hoc polymorphic construct + // with a custom path-dependent "bridge" exception type... + // ... or something + dispatcher.unsafeRunSync(value) + case Outcome.Errored(e) => + callback(Left(e)) + this // sentinel value to indicate the dispatch loop should exit. + case Outcome.Canceled() => + callback(Left(AsyncAwaitDsl.CancelBridge)) + this + } + + def start(): Unit = { + // Required to kickstart the async state machine. + // `def apply` does not consult its argument when `state == 0`. + apply(null) + } + +} diff --git a/tests/jvm/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala b/tests/jvm/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala new file mode 100644 index 0000000000..3950e7656d --- /dev/null +++ b/tests/jvm/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala @@ -0,0 +1,121 @@ +/* + * Copyright 2020-2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package std + +import scala.concurrent.duration._ +import cats.syntax.all._ +import cats.data.Kleisli + +class AsyncAwaitSpec extends BaseSpec { + + "IOAsyncAwait" should { + object IOAsyncAwait extends cats.effect.std.AsyncAwaitDsl[IO] + import IOAsyncAwait.{await => ioAwait, _} + + "work on success" in real { + + val io = IO.sleep(100.millis) >> IO.pure(1) + + val program = async(ioAwait(io) + ioAwait(io)) + + program.flatMap { res => + IO { + res must beEqualTo(2) + } + } + } + + "propagate errors outward" in real { + + case object Boom extends Throwable + val io = IO.raiseError[Int](Boom) + + val program = async(ioAwait(io)) + + program.attempt.flatMap { res => + IO { + res must beEqualTo(Left(Boom)) + } + } + } + + "propagate canceled outcomes outward" in real { + + val io = IO.canceled + + val program = async(ioAwait(io)) + + program.start.flatMap(_.join).flatMap { res => + IO { + res must beEqualTo(Outcome.canceled[IO, Throwable, Unit]) + } + } + } + + "be cancellable" in real { + + val program = for { + ref <- Ref[IO].of(0) + _ <- async { ioAwait(IO.sleep(100.millis) *> ref.update(_ + 1)) } + .start + .flatMap(_.cancel) + _ <- IO.sleep(200.millis) + result <- ref.get + } yield { + result + } + + program.flatMap { res => + IO { + res must beEqualTo(0) + } + } + + } + + "suspend side effects" in real { + var x = 0 + val program = async(x += 1) + + for { + before <- IO(x must beEqualTo(0)) + _ <- program + after <- IO(x must beEqualTo(1)) + } yield before && after + } + } + + "KleisliAsyncAwait" should { + type F[A] = Kleisli[IO, Int, A] + object KleisliAsyncAwait extends cats.effect.std.AsyncAwaitDsl[F] + import KleisliAsyncAwait.{await => kAwait, _} + + "work on successes" in real { + val io = Temporal[F].sleep(100.millis) >> Kleisli(x => IO.pure(x + 1)) + + val program = async(kAwait(io) + kAwait(io)) + + program.run(0).flatMap { res => + IO { + res must beEqualTo(2) + } + } + } + } + +} From 5490c0698d2f56e6a6c0908af2ca67bced2841d1 Mon Sep 17 00:00:00 2001 From: Olivier Melois Date: Fri, 23 Apr 2021 23:34:24 +0200 Subject: [PATCH 02/19] Remove useless import --- std/jvm/src/main/scala-2/AsyncAwait.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/std/jvm/src/main/scala-2/AsyncAwait.scala b/std/jvm/src/main/scala-2/AsyncAwait.scala index 2516447988..59939340fa 100644 --- a/std/jvm/src/main/scala-2/AsyncAwait.scala +++ b/std/jvm/src/main/scala-2/AsyncAwait.scala @@ -19,7 +19,6 @@ package cats.effect.std import scala.annotation.compileTimeOnly import scala.reflect.macros.whitebox -import cats.effect.std.Dispatcher import cats.effect.kernel.Outcome import cats.effect.kernel.Sync import cats.effect.kernel.Async From 47883bdceb57de8566a41eac3dd1a3bad87f795b Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Fri, 23 Apr 2021 16:18:56 -0600 Subject: [PATCH 03/19] Removed unused import --- std/jvm/src/main/scala-2/AsyncAwait.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/std/jvm/src/main/scala-2/AsyncAwait.scala b/std/jvm/src/main/scala-2/AsyncAwait.scala index 59939340fa..9922ce1c31 100644 --- a/std/jvm/src/main/scala-2/AsyncAwait.scala +++ b/std/jvm/src/main/scala-2/AsyncAwait.scala @@ -71,7 +71,6 @@ object AsyncAwaitDsl { try { val awaitSym = typeOf[AsyncAwaitDsl[Any]].decl(TermName("await")) def mark(t: DefDef): Tree = { - import language.reflectiveCalls c.internal .asInstanceOf[{ def markForAsyncTransform( From d6b9f990f7137dbacb70209fcaecdcc18612bfaa Mon Sep 17 00:00:00 2001 From: Olivier Melois Date: Sat, 24 Apr 2021 10:51:19 +0200 Subject: [PATCH 04/19] Add a "DipatchCloak" construct This aims at solving the problem of OptionT/EitherT/IorT traversing dispatchers --- .../scala/cats/effect/std/DispatchCloak.scala | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 std/shared/src/main/scala/cats/effect/std/DispatchCloak.scala diff --git a/std/shared/src/main/scala/cats/effect/std/DispatchCloak.scala b/std/shared/src/main/scala/cats/effect/std/DispatchCloak.scala new file mode 100644 index 0000000000..0fb43d44d3 --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/DispatchCloak.scala @@ -0,0 +1,85 @@ +/* + * Copyright 2020-2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std + +import cats.effect.kernel.Outcome +import cats.effect.kernel.Spawn +import cats.effect.kernel.syntax.all._ +import cats.syntax.all._ + +import cats.data.OptionT + +import scala.util.control.NoStackTrace + +/** + * Construct that "clocks" non-successful outcomes as Throwables + * to traverse dispatcher layers. + */ +abstract class DispatchCloak[F[_]](implicit F: Spawn[F]) { + + def cloak[A](fa: F[A]): F[Either[Throwable, A]] + + def uncloak[A](t: Throwable): Option[F[A]] + + def guaranteeCloak[A](fa: F[A]): F[Either[Throwable, A]] = { + fa.start.flatMap(_.join).flatMap { + case Outcome.Succeeded(fa) => cloak(fa) + case Outcome.Errored(e) => F.pure(Left(e)) + case Outcome.Canceled() => F.pure(Left(DispatchCloak.CancelCloak)) + } + } + + def recoverCloaked[A](fa: F[A]) = { + fa.handleErrorWith { + case DispatchCloak.CancelCloak => F.canceled.asInstanceOf[F[A]] + case other => uncloak[A](other).getOrElse(F.raiseError(other)) + } + } + +} + +object DispatchCloak extends LowPriorityDispatchCloakImplicits { + + def apply[F[_]](implicit instance: DispatchCloak[F]): DispatchCloak[F] = instance + + case object CancelCloak extends Throwable with NoStackTrace + + implicit def cloackForOptionT[F[_]: Spawn]: DispatchCloak[OptionT[F, *]] = + new DispatchCloak[OptionT[F, *]] { + // Path dependant class to ensure this instance doesn't intercept + // instances belonging to other transformer layers. + private case object NoneCloak extends Throwable with NoStackTrace + + def cloak[A](fa: OptionT[F, A]): OptionT[F, Either[Throwable, A]] = + fa.map(_.asRight[Throwable]).orElse(OptionT.pure(NoneCloak.asLeft[A])) + + def uncloak[A](t: Throwable): Option[OptionT[F, A]] = + if (t == NoneCloak) Some(OptionT.none[F, A]) else None + } + +} + +private[std] trait LowPriorityDispatchCloakImplicits { + + implicit def cloakForSpawn[F[_]: Spawn]: DispatchCloak[F] = new DispatchCloak[F] { + def cloak[A](fa: F[A]): F[Either[Throwable, A]] = + fa.map(a => Right(a)) + + def uncloak[A](t: Throwable): Option[F[A]] = None + } + +} From ec051cd28a974618e1a71ae93c0ec44026073cfc Mon Sep 17 00:00:00 2001 From: Olivier Melois Date: Sat, 24 Apr 2021 11:50:28 +0200 Subject: [PATCH 05/19] Fixed implementation of the OptionT dispatch cloak * Leveraged DispatchCloak in the async/await implementation * Added tests to show that it works for OptionT --- std/jvm/src/main/scala-2/AsyncAwait.scala | 45 +++++-------- .../scala/cats/effect/std/DispatchCloak.scala | 35 +++++++--- .../cats/effect/std/AsyncAwaitSpec.scala | 64 +++++++++++++++++++ 3 files changed, 106 insertions(+), 38 deletions(-) diff --git a/std/jvm/src/main/scala-2/AsyncAwait.scala b/std/jvm/src/main/scala-2/AsyncAwait.scala index 9922ce1c31..45e3251932 100644 --- a/std/jvm/src/main/scala-2/AsyncAwait.scala +++ b/std/jvm/src/main/scala-2/AsyncAwait.scala @@ -19,12 +19,10 @@ package cats.effect.std import scala.annotation.compileTimeOnly import scala.reflect.macros.whitebox -import cats.effect.kernel.Outcome import cats.effect.kernel.Sync import cats.effect.kernel.Async -import cats.effect.kernel.syntax.all._ -class AsyncAwaitDsl[F[_]](implicit F: Async[F]) { +class AsyncAwaitDsl[F[_]](implicit F: Async[F], Cloak: DispatchCloak[F]) { /** * Type member used by the macro expansion to recover what `F` is without typetags @@ -36,6 +34,8 @@ class AsyncAwaitDsl[F[_]](implicit F: Async[F]) { */ implicit val _AsyncInstance: Async[F] = F + implicit val _CloakInstance: DispatchCloak[F] = Cloak + /** * Non-blocking await the on result of `awaitable`. This may only be used directly within an enclosing `async` block. * @@ -90,16 +90,13 @@ object AsyncAwaitDsl { val name = TypeName("stateMachine$async") // format: off q""" - final class $name(dispatcher: _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext], callback: _root_.cats.effect.std.AsyncAwaitDsl.Callback) extends _root_.cats.effect.std.AsyncAwaitStateMachine(dispatcher, callback) { - ${mark(q"""override def apply(tr$$async: _root_.cats.effect.kernel.Outcome[${c.prefix}._AsyncContext, _root_.scala.Throwable, _root_.scala.AnyRef]): _root_.scala.Unit = ${body}""")} + final class $name(dispatcher: _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext], cloak: _root_.cats.effect.std.DispatchCloak[${c.prefix}._AsyncContext], callback: _root_.cats.effect.std.AsyncAwaitDsl.Callback) extends _root_.cats.effect.std.AsyncAwaitStateMachine(dispatcher, cloak, callback) { + ${mark(q"""override def apply(tr$$async: _root_.scala.Either[_root_.scala.Throwable, _root_.scala.AnyRef]): _root_.scala.Unit = ${body}""")} } - ${c.prefix}._AsyncInstance.recoverWith { + ${c.prefix}._CloakInstance.recoverCloaked { _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext].use { dispatcher => - ${c.prefix}._AsyncInstance.async_[_root_.scala.AnyRef](cb => new $name(dispatcher, cb).start()) + ${c.prefix}._AsyncInstance.async_[_root_.scala.AnyRef](cb => new $name(dispatcher, ${c.prefix}._CloakInstance, cb).start()) } - }{ - case _root_.cats.effect.std.AsyncAwaitDsl.CancelBridge => - ${c.prefix}._AsyncInstance.map(${c.prefix}._AsyncInstance.canceled)(_ => null.asInstanceOf[AnyRef]) }.asInstanceOf[${c.macroApplication.tpe}] """ } catch { @@ -111,17 +108,16 @@ object AsyncAwaitDsl { } } - // A marker exception to communicate cancellation through the async runtime. - object CancelBridge extends Throwable with scala.util.control.NoStackTrace } abstract class AsyncAwaitStateMachine[F[_]]( dispatcher: Dispatcher[F], + cloak: DispatchCloak[F], callback: AsyncAwaitDsl.Callback -)(implicit F: Sync[F]) extends Function1[Outcome[F, Throwable, AnyRef], Unit] { +)(implicit F: Sync[F]) extends Function1[Either[Throwable, AnyRef], Unit] { // FSM translated method - //def apply(v1: Outcome[IO, Throwable, AnyRef]): Unit = ??? + //def apply(v1: Either[Throwable, AnyRef]): Unit = ??? private[this] var state$async: Int = 0 @@ -139,29 +135,22 @@ abstract class AsyncAwaitStateMachine[F[_]]( } protected def onComplete(f: F[AnyRef]): Unit = { - dispatcher.unsafeRunAndForget(f.guaranteeCase(outcome => F.delay(this(outcome)))) + dispatcher.unsafeRunAndForget { + F.flatMap(cloak.guaranteeCloak(f))(either => F.delay(this(either))) + } } - protected def getCompleted(f: F[AnyRef]): Outcome[F, Throwable, AnyRef] = { + protected def getCompleted(f: F[AnyRef]): Either[Throwable, AnyRef] = { val _ = f null } - protected def tryGet(tr: Outcome[F, Throwable, AnyRef]): AnyRef = + protected def tryGet(tr: Either[Throwable, AnyRef]): AnyRef = tr match { - case Outcome.Succeeded(value) => - // TODO discuss how to propagate "errors"" from other - // error channels than the Async's, such as None - // in OptionT. Maybe some ad-hoc polymorphic construct - // with a custom path-dependent "bridge" exception type... - // ... or something - dispatcher.unsafeRunSync(value) - case Outcome.Errored(e) => + case Right(value) => value + case Left(e) => callback(Left(e)) this // sentinel value to indicate the dispatch loop should exit. - case Outcome.Canceled() => - callback(Left(AsyncAwaitDsl.CancelBridge)) - this } def start(): Unit = { diff --git a/std/shared/src/main/scala/cats/effect/std/DispatchCloak.scala b/std/shared/src/main/scala/cats/effect/std/DispatchCloak.scala index 0fb43d44d3..62dc8c3f67 100644 --- a/std/shared/src/main/scala/cats/effect/std/DispatchCloak.scala +++ b/std/shared/src/main/scala/cats/effect/std/DispatchCloak.scala @@ -26,8 +26,13 @@ import cats.data.OptionT import scala.util.control.NoStackTrace /** - * Construct that "clocks" non-successful outcomes as Throwables - * to traverse dispatcher layers. + * Construct that "cloaks" non-successful outcomes as path-dependant Throwables, + * allowing to traverse dispatcher layers and be recovered in effect-land on + * the other side of the unsafe region. + * + * In particular, side error channels that the kernel typeclasses do not know about, + * such as the ones of OptionT/EitherT/IorT, can be accounted for using this construct, + * making it possible to interoperate with impure semantics, in polymorphic ways. */ abstract class DispatchCloak[F[_]](implicit F: Spawn[F]) { @@ -56,26 +61,36 @@ object DispatchCloak extends LowPriorityDispatchCloakImplicits { def apply[F[_]](implicit instance: DispatchCloak[F]): DispatchCloak[F] = instance + // A marker exception to communicate cancellation through dispatch runtimes. case object CancelCloak extends Throwable with NoStackTrace - implicit def cloackForOptionT[F[_]: Spawn]: DispatchCloak[OptionT[F, *]] = + implicit def cloakForOptionT[F[_]: Spawn: DispatchCloak]: DispatchCloak[OptionT[F, *]] = new DispatchCloak[OptionT[F, *]] { - // Path dependant class to ensure this instance doesn't intercept - // instances belonging to other transformer layers. + // Path dependant object to ensure this doesn't intercept instances + // cloaked Nones belonging to other OptionT layers. private case object NoneCloak extends Throwable with NoStackTrace - def cloak[A](fa: OptionT[F, A]): OptionT[F, Either[Throwable, A]] = - fa.map(_.asRight[Throwable]).orElse(OptionT.pure(NoneCloak.asLeft[A])) + def cloak[A](fa: OptionT[F, A]): OptionT[F, Either[Throwable, A]] = { + OptionT.liftF( + DispatchCloak[F].cloak(fa.value).map(_.sequence.getOrElse(Left(this.NoneCloak)))) + } - def uncloak[A](t: Throwable): Option[OptionT[F, A]] = - if (t == NoneCloak) Some(OptionT.none[F, A]) else None + def uncloak[A](t: Throwable): Option[OptionT[F, A]] = t match { + case this.NoneCloak => Some(OptionT.none[F, A]) + case other => DispatchCloak[F].uncloak(other).map(OptionT.liftF[F, A]) + } } } private[std] trait LowPriorityDispatchCloakImplicits { - implicit def cloakForSpawn[F[_]: Spawn]: DispatchCloak[F] = new DispatchCloak[F] { + /** + * This should be the default instance for anything that does not have a side + * error channel that might prevent calls such `Dispatcher#unsafeRunSync` from + * terminating due to the kernel typeclasses not having knowledge of. + */ + implicit def defaultCloak[F[_]: Spawn]: DispatchCloak[F] = new DispatchCloak[F] { def cloak[A](fa: F[A]): F[Either[Throwable, A]] = fa.map(a => Right(a)) diff --git a/tests/jvm/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala b/tests/jvm/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala index 3950e7656d..1978037108 100644 --- a/tests/jvm/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala +++ b/tests/jvm/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala @@ -20,6 +20,7 @@ package std import scala.concurrent.duration._ import cats.syntax.all._ import cats.data.Kleisli +import cats.data.OptionT class AsyncAwaitSpec extends BaseSpec { @@ -118,4 +119,67 @@ class AsyncAwaitSpec extends BaseSpec { } } + "OptionTAsyncAwait" should { + type F[A] = OptionT[IO, A] + object OptionTAsyncAwait extends cats.effect.std.AsyncAwaitDsl[F] + import OptionTAsyncAwait.{await => oAwait, _} + + "work on successes" in real { + val io = Temporal[F].sleep(100.millis) >> OptionT.pure[IO](1) + + val program = async(oAwait(io) + oAwait(io)) + + program.value.flatMap { res => + IO { + res must beEqualTo(Some(2)) + } + } + } + + "work on None" in real { + val io1 = OptionT.pure[IO](1) + val io2 = OptionT.none[IO, Int] + + val program = async(oAwait(io1) + oAwait(io2)) + + program.value.flatMap { res => + IO { + res must beEqualTo(None) + } + } + } + } + + "Nested OptionT AsyncAwait" should { + type F[A] = OptionT[OptionT[IO, *], A] + object NestedAsyncAwait extends cats.effect.std.AsyncAwaitDsl[F] + import NestedAsyncAwait.{await => oAwait, _} + + "surface None at the right layer (1)" in real { + // val io1 = 1.pure[F] + val io2 = OptionT.liftF(OptionT.none[IO, Int]) + + val program = async(oAwait(io2)) + + program.value.value.flatMap { res => + IO { + res must beEqualTo(None) + } + } + } + + "surface None at the right layer (2)" in real { + val io1 = 1.pure[F] + val io2 = OptionT.none[OptionT[IO, *], Int] + + val program = async(oAwait(io1) + oAwait(io2)) + + program.value.value.flatMap { res => + IO { + res must beEqualTo(Some(None)) + } + } + } + } + } From 911083178b0c81157e7da54fe015b135c1ee1d55 Mon Sep 17 00:00:00 2001 From: Olivier Melois Date: Sun, 25 Apr 2021 11:57:46 +0200 Subject: [PATCH 06/19] Adds a "Resume typeclass construct" This allows Async/Await to work with any monadic effect. --- std/jvm/src/main/scala-2/AsyncAwait.scala | 54 ++++++---- .../scala/cats/effect/std/DispatchCloak.scala | 100 ------------------ .../main/scala/cats/effect/std/Resume.scala | 81 ++++++++++++++ .../cats/effect/std/AsyncAwaitSpec.scala | 21 ++++ 4 files changed, 137 insertions(+), 119 deletions(-) delete mode 100644 std/shared/src/main/scala/cats/effect/std/DispatchCloak.scala create mode 100644 std/shared/src/main/scala/cats/effect/std/Resume.scala diff --git a/std/jvm/src/main/scala-2/AsyncAwait.scala b/std/jvm/src/main/scala-2/AsyncAwait.scala index 45e3251932..451546c243 100644 --- a/std/jvm/src/main/scala-2/AsyncAwait.scala +++ b/std/jvm/src/main/scala-2/AsyncAwait.scala @@ -19,10 +19,15 @@ package cats.effect.std import scala.annotation.compileTimeOnly import scala.reflect.macros.whitebox -import cats.effect.kernel.Sync import cats.effect.kernel.Async +import cats.effect.kernel.Outcome +import cats.effect.kernel.syntax.all._ +import cats.syntax.all._ +import cats.effect.kernel.Outcome.Canceled +import cats.effect.kernel.Outcome.Errored +import cats.effect.kernel.Outcome.Succeeded -class AsyncAwaitDsl[F[_]](implicit F: Async[F], Cloak: DispatchCloak[F]) { +class AsyncAwaitDsl[F[_]](implicit F: Async[F], R: Resume[F]) { /** * Type member used by the macro expansion to recover what `F` is without typetags @@ -34,7 +39,7 @@ class AsyncAwaitDsl[F[_]](implicit F: Async[F], Cloak: DispatchCloak[F]) { */ implicit val _AsyncInstance: Async[F] = F - implicit val _CloakInstance: DispatchCloak[F] = Cloak + implicit val _ResumeInstance: Resume[F] = R /** * Non-blocking await the on result of `awaitable`. This may only be used directly within an enclosing `async` block. @@ -56,7 +61,9 @@ class AsyncAwaitDsl[F[_]](implicit F: Async[F], Cloak: DispatchCloak[F]) { object AsyncAwaitDsl { - type Callback = Either[Throwable, AnyRef] => Unit + type CallbackTarget[F[_]] = Outcome[F, Throwable, AnyRef] + type Callback[F[_]] = Either[Throwable, CallbackTarget[F]] => Unit + type ResumeOutcome[F[_]] = Either[F[AnyRef], (F[Unit], AnyRef)] def asyncImpl[F[_], T]( c: whitebox.Context @@ -90,13 +97,15 @@ object AsyncAwaitDsl { val name = TypeName("stateMachine$async") // format: off q""" - final class $name(dispatcher: _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext], cloak: _root_.cats.effect.std.DispatchCloak[${c.prefix}._AsyncContext], callback: _root_.cats.effect.std.AsyncAwaitDsl.Callback) extends _root_.cats.effect.std.AsyncAwaitStateMachine(dispatcher, cloak, callback) { - ${mark(q"""override def apply(tr$$async: _root_.scala.Either[_root_.scala.Throwable, _root_.scala.AnyRef]): _root_.scala.Unit = ${body}""")} + final class $name(dispatcher: _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext], resume: _root_.cats.effect.std.Resume[${c.prefix}._AsyncContext], callback: _root_.cats.effect.std.AsyncAwaitDsl.Callback[${c.prefix}._AsyncContext]) extends _root_.cats.effect.std.AsyncAwaitStateMachine(dispatcher, resume, callback) { + ${mark(q"""override def apply(tr$$async: _root_.cats.effect.std.AsyncAwaitDsl.ResumeOutcome[${c.prefix}._AsyncContext]): _root_.scala.Unit = ${body}""")} } - ${c.prefix}._CloakInstance.recoverCloaked { + ${c.prefix}._AsyncInstance.flatMap { _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext].use { dispatcher => - ${c.prefix}._AsyncInstance.async_[_root_.scala.AnyRef](cb => new $name(dispatcher, ${c.prefix}._CloakInstance, cb).start()) + ${c.prefix}._AsyncInstance.async_[_root_.cats.effect.kernel.Outcome[${c.prefix}._AsyncContext, Throwable, AnyRef]](cb => new $name(dispatcher, ${c.prefix}._ResumeInstance, cb).start()) } + }{ outcome => + outcome.embedNever }.asInstanceOf[${c.macroApplication.tpe}] """ } catch { @@ -112,13 +121,14 @@ object AsyncAwaitDsl { abstract class AsyncAwaitStateMachine[F[_]]( dispatcher: Dispatcher[F], - cloak: DispatchCloak[F], - callback: AsyncAwaitDsl.Callback -)(implicit F: Sync[F]) extends Function1[Either[Throwable, AnyRef], Unit] { + resume: Resume[F], + callback: AsyncAwaitDsl.Callback[F] +)(implicit F: Async[F]) extends Function1[AsyncAwaitDsl.ResumeOutcome[F], Unit] { // FSM translated method - //def apply(v1: Either[Throwable, AnyRef]): Unit = ??? + //def apply(v1: AsyncAwaitDsl.ResumeOutcome[F]): Unit = ??? + private[this] var recordedEffect : F[Unit] = F.unit private[this] var state$async: Int = 0 /** Retrieve the current value of the state variable */ @@ -131,25 +141,31 @@ abstract class AsyncAwaitStateMachine[F[_]]( callback(Left(t)) protected def completeSuccess(value: AnyRef): Unit = { - callback(Right(value)) + callback(Right(Outcome.Succeeded(F.as(recordedEffect, value)))) } protected def onComplete(f: F[AnyRef]): Unit = { dispatcher.unsafeRunAndForget { - F.flatMap(cloak.guaranteeCloak(f))(either => F.delay(this(either))) + (recordedEffect *> f).start.flatMap(_.join).flatMap { + case Canceled() => F.delay(this(Left(F.canceled.asInstanceOf[F[AnyRef]]))) + case Errored(e) => F.delay(this(Left(F.raiseError(e)))) + case Succeeded(fa) => resume.resume(fa).flatMap(r => F.delay(this(r))) + } } } - protected def getCompleted(f: F[AnyRef]): Either[Throwable, AnyRef] = { + protected def getCompleted(f: F[AnyRef]): AsyncAwaitDsl.ResumeOutcome[F] = { val _ = f null } - protected def tryGet(tr: Either[Throwable, AnyRef]): AnyRef = + protected def tryGet(tr: AsyncAwaitDsl.ResumeOutcome[F]): AnyRef = tr match { - case Right(value) => value - case Left(e) => - callback(Left(e)) + case Right((newEffect, value)) => + recordedEffect = newEffect + value + case Left(monadicStop) => + callback(Right(Outcome.succeeded(monadicStop))) this // sentinel value to indicate the dispatch loop should exit. } diff --git a/std/shared/src/main/scala/cats/effect/std/DispatchCloak.scala b/std/shared/src/main/scala/cats/effect/std/DispatchCloak.scala deleted file mode 100644 index 62dc8c3f67..0000000000 --- a/std/shared/src/main/scala/cats/effect/std/DispatchCloak.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright 2020-2021 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats.effect.std - -import cats.effect.kernel.Outcome -import cats.effect.kernel.Spawn -import cats.effect.kernel.syntax.all._ -import cats.syntax.all._ - -import cats.data.OptionT - -import scala.util.control.NoStackTrace - -/** - * Construct that "cloaks" non-successful outcomes as path-dependant Throwables, - * allowing to traverse dispatcher layers and be recovered in effect-land on - * the other side of the unsafe region. - * - * In particular, side error channels that the kernel typeclasses do not know about, - * such as the ones of OptionT/EitherT/IorT, can be accounted for using this construct, - * making it possible to interoperate with impure semantics, in polymorphic ways. - */ -abstract class DispatchCloak[F[_]](implicit F: Spawn[F]) { - - def cloak[A](fa: F[A]): F[Either[Throwable, A]] - - def uncloak[A](t: Throwable): Option[F[A]] - - def guaranteeCloak[A](fa: F[A]): F[Either[Throwable, A]] = { - fa.start.flatMap(_.join).flatMap { - case Outcome.Succeeded(fa) => cloak(fa) - case Outcome.Errored(e) => F.pure(Left(e)) - case Outcome.Canceled() => F.pure(Left(DispatchCloak.CancelCloak)) - } - } - - def recoverCloaked[A](fa: F[A]) = { - fa.handleErrorWith { - case DispatchCloak.CancelCloak => F.canceled.asInstanceOf[F[A]] - case other => uncloak[A](other).getOrElse(F.raiseError(other)) - } - } - -} - -object DispatchCloak extends LowPriorityDispatchCloakImplicits { - - def apply[F[_]](implicit instance: DispatchCloak[F]): DispatchCloak[F] = instance - - // A marker exception to communicate cancellation through dispatch runtimes. - case object CancelCloak extends Throwable with NoStackTrace - - implicit def cloakForOptionT[F[_]: Spawn: DispatchCloak]: DispatchCloak[OptionT[F, *]] = - new DispatchCloak[OptionT[F, *]] { - // Path dependant object to ensure this doesn't intercept instances - // cloaked Nones belonging to other OptionT layers. - private case object NoneCloak extends Throwable with NoStackTrace - - def cloak[A](fa: OptionT[F, A]): OptionT[F, Either[Throwable, A]] = { - OptionT.liftF( - DispatchCloak[F].cloak(fa.value).map(_.sequence.getOrElse(Left(this.NoneCloak)))) - } - - def uncloak[A](t: Throwable): Option[OptionT[F, A]] = t match { - case this.NoneCloak => Some(OptionT.none[F, A]) - case other => DispatchCloak[F].uncloak(other).map(OptionT.liftF[F, A]) - } - } - -} - -private[std] trait LowPriorityDispatchCloakImplicits { - - /** - * This should be the default instance for anything that does not have a side - * error channel that might prevent calls such `Dispatcher#unsafeRunSync` from - * terminating due to the kernel typeclasses not having knowledge of. - */ - implicit def defaultCloak[F[_]: Spawn]: DispatchCloak[F] = new DispatchCloak[F] { - def cloak[A](fa: F[A]): F[Either[Throwable, A]] = - fa.map(a => Right(a)) - - def uncloak[A](t: Throwable): Option[F[A]] = None - } - -} diff --git a/std/shared/src/main/scala/cats/effect/std/Resume.scala b/std/shared/src/main/scala/cats/effect/std/Resume.scala new file mode 100644 index 0000000000..57b50a3420 --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/Resume.scala @@ -0,0 +1,81 @@ +/* + * Copyright 2020-2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std + +import cats.Monad +import cats.syntax.all._ +import cats.data.OptionT +import cats.data.WriterT +import cats.kernel.Monoid + +/** + * Encodes the ability to peek into a computation's product/coproduct + * structure to decide whether it can be resumed or should be short-stopped. + * + * If the computation can be resumed (typically when all monadic layers + * have been applied successfully), Right is yielded, + * with a `F[Unit]` keeping track of some potential product components + * in the monadic effect. + * + * If the computation cannot be resumed because of some non-successful + * coproduct component in the monadic effect, Left is yielded. + */ +trait Resume[F[_]] { + + def resume[A](fa: F[A]): F[Either[F[A], (F[Unit], A)]] + +} + +object Resume extends LowPriorityResumeInstances { + + implicit def optionTResume[F[_]](implicit F: Monad[F], R: Resume[F]): Resume[OptionT[F, *]] = + new Resume[OptionT[F, *]] { + def resume[A]( + fa: OptionT[F, A]): OptionT[F, Either[OptionT[F, A], (OptionT[F, Unit], A)]] = + OptionT.liftF { + R.resume(fa.value).map { + case Left(fa) => Left(OptionT(fa)) + case Right((funit, None)) => Left(OptionT(funit.as(None))) + case Right((funit, Some(value))) => Right(OptionT.liftF(funit), value) + } + } + } + + implicit def writerTResume[F[_]: Monad, L: Monoid]( + implicit R: Resume[F]): Resume[WriterT[F, L, *]] = + new Resume[WriterT[F, L, *]] { + def resume[A](fa: WriterT[F, L, A]) + : WriterT[F, L, Either[WriterT[F, L, A], (WriterT[F, L, Unit], A)]] = + WriterT.liftF { + R.resume(fa.run).map { + case Left(value) => Left(WriterT(value)) + case Right((funit, (log, value))) => + val w = WriterT(funit.map(log -> _)) + Right((w, value)) + } + } + } + +} + +trait LowPriorityResumeInstances { + + implicit def defaultMonadicResume[F[_]](implicit F: Monad[F]): Resume[F] = new Resume[F] { + def resume[A](fa: F[A]): F[Either[F[A], (F[Unit], A)]] = fa.map { a => Right((F.unit, a)) } + } + +} diff --git a/tests/jvm/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala b/tests/jvm/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala index 1978037108..b63ecfd072 100644 --- a/tests/jvm/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala +++ b/tests/jvm/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala @@ -21,6 +21,7 @@ import scala.concurrent.duration._ import cats.syntax.all._ import cats.data.Kleisli import cats.data.OptionT +import cats.data.WriterT class AsyncAwaitSpec extends BaseSpec { @@ -182,4 +183,24 @@ class AsyncAwaitSpec extends BaseSpec { } } + "WriteT AsyncAwait" should { + type F[A] = WriterT[IO, Int, A] + object NestedAsyncAwait extends cats.effect.std.AsyncAwaitDsl[F] + import NestedAsyncAwait.{await => wAwait, _} + + "surface logged " in real { + // val io1 = 1.pure[F] + val io1 = WriterT(IO(1, 3)) + + val program = async(wAwait(io1) * wAwait(io1)) + + program.run.flatMap { res => + IO { + res must beEqualTo((2, 9)) + } + } + } + + } + } From 3d92776cd48e3e10562da48ee2e7e017324dc0c0 Mon Sep 17 00:00:00 2001 From: Olivier Melois Date: Sun, 25 Apr 2021 12:19:12 +0200 Subject: [PATCH 07/19] Remove unnecessary outcome wrapping --- std/jvm/src/main/scala-2/AsyncAwait.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/std/jvm/src/main/scala-2/AsyncAwait.scala b/std/jvm/src/main/scala-2/AsyncAwait.scala index 451546c243..6173ba26da 100644 --- a/std/jvm/src/main/scala-2/AsyncAwait.scala +++ b/std/jvm/src/main/scala-2/AsyncAwait.scala @@ -20,7 +20,6 @@ import scala.annotation.compileTimeOnly import scala.reflect.macros.whitebox import cats.effect.kernel.Async -import cats.effect.kernel.Outcome import cats.effect.kernel.syntax.all._ import cats.syntax.all._ import cats.effect.kernel.Outcome.Canceled @@ -61,7 +60,7 @@ class AsyncAwaitDsl[F[_]](implicit F: Async[F], R: Resume[F]) { object AsyncAwaitDsl { - type CallbackTarget[F[_]] = Outcome[F, Throwable, AnyRef] + type CallbackTarget[F[_]] = F[AnyRef] type Callback[F[_]] = Either[Throwable, CallbackTarget[F]] => Unit type ResumeOutcome[F[_]] = Either[F[AnyRef], (F[Unit], AnyRef)] @@ -100,12 +99,10 @@ object AsyncAwaitDsl { final class $name(dispatcher: _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext], resume: _root_.cats.effect.std.Resume[${c.prefix}._AsyncContext], callback: _root_.cats.effect.std.AsyncAwaitDsl.Callback[${c.prefix}._AsyncContext]) extends _root_.cats.effect.std.AsyncAwaitStateMachine(dispatcher, resume, callback) { ${mark(q"""override def apply(tr$$async: _root_.cats.effect.std.AsyncAwaitDsl.ResumeOutcome[${c.prefix}._AsyncContext]): _root_.scala.Unit = ${body}""")} } - ${c.prefix}._AsyncInstance.flatMap { + ${c.prefix}._AsyncInstance.flatten { _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext].use { dispatcher => - ${c.prefix}._AsyncInstance.async_[_root_.cats.effect.kernel.Outcome[${c.prefix}._AsyncContext, Throwable, AnyRef]](cb => new $name(dispatcher, ${c.prefix}._ResumeInstance, cb).start()) + ${c.prefix}._AsyncInstance.async_[${c.prefix}._AsyncContext[AnyRef]](cb => new $name(dispatcher, ${c.prefix}._ResumeInstance, cb).start()) } - }{ outcome => - outcome.embedNever }.asInstanceOf[${c.macroApplication.tpe}] """ } catch { @@ -141,7 +138,7 @@ abstract class AsyncAwaitStateMachine[F[_]]( callback(Left(t)) protected def completeSuccess(value: AnyRef): Unit = { - callback(Right(Outcome.Succeeded(F.as(recordedEffect, value)))) + callback(Right(F.as(recordedEffect, value))) } protected def onComplete(f: F[AnyRef]): Unit = { @@ -165,7 +162,7 @@ abstract class AsyncAwaitStateMachine[F[_]]( recordedEffect = newEffect value case Left(monadicStop) => - callback(Right(Outcome.succeeded(monadicStop))) + callback(Right(monadicStop)) this // sentinel value to indicate the dispatch loop should exit. } From 41d00ed51170dd0fa710c5107b7189f1aa8fe09f Mon Sep 17 00:00:00 2001 From: Olivier Melois Date: Sun, 25 Apr 2021 17:31:48 +0200 Subject: [PATCH 08/19] Added more resume instances --- .../main/scala/cats/effect/std/Resume.scala | 55 +++++++++++++++++-- 1 file changed, 49 insertions(+), 6 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Resume.scala b/std/shared/src/main/scala/cats/effect/std/Resume.scala index 57b50a3420..f77b6e4577 100644 --- a/std/shared/src/main/scala/cats/effect/std/Resume.scala +++ b/std/shared/src/main/scala/cats/effect/std/Resume.scala @@ -20,7 +20,11 @@ import cats.Monad import cats.syntax.all._ import cats.data.OptionT import cats.data.WriterT +import cats.data.Kleisli import cats.kernel.Monoid +import cats.data.IorT +import cats.data.Ior +import cats.data.EitherT /** * Encodes the ability to peek into a computation's product/coproduct @@ -42,13 +46,13 @@ trait Resume[F[_]] { object Resume extends LowPriorityResumeInstances { - implicit def optionTResume[F[_]](implicit F: Monad[F], R: Resume[F]): Resume[OptionT[F, *]] = + implicit def optionTResume[F[_]](implicit F: Monad[F], RF: Resume[F]): Resume[OptionT[F, *]] = new Resume[OptionT[F, *]] { def resume[A]( fa: OptionT[F, A]): OptionT[F, Either[OptionT[F, A], (OptionT[F, Unit], A)]] = OptionT.liftF { - R.resume(fa.value).map { - case Left(fa) => Left(OptionT(fa)) + RF.resume(fa.value).map { + case Left(stopped) => Left(OptionT(stopped)) case Right((funit, None)) => Left(OptionT(funit.as(None))) case Right((funit, Some(value))) => Right(OptionT.liftF(funit), value) } @@ -56,13 +60,13 @@ object Resume extends LowPriorityResumeInstances { } implicit def writerTResume[F[_]: Monad, L: Monoid]( - implicit R: Resume[F]): Resume[WriterT[F, L, *]] = + implicit RF: Resume[F]): Resume[WriterT[F, L, *]] = new Resume[WriterT[F, L, *]] { def resume[A](fa: WriterT[F, L, A]) : WriterT[F, L, Either[WriterT[F, L, A], (WriterT[F, L, Unit], A)]] = WriterT.liftF { - R.resume(fa.run).map { - case Left(value) => Left(WriterT(value)) + RF.resume(fa.run).map { + case Left(stopped) => Left(WriterT(stopped)) case Right((funit, (log, value))) => val w = WriterT(funit.map(log -> _)) Right((w, value)) @@ -70,6 +74,45 @@ object Resume extends LowPriorityResumeInstances { } } + implicit def kleisliResume[F[_]: Monad, R](implicit RF: Resume[F]): Resume[Kleisli[F, R, *]] = + new Resume[Kleisli[F, R, *]] { + def resume[A](fa: Kleisli[F, R, A]) + : Kleisli[F, R, Either[Kleisli[F, R, A], (Kleisli[F, R, Unit], A)]] = Kleisli { r => + RF.resume(fa.run(r)).map { + case Left(stopped) => Left(Kleisli.liftF[F, R, A](stopped)) + case Right((funit, value)) => Right((Kleisli.liftF(funit), value)) + } + } + } + + implicit def iorTResume[F[_]: Monad, E](implicit RF: Resume[F]): Resume[IorT[F, E, *]] = + new Resume[IorT[F, E, *]] { + def resume[A]( + fa: IorT[F, E, A]): IorT[F, E, Either[IorT[F, E, A], (IorT[F, E, Unit], A)]] = + IorT.liftF { + RF.resume(fa.value).map { + case Left(stopped) => Left(IorT(stopped)) + case Right((funit, Ior.Right(value))) => Right((IorT.liftF(funit), value)) + case Right((funit, left @ Ior.Left(_))) => Left(IorT(funit.as(left))) + case Right((funit, Ior.Both(e, value))) => + Right(IorT(funit.as(Ior.Both(e, ()))), value) + } + } + } + + implicit def eitherTResume[F[_]: Monad, E](implicit RF: Resume[F]): Resume[EitherT[F, E, *]] = + new Resume[EitherT[F, E, *]] { + def resume[A](fa: EitherT[F, E, A]) + : EitherT[F, E, Either[EitherT[F, E, A], (EitherT[F, E, Unit], A)]] = + EitherT.liftF { + RF.resume(fa.value).map { + case Left(stopped) => Left(EitherT(stopped)) + case Right((funit, Right(value))) => Right((EitherT.liftF(funit), value)) + case Right((funit, left @ Left(_))) => Left(EitherT(funit.as(left))) + } + } + } + } trait LowPriorityResumeInstances { From cea18d8c77ac216c23d81327a31b10b7834cdd05 Mon Sep 17 00:00:00 2001 From: Olivier Melois Date: Sun, 25 Apr 2021 21:40:59 +0200 Subject: [PATCH 09/19] Make AsyncAwait cross-compile to JS --- build.sbt | 18 +++++++++++++++++- .../src/main/scala-2/AsyncAwait.scala | 0 .../cats/effect/std/AsyncAwaitSpec.scala | 0 3 files changed, 17 insertions(+), 1 deletion(-) rename std/{jvm => shared}/src/main/scala-2/AsyncAwait.scala (100%) rename tests/{jvm => shared}/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala (100%) diff --git a/build.sbt b/build.sbt index e3c765f479..d329f59015 100644 --- a/build.sbt +++ b/build.sbt @@ -303,7 +303,15 @@ lazy val tests = crossProject(JSPlatform, JVMPlatform) libraryDependencies ++= Seq( "org.typelevel" %%% "discipline-specs2" % DisciplineVersion % Test, "org.typelevel" %%% "cats-kernel-laws" % CatsVersion % Test), - scalacOptions ++= List("-Xasync") + scalacOptions ++= List("-Xasync"), + Test / unmanagedSourceDirectories ++= { + if (!isDotty.value) + Seq( + (Compile / baseDirectory) + .value + .getParentFile() / "shared" / "src" / "test" / "scala-2") + else Seq() + } ) .jvmSettings( Test / fork := true, @@ -334,6 +342,14 @@ lazy val std = crossProject(JSPlatform, JVMPlatform) if (!isDotty.value) Seq("org.scala-lang" % "scala-reflect" % scalaVersion.value % "provided") else Seq() + }, + Compile / unmanagedSourceDirectories ++= { + if (!isDotty.value) + Seq( + (Compile / baseDirectory) + .value + .getParentFile() / "shared" / "src" / "main" / "scala-2") + else Seq() } ) diff --git a/std/jvm/src/main/scala-2/AsyncAwait.scala b/std/shared/src/main/scala-2/AsyncAwait.scala similarity index 100% rename from std/jvm/src/main/scala-2/AsyncAwait.scala rename to std/shared/src/main/scala-2/AsyncAwait.scala diff --git a/tests/jvm/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala b/tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala similarity index 100% rename from tests/jvm/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala rename to tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala From e544cb9ecede07b44f3b76749fd9148aeaabd9ff Mon Sep 17 00:00:00 2001 From: Olivier Melois Date: Mon, 26 Apr 2021 10:53:40 +0200 Subject: [PATCH 10/19] Add Resume laws and tests --- .../scala/cats/effect/std/ResumeSpec.scala | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 tests/shared/src/test/scala/cats/effect/std/ResumeSpec.scala diff --git a/tests/shared/src/test/scala/cats/effect/std/ResumeSpec.scala b/tests/shared/src/test/scala/cats/effect/std/ResumeSpec.scala new file mode 100644 index 0000000000..9df8fdaf40 --- /dev/null +++ b/tests/shared/src/test/scala/cats/effect/std/ResumeSpec.scala @@ -0,0 +1,119 @@ +package cats.effect +package std + +import cats.data._ +import cats.effect.kernel.Async +import cats.effect.laws._ +import cats.effect.syntax.all._ +import cats.kernel.Eq +import cats.laws.discipline.arbitrary._ +import cats.syntax.all._ +import org.scalacheck._ +import org.specs2.ScalaCheck +import org.specs2.mutable.Specification +import org.typelevel.discipline.Laws +import org.typelevel.discipline.specs2.mutable.Discipline + +import Prop.forAll + +trait ResumeLaws[F[_]] { + + implicit val async: Async[F] + implicit val resume: Resume[F] + + // Running the effect through unsafe async boundary to ensure + // that non-CE semantics are accounted for by the Resume. + // + // In case of unaccounted coproduct semantics (side error channels), the unsafe + // run will hang, making the test eventually timeout. + // + // In case of unaccounted product semantics (WriterT, IorT.Both), the unsafe + // run will forget/lose the log channel. + // + // This law therefore verifies that the Resume materialises all monadic, + // but non CE-related monadic information so that the outer effect contains only + // CE semantics, which can be ran through a dispatcher without hanging. + // The law also verifies that the absorption of the materialised effects + // after the dispatch contains the same amount of information as the initial + // effect. + def accountsForAllButSideEffects[A](fa: F[A]) = { + val throughUnsafeBoundary = Dispatcher[F].use { dispatcher => + val resumed = resume.resume(fa) + + resumed.start.flatMap(_.join).flatMap { + case Outcome.Succeeded(ffa) => + val ffuture = async.delay(dispatcher.unsafeToFuture(ffa)) + async.fromFuture(ffuture).flatMap { + case Left(fa) => fa + case Right((funit, a)) => funit.as(a) + } + case Outcome.Canceled() => async.canceled *> async.never[A] + case Outcome.Errored(e) => async.raiseError[A](e) + } + } + throughUnsafeBoundary <-> fa + } + + def resumePureIsUnitAndA[A](a: A) = { + resume.resume(async.pure(a)) <-> async.pure(Right((async.unit, a))) + } + +} + +object ResumeLaws { + def apply[F[_]](implicit F0: Async[F], R: Resume[F]): ResumeLaws[F] = new ResumeLaws[F] { + val async: Async[F] = F0 + val resume: Resume[F] = R + } +} + +trait ResumeTests[F[_]] extends Laws { + + val laws: ResumeLaws[F] + + def resume[A]( + implicit ArbFA: Arbitrary[F[A]], + ArbA: Arbitrary[A], + EqFA: Eq[F[A]], + EqResume: Eq[F[Either[F[A], (F[Unit], A)]]]): RuleSet = { + new RuleSet { + val name = "resume" + val bases = Nil + val parents = Seq() + + val props = Seq( + "accountsForAllButSideEffects" -> forAll(laws.accountsForAllButSideEffects[A] _), + "resumePureIsUnitAndA" -> forAll(laws.resumePureIsUnitAndA[A] _) + ) + } + } +} + +object ResumeTests { + def apply[F[_]](implicit F0: Async[F], R: Resume[F]): ResumeTests[F] = + new ResumeTests[F] { + val laws = ResumeLaws[F] + } +} + +class ResumeSpec extends Specification with Discipline with ScalaCheck with BaseSpec { + outer => + + // we just need this because of the laws testing, since the prop runs can interfere with each other + sequential + + implicit def kleisliEq[A](implicit eqIOA: Eq[IO[A]]): Eq[Kleisli[IO, Int, A]] = + Eq.by[Kleisli[IO, Int, A], IO[A]](_.run(0)) + + { + implicit val ticker = Ticker() + + checkAll("OptionT[IO, *]", ResumeTests[OptionT[IO, *]].resume[Int]) + checkAll("IorT[IO, Int, *]", ResumeTests[IorT[IO, Int, *]].resume[Int]) + checkAll("WriterT[IO, Int, *]", ResumeTests[WriterT[IO, Int, *]].resume[Int]) + checkAll("EitherT[IO, Int, *]", ResumeTests[EitherT[IO, Int, *]].resume[Int]) + checkAll("Kleisli[IO, Int, *]", ResumeTests[Kleisli[IO, Int, *]].resume[Int]) + checkAll("IO", ResumeTests[IO].resume[Int]) + } + +} From 48fa4320c772241db764263db6b94142cdf4f2a2 Mon Sep 17 00:00:00 2001 From: Olivier Melois Date: Thu, 29 Apr 2021 11:42:55 +0200 Subject: [PATCH 11/19] Changed location of resume call to work with Resource --- std/shared/src/main/scala-2/AsyncAwait.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/std/shared/src/main/scala-2/AsyncAwait.scala b/std/shared/src/main/scala-2/AsyncAwait.scala index 6173ba26da..6be8e3a41a 100644 --- a/std/shared/src/main/scala-2/AsyncAwait.scala +++ b/std/shared/src/main/scala-2/AsyncAwait.scala @@ -143,10 +143,10 @@ abstract class AsyncAwaitStateMachine[F[_]]( protected def onComplete(f: F[AnyRef]): Unit = { dispatcher.unsafeRunAndForget { - (recordedEffect *> f).start.flatMap(_.join).flatMap { + resume.resume((recordedEffect *> f)).start.flatMap(_.join).flatMap { case Canceled() => F.delay(this(Left(F.canceled.asInstanceOf[F[AnyRef]]))) case Errored(e) => F.delay(this(Left(F.raiseError(e)))) - case Succeeded(fa) => resume.resume(fa).flatMap(r => F.delay(this(r))) + case Succeeded(resumed) => resumed.flatMap(r => F.delay(this(r))) } } } From e73d8ed67e45c8bdf3c9a5a5044227dbbf63a1fa Mon Sep 17 00:00:00 2001 From: Olivier Melois Date: Thu, 29 Apr 2021 12:11:51 +0200 Subject: [PATCH 12/19] Fix compilation, prevent Resume instance for Resource --- .../main/scala/cats/effect/std/Resume.scala | 33 +++++++++++++++++-- .../scala/cats/effect/std/ResumeSpec.scala | 30 ++++++++++------- 2 files changed, 49 insertions(+), 14 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Resume.scala b/std/shared/src/main/scala/cats/effect/std/Resume.scala index f77b6e4577..6f20865232 100644 --- a/std/shared/src/main/scala/cats/effect/std/Resume.scala +++ b/std/shared/src/main/scala/cats/effect/std/Resume.scala @@ -25,6 +25,8 @@ import cats.kernel.Monoid import cats.data.IorT import cats.data.Ior import cats.data.EitherT +import cats.effect.kernel.Resource +import cats.effect.kernel.MonadCancel /** * Encodes the ability to peek into a computation's product/coproduct @@ -54,7 +56,7 @@ object Resume extends LowPriorityResumeInstances { RF.resume(fa.value).map { case Left(stopped) => Left(OptionT(stopped)) case Right((funit, None)) => Left(OptionT(funit.as(None))) - case Right((funit, Some(value))) => Right(OptionT.liftF(funit), value) + case Right((funit, Some(value))) => Right((OptionT.liftF(funit), value)) } } } @@ -95,7 +97,7 @@ object Resume extends LowPriorityResumeInstances { case Right((funit, Ior.Right(value))) => Right((IorT.liftF(funit), value)) case Right((funit, left @ Ior.Left(_))) => Left(IorT(funit.as(left))) case Right((funit, Ior.Both(e, value))) => - Right(IorT(funit.as(Ior.Both(e, ()))), value) + Right((IorT(funit.as(Ior.Both(e, ()))), value)) } } } @@ -113,6 +115,33 @@ object Resume extends LowPriorityResumeInstances { } } + @scala.annotation.implicitAmbiguous( + "cats.effect.std.Resume cannot be implemented safely for cats.effect.kernel.Resource") + implicit def preventResourceResume[F[_]]( + implicit F: MonadCancel[F, Throwable], + RF: Resume[F]): Resume[Resource[F, *]] = ??? + + // This implementation breaks laws breaks on first attempt with seed + // 0HGJ9GYf2lulgEEt7ykq8Mknz0uqEm-gkp_FKLoXiMJ= + implicit def resourceResume[F[_]]( + implicit F: MonadCancel[F, Throwable], + RF: Resume[F]): Resume[Resource[F, *]] = + new Resume[Resource[F, *]] { + def resume[A]( + fa: Resource[F, A]): Resource[F, Either[Resource[F, A], (Resource[F, Unit], A)]] = { + Resource[F, Either[Resource[F, A], (Resource[F, Unit], A)]] { + RF.resume(fa.allocated[A]).map { + case Left(stopped) => + // A is never yielded, so the finalizer is guaranteed to be ran + // (according to `allocated`'s docs) + Left(Resource(stopped)) -> F.unit + case Right((funit, (a, finalizer))) => + (Right(Resource.eval(funit) -> a), finalizer) + } + } + } + } + } trait LowPriorityResumeInstances { diff --git a/tests/shared/src/test/scala/cats/effect/std/ResumeSpec.scala b/tests/shared/src/test/scala/cats/effect/std/ResumeSpec.scala index 9df8fdaf40..c84ac6086e 100644 --- a/tests/shared/src/test/scala/cats/effect/std/ResumeSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/ResumeSpec.scala @@ -22,7 +22,7 @@ trait ResumeLaws[F[_]] { implicit val resume: Resume[F] // Running the effect through unsafe async boundary to ensure - // that non-CE semantics are accounted for by the Resume. + // that algebraic semantics are accounted for by the Resume. // // In case of unaccounted coproduct semantics (side error channels), the unsafe // run will hang, making the test eventually timeout. @@ -38,19 +38,22 @@ trait ResumeLaws[F[_]] { // effect. def accountsForAllButSideEffects[A](fa: F[A]) = { val throughUnsafeBoundary = Dispatcher[F].use { dispatcher => - val resumed = resume.resume(fa) - - resumed.start.flatMap(_.join).flatMap { - case Outcome.Succeeded(ffa) => - val ffuture = async.delay(dispatcher.unsafeToFuture(ffa)) - async.fromFuture(ffuture).flatMap { - case Left(fa) => fa - case Right((funit, a)) => funit.as(a) - } - case Outcome.Canceled() => async.canceled *> async.never[A] - case Outcome.Errored(e) => async.raiseError[A](e) + resume.resume(fa).background.use { + _.flatMap { + case Outcome.Canceled() => + async.canceled *> async.never[A] + case Outcome.Errored(e) => + async.raiseError[A](e) + case Outcome.Succeeded(fa_) => + val ffuture = async.delay(dispatcher.unsafeToFuture(fa_)) + async.fromFuture(ffuture).flatMap { + case Left(fa) => fa + case Right((funit, a)) => funit.as(a) + } + } } } + throughUnsafeBoundary <-> fa } @@ -114,6 +117,9 @@ class ResumeSpec extends Specification with Discipline with ScalaCheck with Base checkAll("EitherT[IO, Int, *]", ResumeTests[EitherT[IO, Int, *]].resume[Int]) checkAll("Kleisli[IO, Int, *]", ResumeTests[Kleisli[IO, Int, *]].resume[Int]) checkAll("IO", ResumeTests[IO].resume[Int]) + // Laws breaks on first attempt with seed + // 0HGJ9GYf2lulgEEt7ykq8Mknz0uqEm-gkp_FKLoXiMJ= + // checkAll("Resource[IO, *]", ResumeTests[Resource[IO, *]].resume[Int]) } } From 104a27c17ddea40828ece2764aa09448acd816d8 Mon Sep 17 00:00:00 2001 From: Olivier Melois Date: Fri, 28 May 2021 10:17:38 +0200 Subject: [PATCH 13/19] Remove Resume in favour of a mutable variable Allows to decide whether we're in a happy path or not, without knowing anything about the underlying monad. Knowning whether we're still in the happy path is crucial to signal to the Async/Await state machine whether it should continue or have the async block yield. --- std/shared/src/main/scala-2/AsyncAwait.scala | 23 ++- .../main/scala/cats/effect/std/Resume.scala | 153 ------------------ .../scala/cats/effect/std/ResumeSpec.scala | 125 -------------- 3 files changed, 15 insertions(+), 286 deletions(-) delete mode 100644 std/shared/src/main/scala/cats/effect/std/Resume.scala delete mode 100644 tests/shared/src/test/scala/cats/effect/std/ResumeSpec.scala diff --git a/std/shared/src/main/scala-2/AsyncAwait.scala b/std/shared/src/main/scala-2/AsyncAwait.scala index 6be8e3a41a..ee2d9268c6 100644 --- a/std/shared/src/main/scala-2/AsyncAwait.scala +++ b/std/shared/src/main/scala-2/AsyncAwait.scala @@ -26,7 +26,7 @@ import cats.effect.kernel.Outcome.Canceled import cats.effect.kernel.Outcome.Errored import cats.effect.kernel.Outcome.Succeeded -class AsyncAwaitDsl[F[_]](implicit F: Async[F], R: Resume[F]) { +class AsyncAwaitDsl[F[_]](implicit F: Async[F]) { /** * Type member used by the macro expansion to recover what `F` is without typetags @@ -38,8 +38,6 @@ class AsyncAwaitDsl[F[_]](implicit F: Async[F], R: Resume[F]) { */ implicit val _AsyncInstance: Async[F] = F - implicit val _ResumeInstance: Resume[F] = R - /** * Non-blocking await the on result of `awaitable`. This may only be used directly within an enclosing `async` block. * @@ -96,12 +94,12 @@ object AsyncAwaitDsl { val name = TypeName("stateMachine$async") // format: off q""" - final class $name(dispatcher: _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext], resume: _root_.cats.effect.std.Resume[${c.prefix}._AsyncContext], callback: _root_.cats.effect.std.AsyncAwaitDsl.Callback[${c.prefix}._AsyncContext]) extends _root_.cats.effect.std.AsyncAwaitStateMachine(dispatcher, resume, callback) { + final class $name(dispatcher: _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext], callback: _root_.cats.effect.std.AsyncAwaitDsl.Callback[${c.prefix}._AsyncContext]) extends _root_.cats.effect.std.AsyncAwaitStateMachine(dispatcher, callback) { ${mark(q"""override def apply(tr$$async: _root_.cats.effect.std.AsyncAwaitDsl.ResumeOutcome[${c.prefix}._AsyncContext]): _root_.scala.Unit = ${body}""")} } ${c.prefix}._AsyncInstance.flatten { _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext].use { dispatcher => - ${c.prefix}._AsyncInstance.async_[${c.prefix}._AsyncContext[AnyRef]](cb => new $name(dispatcher, ${c.prefix}._ResumeInstance, cb).start()) + ${c.prefix}._AsyncInstance.async_[${c.prefix}._AsyncContext[AnyRef]](cb => new $name(dispatcher, cb).start()) } }.asInstanceOf[${c.macroApplication.tpe}] """ @@ -118,14 +116,16 @@ object AsyncAwaitDsl { abstract class AsyncAwaitStateMachine[F[_]]( dispatcher: Dispatcher[F], - resume: Resume[F], callback: AsyncAwaitDsl.Callback[F] )(implicit F: Async[F]) extends Function1[AsyncAwaitDsl.ResumeOutcome[F], Unit] { // FSM translated method //def apply(v1: AsyncAwaitDsl.ResumeOutcome[F]): Unit = ??? + // Resorting to mutation to track algebraic product effects (like WriterT), + // since the information they carry would otherwise get lost on every dispatch. private[this] var recordedEffect : F[Unit] = F.unit + private[this] var state$async: Int = 0 /** Retrieve the current value of the state variable */ @@ -143,10 +143,17 @@ abstract class AsyncAwaitStateMachine[F[_]]( protected def onComplete(f: F[AnyRef]): Unit = { dispatcher.unsafeRunAndForget { - resume.resume((recordedEffect *> f)).start.flatMap(_.join).flatMap { + // Resorting to mutation to extract the "happy path" value from the monadic context, + // as inspecting the Succeeded outcome using dispatcher is risky on algebraic sums, + // such as OptionT, EitherT, ... + var value: Option[AnyRef] = None + (recordedEffect *> f).flatTap(r => F.delay{value = Some(r)}).start.flatMap(_.join).flatMap { case Canceled() => F.delay(this(Left(F.canceled.asInstanceOf[F[AnyRef]]))) case Errored(e) => F.delay(this(Left(F.raiseError(e)))) - case Succeeded(resumed) => resumed.flatMap(r => F.delay(this(r))) + case Succeeded(resumed) => value match { + case Some(value) => F.delay(this(Right(resumed.void -> value))) + case None => F.delay(this(Left(resumed))) + } } } } diff --git a/std/shared/src/main/scala/cats/effect/std/Resume.scala b/std/shared/src/main/scala/cats/effect/std/Resume.scala deleted file mode 100644 index 6f20865232..0000000000 --- a/std/shared/src/main/scala/cats/effect/std/Resume.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright 2020-2021 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats.effect.std - -import cats.Monad -import cats.syntax.all._ -import cats.data.OptionT -import cats.data.WriterT -import cats.data.Kleisli -import cats.kernel.Monoid -import cats.data.IorT -import cats.data.Ior -import cats.data.EitherT -import cats.effect.kernel.Resource -import cats.effect.kernel.MonadCancel - -/** - * Encodes the ability to peek into a computation's product/coproduct - * structure to decide whether it can be resumed or should be short-stopped. - * - * If the computation can be resumed (typically when all monadic layers - * have been applied successfully), Right is yielded, - * with a `F[Unit]` keeping track of some potential product components - * in the monadic effect. - * - * If the computation cannot be resumed because of some non-successful - * coproduct component in the monadic effect, Left is yielded. - */ -trait Resume[F[_]] { - - def resume[A](fa: F[A]): F[Either[F[A], (F[Unit], A)]] - -} - -object Resume extends LowPriorityResumeInstances { - - implicit def optionTResume[F[_]](implicit F: Monad[F], RF: Resume[F]): Resume[OptionT[F, *]] = - new Resume[OptionT[F, *]] { - def resume[A]( - fa: OptionT[F, A]): OptionT[F, Either[OptionT[F, A], (OptionT[F, Unit], A)]] = - OptionT.liftF { - RF.resume(fa.value).map { - case Left(stopped) => Left(OptionT(stopped)) - case Right((funit, None)) => Left(OptionT(funit.as(None))) - case Right((funit, Some(value))) => Right((OptionT.liftF(funit), value)) - } - } - } - - implicit def writerTResume[F[_]: Monad, L: Monoid]( - implicit RF: Resume[F]): Resume[WriterT[F, L, *]] = - new Resume[WriterT[F, L, *]] { - def resume[A](fa: WriterT[F, L, A]) - : WriterT[F, L, Either[WriterT[F, L, A], (WriterT[F, L, Unit], A)]] = - WriterT.liftF { - RF.resume(fa.run).map { - case Left(stopped) => Left(WriterT(stopped)) - case Right((funit, (log, value))) => - val w = WriterT(funit.map(log -> _)) - Right((w, value)) - } - } - } - - implicit def kleisliResume[F[_]: Monad, R](implicit RF: Resume[F]): Resume[Kleisli[F, R, *]] = - new Resume[Kleisli[F, R, *]] { - def resume[A](fa: Kleisli[F, R, A]) - : Kleisli[F, R, Either[Kleisli[F, R, A], (Kleisli[F, R, Unit], A)]] = Kleisli { r => - RF.resume(fa.run(r)).map { - case Left(stopped) => Left(Kleisli.liftF[F, R, A](stopped)) - case Right((funit, value)) => Right((Kleisli.liftF(funit), value)) - } - } - } - - implicit def iorTResume[F[_]: Monad, E](implicit RF: Resume[F]): Resume[IorT[F, E, *]] = - new Resume[IorT[F, E, *]] { - def resume[A]( - fa: IorT[F, E, A]): IorT[F, E, Either[IorT[F, E, A], (IorT[F, E, Unit], A)]] = - IorT.liftF { - RF.resume(fa.value).map { - case Left(stopped) => Left(IorT(stopped)) - case Right((funit, Ior.Right(value))) => Right((IorT.liftF(funit), value)) - case Right((funit, left @ Ior.Left(_))) => Left(IorT(funit.as(left))) - case Right((funit, Ior.Both(e, value))) => - Right((IorT(funit.as(Ior.Both(e, ()))), value)) - } - } - } - - implicit def eitherTResume[F[_]: Monad, E](implicit RF: Resume[F]): Resume[EitherT[F, E, *]] = - new Resume[EitherT[F, E, *]] { - def resume[A](fa: EitherT[F, E, A]) - : EitherT[F, E, Either[EitherT[F, E, A], (EitherT[F, E, Unit], A)]] = - EitherT.liftF { - RF.resume(fa.value).map { - case Left(stopped) => Left(EitherT(stopped)) - case Right((funit, Right(value))) => Right((EitherT.liftF(funit), value)) - case Right((funit, left @ Left(_))) => Left(EitherT(funit.as(left))) - } - } - } - - @scala.annotation.implicitAmbiguous( - "cats.effect.std.Resume cannot be implemented safely for cats.effect.kernel.Resource") - implicit def preventResourceResume[F[_]]( - implicit F: MonadCancel[F, Throwable], - RF: Resume[F]): Resume[Resource[F, *]] = ??? - - // This implementation breaks laws breaks on first attempt with seed - // 0HGJ9GYf2lulgEEt7ykq8Mknz0uqEm-gkp_FKLoXiMJ= - implicit def resourceResume[F[_]]( - implicit F: MonadCancel[F, Throwable], - RF: Resume[F]): Resume[Resource[F, *]] = - new Resume[Resource[F, *]] { - def resume[A]( - fa: Resource[F, A]): Resource[F, Either[Resource[F, A], (Resource[F, Unit], A)]] = { - Resource[F, Either[Resource[F, A], (Resource[F, Unit], A)]] { - RF.resume(fa.allocated[A]).map { - case Left(stopped) => - // A is never yielded, so the finalizer is guaranteed to be ran - // (according to `allocated`'s docs) - Left(Resource(stopped)) -> F.unit - case Right((funit, (a, finalizer))) => - (Right(Resource.eval(funit) -> a), finalizer) - } - } - } - } - -} - -trait LowPriorityResumeInstances { - - implicit def defaultMonadicResume[F[_]](implicit F: Monad[F]): Resume[F] = new Resume[F] { - def resume[A](fa: F[A]): F[Either[F[A], (F[Unit], A)]] = fa.map { a => Right((F.unit, a)) } - } - -} diff --git a/tests/shared/src/test/scala/cats/effect/std/ResumeSpec.scala b/tests/shared/src/test/scala/cats/effect/std/ResumeSpec.scala deleted file mode 100644 index c84ac6086e..0000000000 --- a/tests/shared/src/test/scala/cats/effect/std/ResumeSpec.scala +++ /dev/null @@ -1,125 +0,0 @@ -package cats.effect -package std - -import cats.data._ -import cats.effect.kernel.Async -import cats.effect.laws._ -import cats.effect.syntax.all._ -import cats.kernel.Eq -import cats.laws.discipline.arbitrary._ -import cats.syntax.all._ -import org.scalacheck._ -import org.specs2.ScalaCheck -import org.specs2.mutable.Specification -import org.typelevel.discipline.Laws -import org.typelevel.discipline.specs2.mutable.Discipline - -import Prop.forAll - -trait ResumeLaws[F[_]] { - - implicit val async: Async[F] - implicit val resume: Resume[F] - - // Running the effect through unsafe async boundary to ensure - // that algebraic semantics are accounted for by the Resume. - // - // In case of unaccounted coproduct semantics (side error channels), the unsafe - // run will hang, making the test eventually timeout. - // - // In case of unaccounted product semantics (WriterT, IorT.Both), the unsafe - // run will forget/lose the log channel. - // - // This law therefore verifies that the Resume materialises all monadic, - // but non CE-related monadic information so that the outer effect contains only - // CE semantics, which can be ran through a dispatcher without hanging. - // The law also verifies that the absorption of the materialised effects - // after the dispatch contains the same amount of information as the initial - // effect. - def accountsForAllButSideEffects[A](fa: F[A]) = { - val throughUnsafeBoundary = Dispatcher[F].use { dispatcher => - resume.resume(fa).background.use { - _.flatMap { - case Outcome.Canceled() => - async.canceled *> async.never[A] - case Outcome.Errored(e) => - async.raiseError[A](e) - case Outcome.Succeeded(fa_) => - val ffuture = async.delay(dispatcher.unsafeToFuture(fa_)) - async.fromFuture(ffuture).flatMap { - case Left(fa) => fa - case Right((funit, a)) => funit.as(a) - } - } - } - } - - throughUnsafeBoundary <-> fa - } - - def resumePureIsUnitAndA[A](a: A) = { - resume.resume(async.pure(a)) <-> async.pure(Right((async.unit, a))) - } - -} - -object ResumeLaws { - def apply[F[_]](implicit F0: Async[F], R: Resume[F]): ResumeLaws[F] = new ResumeLaws[F] { - val async: Async[F] = F0 - val resume: Resume[F] = R - } -} - -trait ResumeTests[F[_]] extends Laws { - - val laws: ResumeLaws[F] - - def resume[A]( - implicit ArbFA: Arbitrary[F[A]], - ArbA: Arbitrary[A], - EqFA: Eq[F[A]], - EqResume: Eq[F[Either[F[A], (F[Unit], A)]]]): RuleSet = { - new RuleSet { - val name = "resume" - val bases = Nil - val parents = Seq() - - val props = Seq( - "accountsForAllButSideEffects" -> forAll(laws.accountsForAllButSideEffects[A] _), - "resumePureIsUnitAndA" -> forAll(laws.resumePureIsUnitAndA[A] _) - ) - } - } -} - -object ResumeTests { - def apply[F[_]](implicit F0: Async[F], R: Resume[F]): ResumeTests[F] = - new ResumeTests[F] { - val laws = ResumeLaws[F] - } -} - -class ResumeSpec extends Specification with Discipline with ScalaCheck with BaseSpec { - outer => - - // we just need this because of the laws testing, since the prop runs can interfere with each other - sequential - - implicit def kleisliEq[A](implicit eqIOA: Eq[IO[A]]): Eq[Kleisli[IO, Int, A]] = - Eq.by[Kleisli[IO, Int, A], IO[A]](_.run(0)) - - { - implicit val ticker = Ticker() - - checkAll("OptionT[IO, *]", ResumeTests[OptionT[IO, *]].resume[Int]) - checkAll("IorT[IO, Int, *]", ResumeTests[IorT[IO, Int, *]].resume[Int]) - checkAll("WriterT[IO, Int, *]", ResumeTests[WriterT[IO, Int, *]].resume[Int]) - checkAll("EitherT[IO, Int, *]", ResumeTests[EitherT[IO, Int, *]].resume[Int]) - checkAll("Kleisli[IO, Int, *]", ResumeTests[Kleisli[IO, Int, *]].resume[Int]) - checkAll("IO", ResumeTests[IO].resume[Int]) - // Laws breaks on first attempt with seed - // 0HGJ9GYf2lulgEEt7ykq8Mknz0uqEm-gkp_FKLoXiMJ= - // checkAll("Resource[IO, *]", ResumeTests[Resource[IO, *]].resume[Int]) - } - -} From 8971556acc0586e90f0c9f3283cfd2ea1eb203c0 Mon Sep 17 00:00:00 2001 From: Olivier Melois Date: Sat, 29 May 2021 09:47:12 +0200 Subject: [PATCH 14/19] Scaladoc + polishing tests --- std/shared/src/main/scala-2/AsyncAwait.scala | 76 +++++++++++++------ .../cats/effect/std/AsyncAwaitSpec.scala | 27 +++++-- 2 files changed, 75 insertions(+), 28 deletions(-) diff --git a/std/shared/src/main/scala-2/AsyncAwait.scala b/std/shared/src/main/scala-2/AsyncAwait.scala index 2d742b4124..a80ef54314 100644 --- a/std/shared/src/main/scala-2/AsyncAwait.scala +++ b/std/shared/src/main/scala-2/AsyncAwait.scala @@ -26,6 +26,27 @@ import cats.effect.kernel.Outcome.Canceled import cats.effect.kernel.Outcome.Errored import cats.effect.kernel.Outcome.Succeeded +/** + * WARNING: This construct currently only works on scala 2 (2.12.12+ / 2.13.3+), + * relies on an experimental compiler feature enabled by the -Xasync + * scalac option, and should absolutely be considered unstable with + * regards to backward compatibility guarantees (be that source or binary). + * + * Partially applied construct allowing for async/await semantics, + * popularised in other programming languages. + * + * {{{ + * object ioAsyncAwait extends AsyncAwaitDsl[IO] + * import ioAsyncAwait._ + * + * val io : IO[Int] = ??? + * async { await(io) + await(io) } + * }}} + * + * The code is transformed at compile time into a state machine + * that sequentially calls upon a [[Dispatcher]] every time it reaches + * an "await" block. + */ class AsyncAwaitDsl[F[_]](implicit F: Async[F]) { /** @@ -41,16 +62,16 @@ class AsyncAwaitDsl[F[_]](implicit F: Async[F]) { /** * Non-blocking await the on result of `awaitable`. This may only be used directly within an enclosing `async` block. * - * Internally, this will register the remainder of the code in enclosing `async` block as a callback - * in the `onComplete` handler of `awaitable`, and will *not* block a thread. + * Internally, this transforms the remainder of the code in enclosing `async` block into a callback + * that triggers upon a successful computation outcome. It does *not* block a thread. */ @compileTimeOnly("[async] `await` must be enclosed in an `async` block") def await[T](awaitable: F[T]): T = - ??? // No implementation here, as calls to this are translated to `onComplete` by the macro. + ??? // No implementation here, as calls to this are translated by the macro. /** * Run the block of code `body` asynchronously. `body` may contain calls to `await` when the results of - * a `Future` are needed; this is translated into non-blocking code. + * a `F` are needed; this is translated into non-blocking code. */ def async[T](body: => T): F[T] = macro AsyncAwaitDsl.asyncImpl[F, T] @@ -58,8 +79,7 @@ class AsyncAwaitDsl[F[_]](implicit F: Async[F]) { object AsyncAwaitDsl { - type CallbackTarget[F[_]] = F[AnyRef] - type Callback[F[_]] = Either[Throwable, CallbackTarget[F]] => Unit + type AwaitCallback[F[_]] = Either[Throwable, F[AnyRef]] => Unit // Outcome of an await block. Either a failed algebraic computation, // or a successful value accompanied by a "summary" computation. @@ -101,7 +121,7 @@ object AsyncAwaitDsl { val name = TypeName("stateMachine$async") // format: off q""" - final class $name(dispatcher: _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext], callback: _root_.cats.effect.std.AsyncAwaitDsl.Callback[${c.prefix}._AsyncContext]) extends _root_.cats.effect.std.AsyncAwaitStateMachine(dispatcher, callback) { + final class $name(dispatcher: _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext], callback: _root_.cats.effect.std.AsyncAwaitDsl.AwaitCallback[${c.prefix}._AsyncContext]) extends _root_.cats.effect.std.AsyncAwaitStateMachine(dispatcher, callback) { ${mark(q"""override def apply(tr$$async: _root_.cats.effect.std.AsyncAwaitDsl.AwaitOutcome[${c.prefix}._AsyncContext]): _root_.scala.Unit = ${body}""")} } ${c.prefix}._AsyncInstance.flatten { @@ -110,11 +130,14 @@ object AsyncAwaitDsl { } }.asInstanceOf[${c.macroApplication.tpe}] """ + // format: on } catch { case e: ReflectiveOperationException => c.abort( c.macroApplication.pos, - "-Xasync is provided as a Scala compiler option, but the async macro is unable to call c.internal.markForAsyncTransform. " + e.getClass.getName + " " + e.getMessage + "-Xasync is provided as a Scala compiler option, but the async macro is unable to call c.internal.markForAsyncTransform. " + e + .getClass + .getName + " " + e.getMessage ) } } @@ -123,30 +146,34 @@ object AsyncAwaitDsl { abstract class AsyncAwaitStateMachine[F[_]]( dispatcher: Dispatcher[F], - callback: AsyncAwaitDsl.Callback[F] -)(implicit F: Async[F]) extends Function1[AsyncAwaitDsl.AwaitOutcome[F], Unit] { + callback: AsyncAwaitDsl.AwaitCallback[F] +)(implicit F: Async[F]) + extends Function1[AsyncAwaitDsl.AwaitOutcome[F], Unit] { // FSM translated method //def apply(v1: AsyncAwaitDsl.AwaitOutcome[F]): Unit = ??? // Resorting to mutation to track algebraic product effects (like WriterT), // since the information they carry would otherwise get lost on every dispatch. - private[this] var summary : F[Unit] = F.unit + private[this] var summary: F[Unit] = F.unit private[this] var state$async: Int = 0 - /** Retrieve the current value of the state variable */ + /** + * Retrieve the current value of the state variable + */ protected def state: Int = state$async - /** Assign `i` to the state variable */ + /** + * Assign `i` to the state variable + */ protected def state_=(s: Int): Unit = state$async = s protected def completeFailure(t: Throwable): Unit = callback(Left(t)) - protected def completeSuccess(value: AnyRef): Unit = { + protected def completeSuccess(value: AnyRef): Unit = callback(Right(F.as(summary, value))) - } protected def onComplete(f: F[AnyRef]): Unit = { dispatcher.unsafeRunAndForget { @@ -154,14 +181,19 @@ abstract class AsyncAwaitStateMachine[F[_]]( // as inspecting the Succeeded outcome using dispatcher is risky on algebraic sums, // such as OptionT, EitherT, ... var awaitedValue: Option[AnyRef] = None - (summary *> f).flatTap(r => F.delay{awaitedValue = Some(r)}).start.flatMap(_.join).flatMap { - case Canceled() => F.delay(this(Left(F.canceled.asInstanceOf[F[AnyRef]]))) - case Errored(e) => F.delay(this(Left(F.raiseError(e)))) - case Succeeded(awaitOutcome) => awaitedValue match { - case Some(v) => F.delay(this(Right(awaitOutcome.void -> v))) - case None => F.delay(this(Left(awaitOutcome))) + (summary *> f) + .flatTap(r => F.delay { awaitedValue = Some(r) }) + .start + .flatMap(_.join) + .flatMap { + case Canceled() => F.delay(this(Left(F.canceled.asInstanceOf[F[AnyRef]]))) + case Errored(e) => F.delay(this(Left(F.raiseError(e)))) + case Succeeded(awaitOutcome) => + awaitedValue match { + case Some(v) => F.delay(this(Right(awaitOutcome.void -> v))) + case None => F.delay(this(Left(awaitOutcome))) + } } - } } } diff --git a/tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala b/tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala index 9239d286c6..6daaca0ce4 100644 --- a/tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala +++ b/tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala @@ -56,6 +56,19 @@ class AsyncAwaitSpec extends BaseSpec { } } + "propagate uncaught errors outward" in real { + + case object Boom extends Throwable + + val program = async(throw Boom) + + program.attempt.flatMap { res => + IO { + res must beEqualTo(Left(Boom)) + } + } + } + "propagate canceled outcomes outward" in real { val io = IO.canceled @@ -73,10 +86,10 @@ class AsyncAwaitSpec extends BaseSpec { val program = for { ref <- Ref[IO].of(0) - _ <- async { ioAwait(IO.sleep(100.millis) *> ref.update(_ + 1)) } - .start - .flatMap(_.cancel) - _ <- IO.sleep(200.millis) + _ <- async { + ioAwait(IO.never) + ioAwait(ref.update(_ + 1)) + }.start.flatMap(_.cancel) result <- ref.get } yield { result @@ -97,8 +110,10 @@ class AsyncAwaitSpec extends BaseSpec { for { before <- IO(x must beEqualTo(0)) _ <- program - after <- IO(x must beEqualTo(1)) - } yield before && after + _ <- IO(x must beEqualTo(1)) + _ <- program + _ <- IO(x must beEqualTo(2)) + } yield ok } } From 0220ebf37d608c91b09e76bdff36500f4b5bbb7b Mon Sep 17 00:00:00 2001 From: Olivier Melois Date: Sat, 29 May 2021 10:07:37 +0200 Subject: [PATCH 15/19] making the fork/join sequence uncancelable --- std/shared/src/main/scala-2/AsyncAwait.scala | 27 ++++++++++--------- .../cats/effect/std/AsyncAwaitSpec.scala | 2 +- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/std/shared/src/main/scala-2/AsyncAwait.scala b/std/shared/src/main/scala-2/AsyncAwait.scala index a80ef54314..67d2f7411c 100644 --- a/std/shared/src/main/scala-2/AsyncAwait.scala +++ b/std/shared/src/main/scala-2/AsyncAwait.scala @@ -181,19 +181,20 @@ abstract class AsyncAwaitStateMachine[F[_]]( // as inspecting the Succeeded outcome using dispatcher is risky on algebraic sums, // such as OptionT, EitherT, ... var awaitedValue: Option[AnyRef] = None - (summary *> f) - .flatTap(r => F.delay { awaitedValue = Some(r) }) - .start - .flatMap(_.join) - .flatMap { - case Canceled() => F.delay(this(Left(F.canceled.asInstanceOf[F[AnyRef]]))) - case Errored(e) => F.delay(this(Left(F.raiseError(e)))) - case Succeeded(awaitOutcome) => - awaitedValue match { - case Some(v) => F.delay(this(Right(awaitOutcome.void -> v))) - case None => F.delay(this(Left(awaitOutcome))) - } - } + F.uncancelable { poll => + poll(summary *> f) + .flatTap(r => F.delay { awaitedValue = Some(r) }) + .start + .flatMap(_.join) + }.flatMap { + case Canceled() => F.delay(this(Left(F.canceled.asInstanceOf[F[AnyRef]]))) + case Errored(e) => F.delay(this(Left(F.raiseError(e)))) + case Succeeded(awaitOutcome) => + awaitedValue match { + case Some(v) => F.delay(this(Right(awaitOutcome.void -> v))) + case None => F.delay(this(Left(awaitOutcome))) + } + } } } diff --git a/tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala b/tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala index 6daaca0ce4..c78222683e 100644 --- a/tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala +++ b/tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala @@ -108,7 +108,7 @@ class AsyncAwaitSpec extends BaseSpec { val program = async(x += 1) for { - before <- IO(x must beEqualTo(0)) + _ <- IO(x must beEqualTo(0)) _ <- program _ <- IO(x must beEqualTo(1)) _ <- program From 259b7234d5f1633e5a2caf4548e468afa798f76d Mon Sep 17 00:00:00 2001 From: Olivier Melois Date: Sat, 29 May 2021 10:24:44 +0200 Subject: [PATCH 16/19] whitebox => blackbox --- std/shared/src/main/scala-2/AsyncAwait.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/std/shared/src/main/scala-2/AsyncAwait.scala b/std/shared/src/main/scala-2/AsyncAwait.scala index 67d2f7411c..a8d7b227e0 100644 --- a/std/shared/src/main/scala-2/AsyncAwait.scala +++ b/std/shared/src/main/scala-2/AsyncAwait.scala @@ -17,7 +17,7 @@ package cats.effect.std import scala.annotation.compileTimeOnly -import scala.reflect.macros.whitebox +import scala.reflect.macros.blackbox import cats.effect.kernel.Async import cats.effect.kernel.syntax.all._ @@ -90,8 +90,8 @@ object AsyncAwaitDsl { type AwaitOutcome[F[_]] = Either[F[AnyRef], (F[Unit], AnyRef)] def asyncImpl[F[_], T]( - c: whitebox.Context - )(body: c.Tree): c.Tree = { + c: blackbox.Context + )(body: c.Expr[T]): c.Expr[F[T]] = { import c.universe._ if (!c.compilerSettings.contains("-Xasync")) { c.abort( @@ -120,7 +120,7 @@ object AsyncAwaitDsl { } val name = TypeName("stateMachine$async") // format: off - q""" + val tree = q""" final class $name(dispatcher: _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext], callback: _root_.cats.effect.std.AsyncAwaitDsl.AwaitCallback[${c.prefix}._AsyncContext]) extends _root_.cats.effect.std.AsyncAwaitStateMachine(dispatcher, callback) { ${mark(q"""override def apply(tr$$async: _root_.cats.effect.std.AsyncAwaitDsl.AwaitOutcome[${c.prefix}._AsyncContext]): _root_.scala.Unit = ${body}""")} } @@ -131,6 +131,7 @@ object AsyncAwaitDsl { }.asInstanceOf[${c.macroApplication.tpe}] """ // format: on + c.Expr(tree) } catch { case e: ReflectiveOperationException => c.abort( From 133094b368561c95c009c96bd53a53788c2a9bbd Mon Sep 17 00:00:00 2001 From: Olivier Melois Date: Mon, 31 May 2021 09:41:15 +0200 Subject: [PATCH 17/19] polling the Fiber#join call --- std/shared/src/main/scala-2/AsyncAwait.scala | 2 +- .../src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/std/shared/src/main/scala-2/AsyncAwait.scala b/std/shared/src/main/scala-2/AsyncAwait.scala index a8d7b227e0..13990a040e 100644 --- a/std/shared/src/main/scala-2/AsyncAwait.scala +++ b/std/shared/src/main/scala-2/AsyncAwait.scala @@ -186,7 +186,7 @@ abstract class AsyncAwaitStateMachine[F[_]]( poll(summary *> f) .flatTap(r => F.delay { awaitedValue = Some(r) }) .start - .flatMap(_.join) + .flatMap(fiber => poll(fiber.join).onCancel(fiber.cancel)) }.flatMap { case Canceled() => F.delay(this(Left(F.canceled.asInstanceOf[F[AnyRef]]))) case Errored(e) => F.delay(this(Left(F.raiseError(e)))) diff --git a/tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala b/tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala index c78222683e..4a34cc8178 100644 --- a/tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala +++ b/tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala @@ -60,7 +60,8 @@ class AsyncAwaitSpec extends BaseSpec { case object Boom extends Throwable - val program = async(throw Boom) + def boom: Unit = throw Boom + val program = async(boom) program.attempt.flatMap { res => IO { @@ -87,7 +88,7 @@ class AsyncAwaitSpec extends BaseSpec { val program = for { ref <- Ref[IO].of(0) _ <- async { - ioAwait(IO.never) + ioAwait(IO.never[Unit]) ioAwait(ref.update(_ + 1)) }.start.flatMap(_.cancel) result <- ref.get From 7cf2ad5702601c9eec74dd05ff331ed64a6aab83 Mon Sep 17 00:00:00 2001 From: Olivier Melois Date: Mon, 31 May 2021 10:06:29 +0200 Subject: [PATCH 18/19] parenthesis around unit-returning method --- .../src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala b/tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala index 4a34cc8178..e2832065ab 100644 --- a/tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala +++ b/tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala @@ -60,8 +60,8 @@ class AsyncAwaitSpec extends BaseSpec { case object Boom extends Throwable - def boom: Unit = throw Boom - val program = async(boom) + def boom(): Unit = throw Boom + val program = async(boom()) program.attempt.flatMap { res => IO { From a2a9cb73abe00977470af722c8eaf9e35ffc7c6e Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Mon, 31 May 2021 14:36:09 -0500 Subject: [PATCH 19/19] Update std/shared/src/main/scala-2/AsyncAwait.scala --- std/shared/src/main/scala-2/AsyncAwait.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/std/shared/src/main/scala-2/AsyncAwait.scala b/std/shared/src/main/scala-2/AsyncAwait.scala index 13990a040e..30eaa9e7c6 100644 --- a/std/shared/src/main/scala-2/AsyncAwait.scala +++ b/std/shared/src/main/scala-2/AsyncAwait.scala @@ -36,10 +36,10 @@ import cats.effect.kernel.Outcome.Succeeded * popularised in other programming languages. * * {{{ - * object ioAsyncAwait extends AsyncAwaitDsl[IO] - * import ioAsyncAwait._ + * object dsl extends AsyncAwaitDsl[IO] + * import dsl._ * - * val io : IO[Int] = ??? + * val io: IO[Int] = ??? * async { await(io) + await(io) } * }}} *