Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Usage with other effect systems besides Future #149

Closed
jimmydivvy opened this issue Dec 1, 2015 · 27 comments
Closed

Usage with other effect systems besides Future #149

jimmydivvy opened this issue Dec 1, 2015 · 27 comments
Assignees
Milestone

Comments

@jimmydivvy
Copy link

The README indicates that this library can be used with other implementations of the future pattern - however I couldn't find documentation on how to do this.

In particular I'm interested in using with scalaz Task. Is this possible?

@jvican
Copy link
Member

jvican commented Dec 27, 2015

I am also very interested in this. Up +1

@retronym
Copy link
Member

retronym commented Sep 8, 2016

See:

https://github.com/scala/async/blob/master/src/main/scala/scala/async/internal/ScalaConcurrentAsync.scala

https://github.com/scala/async/blob/master/src/main/scala/scala/async/internal/FutureSystem.scala#L79-L138

https://github.com/scala/async/blob/master/src/main/scala/scala/async/Async.scala#L45

For the spots you would need to replicate to add an adapter for Task. You might need to model the absense of an execution context on Task as a Unit typed. You can see this in the example of the "not-actually-concurrent future system" in https://github.com/scala/async/blob/master/src/main/scala/scala/async/internal/AsyncId.scala

@SethTisue
Copy link
Member

would someone like to turn this into a pull request with a small addition to the doc?

@adamw
Copy link

adamw commented Apr 5, 2018

I think the major problem here would be that the FutureSystem assumes the presence of a Promise concept, and as far as I know the Task in scalaz, or IO in cats don't have that.

However, there's https://github.com/pelotom/effectful, which seems to do the same thing for scalaz, or is it somehow different?

@dsilvasc
Copy link

@adamw effectful is effectively abandoned: pelotom/effectful#15

@adamw
Copy link

adamw commented Oct 14, 2018

@dsilvasc True, though there's also https://github.com/monadless/monadless (not sure if it's maintained, but it usually works good enough)

@dsilvasc
Copy link

@adamw There's a question about monadless support for scala 2.13 from over 2 months ago with no response:
monadless/monadless#9

Don't know about maintenance plans for Stateless Future:
https://github.com/qifun/stateless-future

ThoughtWorks Each doesn't seem to have a scala 2.13 version published to Maven Central, but they might actively use it in production with their customers.
https://github.com/ThoughtWorksInc/each

ThoughtWorks seem to be moving on to a compiler plugin though -- same primary author from Each:
https://javadoc.io/page/com.thoughtworks.dsl/dsl_2.12/latest/com/thoughtworks/dsl/index.html
https://github.com/ThoughtWorksInc/Dsl.scala
https://github.com/ThoughtWorksInc/Dsl.scala/wiki/Benchmarks:-Dsl.scala-vs-Monix-vs-Cats-Effect-vs-Scalaz-Concurrent-vs-Scala-Async-vs-Scala-Continuation

Sorry for the tangent -- I think that's all of the related projects :)

@adamw
Copy link

adamw commented Oct 15, 2018

I would be more concerned about unmerged PRs than issues without comment ;) Anyway, all of these seem rather abandoned. Would be nice of course to combine all these efforts into a single one, working in any Cats/Scalaz monad (like Kotlin's coroutines), but ... there's a finite amount of time ;)

@SethTisue SethTisue changed the title Usage with scalaz Task Usage with other effect systems besides Future Feb 4, 2019
@dsilva
Copy link

dsilva commented Feb 8, 2019

@SethTisue @retronym @adamw

I took a look at ScalaConcurrentAsync and AsyncId, then took a stab at implementing an adapter for a Task-like abstraction where Task[T] is a wrapper around a memoized function (ExecutionContext => Future[T]) and with an API similar to that of Future (but without an implicit ExecutionContext parameter everywhere). Here's what a naive translation would look like:

