Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add synchronous ("parasitic") ExecutionContext #7784

Merged
merged 2 commits into from Mar 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 29 additions & 6 deletions src/library/scala/concurrent/ExecutionContext.scala
Expand Up @@ -139,9 +139,34 @@ object ExecutionContext {
*
* @return the global `ExecutionContext`
*/
def global: ExecutionContextExecutor = Implicits.global.asInstanceOf[ExecutionContextExecutor]
final lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)

object Implicits {
/**
* WARNING: Only ever execute logic which will quickly return control to the caller.
*
* This `ExecutionContext` steals execution time from other threads by having its
* `Runnable`s run on the `Thread` which calls `execute` and then yielding back control
* to the caller after *all* its `Runnable`s have been executed.
* Nested invocations of `execute` will be trampolined to prevent uncontrolled stack space growth.
*
* When using `parasitic` with abstractions such as `Future` it will in many cases be non-deterministic
* as to which `Thread` will be executing the logic, as it depends on when/if that `Future` is completed.
*
* Do *not* call any blocking code in the `Runnable`s submitted to this `ExecutionContext`
* as it will prevent progress by other enqueued `Runnable`s and the calling `Thread`.
*
* Symptoms of misuse of this `ExecutionContext` include, but are not limited to, deadlocks
* and severe performance problems.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jroper @NthPortal Is this an adequate explanation?

*
* Any `NonFatal` or `InterruptedException`s will be reported to the `defaultReporter`.
*/
final object parasitic extends ExecutionContextExecutor with BatchingExecutor {
override final def submitForExecution(runnable: Runnable): Unit = runnable.run()
override final def execute(runnable: Runnable): Unit = submitSyncBatched(runnable)
override final def reportFailure(t: Throwable): Unit = defaultReporter(t)
}

final object Implicits {
/**
* The implicit global `ExecutionContext`. Import `global` when you want to provide the global
* `ExecutionContext` implicitly.
Expand All @@ -150,7 +175,7 @@ object ExecutionContext {
* the thread pool uses a target number of worker threads equal to the number of
* [[https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#availableProcessors-- available processors]].
*/
implicit lazy val global: ExecutionContext = impl.ExecutionContextImpl.fromExecutor(null: Executor)
implicit final def global: ExecutionContext = ExecutionContext.global
}

/** Creates an `ExecutionContext` from the given `ExecutorService`.
Expand Down Expand Up @@ -197,7 +222,5 @@ object ExecutionContext {
*
* @return the function for error reporting
*/
def defaultReporter: Throwable => Unit = _.printStackTrace()
final val defaultReporter: Throwable => Unit = _.printStackTrace()
}


64 changes: 18 additions & 46 deletions src/library/scala/concurrent/Future.scala
Expand Up @@ -23,6 +23,8 @@ import scala.collection.BuildFrom
import scala.collection.mutable.{Builder, ArrayBuffer}
import scala.reflect.ClassTag

import scala.concurrent.ExecutionContext.parasitic

