Skip to content

Commit

Permalink
Scaladoc + polishing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Baccata committed May 29, 2021
1 parent ed3c88a commit 8971556
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 28 deletions.
76 changes: 54 additions & 22 deletions std/shared/src/main/scala-2/AsyncAwait.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,27 @@ import cats.effect.kernel.Outcome.Canceled
import cats.effect.kernel.Outcome.Errored
import cats.effect.kernel.Outcome.Succeeded

/**
* WARNING: This construct currently only works on scala 2 (2.12.12+ / 2.13.3+),
* relies on an experimental compiler feature enabled by the -Xasync
* scalac option, and should absolutely be considered unstable with
* regards to backward compatibility guarantees (be that source or binary).
*
* Partially applied construct allowing for async/await semantics,
* popularised in other programming languages.
*
* {{{
* object ioAsyncAwait extends AsyncAwaitDsl[IO]
* import ioAsyncAwait._
*
* val io : IO[Int] = ???
* async { await(io) + await(io) }
* }}}
*
* The code is transformed at compile time into a state machine
* that sequentially calls upon a [[Dispatcher]] every time it reaches
* an "await" block.
*/
class AsyncAwaitDsl[F[_]](implicit F: Async[F]) {

/**
Expand All @@ -41,25 +62,24 @@ class AsyncAwaitDsl[F[_]](implicit F: Async[F]) {
/**
* Non-blocking await the on result of `awaitable`. This may only be used directly within an enclosing `async` block.
*
* Internally, this will register the remainder of the code in enclosing `async` block as a callback
* in the `onComplete` handler of `awaitable`, and will *not* block a thread.
* Internally, this transforms the remainder of the code in enclosing `async` block into a callback
* that triggers upon a successful computation outcome. It does *not* block a thread.
*/
@compileTimeOnly("[async] `await` must be enclosed in an `async` block")
def await[T](awaitable: F[T]): T =
??? // No implementation here, as calls to this are translated to `onComplete` by the macro.
??? // No implementation here, as calls to this are translated by the macro.

/**
* Run the block of code `body` asynchronously. `body` may contain calls to `await` when the results of
* a `Future` are needed; this is translated into non-blocking code.
* a `F` are needed; this is translated into non-blocking code.
*/
def async[T](body: => T): F[T] = macro AsyncAwaitDsl.asyncImpl[F, T]

}