import scala.async.internal.{AsyncBase, FutureSystem}
import scala.concurrent.Promise
import scala.reflect.macros.whitebox

import scala.language.experimental.macros

object TaskAsync extends AsyncBase {
  lazy val futureSystem = TaskFutureSystem
  type FS = TaskFutureSystem.type

  def async[T](body: => T): Task[T] = macro asyncTaskImpl[T]

  def asyncTaskImpl[T: c.WeakTypeTag](c: whitebox.Context)(body: c.Expr[T]): c.Expr[Task[T]] = {
    val u: c.Expr[Unit] = c.Expr[Unit](c.parse("()"))
    asyncImpl[T](c)(body)(u)
  }
}

object TaskFutureSystem extends FutureSystem {
  override type Prom[A] = Promise[A]
  override type Fut[A] = Task[A]
  override type ExecContext = Unit
  override type Tryy[A] = scala.util.Try[A]

  override def mkOps(c0: whitebox.Context): Ops { val c: c0.type } = new Ops {
    val c: c0.type = c0
    import c.universe._

    def promType[A](implicit evidence: c.universe.WeakTypeTag[A]): c.universe.Type = weakTypeOf[Prom[A]]
    def tryType[A](implicit evidence: c.universe.WeakTypeTag[A]): c.universe.Type = weakTypeOf[Tryy[A]]

    def execContextType: Type = weakTypeOf[ExecContext]

    def createProm[A](implicit evidence: c.universe.WeakTypeTag[A]): c.universe.Expr[Promise[A]] = reify {
      Promise[A]()
    }

    def promiseToFuture[A: WeakTypeTag](prom: Expr[Prom[A]]): c.universe.Expr[Task[A]] = reify {
      new Task(_ => prom.splice.future)
    }

    def future[A: WeakTypeTag](a: Expr[A])(execContext: Expr[ExecContext]): c.universe.Expr[Task[A]] = reify {
      new Task(implicit ec => Future { a.splice })
    }

    def onComplete[A, B](
        task: Expr[Fut[A]],
        fun: Expr[scala.util.Try[A] => B],
        execContext: Expr[ExecContext]): Expr[Unit] = reify {
      task.splice.onComplete(fun.splice)
    }

    override def continueCompletedFutureOnSameThread: Boolean = true

    override def getCompleted[A: WeakTypeTag](task: Expr[Fut[A]]): Expr[Tryy[A]] = reify {
      if (task.splice.isCompleted) future.splice.value.get else null
    }

    def completeProm[A](prom: Expr[Prom[A]], value: Expr[scala.util.Try[A]]): Expr[Unit] = reify {
      prom.splice.complete(value.splice)
      c.Expr[Unit](Literal(Constant(()))).splice
    }

    def tryyIsFailure[A](tryy: Expr[scala.util.Try[A]]): Expr[Boolean] = reify {
      tryy.splice.isFailure
    }

    def tryyGet[A](tryy: Expr[Tryy[A]]): Expr[A] = reify {
      tryy.splice.get
    }
    def tryySuccess[A: WeakTypeTag](a: Expr[A]): Expr[Tryy[A]] = reify {
      scala.util.Success[A](a.splice)
    }
    def tryyFailure[A: WeakTypeTag](a: Expr[Throwable]): Expr[Tryy[A]] = reify {
      scala.util.Failure[A](a.splice)
    }
  }
}

This doesn't work because the async macro expects the implementation of the future method to generate an expression that kicks off execution, and tasks don't start running when they're created.

To clarify, here's an example:

    val runCount = new AtomicInteger(0)

    val t = async {
      val task = async {
        runCount.incrementAndGet()
        5
      }

      assert(runCount.get == 0)

      val ten = await(task) * 2
      assert(ten == 10)

      assert(runCount.get == 1)

      ten
    }

    val future = t.run()(ExecutionContext.global)
    val result = Await.result(future, Duration.Inf)

    assert(runCount.get == 1)
    assert(result == 10)

