Browse files

Cleaning up method implementations in Future

    Optimizations:
    1) Avoiding isDefinedAt + apply and using applyOrElse to allow for optimizations later
    2) Reducing method sizes to be more JIT + inliner friendly
    3) Reusing core combinators to reuse inliner/JIT optimizations and be more code-cache friendly
  • Loading branch information...
1 parent 70e2ead commit da54f34a6526b49b9e13e60c4fce242325c1c36e @viktorklang viktorklang committed Jun 19, 2013
Showing with 44 additions and 137 deletions.
  1. +44 −137 src/library/scala/concurrent/Future.scala
View
181 src/library/scala/concurrent/Future.scala
@@ -14,7 +14,7 @@ import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS MILLIS }
import java.lang.{ Iterable => JIterable }
import java.util.{ LinkedList => JLinkedList }
-import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
+import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicLong, AtomicBoolean }
import scala.util.control.NonFatal
import scala.Option
@@ -101,7 +101,7 @@ trait Future[+T] extends Awaitable[T] {
// that also have an executor parameter, which
// keeps us from accidentally forgetting to use
// the executor parameter.
- private implicit def internalExecutor: ExecutionContext = Future.InternalCallbackExecutor
+ private def internalExecutor = Future.InternalCallbackExecutor
/* Callbacks */
@@ -116,9 +116,10 @@ trait Future[+T] extends Awaitable[T] {
* $callbackInContext
*/
def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = onComplete {
- case Success(v) if pf isDefinedAt v => pf(v)
+ case Success(v) =>
+ pf.applyOrElse[T, Any](v, Predef.conforms[T]) // Exploiting the cached function to avoid MatchError
case _ =>
- }(executor)
+ }
/** When this future is completed with a failure (i.e. with a throwable),
* apply the provided callback to the throwable.
@@ -134,9 +135,10 @@ trait Future[+T] extends Awaitable[T] {
* $callbackInContext
*/
def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete {
- case Failure(t) if NonFatal(t) && callback.isDefinedAt(t) => callback(t)
+ case Failure(t) =>
+ callback.applyOrElse[Throwable, Any](t, Predef.conforms[Throwable]) // Exploiting the cached function to avoid MatchError
case _ =>
- }(executor)
+ }
/** When this future is completed, either through an exception, or a value,
* apply the provided function.
@@ -186,13 +188,12 @@ trait Future[+T] extends Awaitable[T] {
* and throws a corresponding exception if the original future fails.
*/
def failed: Future[Throwable] = {
+ implicit val ec = internalExecutor
val p = Promise[Throwable]()
-
onComplete {
case Failure(t) => p success t
case Success(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable."))
}
-
p.future
}
@@ -203,10 +204,7 @@ trait Future[+T] extends Awaitable[T] {
*
* Will not be called if the future fails.
*/
- def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = onComplete {
- case Success(r) => f(r)
- case _ => // do nothing
- }(executor)
+ def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = onComplete { _ foreach f }
/** Creates a new future by applying the 's' function to the successful result of
* this future, or the 'f' function to the failed result. If there is any non-fatal
@@ -221,19 +219,11 @@ trait Future[+T] extends Awaitable[T] {
*/
def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] = {
val p = Promise[S]()
-
+ // transform on Try has the wrong shape for us here
onComplete {
- case result =>
- try {
- result match {
- case Failure(t) => p failure f(t)
- case Success(r) => p success s(r)
- }
- } catch {
- case NonFatal(t) => p failure t
- }
- }(executor)
-
+ case Success(r) => p complete Try(s(r))
+ case Failure(t) => p complete Try(throw f(t)) // will throw fatal errors!
+ }
p.future
}
@@ -245,19 +235,7 @@ trait Future[+T] extends Awaitable[T] {
*/
def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { // transform(f, identity)
val p = Promise[S]()
-
- onComplete {
- case result =>
- try {
- result match {
- case Success(r) => p success f(r)
- case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
- }
- } catch {
- case NonFatal(t) => p failure t
- }
- }(executor)
-
+ onComplete { v => p complete (v map f) }
p.future
}
@@ -270,20 +248,10 @@ trait Future[+T] extends Awaitable[T] {
*/
def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = {
val p = Promise[S]()
-
onComplete {
case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
- case Success(v) =>
- try {
- f(v).onComplete({
- case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
- case Success(v) => p success v
- })(internalExecutor)
- } catch {
- case NonFatal(t) => p failure t
- }
- }(executor)
-
+ case Success(v) => try f(v) onComplete p.complete catch { case NonFatal(t) => p failure t }
+ }
p.future
}
@@ -303,34 +271,14 @@ trait Future[+T] extends Awaitable[T] {
* Await.result(h, Duration.Zero) // throw a NoSuchElementException
* }}}
*/
- def filter(pred: T => Boolean)(implicit executor: ExecutionContext): Future[T] = {
- val p = Promise[T]()
-
- onComplete {
- case f: Failure[_] => p complete f.asInstanceOf[Failure[T]]
- case Success(v) =>
- try {
- if (pred(v)) p success v
- else p failure new NoSuchElementException("Future.filter predicate is not satisfied")
- } catch {
- case NonFatal(t) => p failure t
- }
- }(executor)
-
- p.future
- }
+ def filter(pred: T => Boolean)(implicit executor: ExecutionContext): Future[T] =
+ map {
+ r => if (pred(r)) r else throw new NoSuchElementException("Future.filter predicate is not satisfied")
+ }
/** Used by for-comprehensions.
*/
final def withFilter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = filter(p)(executor)
- // final def withFilter(p: T => Boolean) = new FutureWithFilter[T](this, p)
-
- // final class FutureWithFilter[+S](self: Future[S], p: S => Boolean) {
- // def foreach(f: S => Unit): Unit = self filter p foreach f
- // def map[R](f: S => R) = self filter p map f
- // def flatMap[R](f: S => Future[R]) = self filter p flatMap f
- // def withFilter(q: S => Boolean): FutureWithFilter[S] = new FutureWithFilter[S](self, x => p(x) && q(x))
- // }
/** Creates a new future by mapping the value of the current future, if the given partial function is defined at that value.
*
@@ -352,22 +300,10 @@ trait Future[+T] extends Awaitable[T] {
* Await.result(h, Duration.Zero) // throw a NoSuchElementException
* }}}
*/
- def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] = {
- val p = Promise[S]()
-
- onComplete {
- case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
- case Success(v) =>
- try {
- if (pf.isDefinedAt(v)) p success pf(v)
- else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v)
- } catch {
- case NonFatal(t) => p failure t
- }
- }(executor)
-
- p.future
- }
+ def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] =
+ map {
+ r => pf.applyOrElse(r, (t: T) => throw new NoSuchElementException("Future.collect partial function is not defined at: " + t))
+ }
/** Creates a new future that will handle any matching throwable that this
* future might contain. If there is no match, or if this future contains
@@ -383,9 +319,7 @@ trait Future[+T] extends Awaitable[T] {
*/
def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = {
val p = Promise[U]()
-
- onComplete { case tr => p.complete(tr recover pf) }(executor)
-
+ onComplete { v => p complete (v recover pf) }
p.future
}
@@ -404,17 +338,10 @@ trait Future[+T] extends Awaitable[T] {
*/
def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = {
val p = Promise[U]()
-
onComplete {
- case Failure(t) if pf isDefinedAt t =>
- try {
- p completeWith pf(t)
- } catch {
- case NonFatal(t) => p failure t
- }
- case otherwise => p complete otherwise
- }(executor)
-
+ case Failure(t) => try pf.applyOrElse(t, (_: Throwable) => this) onComplete p.complete catch { case NonFatal(t) => p failure t }
+ case other => p complete other
+ }
p.future
}
@@ -427,19 +354,12 @@ trait Future[+T] extends Awaitable[T] {
* with the throwable stored in `that`.
*/
def zip[U](that: Future[U]): Future[(T, U)] = {
+ implicit val ec = internalExecutor
val p = Promise[(T, U)]()
-
- this onComplete {
+ onComplete {
case f: Failure[_] => p complete f.asInstanceOf[Failure[(T, U)]]
- case Success(r) =>
- that onSuccess {
- case r2 => p success ((r, r2))
- }
- that onFailure {
- case f => p failure f
- }
+ case Success(s) => that onComplete { c => p.complete(c map { s2 => (s, s2) }) }
}
-
p.future
}
@@ -458,6 +378,7 @@ trait Future[+T] extends Awaitable[T] {
* }}}
*/
def fallbackTo[U >: T](that: Future[U]): Future[U] = {
+ implicit val ec = internalExecutor
val p = Promise[U]()
onComplete {
case s @ Success(_) => p complete s
@@ -470,23 +391,13 @@ trait Future[+T] extends Awaitable[T] {
* that conforms to `S`'s erased type or a `ClassCastException` otherwise.
*/
def mapTo[S](implicit tag: ClassTag[S]): Future[S] = {
- def boxedType(c: Class[_]): Class[_] = {
+ implicit val ec = internalExecutor
+ val boxedClass = {
+ val c = tag.runtimeClass
if (c.isPrimitive) Future.toBoxed(c) else c
}
-
- val p = Promise[S]()
-
- onComplete {
- case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
- case Success(t) =>
- p complete (try {
- Success(boxedType(tag.runtimeClass).cast(t).asInstanceOf[S])
- } catch {
- case e: ClassCastException => Failure(e)
- })
- }
-
- p.future
+ require(boxedClass ne null)
+ map(s => boxedClass.cast(s).asInstanceOf[S])
}
/** Applies the side-effecting function to the result of this future, and returns
@@ -514,11 +425,9 @@ trait Future[+T] extends Awaitable[T] {
*/
def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
-
onComplete {
- case r => try if (pf isDefinedAt r) pf(r) finally p complete r
- }(executor)
-
+ case r => try pf.applyOrElse[Try[T], Any](r, Predef.conforms[Try[T]]) finally p complete r
+ }
p.future
}
@@ -579,14 +488,12 @@ object Future {
} map (_.result)
}
- /** Returns a `Future` to the result of the first future in the list that is completed.
+ /** Returns a new `Future` to the result of the first future in the list that is completed.
*/
def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
-
val completeFirst: Try[T] => Unit = p tryComplete _
- futures.foreach(_ onComplete completeFirst)
-
+ futures foreach { _ onComplete completeFirst }
p.future
}
@@ -626,7 +533,7 @@ object Future {
* }}}
*/
def fold[T, R](futures: TraversableOnce[Future[T]])(zero: R)(foldFun: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
- if (futures.isEmpty) Promise.successful(zero).future
+ if (futures.isEmpty) Future.successful(zero)
else sequence(futures).map(_.foldLeft(zero)(foldFun))
}
@@ -638,7 +545,7 @@ object Future {
* }}}
*/
def reduce[T, R >: T](futures: TraversableOnce[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
- if (futures.isEmpty) Promise[R].failure(new NoSuchElementException("reduce attempted on empty collection")).future
+ if (futures.isEmpty) Future.failed(new NoSuchElementException("reduce attempted on empty collection"))
else sequence(futures).map(_ reduceLeft op)
}

0 comments on commit da54f34

Please sign in to comment.