Permalink
Browse files

Move implicit ExecutionContext to be determined by lexical scope

Port of a pull request originally submitted by @havocp.

- declare the invariant that all app callbacks have an
  associated ExecutionContext provided at the place
  the callback is passed to a method on Future
- always run callbacks in their associated EC
- since all callbacks have their own EC, Promise
  does not need one
- "internal" callbacks don't need to defer execution either
  since we know the ultimate app callback will do so,
  therefore we can use an immediate executor for these
  • Loading branch information...
1 parent e490b02 commit 1dfce90246f7d334e34d110afb8b1517180995fc @phaller phaller committed May 22, 2012
@@ -82,9 +82,29 @@ import language.higherKinds
* {{{
* f flatMap { (x: Int) => g map { (y: Int) => x + y } }
* }}}
+ *
+ * @define callbackInContext
+ * The provided callback always runs in the provided implicit
+ *`ExecutionContext`, though there is no guarantee that the
+ * `execute()` method on the `ExecutionContext` will be called once
+ * per callback or that `execute()` will be called in the current
+ * thread. That is, the implementation may run multiple callbacks
+ * in a batch within a single `execute()` and it may run
+ * `execute()` either immediately or asynchronously.
*/
trait Future[+T] extends Awaitable[T] {
+ // The executor within the lexical scope
+ // of the Future trait. Note that this will
+ // (modulo bugs) _never_ execute a callback
+ // other than those below in this same file.
+ // As a nice side benefit, having this implicit
+ // here forces an ambiguity in those methods
+ // that also have an executor parameter, which
+ // keeps us from accidentally forgetting to use
+ // the executor parameter.
+ private implicit def internalExecutor: ExecutionContext = Future.InternalCallbackExecutor
+
/* Callbacks */
/** When this future is completed successfully (i.e. with a value),
@@ -95,11 +115,12 @@ trait Future[+T] extends Awaitable[T] {
* this will either be applied immediately or be scheduled asynchronously.
*
* $multipleCallbacks
+ * $callbackInContext
*/
- def onSuccess[U](pf: PartialFunction[T, U]): Unit = onComplete {
+ def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = onComplete {
case Right(v) if pf isDefinedAt v => pf(v)
case _ =>
- }
+ }(executor)
/** When this future is completed with a failure (i.e. with a throwable),
* apply the provided callback to the throwable.
@@ -112,11 +133,12 @@ trait Future[+T] extends Awaitable[T] {
* Will not be called in case that the future is completed with a value.
*
* $multipleCallbacks
+ * $callbackInContext
*/
- def onFailure[U](callback: PartialFunction[Throwable, U]): Unit = onComplete {
+ def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete {
case Left(t) if (isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t)
case _ =>
- }
+ }(executor)
/** When this future is completed, either through an exception, or a value,
* apply the provided function.
@@ -125,8 +147,9 @@ trait Future[+T] extends Awaitable[T] {
* this will either be applied immediately or be scheduled asynchronously.
*
* $multipleCallbacks
+ * $callbackInContext
*/
- def onComplete[U](func: Either[Throwable, T] => U): Unit
+ def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit
/* Miscellaneous */
@@ -182,10 +205,10 @@ trait Future[+T] extends Awaitable[T] {
*
* Will not be called if the future fails.
*/
- def foreach[U](f: T => U): Unit = onComplete {
+ def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = onComplete {
case Right(r) => f(r)
case _ => // do nothing
- }
+ }(executor)
/** Creates a new future by applying the 's' function to the successful result of
* this future, or the 'f' function to the failed result. If there is any non-fatal
@@ -198,7 +221,7 @@ trait Future[+T] extends Awaitable[T] {
* the returned future
* @return a future that will be completed with the transformed value
*/
- def transform[S](s: T => S, f: Throwable => Throwable): Future[S] = {
+ def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] = {
val p = Promise[S]()
onComplete {
@@ -211,7 +234,7 @@ trait Future[+T] extends Awaitable[T] {
} catch {
case NonFatal(t) => p failure t
}
- }
+ }(executor)
p.future
}
@@ -222,7 +245,7 @@ trait Future[+T] extends Awaitable[T] {
*
* $forComprehensionExamples
*/
- def map[S](f: T => S): Future[S] = { // transform(f, identity)
+ def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { // transform(f, identity)
val p = Promise[S]()
onComplete {
@@ -235,7 +258,7 @@ trait Future[+T] extends Awaitable[T] {
} catch {
case NonFatal(t) => p failure t
}
- }
+ }(executor)
p.future
}
@@ -247,21 +270,21 @@ trait Future[+T] extends Awaitable[T] {
*
* $forComprehensionExamples
*/
- def flatMap[S](f: T => Future[S]): Future[S] = {
+ def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = {
val p = Promise[S]()
onComplete {
case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]]
case Right(v) =>
try {
- f(v) onComplete {
+ f(v).onComplete({
case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]]
case Right(v) => p success v
- }
+ })(internalExecutor)
} catch {
case NonFatal(t) => p failure t
}
- }
+ }(executor)
p.future
}
@@ -282,7 +305,7 @@ trait Future[+T] extends Awaitable[T] {
* await(h, 0) // throw a NoSuchElementException
* }}}
*/
- def filter(pred: T => Boolean): Future[T] = {
+ def filter(pred: T => Boolean)(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
onComplete {
@@ -294,14 +317,14 @@ trait Future[+T] extends Awaitable[T] {
} catch {
case NonFatal(t) => p failure t
}
- }
+ }(executor)
p.future
}
/** Used by for-comprehensions.
*/
- final def withFilter(p: T => Boolean): Future[T] = filter(p)
+ final def withFilter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = filter(p)(executor)
// final def withFilter(p: T => Boolean) = new FutureWithFilter[T](this, p)
// final class FutureWithFilter[+S](self: Future[S], p: S => Boolean) {
@@ -331,7 +354,7 @@ trait Future[+T] extends Awaitable[T] {
* await(h, 0) // throw a NoSuchElementException
* }}}
*/
- def collect[S](pf: PartialFunction[T, S]): Future[S] = {
+ def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] = {
val p = Promise[S]()
onComplete {
@@ -343,7 +366,7 @@ trait Future[+T] extends Awaitable[T] {
} catch {
case NonFatal(t) => p failure t
}
- }
+ }(executor)
p.future
}
@@ -360,7 +383,7 @@ trait Future[+T] extends Awaitable[T] {
* future (6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3
* }}}
*/
- def recover[U >: T](pf: PartialFunction[Throwable, U]): Future[U] = {
+ def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = {
val p = Promise[U]()
onComplete {
@@ -370,7 +393,7 @@ trait Future[+T] extends Awaitable[T] {
case NonFatal(t) => p failure t
}
case otherwise => p complete otherwise
- }
+ }(executor)
p.future
}
@@ -388,7 +411,7 @@ trait Future[+T] extends Awaitable[T] {
* future (6 / 0) recoverWith { case e: ArithmeticException => f } // result: Int.MaxValue
* }}}
*/
- def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = {
+ def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = {
val p = Promise[U]()
onComplete {
@@ -399,7 +422,7 @@ trait Future[+T] extends Awaitable[T] {
case NonFatal(t) => p failure t
}
case otherwise => p complete otherwise
- }
+ }(executor)
p.future
}
@@ -498,12 +521,12 @@ trait Future[+T] extends Awaitable[T] {
* }
* }}}
*/
- def andThen[U](pf: PartialFunction[Either[Throwable, T], U]): Future[T] = {
+ def andThen[U](pf: PartialFunction[Either[Throwable, T], U])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
onComplete {
case r => try if (pf isDefinedAt r) pf(r) finally p complete r
- }
+ }(executor)
p.future
}
@@ -656,6 +679,33 @@ object Future {
for (r <- fr; b <- fb) yield (r += b)
}.map(_.result)
+ // This is used to run callbacks which are internal
+ // to scala.concurrent; our own callbacks are only
+ // ever used to eventually run another callback,
+ // and that other callback will have its own
+ // executor because all callbacks come with
+ // an executor. Our own callbacks never block
+ // and have no "expected" exceptions.
+ // As a result, this executor can do nothing;
+ // some other executor will always come after
+ // it (and sometimes one will be before it),
+ // and those will be performing the "real"
+ // dispatch to code outside scala.concurrent.
+ // Because this exists, ExecutionContext.defaultExecutionContext
+ // isn't instantiated by Future internals, so
+ // if some code for some reason wants to avoid
+ // ever starting up the default context, it can do so
+ // by just not ever using it itself. scala.concurrent
+ // doesn't need to create defaultExecutionContext as
+ // a side effect.
+ private[concurrent] object InternalCallbackExecutor extends ExecutionContext {
+ def execute(runnable: Runnable): Unit =
+ runnable.run()
+ def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
+ throw new IllegalStateException("bug in scala.concurrent, called blocking() from internal callback")
+ def reportFailure(t: Throwable): Unit =
+ throw new IllegalStateException("problem in scala.concurrent internal callback", t)
+ }
}
@@ -25,6 +25,11 @@ package scala.concurrent
*/
trait Promise[T] {
+ // used for internal callbacks defined in
+ // the lexical scope of this trait;
+ // _never_ for application callbacks.
+ private implicit def internalExecutor: ExecutionContext = Future.InternalCallbackExecutor
+
/** Future containing the value of this promise.
*/
def future: Future[T]
@@ -106,26 +111,23 @@ object Promise {
/** Creates a promise object which can be completed with a value.
*
* @tparam T the type of the value in the promise
- * @param executor the execution context on which the promise is created on
* @return the newly created `Promise` object
*/
- def apply[T]()(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.DefaultPromise[T]()
+ def apply[T](): Promise[T] = new impl.Promise.DefaultPromise[T]()
/** Creates an already completed Promise with the specified exception.
*
* @tparam T the type of the value in the promise
- * @param executor the execution context on which the promise is created on
* @return the newly created `Promise` object
*/
- def failed[T](exception: Throwable)(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.KeptPromise[T](Left(exception))
+ def failed[T](exception: Throwable): Promise[T] = new impl.Promise.KeptPromise[T](Left(exception))
/** Creates an already completed Promise with the specified result.
*
* @tparam T the type of the value in the promise
- * @param executor the execution context on which the promise is created on
* @return the newly created `Promise` object
*/
- def successful[T](result: T)(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.KeptPromise[T](Right(result))
+ def successful[T](result: T): Promise[T] = new impl.Promise.KeptPromise[T](Right(result))
}
@@ -17,8 +17,6 @@ import scala.collection.mutable.Stack
private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] {
- implicit def executor: ExecutionContext
-
}
private[concurrent] object Future {
@@ -42,7 +42,7 @@ object Promise {
/** Default promise implementation.
*/
- class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { self =>
+ class DefaultPromise[T] extends AbstractPromise with Promise[T] { self =>
updateState(null, Nil) // Start at "No callbacks" //FIXME switch to Unsafe instead of ARFU
protected final def tryAwait(atMost: Duration): Boolean = {
@@ -108,21 +108,26 @@ object Promise {
}) match {
case null => false
case cs if cs.isEmpty => true
- case cs => Future.dispatchFuture(executor, () => cs.foreach(f => notifyCompleted(f, resolved))); true
+ // this assumes that f(resolved) will go via dispatchFuture
+ // and notifyCompleted (see onComplete below)
+ case cs => cs.foreach(f => f(resolved)); true
}
}
- def onComplete[U](func: Either[Throwable, T] => U): Unit = {
+ def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
+ val bound: Either[Throwable, T] => Unit = (either: Either[Throwable, T]) =>
+ Future.dispatchFuture(executor, () => notifyCompleted(func, either))
+
@tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
def dispatchOrAddCallback(): Unit =
getState match {
- case r: Either[_, _] => Future.dispatchFuture(executor, () => notifyCompleted(func, r.asInstanceOf[Either[Throwable, T]]))
- case listeners: List[_] => if (updateState(listeners, func :: listeners)) () else dispatchOrAddCallback()
+ case r: Either[_, _] => bound(r.asInstanceOf[Either[Throwable, T]])
+ case listeners: List[_] => if (updateState(listeners, bound :: listeners)) () else dispatchOrAddCallback()
}
dispatchOrAddCallback()
}
- private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) {
+ private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T])(implicit executor: ExecutionContext) {
try {
func(result)
} catch {
@@ -135,15 +140,15 @@ object Promise {
*
* Useful in Future-composition when a value to contribute is already available.
*/
- final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] {
+ final class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] {
val value = Some(resolveEither(suppliedValue))
override def isCompleted(): Boolean = true
def tryComplete(value: Either[Throwable, T]): Boolean = false
- def onComplete[U](func: Either[Throwable, T] => U): Unit = {
+ def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
val completedAs = value.get // Avoid closing over "this"
Future.dispatchFuture(executor, () => func(completedAs))
}
Oops, something went wrong.

0 comments on commit 1dfce90

Please sign in to comment.