Skip to content
Browse files

Fix SI-6932 by enabling linearization of callback execution for the …

…internal execution context of Future
  • Loading branch information...
1 parent 53d4ec0 commit 5713c1b9b5642bb1813f9e5816d31a1bc5d70c60 @viktorklang viktorklang committed with phaller Jan 14, 2013
View
126 src/library/scala/concurrent/BatchingExecutor.scala
@@ -0,0 +1,126 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+import java.util.concurrent.{ Executor }
+import scala.concurrent._
+import scala.annotation.tailrec
+
+/**
+ * All Batchables are automatically batched when submitted to a BatchingExecutor
+ */
+private[concurrent] trait Batchable extends Runnable {
+ def isBatchable: Boolean
+}
+
+/**
+ * Mixin trait for an Executor
+ * which groups multiple nested `Runnable.run()` calls
+ * into a single Runnable passed to the original
+ * Executor. This can be a useful optimization
+ * because it bypasses the original context's task
+ * queue and keeps related (nested) code on a single
+ * thread which may improve CPU affinity. However,
+ * if tasks passed to the Executor are blocking
+ * or expensive, this optimization can prevent work-stealing
+ * and make performance worse. Also, some ExecutionContext
+ * may be fast enough natively that this optimization just
+ * adds overhead.
+ * The default ExecutionContext.global is already batching
+ * or fast enough not to benefit from it; while
+ * `fromExecutor` and `fromExecutorService` do NOT add
+ * this optimization since they don't know whether the underlying
+ * executor will benefit from it.
+ * A batching executor can create deadlocks if code does
+ * not use `scala.concurrent.blocking` when it should,
+ * because tasks created within other tasks will block
+ * on the outer task completing.
+ * This executor may run tasks in any order, including LIFO order.
+ * There are no ordering guarantees.
+ *
+ * WARNING: The underlying Executor's execute-method must not execute the submitted Runnable
+ * in the calling thread synchronously. It must enqueue/handoff the Runnable.
+ */
+private[concurrent] trait BatchingExecutor extends Executor {
+
+ // invariant: if "_tasksLocal.get ne null" then we are inside BatchingRunnable.run; if it is null, we are outside
+ private val _tasksLocal = new ThreadLocal[List[Runnable]]()
+
+ private class Batch(val initial: List[Runnable]) extends Runnable with BlockContext {
+ private var parentBlockContext: BlockContext = _
+ // this method runs in the delegate ExecutionContext's thread
+ override def run(): Unit = {
+ require(_tasksLocal.get eq null)
+
+ val prevBlockContext = BlockContext.current
+ BlockContext.withBlockContext(this) {
+ try {
+ parentBlockContext = prevBlockContext
+
+ @tailrec def processBatch(batch: List[Runnable]): Unit = batch match {
+ case Nil ()
+ case head :: tail
+ _tasksLocal set tail
+ try {
+ head.run()
+ } catch {
+ case t: Throwable
+ // if one task throws, move the
+ // remaining tasks to another thread
+ // so we can throw the exception
+ // up to the invoking executor
+ val remaining = _tasksLocal.get
+ _tasksLocal set Nil
+ unbatchedExecute(new Batch(remaining)) //TODO what if this submission fails?
+ throw t // rethrow
+ }
+ processBatch(_tasksLocal.get) // since head.run() can add entries, always do _tasksLocal.get here
+ }
+
+ processBatch(initial)
+ } finally {
+ _tasksLocal.remove()
+ parentBlockContext = null
+ }
+ }
+ }
+
+ override def blockOn[T](thunk: T)(implicit permission: CanAwait): T = {
+ // if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock.
+ {
+ val tasks = _tasksLocal.get
+ _tasksLocal set Nil
+ if ((tasks ne null) && tasks.nonEmpty)
+ unbatchedExecute(new Batch(tasks))
+ }
+
+ // now delegate the blocking to the previous BC
+ require(parentBlockContext ne null)
+ parentBlockContext.blockOn(thunk)
+ }
+ }
+
+ protected def unbatchedExecute(r: Runnable): Unit
+
+ override def execute(runnable: Runnable): Unit = {
+ if (batchable(runnable)) { // If we can batch the runnable
+ _tasksLocal.get match {
+ case null unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch
+ case some _tasksLocal.set(runnable :: some) // If we are already in batching mode, add to batch
+ }
+ } else unbatchedExecute(runnable) // If not batchable, just delegate to underlying
+ }
+
+ /** Override this to define which runnables will be batched. */
+ def batchable(runnable: Runnable): Boolean = runnable match {
+ case b: Batchable b.isBatchable
+ case _: scala.concurrent.OnCompleteRunnable true
+ case _ false
+ }
+}
View
6 src/library/scala/concurrent/Future.scala
@@ -672,9 +672,9 @@ object Future {
// by just not ever using it itself. scala.concurrent
// doesn't need to create defaultExecutionContext as
// a side effect.
- private[concurrent] object InternalCallbackExecutor extends ExecutionContext {
- override def execute(runnable: Runnable): Unit =
- runnable.run()
+ private[concurrent] object InternalCallbackExecutor extends ExecutionContext with BatchingExecutor {
+ override protected def unbatchedExecute(r: Runnable): Unit =
+ r.run()
override def reportFailure(t: Throwable): Unit =
throw new IllegalStateException("problem in scala.concurrent internal callback", t)
}
View
7 test/files/jvm/scala-concurrent-tck.scala
@@ -141,6 +141,12 @@ trait FutureCallbacks extends TestBase {
assert(false)
}
}
+
+ def testThatNestedCallbacksDoNotYieldStackOverflow(): Unit = {
+ val promise = Promise[Int]
+ (0 to 10000).map(Future(_)).foldLeft(promise.future)((f1, f2) => f2.flatMap(i => f1))
+ promise.success(-1)
+ }
testOnSuccess()
testOnSuccessWhenCompleted()
@@ -150,6 +156,7 @@ trait FutureCallbacks extends TestBase {
// testOnFailureWhenSpecialThrowable(6, new scala.util.control.ControlThrowable { })
//TODO: this test is currently problematic, because NonFatal does not match InterruptedException
//testOnFailureWhenSpecialThrowable(7, new InterruptedException)
+ testThatNestedCallbacksDoNotYieldStackOverflow()
testOnFailureWhenTimeoutException()
}

0 comments on commit 5713c1b

Please sign in to comment.
Something went wrong with that request. Please try again.