object AsyncAwaitDsl {

type CallbackTarget[F[_]] = F[AnyRef]
type Callback[F[_]] = Either[Throwable, CallbackTarget[F]] => Unit
type AwaitCallback[F[_]] = Either[Throwable, F[AnyRef]] => Unit

// Outcome of an await block. Either a failed algebraic computation,
// or a successful value accompanied by a "summary" computation.
Expand Down Expand Up @@ -101,7 +121,7 @@ object AsyncAwaitDsl {
val name = TypeName("stateMachine$async")
// format: off
q"""
final class $name(dispatcher: _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext], callback: _root_.cats.effect.std.AsyncAwaitDsl.Callback[${c.prefix}._AsyncContext]) extends _root_.cats.effect.std.AsyncAwaitStateMachine(dispatcher, callback) {
final class $name(dispatcher: _root_.cats.effect.std.Dispatcher[${c.prefix}._AsyncContext], callback: _root_.cats.effect.std.AsyncAwaitDsl.AwaitCallback[${c.prefix}._AsyncContext]) extends _root_.cats.effect.std.AsyncAwaitStateMachine(dispatcher, callback) {
${mark(q"""override def apply(tr$$async: _root_.cats.effect.std.AsyncAwaitDsl.AwaitOutcome[${c.prefix}._AsyncContext]): _root_.scala.Unit = ${body}""")}
}
${c.prefix}._AsyncInstance.flatten {
Expand All @@ -110,11 +130,14 @@ object AsyncAwaitDsl {
}
}.asInstanceOf[${c.macroApplication.tpe}]
"""
// format: on
} catch {
case e: ReflectiveOperationException =>
c.abort(
c.macroApplication.pos,
"-Xasync is provided as a Scala compiler option, but the async macro is unable to call c.internal.markForAsyncTransform. " + e.getClass.getName + " " + e.getMessage
"-Xasync is provided as a Scala compiler option, but the async macro is unable to call c.internal.markForAsyncTransform. " + e
.getClass
.getName + " " + e.getMessage
)
}
}
Expand All @@ -123,45 +146,54 @@ object AsyncAwaitDsl {

abstract class AsyncAwaitStateMachine[F[_]](
dispatcher: Dispatcher[F],
callback: AsyncAwaitDsl.Callback[F]
)(implicit F: Async[F]) extends Function1[AsyncAwaitDsl.AwaitOutcome[F], Unit] {
callback: AsyncAwaitDsl.AwaitCallback[F]
)(implicit F: Async[F])
extends Function1[AsyncAwaitDsl.AwaitOutcome[F], Unit] {

// FSM translated method
//def apply(v1: AsyncAwaitDsl.AwaitOutcome[F]): Unit = ???

// Resorting to mutation to track algebraic product effects (like WriterT),
// since the information they carry would otherwise get lost on every dispatch.
private[this] var summary : F[Unit] = F.unit
private[this] var summary: F[Unit] = F.unit

private[this] var state$async: Int = 0

/** Retrieve the current value of the state variable */
/**
* Retrieve the current value of the state variable
*/
protected def state: Int = state$async

/** Assign `i` to the state variable */
/**
* Assign `i` to the state variable
*/
protected def state_=(s: Int): Unit = state$async = s

protected def completeFailure(t: Throwable): Unit =
callback(Left(t))

protected def completeSuccess(value: AnyRef): Unit = {
protected def completeSuccess(value: AnyRef): Unit =
callback(Right(F.as(summary, value)))
}

protected def onComplete(f: F[AnyRef]): Unit = {
dispatcher.unsafeRunAndForget {
// Resorting to mutation to extract the "happy path" value from the monadic context,
// as inspecting the Succeeded outcome using dispatcher is risky on algebraic sums,
// such as OptionT, EitherT, ...
var awaitedValue: Option[AnyRef] = None
(summary *> f).flatTap(r => F.delay{awaitedValue = Some(r)}).start.flatMap(_.join).flatMap {
case Canceled() => F.delay(this(Left(F.canceled.asInstanceOf[F[AnyRef]])))
case Errored(e) => F.delay(this(Left(F.raiseError(e))))
case Succeeded(awaitOutcome) => awaitedValue match {
case Some(v) => F.delay(this(Right(awaitOutcome.void -> v)))
case None => F.delay(this(Left(awaitOutcome)))
(summary *> f)
.flatTap(r => F.delay { awaitedValue = Some(r) })
.start
.flatMap(_.join)
.flatMap {
case Canceled() => F.delay(this(Left(F.canceled.asInstanceOf[F[AnyRef]])))
case Errored(e) => F.delay(this(Left(F.raiseError(e))))
case Succeeded(awaitOutcome) =>
awaitedValue match {
case Some(v) => F.delay(this(Right(awaitOutcome.void -> v)))
case None => F.delay(this(Left(awaitOutcome)))
}
}
}
}
}

Expand Down
27 changes: 21 additions & 6 deletions tests/shared/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ class AsyncAwaitSpec extends BaseSpec {
}
}

"propagate uncaught errors outward" in real {

case object Boom extends Throwable

val program = async(throw Boom)

program.attempt.flatMap { res =>
IO {
res must beEqualTo(Left(Boom))
}
}
}

"propagate canceled outcomes outward" in real {

val io = IO.canceled
Expand All @@ -73,10 +86,10 @@ class AsyncAwaitSpec extends BaseSpec {

val program = for {
ref <- Ref[IO].of(0)
_ <- async { ioAwait(IO.sleep(100.millis) *> ref.update(_ + 1)) }
.start
.flatMap(_.cancel)
_ <- IO.sleep(200.millis)
_ <- async {
ioAwait(IO.never)
ioAwait(ref.update(_ + 1))
}.start.flatMap(_.cancel)
result <- ref.get
} yield {
result
Expand All @@ -97,8 +110,10 @@ class AsyncAwaitSpec extends BaseSpec {
for {
before <- IO(x must beEqualTo(0))
_ <- program
after <- IO(x must beEqualTo(1))
} yield before && after
_ <- IO(x must beEqualTo(1))
_ <- program
_ <- IO(x must beEqualTo(2))
} yield ok
}
}

Expand Down

0 comments on commit 8971556

Please sign in to comment.