and the macro expansion printed by -Ymacro-debug-lite and some manual cleanup for legibility:

    val runCount = new AtomicInteger(0)

    class StateMachine extends AnyRef with (snapchat.concurrent.TaskFutureSystem.Tryy[Any] => Unit) with (() => Unit) {
      private[this] var await$macro$4$macro$6: Int = _
      private[this] var state: Int = 0
      /*private[this] */
      val result: snapchat.concurrent.TaskFutureSystem.Prom[Int] = Promise.apply[Int]()
      //   def result: snapchat.concurrent.TaskFutureSystem.Prom[Int] = result
      private[this] val execContext: Unit = ()
      //   def execContext: Unit = execContext

      def apply(tr: snapchat.concurrent.TaskFutureSystem.Tryy[Int]): Unit = while$macro$8() {
        try {
          state match {
            case 0 =>
              val task = new Task(implicit ec => Future {
                runCount.incrementAndGet()
                5
              })
              assert(runCount.get().==(0))
              val awaitable$macro$3
                : snapchat.concurrent.Task[Int] /* @scala.reflect.internal.annotations.uncheckedBounds */ =
                task
              state = 1
              val completed$macro$7: snapchat.concurrent.TaskFutureSystem.Tryy[Int] =
                if (awaitable$macro$3.isCompleted)
                  awaitable$macro$3.value.get
                else
                  null
              if (null.ne(completed$macro$7)) {
                if (completed$macro$7.isFailure) {
                  /*stateMachine$macro$2.this.*/
                  result.complete(completed$macro$7)
                  return ()
                } else {
                  await$macro$4$macro$6 = completed$macro$7.get
                  state = 2
                }
                ()
              } else {
                awaitable$macro$3.onComplete(this)
                return ()
              }
              ()

            case 2 => {
              /*stateMachine$macro$2.this.*/
              result.complete(Success({
                val x$macro$5: Int = 2
                val ten: Int = await$macro$4$macro$6.*(x$macro$5)
                assert(ten.==(10))
                assert(runCount.get().==(0))
                ten
              }))
              ()
            }
            return ()

          case 1 =>
              if (tr.isFailure) {
                /*stateMachine$macro$2.this.*/
                result
                  .complete(tr)
                return ()
              } else {
                await$macro$4$macro$6 = tr.get
                state = 2
              }
              ()

            case _ => throw new IllegalStateException()
          }
        } catch {
          case throwable @ (_: Throwable) =>
            result.complete(Failure[Int](throwable))
            return ()
        }
        while$macro$8()
      }
      def apply(): Unit = StateMachine.this.apply(null)
    }

    val stateMachine = new StateMachine

    new Task(implicit ec => Future { stateMachine.apply() })
    new Task(_ => stateMachine$macro$2.result.future)

Note the last two lines. The call to stateMachine.apply() is wrapped in a task that's never started, then discarded. The other task (on the last line) grabs the future from the promise that was created by new StateMachine but that won't ever be completed because stateMachine.apply() never actually runs.

@adamw
Copy link

adamw commented Feb 8, 2019

@dsilva I ended up using https://github.com/monadless/monadless, which has the same functionality, but is general and works for any monad. Unfortunately, it's also unmaintained.

@dsilva
Copy link

dsilva commented Feb 8, 2019

@adamw Thanks, that worked for me too. Looks like it rewrites code into calls to map and flatMap instead of state machines.

@iantabolt
Copy link

For a complete example of extending async/await, we have a version using Twitter Futures here https://github.com/foursquare/twitter-util-async. I'm not sure how much it will help with the Task implementation since it also involves a Promise concept.

@SethTisue I could take a stab at the documentation update

@retronym
Copy link
Member

I'm open to refactoring the internals of async to support abstractions like Task.

@Atry
Copy link
Contributor

Atry commented Mar 9, 2019

