Permalink
Browse files

SIP-14: clean ups and fixes

  • Loading branch information...
phaller committed May 16, 2012
1 parent 9fe251e commit 2a36246342c17044bf5aafbf71fe1f2147ffe60a
@@ -541,7 +541,7 @@ trait ExecutionContextTasks extends Tasks {
// this part is a hack which allows switching
val driver: Tasks = executionContext match {
- case eci: scala.concurrent.impl.ExecutionContextImpl => eci.executorService match {
+ case eci: scala.concurrent.impl.ExecutionContextImpl => eci.executor match {
case fjp: ForkJoinPool => new ForkJoinTaskSupport(fjp)
case tpe: ThreadPoolExecutor => new ThreadPoolTaskSupport(tpe)
case _ => ???
@@ -8,7 +8,7 @@
package scala.concurrent
-import java.util.concurrent.{ Executors, ExecutorService, ThreadFactory }
+import java.util.concurrent.{ Executors, Executor, ThreadFactory }
import scala.concurrent.forkjoin.{ ForkJoinPool, ForkJoinWorkerThread }
import scala.concurrent.util.Duration
import language.implicitConversions
@@ -19,7 +19,7 @@ import language.implicitConversions
abstract class ConcurrentPackageObject {
/** A global execution environment for executing lightweight tasks.
*/
- lazy val defaultExecutionContext = new impl.ExecutionContextImpl(null)
+ lazy val defaultExecutionContext: ExecutionContext with Executor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
val currentExecutionContext = new ThreadLocal[ExecutionContext]
@@ -47,11 +47,13 @@ object ExecutionContext {
/** Creates an `ExecutionContext` from the given `ExecutorService`.
*/
- def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit = defaultReporter): ExecutionContext with Executor = new impl.ExecutionContextImpl(e, reporter)
+ def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit = defaultReporter): ExecutionContext with ExecutorService =
+ impl.ExecutionContextImpl.fromExecutorService(e, reporter)
/** Creates an `ExecutionContext` from the given `Executor`.
*/
- def fromExecutor(e: Executor, reporter: Throwable => Unit = defaultReporter): ExecutionContext with Executor = new impl.ExecutionContextImpl(e, reporter)
+ def fromExecutor(e: Executor, reporter: Throwable => Unit = defaultReporter): ExecutionContext with Executor =
+ impl.ExecutionContextImpl.fromExecutor(e, reporter)
def defaultReporter: Throwable => Unit = {
// re-throwing `Error`s here causes an exception handling test to fail.
@@ -96,9 +96,9 @@ trait Future[+T] extends Awaitable[T] {
*
* $multipleCallbacks
*/
- def onSuccess[U](pf: PartialFunction[T, U]): this.type = onComplete {
- case Left(t) => // do nothing
- case Right(v) => if (pf isDefinedAt v) pf(v) else { /*do nothing*/ }
+ def onSuccess[U](pf: PartialFunction[T, U]): Unit = onComplete {
+ case Right(v) if pf isDefinedAt v => pf(v)
+ case _ =>
}
/** When this future is completed with a failure (i.e. with a throwable),
@@ -113,9 +113,9 @@ trait Future[+T] extends Awaitable[T] {
*
* $multipleCallbacks
*/
- def onFailure[U](callback: PartialFunction[Throwable, U]): this.type = onComplete {
- case Left(t) => if (isFutureThrowable(t) && callback.isDefinedAt(t)) callback(t) else { /*do nothing*/ }
- case Right(v) => // do nothing
+ def onFailure[U](callback: PartialFunction[Throwable, U]): Unit = onComplete {
+ case Left(t) if (isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t)
+ case _ =>
}
/** When this future is completed, either through an exception, or a value,
@@ -126,7 +126,7 @@ trait Future[+T] extends Awaitable[T] {
*
* $multipleCallbacks
*/
- def onComplete[U](func: Either[Throwable, T] => U): this.type
+ def onComplete[U](func: Either[Throwable, T] => U): Unit
/* Miscellaneous */
@@ -169,7 +169,7 @@ trait Future[+T] extends Awaitable[T] {
onComplete {
case Left(t) => p success t
- case Right(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + v))
+ case Right(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable."))
}
p.future
@@ -184,7 +184,36 @@ trait Future[+T] extends Awaitable[T] {
*/
def foreach[U](f: T => U): Unit = onComplete {
case Right(r) => f(r)
- case Left(_) => // do nothing
+ case _ => // do nothing
+ }
+
+ /** Creates a new future by applying the 's' function to the successful result of
+ * this future, or the 'f' function to the failed result. If there is any non-fatal
+ * exception thrown when 's' or 'f' is applied, that exception will be propagated
+ * to the resulting future.
+ *
+ * @param s function that transforms a successful result of the receiver into a
+ * successful result of the returned future
+ * @param f function that transforms a failure of the receiver into a failure of
+ * the returned future
+ * @return a future that will be completed with the transformed value
+ */
+ def transform[S](s: T => S, f: Throwable => Throwable): Future[S] = {
+ val p = Promise[S]()
+
+ onComplete {
+ case result =>
+ try {
+ result match {
+ case Left(t) => p failure f(t)
+ case Right(r) => p success s(r)
+ }
+ } catch {
+ case NonFatal(t) => p failure t
+ }
+ }
+
+ p.future
}
/** Creates a new future by applying a function to the successful result of
@@ -193,14 +222,17 @@ trait Future[+T] extends Awaitable[T] {
*
* $forComprehensionExamples
*/
- def map[S](f: T => S): Future[S] = {
+ def map[S](f: T => S): Future[S] = { // transform(f, identity)
val p = Promise[S]()
onComplete {
- case Left(t) => p failure t
- case Right(v) =>
- try p success f(v)
- catch {
+ case result =>
+ try {
+ result match {
+ case Right(r) => p success f(r)
+ case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]]
+ }
+ } catch {
case NonFatal(t) => p failure t
}
}
@@ -219,11 +251,11 @@ trait Future[+T] extends Awaitable[T] {
val p = Promise[S]()
onComplete {
- case Left(t) => p failure t
+ case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]]
case Right(v) =>
try {
f(v) onComplete {
- case Left(t) => p failure t
+ case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]]
case Right(v) => p success v
}
} catch {
@@ -254,7 +286,7 @@ trait Future[+T] extends Awaitable[T] {
val p = Promise[T]()
onComplete {
- case Left(t) => p failure t
+ case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, T]]
case Right(v) =>
try {
if (pred(v)) p success v
@@ -303,7 +335,7 @@ trait Future[+T] extends Awaitable[T] {
val p = Promise[S]()
onComplete {
- case Left(t) => p failure t
+ case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]]
case Right(v) =>
try {
if (pf.isDefinedAt(v)) p success pf(v)
@@ -384,7 +416,7 @@ trait Future[+T] extends Awaitable[T] {
val p = Promise[(T, U)]()
this onComplete {
- case Left(t) => p failure t
+ case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, (T, U)]]
case Right(r) =>
that onSuccess {
case r2 => p success ((r, r2))
@@ -431,7 +463,7 @@ trait Future[+T] extends Awaitable[T] {
val p = Promise[S]()
onComplete {
- case l: Left[Throwable, _] => p complete l.asInstanceOf[Either[Throwable, S]]
+ case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]]
case Right(t) =>
p complete (try {
Right(boxedType(tag.erasure).cast(t).asInstanceOf[S])
@@ -470,9 +502,7 @@ trait Future[+T] extends Awaitable[T] {
val p = Promise[T]()
onComplete {
- case r =>
- try if (pf isDefinedAt r) pf(r)
- finally p complete r
+ case r => try if (pf isDefinedAt r) pf(r) finally p complete r
}
p.future
@@ -493,11 +523,7 @@ trait Future[+T] extends Awaitable[T] {
*/
def either[U >: T](that: Future[U]): Future[U] = {
val p = Promise[U]()
-
- val completePromise: PartialFunction[Either[Throwable, U], _] = {
- case Left(t) => p tryFailure t
- case Right(v) => p trySuccess v
- }
+ val completePromise: PartialFunction[Either[Throwable, U], _] = { case result => p tryComplete result }
this onComplete completePromise
that onComplete completePromise
@@ -10,18 +10,20 @@ package scala.concurrent.impl
-import java.util.concurrent.{ Callable, Executor, ExecutorService, Executors, ThreadFactory }
+import java.util.concurrent.{ Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit }
+import java.util.Collection
import scala.concurrent.forkjoin._
import scala.concurrent.{ ExecutionContext, Awaitable }
import scala.concurrent.util.Duration
-private[scala] class ExecutionContextImpl(es: AnyRef, reporter: Throwable => Unit = ExecutionContext.defaultReporter)
-extends ExecutionContext with Executor {
- import ExecutionContextImpl._
-
- val executorService: AnyRef = if (es eq null) getExecutorService else es
+private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContext with Executor {
+
+ val executor: Executor = es match {
+ case null => createExecutorService
+ case some => some
+ }
// to ensure that the current execution context thread local is properly set
def executorsThreadFactory = new ThreadFactory {
@@ -42,64 +44,76 @@ extends ExecutionContext with Executor {
}
}
- def getExecutorService: AnyRef =
- if (scala.util.Properties.isJavaAtLeast("1.6")) {
- val vendor = scala.util.Properties.javaVmVendor
- if ((vendor contains "Oracle") || (vendor contains "Sun") || (vendor contains "Apple"))
- new ForkJoinPool(
- Runtime.getRuntime.availableProcessors(),
+ def createExecutorService: ExecutorService = try { new ForkJoinPool(
+ Runtime.getRuntime.availableProcessors(), //FIXME from config
forkJoinPoolThreadFactory,
- null,
- false)
- else
- Executors.newCachedThreadPool(executorsThreadFactory)
- } else Executors.newCachedThreadPool(executorsThreadFactory)
+ null, //FIXME we should have an UncaughtExceptionHandler, see what Akka does
+ true) //FIXME I really think this should be async...
+ } catch {
+ case NonFatal(t) =>
+ System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to Executors.newCachedThreadPool")
+ t.printStackTrace(System.err)
+ Executors.newCachedThreadPool(executorsThreadFactory)
+ }
- def execute(runnable: Runnable): Unit = executorService match {
+ def execute(runnable: Runnable): Unit = executor match {
case fj: ForkJoinPool =>
Thread.currentThread match {
case fjw: ForkJoinWorkerThread if fjw.getPool eq fj =>
- val fjtask = runnable match {
+ (runnable match {
case fjt: ForkJoinTask[_] => fjt
case _ => ForkJoinTask.adapt(runnable)
- }
- fjtask.fork
- case _ =>
- fj.execute(runnable)
+ }).fork
+ case _ => fj.execute(runnable)
}
- case executor: Executor =>
- executor execute runnable
+ case generic => generic execute runnable
}
def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
Future.releaseStack(this)
- executorService match {
+ executor match {
case fj: ForkJoinPool =>
var result: T = null.asInstanceOf[T]
- val managedBlocker = new ForkJoinPool.ManagedBlocker {
+ ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
@volatile var isdone = false
- def block() = {
- result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence)
+ def block(): Boolean = {
+ result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) // FIXME what happens if there's an exception thrown here?
isdone = true
true
}
def isReleasable = isdone
- }
- ForkJoinPool.managedBlock(managedBlocker)
+ })
result
case _ =>
awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence)
}
}
def reportFailure(t: Throwable) = reporter(t)
-
}
private[concurrent] object ExecutionContextImpl {
-
+
+ def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter)
+ def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutorService =
+ new ExecutionContextImpl(es, reporter) with ExecutorService {
+ final def asExecutorService: ExecutorService = executor.asInstanceOf[ExecutorService]
+ override def execute(command: Runnable) = executor.execute(command)
+ override def shutdown() { asExecutorService.shutdown() }
+ override def shutdownNow() = asExecutorService.shutdownNow()
+ override def isShutdown = asExecutorService.isShutdown
+ override def isTerminated = asExecutorService.isTerminated
+ override def awaitTermination(l: Long, timeUnit: TimeUnit) = asExecutorService.awaitTermination(l, timeUnit)
+ override def submit[T](callable: Callable[T]) = asExecutorService.submit(callable)
+ override def submit[T](runnable: Runnable, t: T) = asExecutorService.submit(runnable, t)
+ override def submit(runnable: Runnable) = asExecutorService.submit(runnable)
+ override def invokeAll[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAll(callables)
+ override def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAll(callables, l, timeUnit)
+ override def invokeAny[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAny(callables)
+ override def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAny(callables, l, timeUnit)
+ }
}
@@ -65,6 +65,8 @@ private[concurrent] object Future {
promise.future
}
+ private[impl] val throwableId: Throwable => Throwable = identity _
+
// an optimization for batching futures
// TODO we should replace this with a public queue,
// so that it can be stolen from
@@ -112,15 +112,14 @@ object Promise {
}
}
- def onComplete[U](func: Either[Throwable, T] => U): this.type = {
+ def onComplete[U](func: Either[Throwable, T] => U): Unit = {
@tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
def dispatchOrAddCallback(): Unit =
getState match {
case r: Either[_, _] => Future.dispatchFuture(executor, () => notifyCompleted(func, r.asInstanceOf[Either[Throwable, T]]))
case listeners: List[_] => if (updateState(listeners, func :: listeners)) () else dispatchOrAddCallback()
}
dispatchOrAddCallback()
- this
}
private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) {
@@ -144,10 +143,9 @@ object Promise {
def tryComplete(value: Either[Throwable, T]): Boolean = false
- def onComplete[U](func: Either[Throwable, T] => U): this.type = {
- val completedAs = value.get
+ def onComplete[U](func: Either[Throwable, T] => U): Unit = {
+ val completedAs = value.get // Avoid closing over "this"
Future.dispatchFuture(executor, () => func(completedAs))
- this
}
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
Oops, something went wrong.

0 comments on commit 2a36246

Please sign in to comment.