Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #2969 from retronym/ticket/7861

SI-7861 Don't execute internal callbacks on the user Executor
  • Loading branch information...
commit 892aa93cf7b74f9bf1c3194930186b33f892fc02 2 parents e176a10 + 7f4b44b
Jason Zaugg retronym authored
11 src/library/scala/concurrent/Future.scala
View
@@ -96,11 +96,8 @@ trait Future[+T] extends Awaitable[T] {
// of the Future trait. Note that this will
// (modulo bugs) _never_ execute a callback
// other than those below in this same file.
- // As a nice side benefit, having this implicit
- // here forces an ambiguity in those methods
- // that also have an executor parameter, which
- // keeps us from accidentally forgetting to use
- // the executor parameter.
+ //
+ // See the documentation on `InternalCallbackExecutor` for more details.
private def internalExecutor = Future.InternalCallbackExecutor
/* Callbacks */
@@ -254,7 +251,7 @@ trait Future[+T] extends Awaitable[T] {
case Success(v) => try f(v) match {
// If possible, link DefaultPromises to avoid space leaks
case dp: DefaultPromise[_] => dp.asInstanceOf[DefaultPromise[S]].linkRootOf(p)
- case fut => fut onComplete p.complete
+ case fut => fut.onComplete(p.complete)(internalExecutor)
} catch { case NonFatal(t) => p failure t }
}
p.future
@@ -344,7 +341,7 @@ trait Future[+T] extends Awaitable[T] {
def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = {
val p = Promise[U]()
onComplete {
- case Failure(t) => try pf.applyOrElse(t, (_: Throwable) => this) onComplete p.complete catch { case NonFatal(t) => p failure t }
+ case Failure(t) => try pf.applyOrElse(t, (_: Throwable) => this).onComplete(p.complete)(internalExecutor) catch { case NonFatal(t) => p failure t }
case other => p complete other
}
p.future
6 test/files/run/future-flatmap-exec-count.check
View
@@ -0,0 +1,6 @@
+mapping
+execute()
+flatmapping
+execute()
+recovering
+execute()
61 test/files/run/future-flatmap-exec-count.scala
View
@@ -0,0 +1,61 @@
+import scala.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
+
+object Test {
+ def main(args: Array[String]) {
+ test()
+ }
+
+ def test() = {
+ def await(f: Future[Any]) =
+ Await.result(f, duration.Duration.Inf)
+
+ val ec = new TestExecutionContext(ExecutionContext.Implicits.global)
+
+ {
+ val p = Promise[Int]()
+ val fp = p.future
+ println("mapping")
+ val mapped = fp.map(x => x)(ec)
+ p.success(0)
+ await(mapped)
+ }
+
+ {
+ println("flatmapping")
+ val p = Promise[Int]()
+ val fp = p.future
+ val flatMapped = fp.flatMap({ (x: Int) =>
+ Future.successful(2 * x)
+ })(ec)
+ p.success(0)
+ await(flatMapped)
+ }
+
+ {
+ println("recovering")
+ val recovered = Future.failed(new Throwable()).recoverWith {
+ case _ => Future.successful(2)
+ }(ec)
+ await(recovered)
+ }
+ }
+
+ class TestExecutionContext(delegate: ExecutionContext) extends ExecutionContext {
+ def execute(runnable: Runnable): Unit = ???
+
+ def reportFailure(t: Throwable): Unit = ???
+
+ override def prepare(): ExecutionContext = {
+ val preparedDelegate = delegate.prepare()
+ return new ExecutionContext {
+ def execute(runnable: Runnable): Unit = {
+ println("execute()")
+ preparedDelegate.execute(runnable)
+ }
+
+ def reportFailure(t: Throwable): Unit = ???
+ }
+ }
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.