From 72b160d9ea9058cc0ddb64a06b0b128faefafb22 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 22 Feb 2019 17:29:07 +0100 Subject: [PATCH 1/2] Introduces ExecutionContext.parasitic This is a specific ExecutionContext which executes submitted Runnables on the calling thread, utilizing a combination of limited stack growth with on-heap trampolining to achieve its functionality and prevents unbounded stack growth. It is introduced as it is too easy for end-users to implement equivalent functionality but it is tricky to get right. --- .../scala/concurrent/ExecutionContext.scala | 26 +++++++- src/library/scala/concurrent/Future.scala | 64 ++++++------------- src/library/scala/concurrent/Promise.scala | 2 +- .../scala/concurrent/impl/Promise.scala | 7 +- .../scala/concurrent/FutureBenchmark.scala | 4 +- test/files/jvm/future-spec.check | 2 + test/files/jvm/future-spec/FutureTests.scala | 34 ++++++++++ 7 files changed, 83 insertions(+), 56 deletions(-) diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index 87a4a815c32d..aaef0c2b7e26 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -139,9 +139,29 @@ object ExecutionContext { * * @return the global `ExecutionContext` */ - def global: ExecutionContextExecutor = Implicits.global.asInstanceOf[ExecutionContextExecutor] + final def global: ExecutionContextExecutor = Implicits.global.asInstanceOf[ExecutionContextExecutor] - object Implicits { + /** + * This `ExecutionContext` steals execution time from other threads, by having its + * `Runnable`s run on the `Thread` which calls `execute` and then yielding back control + * to the caller after *all* its `Runnables` have been executed. + * Nested invocations of `execute` will be trampolined to prevent uncontrolled stack space growth. + * + * WARNING: Do *not* call any blocking code in the `Runnable`s submitted to this `ExecutionContext` + * as it will prevent progress by other enqueued `Runnable`s and the calling `Thread`. + * Only execute logic which will quickly return control to the caller. + * Symptoms of misuse of this `ExecutionContext` include, but are not limited to, deadlocks + * and severe performance problems. + * + * Any `NonFatal` or `InterruptedException`s will be reported to the `defaultReporter`. + */ + final object parasitic extends ExecutionContextExecutor with BatchingExecutor { + override final def submitForExecution(runnable: Runnable): Unit = runnable.run() + override final def execute(runnable: Runnable): Unit = submitSyncBatched(runnable) + override final def reportFailure(t: Throwable): Unit = defaultReporter(t) + } + + final object Implicits { /** * The implicit global `ExecutionContext`. Import `global` when you want to provide the global * `ExecutionContext` implicitly. @@ -150,7 +170,7 @@ object ExecutionContext { * the thread pool uses a target number of worker threads equal to the number of * [[https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#availableProcessors-- available processors]]. */ - implicit lazy val global: ExecutionContext = impl.ExecutionContextImpl.fromExecutor(null: Executor) + implicit lazy final val global: ExecutionContext = impl.ExecutionContextImpl.fromExecutor(null: Executor) } /** Creates an `ExecutionContext` from the given `ExecutorService`. diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 5a650e07aa3f..06271c4c7497 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -23,6 +23,8 @@ import scala.collection.BuildFrom import scala.collection.mutable.{Builder, ArrayBuffer} import scala.reflect.ClassTag +import scala.concurrent.ExecutionContext.parasitic + /** A `Future` represents a value which may or may not *currently* be available, * but will be available at some point, or an exception if that value could not be made available. * @@ -97,7 +99,6 @@ import scala.reflect.ClassTag * Completion of the Future must *happen-before* the invocation of the callback. */ trait Future[+T] extends Awaitable[T] { - import Future.{ InternalCallbackExecutor => internalExecutor } /* Callbacks */ @@ -159,7 +160,7 @@ trait Future[+T] extends Awaitable[T] { * @return a failed projection of this `Future`. * @group Transformations */ - def failed: Future[Throwable] = transform(Future.failedFun)(internalExecutor) + def failed: Future[Throwable] = transform(Future.failedFun)(parasitic) /* Monadic operations */ @@ -265,7 +266,7 @@ trait Future[+T] extends Awaitable[T] { * @tparam S the type of the returned `Future` * @group Transformations */ - def flatten[S](implicit ev: T <:< Future[S]): Future[S] = flatMap(ev)(internalExecutor) + def flatten[S](implicit ev: T <:< Future[S]): Future[S] = flatMap(ev)(parasitic) /** Creates a new future by filtering the value of the current future with a predicate. * @@ -396,7 +397,7 @@ trait Future[+T] extends Awaitable[T] { * @group Transformations */ def zip[U](that: Future[U]): Future[(T, U)] = - zipWith(that)(Future.zipWithTuple2Fun)(internalExecutor) + zipWith(that)(Future.zipWithTuple2Fun)(parasitic) /** Zips the values of `this` and `that` future using a function `f`, * and creates a new future holding the result. @@ -416,7 +417,7 @@ trait Future[+T] extends Awaitable[T] { * @group Transformations */ def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = - flatMap(r1 => that.map(r2 => f(r1, r2)))(if (executor.isInstanceOf[BatchingExecutor]) executor else internalExecutor) + flatMap(r1 => that.map(r2 => f(r1, r2)))(if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic) /** Creates a new future which holds the result of this future if it was completed successfully, or, if not, * the result of the `that` future if `that` is completed successfully. @@ -440,7 +441,7 @@ trait Future[+T] extends Awaitable[T] { def fallbackTo[U >: T](that: Future[U]): Future[U] = if (this eq that) this else { - implicit val ec = internalExecutor + implicit val ec = parasitic transformWith { t => if (t.isInstanceOf[Success[T]]) this @@ -457,7 +458,7 @@ trait Future[+T] extends Awaitable[T] { * @group Transformations */ def mapTo[S](implicit tag: ClassTag[S]): Future[S] = { - implicit val ec = internalExecutor + implicit val ec = parasitic val boxedClass = { val c = tag.runtimeClass if (c.isPrimitive) Future.toBoxed(c) else c @@ -692,7 +693,7 @@ object Future { final def sequence[A, CC[X] <: IterableOnce[X], To](in: CC[Future[A]])(implicit bf: BuildFrom[CC[Future[A]], A, To], executor: ExecutionContext): Future[To] = in.iterator.foldLeft(successful(bf.newBuilder(in))) { (fr, fa) => fr.zipWith(fa)(Future.addToBuilderFun) - }.map(_.result())(if (executor.isInstanceOf[BatchingExecutor]) executor else InternalCallbackExecutor) + }.map(_.result())(if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic) /** Asynchronously and non-blockingly returns a new `Future` to the result of the first future * in the list that is completed. This means no matter if it is completed as a success or as a failure. @@ -707,7 +708,7 @@ object Future { else { val p = Promise[T]() val firstCompleteHandler = new AtomicReference[Promise[T]](p) with (Try[T] => Unit) { - override def apply(v1: Try[T]): Unit = { + override final def apply(v1: Try[T]): Unit = { val r = getAndSet(null) if (r ne null) r tryComplete v1 // tryComplete is likely to be cheaper than complete @@ -729,13 +730,12 @@ object Future { */ final def find[T](futures: scala.collection.immutable.Iterable[Future[T]])(p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = { def searchNext(i: Iterator[Future[T]]): Future[Option[T]] = - if (!i.hasNext) successful[Option[T]](None) - else { - i.next().transformWith { - case Success(r) if p(r) => successful(Some(r)) - case other => searchNext(i) - } - } + if (!i.hasNext) successful(None) + else i.next().transformWith { + case Success(r) if p(r) => successful(Some(r)) + case _ => searchNext(i) + } + searchNext(futures.iterator) } @@ -783,10 +783,9 @@ object Future { */ @deprecated("use Future.foldLeft instead", "2.12.0") // not removed in 2.13, to facilitate 2.11/2.12/2.13 cross-building; remove in 2.14 (see scala/scala#6319) - def fold[T, R](futures: IterableOnce[Future[T]])(zero: R)(@deprecatedName("foldFun") op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { + def fold[T, R](futures: IterableOnce[Future[T]])(zero: R)(@deprecatedName("foldFun") op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = if (futures.isEmpty) successful(zero) else sequence(futures)(ArrayBuffer, executor).map(_.foldLeft(zero)(op)) - } /** Initiates a non-blocking, asynchronous, fold over the supplied futures * where the fold-zero is the result value of the first `Future` in the collection. @@ -844,34 +843,7 @@ object Future { final def traverse[A, B, M[X] <: IterableOnce[X]](in: M[A])(fn: A => Future[B])(implicit bf: BuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = in.iterator.foldLeft(successful(bf.newBuilder(in))) { (fr, a) => fr.zipWith(fn(a))(Future.addToBuilderFun) - }.map(_.result())(if (executor.isInstanceOf[BatchingExecutor]) executor else InternalCallbackExecutor) - - - // This is used to run callbacks which are internal - // to scala.concurrent; our own callbacks are only - // ever used to eventually run another callback, - // and that other callback will have its own - // executor because all callbacks come with - // an executor. Our own callbacks never block - // and have no "expected" exceptions. - // As a result, this executor can do nothing; - // some other executor will always come after - // it (and sometimes one will be before it), - // and those will be performing the "real" - // dispatch to code outside scala.concurrent. - // Because this exists, ExecutionContext.defaultExecutionContext - // isn't instantiated by Future internals, so - // if some code for some reason wants to avoid - // ever starting up the default context, it can do so - // by just not ever using it itself. scala.concurrent - // doesn't need to create defaultExecutionContext as - // a side effect. - private[concurrent] object InternalCallbackExecutor extends ExecutionContextExecutor with BatchingExecutor { - override final def submitForExecution(runnable: Runnable): Unit = runnable.run() - final override def execute(runnable: Runnable): Unit = submitSyncBatched(runnable) - override final def reportFailure(t: Throwable): Unit = - ExecutionContext.defaultReporter(new IllegalStateException("problem in scala.concurrent internal callback", t)) - } + }.map(_.result())(if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic) } @deprecated("Superseded by `scala.concurrent.Batchable`", "2.13.0") diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala index 9d70c60700f8..5c0cb1f6468c 100644 --- a/src/library/scala/concurrent/Promise.scala +++ b/src/library/scala/concurrent/Promise.scala @@ -66,7 +66,7 @@ trait Promise[T] { */ def completeWith(other: Future[T]): this.type = { if (other ne this.future) // this tryCompleteWith this doesn't make much sense - other.onComplete(this tryComplete _)(Future.InternalCallbackExecutor) + other.onComplete(this tryComplete _)(ExecutionContext.parasitic) this } diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 45d3b1a21070..f1b314e5927f 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -12,7 +12,6 @@ package scala.concurrent.impl import scala.concurrent.{ Batchable, ExecutionContext, CanAwait, TimeoutException, ExecutionException, Future, OnCompleteRunnable } -import Future.InternalCallbackExecutor import scala.concurrent.duration.Duration import scala.annotation.{ tailrec, switch } import scala.util.control.{ NonFatal, ControlThrowable } @@ -200,7 +199,7 @@ private[concurrent] object Promise { if (atMost <= Duration.Zero) null else { val l = new CompletionLatch[T]() - onComplete(l)(InternalCallbackExecutor) + onComplete(l)(ExecutionContext.parasitic) if (atMost.isFinite) l.tryAcquireSharedNanos(1, atMost.toNanos) @@ -261,7 +260,7 @@ private[concurrent] object Promise { if (!state.isInstanceOf[Try[T]]) { val resolved = if (other.isInstanceOf[DefaultPromise[T]]) other.asInstanceOf[DefaultPromise[T]].value0 else other.value.orNull if (resolved ne null) tryComplete0(state, resolved) - else other.onComplete(this)(InternalCallbackExecutor) + else other.onComplete(this)(ExecutionContext.parasitic) } } @@ -365,7 +364,7 @@ private[concurrent] object Promise { override final def toString: String = "ManyCallbacks" } - private[this] final val Noop = new Transformation[Nothing, Nothing](Xform_noop, null, InternalCallbackExecutor) + private[this] final val Noop = new Transformation[Nothing, Nothing](Xform_noop, null, ExecutionContext.parasitic) /** * A Transformation[F, T] receives an F (it is a Callback[F]) and applies a transformation function to that F, diff --git a/test/benchmarks/src/main/scala/scala/concurrent/FutureBenchmark.scala b/test/benchmarks/src/main/scala/scala/concurrent/FutureBenchmark.scala index c54187940832..5cf7d3eba9c0 100644 --- a/test/benchmarks/src/main/scala/scala/concurrent/FutureBenchmark.scala +++ b/test/benchmarks/src/main/scala/scala/concurrent/FutureBenchmark.scala @@ -15,7 +15,7 @@ import scala.annotation.tailrec @Fork(value = 1, jvmArgsAppend = Array("-Xmx1G", "-Xms1G", "-server", "-XX:+AggressiveOpts", "-XX:+UseCompressedOops", "-XX:+AlwaysPreTouch", "-XX:+UseCondCardMark")) @Threads(value = 1) abstract class AbstractBaseFutureBenchmark { - // fjp = ForkJoinPool, fix = FixedThreadPool, fie = FutureInternalExecutor, gbl = GlobalEC + // fjp = ForkJoinPool, fix = FixedThreadPool, fie = parasiticEC, gbl = GlobalEC @Param(Array[String]("fjp", "fix", "fie", "gbl")) final var pool: String = _ @@ -65,7 +65,7 @@ abstract class AbstractBaseFutureBenchmark { System.setProperty("scala.concurrent.context.maxThreads", threads.toString) ExecutionContext.global case "fie" => - scala.concurrent.Future.InternalCallbackExecutor + scala.concurrent.ExecutionContext.parasitic } } diff --git a/test/files/jvm/future-spec.check b/test/files/jvm/future-spec.check index 160ff427a80f..9465f0abd261 100644 --- a/test/files/jvm/future-spec.check +++ b/test/files/jvm/future-spec.check @@ -1 +1,3 @@ warning: there were 5 deprecation warnings (since 2.13.0); re-run with -deprecation for details +FutureTests$$anon$2: do not rethrow +FutureTests$$anon$3: expected diff --git a/test/files/jvm/future-spec/FutureTests.scala b/test/files/jvm/future-spec/FutureTests.scala index 43efbd34ab7a..cc31ce463bea 100644 --- a/test/files/jvm/future-spec/FutureTests.scala +++ b/test/files/jvm/future-spec/FutureTests.scala @@ -4,6 +4,7 @@ import scala.concurrent.duration.Duration.Inf import scala.collection._ import scala.runtime.NonLocalReturnControl import scala.util.{Try,Success,Failure} +import scala.util.control.NoStackTrace import java.util.concurrent.ForkJoinPool class FutureTests extends MinimalScalaTest { @@ -192,6 +193,39 @@ class FutureTests extends MinimalScalaTest { } } + "The parasitic ExecutionContext" should { + "run Runnables on the calling thread" in { + val t = Thread.currentThread + var rt: Thread = null + ExecutionContext.parasitic.execute(() => rt = Thread.currentThread) + t mustBe rt + } + + "not rethrow non-fatal exceptions" in { + ExecutionContext.parasitic.execute(() => throw new RuntimeException("do not rethrow") with NoStackTrace) + } + + "rethrow fatal exceptions" in { + val oome = new OutOfMemoryError("test") + intercept[OutOfMemoryError] { + ExecutionContext.parasitic.execute(() => throw oome) + } mustBe oome + } + + "continue after non-fatal exceptions" in { + var value = "" + ExecutionContext.parasitic.execute(() => throw new RuntimeException("expected") with NoStackTrace) + ExecutionContext.parasitic.execute(() => value = "test") + value mustBe "test" + } + + "not blow the stack" in { + def recur(i: Int): Unit = if (i > 0) ExecutionContext.parasitic.execute(() => recur(i - 1)) else () + + recur(100000) + } + } + "The default ExecutionContext" should { import ExecutionContext.Implicits._ "report uncaught exceptions" in { From bc11322da0933d9f7ef5acfda607ced932efcaf3 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 3 Mar 2019 00:03:55 +0100 Subject: [PATCH 2/2] Switches the implementation order of ExecutionContext.global and Implicits.global --- .../scala/concurrent/ExecutionContext.scala | 21 +++++++++++-------- .../scala/concurrent/impl/Promise.scala | 2 +- test/files/neg/t8849.check | 2 +- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index aaef0c2b7e26..c1612461317e 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -139,17 +139,22 @@ object ExecutionContext { * * @return the global `ExecutionContext` */ - final def global: ExecutionContextExecutor = Implicits.global.asInstanceOf[ExecutionContextExecutor] + final lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor) /** - * This `ExecutionContext` steals execution time from other threads, by having its + * WARNING: Only ever execute logic which will quickly return control to the caller. + * + * This `ExecutionContext` steals execution time from other threads by having its * `Runnable`s run on the `Thread` which calls `execute` and then yielding back control - * to the caller after *all* its `Runnables` have been executed. + * to the caller after *all* its `Runnable`s have been executed. * Nested invocations of `execute` will be trampolined to prevent uncontrolled stack space growth. * - * WARNING: Do *not* call any blocking code in the `Runnable`s submitted to this `ExecutionContext` + * When using `parasitic` with abstractions such as `Future` it will in many cases be non-deterministic + * as to which `Thread` will be executing the logic, as it depends on when/if that `Future` is completed. + * + * Do *not* call any blocking code in the `Runnable`s submitted to this `ExecutionContext` * as it will prevent progress by other enqueued `Runnable`s and the calling `Thread`. - * Only execute logic which will quickly return control to the caller. + * * Symptoms of misuse of this `ExecutionContext` include, but are not limited to, deadlocks * and severe performance problems. * @@ -170,7 +175,7 @@ object ExecutionContext { * the thread pool uses a target number of worker threads equal to the number of * [[https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#availableProcessors-- available processors]]. */ - implicit lazy final val global: ExecutionContext = impl.ExecutionContextImpl.fromExecutor(null: Executor) + implicit final def global: ExecutionContext = ExecutionContext.global } /** Creates an `ExecutionContext` from the given `ExecutorService`. @@ -217,7 +222,5 @@ object ExecutionContext { * * @return the function for error reporting */ - def defaultReporter: Throwable => Unit = _.printStackTrace() + final val defaultReporter: Throwable => Unit = _.printStackTrace() } - - diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index f1b314e5927f..07ce2e4636a9 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -447,7 +447,7 @@ private[concurrent] object Promise { fun(v) null case Xform_recover => - resolve(v.recover(fun.asInstanceOf[PartialFunction[Throwable, F]])) //recover F=:=T + if (v.isInstanceOf[Failure[F]]) resolve(v.recover(fun.asInstanceOf[PartialFunction[Throwable, F]])) else v //recover F=:=T case Xform_recoverWith => if (v.isInstanceOf[Failure[F]]) { val f = fun.asInstanceOf[PartialFunction[Throwable, Future[T]]].applyOrElse(v.asInstanceOf[Failure[F]].exception, Future.recoverWithFailed) diff --git a/test/files/neg/t8849.check b/test/files/neg/t8849.check index 1d5b4164b205..eefff110b890 100644 --- a/test/files/neg/t8849.check +++ b/test/files/neg/t8849.check @@ -1,5 +1,5 @@ t8849.scala:8: error: ambiguous implicit values: - both lazy value global in object Implicits of type => scala.concurrent.ExecutionContext + both method global in object Implicits of type => scala.concurrent.ExecutionContext and value dummy of type scala.concurrent.ExecutionContext match expected type scala.concurrent.ExecutionContext require(implicitly[ExecutionContext] eq dummy)