Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IO#unsafeRunSyncOrFuture #47

Closed
wants to merge 1 commit into from

Conversation

alexandru
Copy link
Member

@alexandru alexandru commented May 6, 2017

This PR adds a new evaluation strategy called unsafeRunSyncOrFuture.

What it does is that it evaluates the effect and tries to return the result immediately, returning Right(a) in case of success, or throwing the exception in case of failure, otherwise upon hitting an asynchronous boundary it returns the result as a Left(future).

sealed abstract class IO[+A] {

  final def unsafeRunSyncOrFuture(): Either[Future[A], A] = ???

}

The primary use-case for me is that in Monix I would like to express an efficient conversion from IO to Task:

def fromIO[A](io: IO[A]): Task[A] =
  Task.suspend {
    try io.unsafeRunSyncOrFuture() match {
      case Right(a) => Task.now(a)
      case Left(f) => Task.fromFuture(f)
    }
    catch {
      case NonFatal(e) => Task.raiseError(e)
    }
  }

In absence of this, what I can do currently is this:

def fromIO[A](io: IO[A]): Task[A] =
  Task.create { (scheduler, callback) =>
    io.unsafeRunAsync {
      case Right(a) => callback.onSuccess(a)
      case Left(e) => callback.onError(e)
    }
    Cancelable.empty
  }

Well, this is less efficient than it could be because this forces 2 "light" asynchronous boundaries, one before triggering io.unsafeRunAsync and one before triggering the final callback ("light" means trampolined Runnable managed by the scheduler) for thread-safety reasons - i.e. no matter what policy we have in cats-effect for Async.async, this conversion in Monix will have to be stack-safe. Also because of the handling of Cancelable, we get unnecessary work that gets executed for async stuff.

In other words, it would be great if I could evaluate IO until either (1) an immediate result is available or (2) an asynchronous boundary is hit. And without this new operation, I cannot do it, because everything that I need is private (and for good reasons).

Alternative (without Future)

This is also one of those cases where usage of Future in the signature is superior to the alternatives and why I've told you before that I actually love Future. We could have described that function like this:

def unsafeRunSyncOrAsync(cb: Either[Throwable, A] => Unit): Option[A] = ???

Unfortunately this doesn't help my use-case, because in this case you need to build your callback before that function gets invoked.

In which case we can either describe our conversion like this:

def fromIO[A](io: IO[A]): Task[A] =
  Task.create { (scheduler, callback) =>
    try {
      val result = io.unsafeRunSyncOrAsync {
        case Right(a) => callback.onSuccess(a)
        case Left(e) => callback.onError(e)
      }

      result match {
        case Some(a) => callback.onSuccess(a)
        case None => ()
      }
    }
    catch {
      case NonFatal(e) => callback.onError(e)
    }
    Cancelable.empty
  }

But this is awful, isn't it? And ... it doesn't help us in optimizing anything.

Or option number 2, we use some kind of placeholder, some synchronized variable meant as a temporary location for our asynchronous results:

def fromIO[A](io: IO[A]): Task[A] =
  Task.suspend {
    try {
      val p = Promise[A]()
      val result = io.unsafeRunSyncOrAsync {
        case Right(a) => p.success(a)
        case Left(e) => p.failure(e)
      }

      result match {
        case Some(a) => Task.now(a)
        case None => Task.fromFuture(p.future)
      }
    }
    catch {
      case NonFatal(e) => Task.raiseError(e)
    }
  }

Well, it's slightly better, but we end up creating a Promise reference on each invocation, whether we want it or not. And we end up using Futures anyway.

In other words, please don't dismiss this just because it has Future in its signature 😃

@codecov-io
Copy link

codecov-io commented May 6, 2017

Codecov Report

Merging #47 into master will increase coverage by 0.19%.
The diff coverage is 100%.

@@            Coverage Diff             @@
##           master      #47      +/-   ##
==========================================
+ Coverage   86.94%   87.13%   +0.19%     
==========================================
  Files          17       17              
  Lines         268      272       +4     
  Branches       10       10              
==========================================
+ Hits          233      237       +4     
  Misses         35       35

@djspiewak
Copy link
Member

I don't object to the fact that Future in the signature, but I don't really see a strong need to add an additional unsafe function for this one specific case. A couple of points…

First, the boxing of the closure for a signature like unsafeSomething(Either[Throwable, A] => Unit): Option[A] is actually less overhead than the boxing for the Either[Future[A], A] on Scala 2.12, and also amenable to escape analysis due to the use of MethodHandle. Closures are much, much more efficient than they once were.

Second, I'm assuming you have a Task.internalCreate operation which doesn't trampoline through the thread pool? The reason I ask is the fact that IO.async's construction is already not stack-safe, meaning that I don't see why there would be an expectation of stack-safety specifically at the boundary coming from IO to Task (obviously, you would expect stack-safety at create usage beyond that point). So you can avoid that thread bounce penalty if you want to, you just push the guarantees back one stage.

Finally, this seems like a relatively small thing to optimize, overall. Even if it's not possible to avoid the active thread bounce in Monix, the only thing you're optimizing is the limited case where an IO is entirely synchronous and the overhead of the thread bounce exceeds the overhead of the added boxing (Either + Option vs closure instantiation). That seems like a very small benefit relative to the cognitive cost of adding an additional unsafe function and increasing the API surface area.

@alexandru
Copy link
Member Author

alexandru commented May 7, 2017

First, the boxing of the closure for a signature like unsafeSomething(Either[Throwable, A] => Unit): Option[A] is actually less overhead than the boxing for the Either[Future[A], A] on Scala 2.12, and also amenable to escape analysis due to the use of MethodHandle. Closures are much, much more efficient than they once were.