/** A `Future` represents a value which may or may not *currently* be available,
* but will be available at some point, or an exception if that value could not be made available.
*
Expand Down Expand Up @@ -97,7 +99,6 @@ import scala.reflect.ClassTag
* Completion of the Future must *happen-before* the invocation of the callback.
*/
trait Future[+T] extends Awaitable[T] {
import Future.{ InternalCallbackExecutor => internalExecutor }

/* Callbacks */

Expand Down Expand Up @@ -159,7 +160,7 @@ trait Future[+T] extends Awaitable[T] {
* @return a failed projection of this `Future`.
* @group Transformations
*/
def failed: Future[Throwable] = transform(Future.failedFun)(internalExecutor)
def failed: Future[Throwable] = transform(Future.failedFun)(parasitic)


/* Monadic operations */
Expand Down Expand Up @@ -265,7 +266,7 @@ trait Future[+T] extends Awaitable[T] {
* @tparam S the type of the returned `Future`
* @group Transformations
*/
def flatten[S](implicit ev: T <:< Future[S]): Future[S] = flatMap(ev)(internalExecutor)
def flatten[S](implicit ev: T <:< Future[S]): Future[S] = flatMap(ev)(parasitic)

/** Creates a new future by filtering the value of the current future with a predicate.
*
Expand Down Expand Up @@ -396,7 +397,7 @@ trait Future[+T] extends Awaitable[T] {
* @group Transformations
*/
def zip[U](that: Future[U]): Future[(T, U)] =
zipWith(that)(Future.zipWithTuple2Fun)(internalExecutor)
zipWith(that)(Future.zipWithTuple2Fun)(parasitic)

/** Zips the values of `this` and `that` future using a function `f`,
* and creates a new future holding the result.
Expand All @@ -416,7 +417,7 @@ trait Future[+T] extends Awaitable[T] {
* @group Transformations
*/
def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] =
flatMap(r1 => that.map(r2 => f(r1, r2)))(if (executor.isInstanceOf[BatchingExecutor]) executor else internalExecutor)
flatMap(r1 => that.map(r2 => f(r1, r2)))(if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic)

/** Creates a new future which holds the result of this future if it was completed successfully, or, if not,
* the result of the `that` future if `that` is completed successfully.
Expand All @@ -440,7 +441,7 @@ trait Future[+T] extends Awaitable[T] {
def fallbackTo[U >: T](that: Future[U]): Future[U] =
if (this eq that) this
else {
implicit val ec = internalExecutor
implicit val ec = parasitic
transformWith {
t =>
if (t.isInstanceOf[Success[T]]) this
Expand All @@ -457,7 +458,7 @@ trait Future[+T] extends Awaitable[T] {
* @group Transformations
*/
def mapTo[S](implicit tag: ClassTag[S]): Future[S] = {
implicit val ec = internalExecutor
implicit val ec = parasitic
val boxedClass = {
val c = tag.runtimeClass
if (c.isPrimitive) Future.toBoxed(c) else c
Expand Down Expand Up @@ -692,7 +693,7 @@ object Future {
final def sequence[A, CC[X] <: IterableOnce[X], To](in: CC[Future[A]])(implicit bf: BuildFrom[CC[Future[A]], A, To], executor: ExecutionContext): Future[To] =
in.iterator.foldLeft(successful(bf.newBuilder(in))) {
(fr, fa) => fr.zipWith(fa)(Future.addToBuilderFun)
}.map(_.result())(if (executor.isInstanceOf[BatchingExecutor]) executor else InternalCallbackExecutor)
}.map(_.result())(if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic)

/** Asynchronously and non-blockingly returns a new `Future` to the result of the first future
* in the list that is completed. This means no matter if it is completed as a success or as a failure.
Expand All @@ -707,7 +708,7 @@ object Future {
else {
val p = Promise[T]()
val firstCompleteHandler = new AtomicReference[Promise[T]](p) with (Try[T] => Unit) {
override def apply(v1: Try[T]): Unit = {
override final def apply(v1: Try[T]): Unit = {
val r = getAndSet(null)
if (r ne null)
r tryComplete v1 // tryComplete is likely to be cheaper than complete
Expand All @@ -729,13 +730,12 @@ object Future {
*/
final def find[T](futures: scala.collection.immutable.Iterable[Future[T]])(p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
def searchNext(i: Iterator[Future[T]]): Future[Option[T]] =
if (!i.hasNext) successful[Option[T]](None)
else {
i.next().transformWith {
case Success(r) if p(r) => successful(Some(r))
case other => searchNext(i)
}
}
if (!i.hasNext) successful(None)
else i.next().transformWith {
case Success(r) if p(r) => successful(Some(r))
case _ => searchNext(i)
}

searchNext(futures.iterator)
}

Expand Down Expand Up @@ -783,10 +783,9 @@ object Future {
*/
@deprecated("use Future.foldLeft instead", "2.12.0")
// not removed in 2.13, to facilitate 2.11/2.12/2.13 cross-building; remove in 2.14 (see scala/scala#6319)
def fold[T, R](futures: IterableOnce[Future[T]])(zero: R)(@deprecatedName("foldFun") op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
def fold[T, R](futures: IterableOnce[Future[T]])(zero: R)(@deprecatedName("foldFun") op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] =
if (futures.isEmpty) successful(zero)
else sequence(futures)(ArrayBuffer, executor).map(_.foldLeft(zero)(op))
}

/** Initiates a non-blocking, asynchronous, fold over the supplied futures
* where the fold-zero is the result value of the first `Future` in the collection.
Expand Down Expand Up @@ -844,34 +843,7 @@ object Future {
final def traverse[A, B, M[X] <: IterableOnce[X]](in: M[A])(fn: A => Future[B])(implicit bf: BuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.iterator.foldLeft(successful(bf.newBuilder(in))) {
(fr, a) => fr.zipWith(fn(a))(Future.addToBuilderFun)
}.map(_.result())(if (executor.isInstanceOf[BatchingExecutor]) executor else InternalCallbackExecutor)


// This is used to run callbacks which are internal
// to scala.concurrent; our own callbacks are only
// ever used to eventually run another callback,
// and that other callback will have its own
// executor because all callbacks come with
// an executor. Our own callbacks never block
// and have no "expected" exceptions.
// As a result, this executor can do nothing;
// some other executor will always come after
// it (and sometimes one will be before it),
// and those will be performing the "real"
// dispatch to code outside scala.concurrent.
// Because this exists, ExecutionContext.defaultExecutionContext
// isn't instantiated by Future internals, so
// if some code for some reason wants to avoid
// ever starting up the default context, it can do so
// by just not ever using it itself. scala.concurrent
// doesn't need to create defaultExecutionContext as
// a side effect.
private[concurrent] object InternalCallbackExecutor extends ExecutionContextExecutor with BatchingExecutor {
override final def submitForExecution(runnable: Runnable): Unit = runnable.run()
final override def execute(runnable: Runnable): Unit = submitSyncBatched(runnable)
override final def reportFailure(t: Throwable): Unit =
ExecutionContext.defaultReporter(new IllegalStateException("problem in scala.concurrent internal callback", t))
NthPortal marked this conversation as resolved.
Show resolved Hide resolved
}
}.map(_.result())(if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic)
}

@deprecated("Superseded by `scala.concurrent.Batchable`", "2.13.0")
Expand Down
2 changes: 1 addition & 1 deletion src/library/scala/concurrent/Promise.scala
Expand Up @@ -66,7 +66,7 @@ trait Promise[T] {
*/
def completeWith(other: Future[T]): this.type = {
if (other ne this.future) // this tryCompleteWith this doesn't make much sense
other.onComplete(this tryComplete _)(Future.InternalCallbackExecutor)
other.onComplete(this tryComplete _)(ExecutionContext.parasitic)

this
}
Expand Down
9 changes: 4 additions & 5 deletions src/library/scala/concurrent/impl/Promise.scala
Expand Up @@ -12,7 +12,6 @@

package scala.concurrent.impl
import scala.concurrent.{ Batchable, ExecutionContext, CanAwait, TimeoutException, ExecutionException, Future, OnCompleteRunnable }
import Future.InternalCallbackExecutor
import scala.concurrent.duration.Duration
import scala.annotation.{ tailrec, switch }
import scala.util.control.{ NonFatal, ControlThrowable }
Expand Down Expand Up @@ -200,7 +199,7 @@ private[concurrent] object Promise {
if (atMost <= Duration.Zero) null
else {
val l = new CompletionLatch[T]()
onComplete(l)(InternalCallbackExecutor)
onComplete(l)(ExecutionContext.parasitic)

if (atMost.isFinite)
l.tryAcquireSharedNanos(1, atMost.toNanos)
Expand Down Expand Up @@ -261,7 +260,7 @@ private[concurrent] object Promise {
if (!state.isInstanceOf[Try[T]]) {
val resolved = if (other.isInstanceOf[DefaultPromise[T]]) other.asInstanceOf[DefaultPromise[T]].value0 else other.value.orNull
if (resolved ne null) tryComplete0(state, resolved)
else other.onComplete(this)(InternalCallbackExecutor)
else other.onComplete(this)(ExecutionContext.parasitic)
}
}

Expand Down Expand Up @@ -365,7 +364,7 @@ private[concurrent] object Promise {
override final def toString: String = "ManyCallbacks"
}

private[this] final val Noop = new Transformation[Nothing, Nothing](Xform_noop, null, InternalCallbackExecutor)
private[this] final val Noop = new Transformation[Nothing, Nothing](Xform_noop, null, ExecutionContext.parasitic)

/**
* A Transformation[F, T] receives an F (it is a Callback[F]) and applies a transformation function to that F,
Expand Down Expand Up @@ -448,7 +447,7 @@ private[concurrent] object Promise {
fun(v)
null
case Xform_recover =>
resolve(v.recover(fun.asInstanceOf[PartialFunction[Throwable, F]])) //recover F=:=T
if (v.isInstanceOf[Failure[F]]) resolve(v.recover(fun.asInstanceOf[PartialFunction[Throwable, F]])) else v //recover F=:=T
case Xform_recoverWith =>
if (v.isInstanceOf[Failure[F]]) {
val f = fun.asInstanceOf[PartialFunction[Throwable, Future[T]]].applyOrElse(v.asInstanceOf[Failure[F]].exception, Future.recoverWithFailed)
Expand Down
Expand Up @@ -15,7 +15,7 @@ import scala.annotation.tailrec
@Fork(value = 1, jvmArgsAppend = Array("-Xmx1G", "-Xms1G", "-server", "-XX:+AggressiveOpts", "-XX:+UseCompressedOops", "-XX:+AlwaysPreTouch", "-XX:+UseCondCardMark"))
@Threads(value = 1)
abstract class AbstractBaseFutureBenchmark {
// fjp = ForkJoinPool, fix = FixedThreadPool, fie = FutureInternalExecutor, gbl = GlobalEC
// fjp = ForkJoinPool, fix = FixedThreadPool, fie = parasiticEC, gbl = GlobalEC
@Param(Array[String]("fjp", "fix", "fie", "gbl"))
final var pool: String = _

Expand Down Expand Up @@ -65,7 +65,7 @@ abstract class AbstractBaseFutureBenchmark {
System.setProperty("scala.concurrent.context.maxThreads", threads.toString)
ExecutionContext.global
case "fie" =>
scala.concurrent.Future.InternalCallbackExecutor
scala.concurrent.ExecutionContext.parasitic
}
}

Expand Down
2 changes: 2 additions & 0 deletions test/files/jvm/future-spec.check
@@ -1 +1,3 @@
warning: there were 5 deprecation warnings (since 2.13.0); re-run with -deprecation for details
FutureTests$$anon$2: do not rethrow
FutureTests$$anon$3: expected
34 changes: 34 additions & 0 deletions test/files/jvm/future-spec/FutureTests.scala
Expand Up @@ -4,6 +4,7 @@ import scala.concurrent.duration.Duration.Inf
import scala.collection._
import scala.runtime.NonLocalReturnControl
import scala.util.{Try,Success,Failure}
import scala.util.control.NoStackTrace
import java.util.concurrent.ForkJoinPool

class FutureTests extends MinimalScalaTest {
Expand Down Expand Up @@ -192,6 +193,39 @@ class FutureTests extends MinimalScalaTest {
}
}

"The parasitic ExecutionContext" should {
"run Runnables on the calling thread" in {
val t = Thread.currentThread
var rt: Thread = null
ExecutionContext.parasitic.execute(() => rt = Thread.currentThread)
t mustBe rt
}

"not rethrow non-fatal exceptions" in {
ExecutionContext.parasitic.execute(() => throw new RuntimeException("do not rethrow") with NoStackTrace)
}

"rethrow fatal exceptions" in {
val oome = new OutOfMemoryError("test")
intercept[OutOfMemoryError] {
ExecutionContext.parasitic.execute(() => throw oome)
} mustBe oome
}

"continue after non-fatal exceptions" in {
var value = ""
ExecutionContext.parasitic.execute(() => throw new RuntimeException("expected") with NoStackTrace)
ExecutionContext.parasitic.execute(() => value = "test")
value mustBe "test"
}

"not blow the stack" in {
def recur(i: Int): Unit = if (i > 0) ExecutionContext.parasitic.execute(() => recur(i - 1)) else ()

recur(100000)
}
}

"The default ExecutionContext" should {
import ExecutionContext.Implicits._
"report uncaught exceptions" in {
Expand Down
2 changes: 1 addition & 1 deletion test/files/neg/t8849.check
@@ -1,5 +1,5 @@
t8849.scala:8: error: ambiguous implicit values:
both lazy value global in object Implicits of type => scala.concurrent.ExecutionContext
both method global in object Implicits of type => scala.concurrent.ExecutionContext
and value dummy of type scala.concurrent.ExecutionContext
match expected type scala.concurrent.ExecutionContext
require(implicitly[ExecutionContext] eq dummy)
Expand Down