move ExecutionContext from Promise to app callbacks #16

Open
wants to merge 4 commits into
from
@@ -82,9 +82,29 @@ import language.higherKinds
* {{{
* f flatMap { (x: Int) => g map { (y: Int) => x + y } }
* }}}
+ *
+ * @define callbackInContext
+ * The provided callback always runs in the provided implicit
+ *`ExecutionContext`, though there is no guarantee that the
+ * `execute()` method on the `ExecutionContext` will be called once
+ * per callback or that `execute()` will be called in the current
+ * thread. That is, the implementation may run multiple callbacks
+ * in a batch within a single `execute()` and it may run
+ * `execute()` either immediately or asynchronously.
*/
trait Future[+T] extends Awaitable[T] {
+ // The executor within the lexical scope
+ // of the Future trait. Note that this will
+ // (modulo bugs) _never_ execute a callback
+ // other than those below in this same file.
+ // As a nice side benefit, having this implicit
+ // here forces an ambiguity in those methods
+ // that also have an executor parameter, which
+ // keeps us from accidentally forgetting to use
+ // the executor parameter.
+ private implicit def internalExecutor: ExecutionContext = Future.InternalCallbackExecutor
+
/* Callbacks */
/** When this future is completed successfully (i.e. with a value),
@@ -95,11 +115,12 @@ trait Future[+T] extends Awaitable[T] {
* this will either be applied immediately or be scheduled asynchronously.
*
* $multipleCallbacks
+ * $callbackInContext
*/
- def onSuccess[U](pf: PartialFunction[T, U]): this.type = onComplete {
+ def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): this.type = onComplete {
case Left(t) => // do nothing
case Right(v) => if (pf isDefinedAt v) pf(v) else { /*do nothing*/ }
- }
+ } (executor)
/** When this future is completed with a failure (i.e. with a throwable),
* apply the provided callback to the throwable.
@@ -112,11 +133,12 @@ trait Future[+T] extends Awaitable[T] {
* Will not be called in case that the future is completed with a value.
*
* $multipleCallbacks
+ * $callbackInContext
*/
- def onFailure[U](callback: PartialFunction[Throwable, U]): this.type = onComplete {
+ def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): this.type = onComplete {
@viktorklang

viktorklang May 2, 2012

Alright, and what do with the Java-API (i.e. don't have to explicitly provide ec (when you don't have one))?

@havocp

havocp May 2, 2012

Needs some thought.

Option 1: Java just passes extra parameter

I guess the default situation is that it's the same as the existing calls in Akka 2.0 that need a dispatcher, like:

Futures.successful(4, system.dispatcher());

So the solution space should be the same as it is for that. With this patch you wouldn't have to provide system.dispatcher() creating a Promise anymore but you would for onComplete(), etc. Typing , system.dispatcher() isn't that horrible given all the crap you already have to type to write an anonymous callback class in Java, but maybe we can do better.

Option 2: Differently-named version of each method using a default EC

From the Java perspective, there's no reason you couldn't have an overload with and without the executor, using a default executor for the one without. But of course that breaks Scala since the overloads would be distinguishable in bytecode but not in Scala source.

Simple if not 100% pretty solution: name the overloads different things,

onFailure(callback, executor)
onFailureDefaultExecutor(callback) // brainstorm better name

On the crazier side, maybe there's some language extension that would let us add an overload to the class file that Java would see and Scala would ignore; it looks like right now the generated class file from onFailure(callback)(executor) has an onFailure(callback, executor) with one combined parameter list. From Java we'd like to add plain onFailure(callback), but hide that from Scala. I can imagine some language feature to allow that, like @javaOnly or something. But clearly not a short-term solution.

Option 3: withDefaultExecutor()

Another possible option, just to expand the possibilities, might be to add:

// Future with an ExecutionContext associated with callbacks you add to it,
// intended for use from Java; not needed in Scala
trait FutureWithExecutor[T] {
    def future: Future[T]
    def onComplete(callback: Either[Throwable, T] => U): this.type
}

// existing Future trait gets new methods
trait Future[T] {
   def withExecutor(executor: ExecutionContext): FutureWithExecutor[T]
   def withDefaultExecutor: FutureWithExecutor[T]
}

So the idea would be that your Java code is like:

future.withDefaultExecutor().map(...).onComplete(...).future()

Where ... expands to the Java anonymous callback class.

I kind of like this solution since it avoids adding a lot of methods to Future itself that don't make sense in Scala, and it's more DRY in Java.

It does allow Java programmers to shoot themselves in the foot in the way this patch is intended to fix, if they start passing FutureWithExecutor across module boundaries instead of passing Future around, but they'll hopefully not do that. Anyway anytime they go through scala.concurrent or Akka APIs they'd have to convert back to regular Future.

Maybe we can think of more options, these are what I thought of so far.