I was not talking about the boxing, which is irrelevant here. I'm saying that within that version you have to build your callback before making the call, which is a problem in case that IO reference is synchronous. When receiving a Future however, you can build your callback after making that call.

In other words you don't have to be in an Async.async context before triggering the evaluation, you can do that afterwards and avoid any overhead that an async context would have.

I'm assuming you have a Task.internalCreate operation which doesn't trampoline through the thread pool? The reason I ask is the fact that IO.async's construction is already not stack-safe

We do have that, however I cannot expose this stack unsafety in Monix. Here's the reason why:

import monix.eval._
import cats.effect._

def fromIO[A](io: IO[A]): Task[A] =
  Task.unsafeCreate[A] { (context, callback) =>
    io.unsafeRunAsync {
      case Right(a) => callback.onSuccess(a)
      case Left(e) => callback.onError(e)
    }
  }

def signalIO[A](a: A): IO[A] =
  IO(a)

def signalTask[A](a: A): Task[A] =
  fromIO(signalIO(a))

def sumN(n: Int, acc: Long = 0): Task[Long] =
  signalTask(n).flatMap { x =>
    if (x <= 0) Task.now(acc) 
    else sumN(n-1, acc+n)
  }

sumN(10000).runAsync.onComplete(println)
//=> java.lang.StackOverflowError
//=>   ...
//=>   at cats.effect.IO$$anonfun$async$1.apply(IO.scala:502)
//=>   at cats.effect.IO$$anonfun$async$1.apply(IO.scala:500)
//=>   at cats.effect.IO.unsafeRunAsync(IO.scala:296)
//=>   ...
//=>   at monix.eval.internal.TaskRunLoop$.executeOnFinish$1(TaskRunLoop.scala:158)
//=>   at monix.eval.internal.TaskRunLoop$.monix$eval$internal$TaskRunLoop$$loop$1(TaskRunLoop.scala:224)
//=>   at monix.eval.internal.TaskRunLoop$RestartCallback$1.onSuccess(TaskRunLoop.scala:119)
//=>   ...

Notice here how the IO reference is synchronous and entirely safe to use in flatMap loops, whereas the resulting Task reference is not. I definitely cannot convert from an IO instance in a stack-unsafe way.

In my opinion this kind of mistake is a booby trap for users that the library should not expose without big warning signs with "here be dragons". This is why Monix's stack-unsafe async operation is called unsafeCreate - because this kind of unsafe is actually more unsafe than the unsafe from unsafeRunAsync. That's because at the very least unsafeRunAsync has a signature that does not lie.

Finally, this seems like a relatively small thing to optimize, overall. Even if it's not possible to avoid the active thread bounce in Monix, the only thing you're optimizing is the limited case where an IO is entirely synchronous and the overhead of the thread bounce exceeds the overhead of the added boxing (Either + Option vs closure instantiation). That seems like a very small benefit relative to the cognitive cost of adding an additional unsafe function and increasing the API surface area.

This isn't a small thing to optimize, this problem will be hit by every implementation that will want to convert from an IO reference. And it's not about boxing either, but about triggering that async context which might trigger overhead related to doing stuff asynchronously. If the synchronous case would be a small thing to optimize for, then Scala's Future wouldn't lose points in every "Task versus Future" comparison ever written.

unsafePerformSync is also an optimization and compared with that one, this one isn't actually unsafe. And it also increases the API surface area. If I were to pick, I would choose this one, because this one is usable on top of Javascript as well.

And if indeed IO references being entirely synchronous is a limited case that doesn't need optimization, then unsafePerformSync should go away too. But I don't think that's the case. I think most IO references will be synchronous.

I do understand the concerns that this increases the API surface, but in this case it's worth it because IO will also play the role of middleware and efficient back and forth conversions should be possible.

@djspiewak
Copy link
Member

We do have that, however I cannot expose this stack unsafety in Monix. Here's the reason why

Ok, that's compelling.

unsafePerformSync is also an optimization and compared with that one, this one isn't actually unsafe. And it also increases the API surface area. If I were to pick, I would choose this one, because this one is usable on top of Javascript as well.

Except unsafePerformSync isn't an optimization. I don't care how fast or slow it is; it's not intended to be used in a production code path, only in tests. It is a convenience optimization, not a performance optimization.

I'll give this some thought. I think you've convinced me that we need some way to externally decompose the special case of a fully-synchronous IO for conversion purposes, it's mostly a question of the best way to expose it.

@djspiewak
Copy link
Member

How about something like this? (avoids adding another unsafe function, and exposes exactly and only as much internals as necessary to distinguish the meaningful cases)

// name tbd
def fold[R](async: ((Either[Throwable, A] => IO[Unit]) => IO[Unit]) => R, sync: Either[Throwable, A] => R): IO[R]

As a sidebar, we could do even better if you wanted to deconstruct the full internals in a safe way:

trait Deconstructor[F[_]] {
  def error[A](t: Throwable): F[A]
  def sync[A](thunk: => A): F[A]
  def async[A](k: (Either[Throwable, A] => Unit) => Unit): F[A]
}

def foldMap[F[_]](d: Deconstructor[F]): IO[F[A]]

That's obviously more powerful, but it has the advantage of allowing you to retrampoline the internals, converting a stack-unsafe IO into a stack-safe Task, even when the IO would have blown the stack under normal execution.

@djspiewak
Copy link
Member

Oh, actually an even better idea!

// name tbd; I would accept something like `to`
def foldMap[F[_]: Effect]: F[A]

We can define this safely, because we know that Effect is lawful. So it doesn't even need to be boxed behind IO.

@alexandru
Copy link
Member Author

OK, will modify the PR.

@alexandru
Copy link
Member Author

Or I'll just create a new one and close this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants