Permalink
Browse files

SI-7861 Don't execute internal callbacks on the user Executor

Callbacks internal to the implementation of Futures should be
executed with the `InternalCallbackExecutor`, rather than the
user supplied `Executor`.

In a refactoring da54f34, `recoverWith` and `flatMap` no longer
played by these rules. This was noticed by a persnickety test in
Play.

Before this patch, the enclosed test outputs:

    % scala-hash v2.10.3-RC2 test/files/run/future-flatmap-exec-count.scala
    mapping
    execute()
    flatmapping
    execute()
    execute()
    recovering
    execute()
    execute()
  • Loading branch information...
1 parent e176a10 commit 7f4b44b612abdc62fba9810194cee17c7d0de37e @retronym retronym committed Sep 20, 2013
@@ -96,11 +96,8 @@ trait Future[+T] extends Awaitable[T] {
// of the Future trait. Note that this will
// (modulo bugs) _never_ execute a callback
// other than those below in this same file.
- // As a nice side benefit, having this implicit
- // here forces an ambiguity in those methods
- // that also have an executor parameter, which
- // keeps us from accidentally forgetting to use
- // the executor parameter.
+ //
+ // See the documentation on `InternalCallbackExecutor` for more details.
private def internalExecutor = Future.InternalCallbackExecutor
/* Callbacks */
@@ -254,7 +251,7 @@ trait Future[+T] extends Awaitable[T] {
case Success(v) => try f(v) match {
// If possible, link DefaultPromises to avoid space leaks
case dp: DefaultPromise[_] => dp.asInstanceOf[DefaultPromise[S]].linkRootOf(p)
- case fut => fut onComplete p.complete
+ case fut => fut.onComplete(p.complete)(internalExecutor)
} catch { case NonFatal(t) => p failure t }
}
p.future
@@ -344,7 +341,7 @@ trait Future[+T] extends Awaitable[T] {
def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = {
val p = Promise[U]()
onComplete {
- case Failure(t) => try pf.applyOrElse(t, (_: Throwable) => this) onComplete p.complete catch { case NonFatal(t) => p failure t }
+ case Failure(t) => try pf.applyOrElse(t, (_: Throwable) => this).onComplete(p.complete)(internalExecutor) catch { case NonFatal(t) => p failure t }
case other => p complete other
}
p.future
@@ -0,0 +1,6 @@
+mapping
+execute()
+flatmapping
+execute()
+recovering
+execute()
@@ -0,0 +1,61 @@
+import scala.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
+
+object Test {
+ def main(args: Array[String]) {
+ test()
+ }
+
+ def test() = {
+ def await(f: Future[Any]) =
+ Await.result(f, duration.Duration.Inf)
+
+ val ec = new TestExecutionContext(ExecutionContext.Implicits.global)
+
+ {
+ val p = Promise[Int]()
+ val fp = p.future
+ println("mapping")
+ val mapped = fp.map(x => x)(ec)
+ p.success(0)
+ await(mapped)
+ }
+
+ {
+ println("flatmapping")
+ val p = Promise[Int]()
+ val fp = p.future
+ val flatMapped = fp.flatMap({ (x: Int) =>
+ Future.successful(2 * x)
+ })(ec)
+ p.success(0)
+ await(flatMapped)
+ }
+
+ {
+ println("recovering")
+ val recovered = Future.failed(new Throwable()).recoverWith {
+ case _ => Future.successful(2)
+ }(ec)
+ await(recovered)
+ }
+ }
+
+ class TestExecutionContext(delegate: ExecutionContext) extends ExecutionContext {
+ def execute(runnable: Runnable): Unit = ???
+
+ def reportFailure(t: Throwable): Unit = ???
+
+ override def prepare(): ExecutionContext = {
+ val preparedDelegate = delegate.prepare()
+ return new ExecutionContext {
+ def execute(runnable: Runnable): Unit = {
+ println("execute()")
+ preparedDelegate.execute(runnable)
+ }
+
+ def reportFailure(t: Throwable): Unit = ???
+ }
+ }
+ }
+}

0 comments on commit 7f4b44b

Please sign in to comment.