@havocp

havocp May 2, 2012

Here is option 3 in untested code form: https://gist.github.com/2578478

It's not mutually exclusive with Option 1 of course. I'm pretty ambivalent on whether adding 3 to 1 is worth the code, it sort of depends on how often Java developers chain Future methods.

Option 3 seems nicer than Option 2 to me.

@axel22

axel22 May 11, 2012

Collaborator

Of these 3 options option no.2 seems most favorable to me. Here's why.

Imagine a Java client writes a couple of methods like this:

public FutureWithExecutor<T> jFlatMap(f1: FutureWithExecutor[T], f2: FutureWithExecutor[T]) {
  return f1..flatMap(f2);
}

public FutureWithExecutor<T> jForeach(f1: Future[T], f: Foreach[T, Unit]) {
  FutureWithExefcutor[T] f2 = f1.withDefaultExecutor();
  f2.foreach(f);
  return f2;
}

and used within his Java code. He sets the return types to FutureWithExecutor because this way he avoids having to call withDefaultExecutor in every other method, like jFlatMap. Then a Scala client comes and calls a Java method jForeach:

implicit val ec = myEc
def scalaMethod(f: Future[T]) {
  jForeach(f, x => println(x)) map {
    y => println(y.hashCode)
  }
} 

One might expect that the map in the scalaMethod would pick up the implicit execution context in scope, but it does not, because jForeach returns a FutureWithExecutor instead of Future. This might result in subtle bugs which can be a pain.

For this reason I would be explicit in the Java code - rename these methods to something else.

@axel22

axel22 May 11, 2012

Collaborator

Or simply require Java users to specify the additional parameter.

@viktorklang

viktorklang May 11, 2012

Yeah, I think this is preferable actually.

@havocp

havocp May 11, 2012

fwiw I would lean toward option 1 (require Java users to specify the extra param) as well. The other solutions don't seem like they pull their weight, they add a bunch of methods and some cognitive overhead for a pretty small gain in convenience.

@viktorklang

viktorklang May 11, 2012

Yeah, that was exactly our consensus as well. Also, it's easier to solve a perceived problem than to retract a bad solution after the fact...

case Left(t) => if (isFutureThrowable(t) && callback.isDefinedAt(t)) callback(t) else { /*do nothing*/ }
case Right(v) => // do nothing
- }
+ } (executor)
/** When this future is completed, either through an exception, or a value,
* apply the provided function.
@@ -125,16 +147,12 @@ trait Future[+T] extends Awaitable[T] {
* this will either be applied immediately or be scheduled asynchronously.
*
* $multipleCallbacks
+ * $callbackInContext
*/
- def onComplete[U](func: Either[Throwable, T] => U): this.type
-
+ def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): this.type
/* Miscellaneous */
- /** Creates a new promise.
- */
- protected def newPromise[S]: Promise[S]
-
/** Returns whether the future has already been completed with
* a value or an exception.
*
@@ -169,7 +187,7 @@ trait Future[+T] extends Awaitable[T] {
* and throws a corresponding exception if the original future fails.
*/
def failed: Future[Throwable] = {
- val p = newPromise[Throwable]
+ val p = Promise[Throwable]
onComplete {
case Left(t) => p success t
@@ -186,19 +204,19 @@ trait Future[+T] extends Awaitable[T] {
*
* Will not be called if the future fails.
*/
- def foreach[U](f: T => U): Unit = onComplete {
+ def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = onComplete {
case Right(r) => f(r)
case Left(_) => // do nothing
- }
+ } (executor)
/** Creates a new future by applying a function to the successful result of
* this future. If this future is completed with an exception then the new
* future will also contain this exception.
*
* $forComprehensionExample
*/
- def map[S](f: T => S): Future[S] = {
- val p = newPromise[S]
+ def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = {
+ val p = Promise[S]
onComplete {
case Left(t) => p failure t
@@ -207,7 +225,7 @@ trait Future[+T] extends Awaitable[T] {
catch {
case NonFatal(t) => p complete resolver(t)
}
- }
+ } (executor)
p.future
}
@@ -219,21 +237,21 @@ trait Future[+T] extends Awaitable[T] {
*
* $forComprehensionExample
*/
- def flatMap[S](f: T => Future[S]): Future[S] = {
- val p = newPromise[S]
+ def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = {
+ val p = Promise[S]
onComplete {
case Left(t) => p failure t
case Right(v) =>
try {
- f(v) onComplete {
+ f(v).onComplete({
case Left(t) => p failure t
case Right(v) => p success v
- }
+ })(internalExecutor)
@phaller

phaller May 2, 2012

Owner

I think it would be better to also use executor here. It would be more regular to always complete promise p on the same EC. And I don't see a reason not to use executor. If executor is good enough to run f(v) and p complete resolver(t) on, then it should be OK to also just complete p on it.

@havocp

havocp May 2, 2012

Here using internalExecutor is just an optimization, you're right it's fine to remove it here.

But internalExecutor doesn't exist because of this, this is just the one place where it's used when we already have an app executor. So it's also the one place it's specified explicitly instead of implicitly, because the app executor's presence makes the implicits ambiguous.

The reason internalExecutor exists is to cover all the spots where we call onComplete, onSuccess, etc. but don't have an app executor. Look for example at the implementation of def failed, but there are a lot of places we use it.

The pattern is almost always:

val p = Promise[T]

onComplete {
    // whatever
}

p.future

The internal executor is just running some quick internal code that converts Either[Throwable, T] to Throwable (for example in failed), or whatever thing like that. Then the app callback if any will have its own executor.

So in most places internalExecutor is not just an optimization, it's needed to call onComplete. The alternative I tried first was to add an onCompleteInternal onSuccessInternal etc. that don't take an executor, to be implemented by DefaultPromise, KeptPromise by running the callback immediately... but it's more code, and ends up being equivalent.

@havocp

havocp May 2, 2012

Oh, worth mentioning, it would also be semantically correct to use defaultExecutionContext here, but since the internal executor is only 8 lines of code I figured why not avoid the overhead; there's no reason to dispatch to a thread pool when we know the app executor will dispatch to the app anyway. We know this internal executor is always just an intermediary.

} catch {
case NonFatal(t) => p complete resolver(t)
}
- }
+ } (executor)
p.future
}
@@ -254,8 +272,8 @@ trait Future[+T] extends Awaitable[T] {
* await(h, 0) // throw a NoSuchElementException
* }}}
*/
- def filter(pred: T => Boolean): Future[T] = {
- val p = newPromise[T]
+ def filter(pred: T => Boolean)(implicit executor: ExecutionContext): Future[T] = {
+ val p = Promise[T]
onComplete {
case Left(t) => p failure t
@@ -266,14 +284,14 @@ trait Future[+T] extends Awaitable[T] {
} catch {
case NonFatal(t) => p complete resolver(t)
}
- }
+ } (executor)
p.future
}
/** Used by for-comprehensions.
*/
- final def withFilter(p: T => Boolean): Future[T] = filter(p)
+ final def withFilter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = filter(p)(executor)
// final def withFilter(p: T => Boolean) = new FutureWithFilter[T](this, p)
// final class FutureWithFilter[+S](self: Future[S], p: S => Boolean) {
@@ -303,8 +321,8 @@ trait Future[+T] extends Awaitable[T] {
* await(h, 0) // throw a NoSuchElementException
* }}}
*/
- def collect[S](pf: PartialFunction[T, S]): Future[S] = {
- val p = newPromise[S]
+ def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] = {
+ val p = Promise[S]
onComplete {
case Left(t) => p failure t
@@ -315,7 +333,7 @@ trait Future[+T] extends Awaitable[T] {
} catch {
case NonFatal(t) => p complete resolver(t)
}
- }
+ } (executor)
p.future
}
@@ -332,15 +350,15 @@ trait Future[+T] extends Awaitable[T] {
* future (6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3
* }}}
*/
- def recover[U >: T](pf: PartialFunction[Throwable, U]): Future[U] = {
- val p = newPromise[U]
+ def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = {
+ val p = Promise[U]
onComplete {
case Left(t) if pf isDefinedAt t =>
try { p success pf(t) }
catch { case NonFatal(t) => p complete resolver(t) }
case otherwise => p complete otherwise
- }
+ } (executor)
p.future
}
@@ -358,8 +376,8 @@ trait Future[+T] extends Awaitable[T] {
* future (6 / 0) recoverWith { case e: ArithmeticException => f } // result: Int.MaxValue
* }}}
*/
- def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = {
- val p = newPromise[U]
+ def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = {
+ val p = Promise[U]
onComplete {
case Left(t) if pf isDefinedAt t =>
@@ -369,7 +387,7 @@ trait Future[+T] extends Awaitable[T] {
case NonFatal(t) => p complete resolver(t)
}
case otherwise => p complete otherwise
- }
+ } (executor)
p.future
}
@@ -383,16 +401,16 @@ trait Future[+T] extends Awaitable[T] {
* with the throwable stored in `that`.
*/
def zip[U](that: Future[U]): Future[(T, U)] = {
- val p = newPromise[(T, U)]
+ val p = Promise[(T, U)]
- this onComplete {
+ this.onComplete {
case Left(t) => p failure t
- case Right(r) => that onSuccess {
+ case Right(r) => that.onSuccess {
case r2 => p success ((r, r2))
}
}
- that onFailure {
+ that.onFailure {
case f => p failure f
}
@@ -414,7 +432,7 @@ trait Future[+T] extends Awaitable[T] {
* }}}
*/
def fallbackTo[U >: T](that: Future[U]): Future[U] = {
- val p = newPromise[U]
+ val p = Promise[U]
onComplete {
case r @ Right(_) p complete r
case _ p completeWith that
@@ -443,7 +461,7 @@ trait Future[+T] extends Awaitable[T] {
if (c.isPrimitive) toBoxed(c) else c
}
- val p = newPromise[S]
+ val p = Promise[S]
onComplete {
case l: Left[Throwable, _] => p complete l.asInstanceOf[Either[Throwable, S]]
@@ -481,14 +499,14 @@ trait Future[+T] extends Awaitable[T] {
* }
* }}}
*/
- def andThen[U](pf: PartialFunction[Either[Throwable, T], U]): Future[T] = {
- val p = newPromise[T]
+ def andThen[U](pf: PartialFunction[Either[Throwable, T], U])(implicit executor: ExecutionContext): Future[T] = {
+ val p = Promise[T]
onComplete {
case r =>
try if (pf isDefinedAt r) pf(r)
finally p complete r
- }
+ } (executor)
p.future
}
@@ -507,7 +525,7 @@ trait Future[+T] extends Awaitable[T] {
* }}}
*/
def either[U >: T](that: Future[U]): Future[U] = {
- val p = newPromise[U]
+ val p = Promise[U]
val completePromise: PartialFunction[Either[Throwable, U], _] = {
case Left(t) => p tryFailure t
@@ -536,10 +554,10 @@ object Future {
*
* @tparam T the type of the result
* @param body the asychronous computation
- * @param execctx the execution context on which the future is run
+ * @param executor the execution context which runs the body
* @return the `Future` holding the result of the computation
*/
- def apply[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = impl.Future(body)
+ def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = impl.Future(body)
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
@@ -628,6 +646,33 @@ object Future {
for (r <- fr; b <- fb) yield (r += b)
}.map(_.result)
+ // This is used to run callbacks which are internal
+ // to scala.concurrent; our own callbacks are only
+ // ever used to eventually run another callback,
+ // and that other callback will have its own
+ // executor because all callbacks come with
+ // an executor. Our own callbacks never block
+ // and have no "expected" exceptions.
+ // As a result, this executor can do nothing;
+ // some other executor will always come after
+ // it (and sometimes one will be before it),
+ // and those will be performing the "real"
+ // dispatch to code outside scala.concurrent.
+ // Because this exists, ExecutionContext.defaultExecutionContext
+ // isn't instantiated by Future internals, so
+ // if some code for some reason wants to avoid
+ // ever starting up the default context, it can do so
+ // by just not ever using it itself. scala.concurrent
+ // doesn't need to create defaultExecutionContext as
+ // a side effect.
+ private[concurrent] object InternalCallbackExecutor extends ExecutionContext {
+ def execute(runnable: Runnable): Unit =
+ runnable.run()
+ def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
+ throw new IllegalStateException("bug in scala.concurrent, called blocking() from internal callback")
+ def reportFailure(t: Throwable): Unit =
+ throw new IllegalStateException("problem in scala.concurrent internal callback", t)
+ }
}
@@ -25,6 +25,11 @@ package scala.concurrent
*/
trait Promise[T] {
+ // used for internal callbacks defined in
+ // the lexical scope of this trait;
+ // _never_ for application callbacks.
+ private implicit def internalExecutor: ExecutionContext = Future.InternalCallbackExecutor
+
/** Future containing the value of this promise.
*/
def future: Future[T]
@@ -106,26 +111,23 @@ object Promise {
/** Creates a promise object which can be completed with a value.
*
* @tparam T the type of the value in the promise
- * @param execctx the execution context on which the promise is created on
* @return the newly created `Promise` object
*/
- def apply[T]()(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.DefaultPromise[T]()
+ def apply[T](): Promise[T] = new impl.Promise.DefaultPromise[T]()
/** Creates an already completed Promise with the specified exception.
*
* @tparam T the type of the value in the promise
- * @param execctx the execution context on which the promise is created on
* @return the newly created `Promise` object
*/
- def failed[T](exception: Throwable)(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.KeptPromise[T](Left(exception))
+ def failed[T](exception: Throwable): Promise[T] = new impl.Promise.KeptPromise[T](Left(exception))
/** Creates an already completed Promise with the specified result.
*
* @tparam T the type of the value in the promise
- * @param execctx the execution context on which the promise is created on
* @return the newly created `Promise` object
*/
- def successful[T](result: T)(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.KeptPromise[T](Right(result))
+ def successful[T](result: T): Promise[T] = new impl.Promise.KeptPromise[T](Right(result))
}
@@ -17,8 +17,6 @@ import scala.collection.mutable.Stack
private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] {
- implicit def executor: ExecutionContext
-
}
private[concurrent] object Future {
Oops, something went wrong.