I would be more concerned about unmerged PRs than issues without comment ;) Anyway, all of these seem rather abandoned. Would be nice of course to combine all these efforts into a single one, working in any Cats/Scalaz monad (like Kotlin's coroutines), but ... there's a finite amount of time ;)

Dsl.scala is not abandoned.

@Atry
Copy link
Contributor

Atry commented Mar 9, 2019

Dsl.scala is not only more general than scala.async, but also more general than monadless or any monad-based direct style DSL, because:

  1. Dsl.scala's built-in compiler plugins are name-based instead of symbol-based, can be used together with any type classes or even type class-free delimited continuations.
  2. Dsl.scala's built-in type class Dsl is more general than Monad.

@retronym
Copy link
Member

retronym commented Mar 20, 2020

The integration story for other awaitable types should get easier with scala/scala#8816.

The test case in that PR includes this integration for java.util.CompletableFuture:

object CompletableFutureAwait {
  def async[T](executor: Executor)(body: T): CompletableFuture[T] = macro impl
  @compileTimeOnly("[async] `await` must be enclosed in `async`")
  def await[T](completableFuture: CompletableFuture[T]): T = ???
  def impl(c: blackbox.Context)(executor: c.Tree)(body: c.Tree): c.Tree = {
    import c.universe._
    val awaitSym = typeOf[CompletableFutureAwait.type].decl(TermName("await"))
    def mark(t: DefDef): Tree = c.internal.markForAsyncTransform(c.internal.enclosingOwner, t, awaitSym, Map.empty)
    val name = TypeName("stateMachine$$async_" + body.pos.line)
    q"""
      final class $name extends _root_.scala.tools.nsc.async.CompletableFutureStateMachine($executor) {
        ${mark(q"""override def apply(tr$$async: _root_.scala.util.Try[_root_.scala.AnyRef]) = ${body}""")}
      }
      new $name().start().asInstanceOf[${c.macroApplication.tpe}]
    """
  }
}

abstract class CompletableFutureStateMachine(executor: Executor) extends AsyncStateMachine[CompletableFuture[AnyRef], Try[AnyRef]] with Runnable with BiConsumer[AnyRef, Throwable] {
  Objects.requireNonNull(executor)

  protected var result$async: CompletableFuture[AnyRef] = new CompletableFuture[AnyRef]();

  // Adapters
  def accept(value: AnyRef, throwable: Throwable): Unit = {
    this(if (throwable != null) Failure(throwable) else Success(value))
  }
  def run(): Unit = {
    apply(null)
  }

  // FSM translated method
  def apply(tr$async: Try[AnyRef]): Unit

  // Required methods
  protected var state$async: Int = StateAssigner.Initial
  protected def completeFailure(t: Throwable): Unit = result$async.completeExceptionally(t)
  protected def completeSuccess(value: AnyRef): Unit = result$async.complete(value)
  protected def onComplete(f: CompletableFuture[AnyRef]): Unit = f.whenCompleteAsync(this)
  protected def getCompleted(f: CompletableFuture[AnyRef]): Try[AnyRef] = try {
    val r = f.getNow(this)
    if (r == this) null
    else Success(r)
  } catch {
    case t: Throwable => Failure(t)
  }
  protected def tryGet(tr: Try[AnyRef]): AnyRef = tr match {
    case Success(value) =>
      value.asInstanceOf[AnyRef]
    case Failure(throwable) =>
      result$async.completeExceptionally(throwable)
      this // sentinel value to indicate the dispatch loop should exit.
  }
  def start(): CompletableFuture[AnyRef] = {
    executor.execute(this)
    result$async
  }
}

@dsilvasc
Copy link

dsilvasc commented May 4, 2020

@retronym With that change, do you have in mind what an example integration without an executor would look like? It would be excellent if moving scala-async into the compiler can be done in away that makes it possible to integrate with task types like monix Task, scalaz Task, ZIO, and cats IO.

@retronym
Copy link
Member

retronym commented May 8, 2020

Here's one way to integrate a cats-eval like type.

package scala.tools.nsc
package async

import scala.language.experimental.macros
import scala.reflect.macros.blackbox
import scala.annotation.compileTimeOnly
import scala.tools.partest.async.AsyncStateMachine

object EvalAwait {
  def evaluating[T](body: T): Eval[T] = macro impl
  @compileTimeOnly("[async] `value` must be enclosed in `writing`")
  def value[T](output: Eval[T]): T = ???
  def impl(c: blackbox.Context)(body: c.Tree): c.Tree = {
    import c.universe._
    val awaitSym = typeOf[EvalAwait.type].decl(TermName("value"))
    def mark(t: DefDef): Tree = c.internal.markForAsyncTransform(c.internal.enclosingOwner, t, awaitSym, Map.empty)
    val name = TypeName("stateMachine$$async_" + body.pos.line)
    q"""
      final class $name extends _root_.scala.tools.nsc.async.EvalStateMachine {
        ${mark(q"""override def apply(tr$$async: _root_.scala.AnyRef) = ${body}""")}
      }
      new $name().start().asInstanceOf[${c.macroApplication.tpe}]
    """
  }
}
abstract class Eval[A] {
  def value: A
}
object Eval {
  def now[T](t: T): Eval[T] = Now(t)
  def later[T](t: => T): Eval[T] = new Later(() => t)
  def always[T](t: => T): Eval[T] = new Always(() => t)
}

final case class Now[A](value: A) extends Eval[A] {
  def memoize: Eval[A] = this
}
final class Later[A](f: () => A) extends Eval[A] {
  private[this] var thunk: () => A = f
  lazy val value: A = {
    try thunk() finally thunk = null
  }
}
final class Always[A](f: () => A) extends Eval[A] {
  def value: A = f()
  def memoize: Eval[A] = new Later(f)
}

abstract class EvalStateMachine extends AsyncStateMachine[Eval[AnyRef], AnyRef] {
  var result$async: AnyRef = _

  // FSM translated method
  def apply(tr$async: AnyRef): Unit

  // Required methods
  protected var state$async: Int = 0
  protected def completeFailure(t: Throwable): Unit = throw t
  protected def completeSuccess(value: AnyRef): Unit = result$async = value
  protected def onComplete(f: Eval[AnyRef]): Unit = throw new UnsupportedOperationException()
  protected def getCompleted(f: Eval[AnyRef]): AnyRef = f.value
  protected def tryGet(tr: AnyRef): AnyRef = tr
  def start(): Eval[AnyRef] = {
    apply(null)
    Eval.now(result$async)
  }
}

The following runs in a stack safe manner. It wouldn't integrate with the trampoline style evaluation in cat-eval flatMap etc, but all the composition of values in the evaluating block is down without in a loop rather than recursion in the state machine.

        import scala.tools.nsc.async._
        import EvalAwait._

        object Test {
          def v1 = Eval.now("v1")
          def v2 = Eval.now("v2")
          def v3 = Eval.later("later")
          def test: String = test0.value
          def test0 = evaluating[String] {
            var r1 = ""
            var i = 0
            while (i < 10000) {
              r1 = value(v1) + value(v2) + value(v3)
              i += 1
            }
            r1
          }
        }

@retronym
Copy link
Member

retronym commented May 8, 2020

For completeness, I'll also do a prototype integration with monix.Task. A little extra indirection is needed to make the Task multi-shot (retry-able) -- the state machine class can't be a Task itself as we need a new instance of it instantiated for each call to Task.execute.

@retronym
Copy link
Member

retronym commented May 8, 2020

Here's a possible monix-task integration: retronym/monad-ui@3b146ec

A new instance of the state machine is created for each execution of the Task.

As an external library, without using the built in run loop in Monix, there doesn't seem to be an API for exploiting the fact that Now constructor of Task has an immediate value. The scala-async state machine can use this to avoid a context switch through the scheduler and instead immediately resume the state machine loop. That could be fixed by moving the DSL implementation into monix or by exposing additional API.

Review by @alexandru, perhaps?

@alexandru
Copy link

Oh wow, looking good.

Loved scala-async in the past, would be great to see it work for IO types.

@kubukoz
Copy link

kubukoz commented May 8, 2020

Has a cats integration been considered? It wouldn't require a lot of knowledge from the plugin about the constructors of the effect, as long as only the interface of cats.Monad is used. That, and probably cats.Traverse to handle for comprehensions.

@djspiewak
Copy link
Member

I definitely think that doing this more generally for cats/cats-effect types would make a lot of sense. Most of the machinery it looks like would only require Monad, and the few pieces which require more than that would be fine with ConcurrentEffect. I think it would require splitting the state machine into two such that the pieces which require tighter constraints are materialized only as necessary.

@retronym
Copy link
Member

retronym commented May 9, 2020

The async/await DSL has two selling points:

  • less syntactic overhead than for-comprehensions (and way less than explicit flatMapping), which is particularly beneficial when you want to express control flow.
  • a more efficient translation based on knowledge of the effect. This is based on the special case that the continuation as single-shot (called zero or one times, and not called in parallel). Rather than generating a separate lambda for each continuation, a single instance of a single state machine class acts as the continuation in all places.
    • Prior to Scala 2.12, this used to be a win for generated code size. But now that lambdas classes are materialized at runtime with LambdaMetafactory, that's no longer a point of difference.
    • Immediate continuations (e.g. on a future whose value is already computed) can be optimized to run on the current thread without consuming stack frames.
    • Captured state that is not referenced afterward is nulled out to reduce the retained memory size of the in-progress computation.

The difference between single-shot vs multi-shot effects is bit subtle. I thought we'd have trouble modelling monix.Task because it needs to support retries, but this was possible by having the Task instantiate a new state machine for each execution.

Take a look at https://github.com/retronym/monad-ui/ for some experiments I did with different DSLs atop the writer Monad.

So, does an async-like DSL still make sense for effects that are really multi-shot (like List), or when then "shot-ity" is not known (like Monad)? @pelotom seems to think so with effectful

monadically {
   val v1 = value(List(1, 2 ,3))
   val v2 = value(List(2, 3, 4))
   v1 + v2
}

Scalac's -Xasync phase could run the ANF transform over this but then have an alternative second half to the transform that emits continuations based on plain lambdas and flatMap calls. This translation could still be somewhat customizable (e.g. to use a typeclass vs assuming flatMap is a member).

@retronym
Copy link
Member

retronym commented May 10, 2020

I don't think this DSL for List's looks appealing to use. It's best to focus on ConcurrentEffect.

Here's a first cut at integration. Caveat: I haven't used cats-effect before! https://github.com/retronym/monad-ui/pull/1/files

I only needed Effect so far. One thing that is missing is a way determine if a value is immediately available without needing to go through a callback. The Future implementation of async uses this to immediately move to the next iteration of the state machine loop (in a stack safe manner) and avoid either a) the overhead of going through the execution context or b) finding that an execution context continues on this thread be because it can detect an immediate value, and burning stack.

@retronym retronym self-assigned this Jun 24, 2020
@retronym retronym added this to the 1.0 milestone Jun 24, 2020
@fwbrasil
Copy link

fwbrasil commented Jun 26, 2020

As a comparison, here's all the code necessary to support cats with Monadless:

https://github.com/monadless/monadless/blob/master/monadless-cats/src/main/scala/io/monadless/cats/MonadlessApplicative.scala
https://github.com/monadless/monadless/blob/master/monadless-cats/src/main/scala/io/monadless/cats/MonadlessMonad.scala

It can depend only on Applicative (or use Monad if necessary) and supports any implementation of the typeclasses.

@SethTisue
Copy link
Member

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests