Permalink
Browse files

Reimplementing much of the DefaultPromise methods

Optimizations:
1) Avoiding to call 'synchronized' in tryComplete and in tryAwait
2) Implementing blocking by using an optimized latch so no blocking ops for non-blockers
3) Reducing method size of isCompleted to be cheaper to inline
4) 'result' to use Try.get instead of patmat
  • Loading branch information...
1 parent 4f8c306 commit 73d494dd4b4b45277c447b774570c52b4df67869 @viktorklang viktorklang committed Feb 18, 2013
Showing with 46 additions and 43 deletions.
  1. +4 −0 bincompat-forward.whitelist.conf
  2. +42 −43 src/library/scala/concurrent/impl/Promise.scala
@@ -467,6 +467,10 @@ filter {
{
matchName="scala.concurrent.forkjoin.ForkJoinPool.helpJoinOnce"
problemName=IncompatibleResultTypeProblem
+ },
+ {
+ matchName="scala.concurrent.impl.Promise$CompletionLatch"
+ problemName=MissingClassProblem
}
]
}
@@ -8,11 +8,14 @@
package scala.concurrent.impl
-import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException }
+import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException, blocking }
+import scala.concurrent.Future.InternalCallbackExecutor
import scala.concurrent.duration.{ Duration, Deadline, FiniteDuration, NANOSECONDS }
import scala.annotation.tailrec
import scala.util.control.NonFatal
import scala.util.{ Try, Success, Failure }
+import java.io.ObjectInputStream
+import java.util.concurrent.locks.AbstractQueuedSynchronizer
private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] {
def future: this.type = this
@@ -52,70 +55,69 @@ private[concurrent] object Promise {
case e: Error => Failure(new ExecutionException("Boxed Error", e))
case t => Failure(t)
}
+
+ /*
+ * Inspired by: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/locks/AbstractQueuedSynchronizer.java
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/publicdomain/zero/1.0/
+ */
+ private final class CompletionLatch[T] extends AbstractQueuedSynchronizer with (Try[T] => Unit) {
+ override protected def tryAcquireShared(ignored: Int): Int = if (getState != 0) 1 else -1
+ override protected def tryReleaseShared(ignore: Int): Boolean = {
+ setState(1)
+ true
+ }
+ override def apply(ignored: Try[T]): Unit = releaseShared(1)
+ }
+
/** Default promise implementation.
*/
class DefaultPromise[T] extends AbstractPromise with Promise[T] { self =>
updateState(null, Nil) // Start at "No callbacks"
- protected final def tryAwait(atMost: Duration): Boolean = {
- @tailrec
- def awaitUnsafe(deadline: Deadline, nextWait: FiniteDuration): Boolean = {
- if (!isCompleted && nextWait > Duration.Zero) {
- val ms = nextWait.toMillis
- val ns = (nextWait.toNanos % 1000000l).toInt // as per object.wait spec
-
- synchronized { if (!isCompleted) wait(ms, ns) }
-
- awaitUnsafe(deadline, deadline.timeLeft)
- } else
- isCompleted
- }
- @tailrec
- def awaitUnbounded(): Boolean = {
- if (isCompleted) true
- else {
- synchronized { if (!isCompleted) wait() }
- awaitUnbounded()
- }
- }
-
+ protected final def tryAwait(atMost: Duration): Boolean = if (!isCompleted) {
import Duration.Undefined
+ import scala.concurrent.Future.InternalCallbackExecutor
atMost match {
- case u if u eq Undefined => throw new IllegalArgumentException("cannot wait for Undefined period")
- case Duration.Inf => awaitUnbounded
- case Duration.MinusInf => isCompleted
- case f: FiniteDuration => if (f > Duration.Zero) awaitUnsafe(f.fromNow, f) else isCompleted
+ case e if e eq Undefined => throw new IllegalArgumentException("cannot wait for Undefined period")
+ case Duration.Inf =>
+ val l = new CompletionLatch[T]()
+ onComplete(l)(InternalCallbackExecutor)
+ l.acquireSharedInterruptibly(1)
+ case Duration.MinusInf => // Drop out
+ case f: FiniteDuration =>
+ if (f > Duration.Zero) {
+ val l = new CompletionLatch[T]()
+ onComplete(l)(InternalCallbackExecutor)
+ l.tryAcquireSharedNanos(1, f.toNanos)
+ }
}
- }
+
+ isCompleted
+ } else true // Already completed
@throws(classOf[TimeoutException])
@throws(classOf[InterruptedException])
def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
- if (isCompleted || tryAwait(atMost)) this
+ if (tryAwait(atMost)) this
else throw new TimeoutException("Futures timed out after [" + atMost + "]")
@throws(classOf[Exception])
def result(atMost: Duration)(implicit permit: CanAwait): T =
- ready(atMost).value.get match {
- case Failure(e) => throw e
- case Success(r) => r
- }
+ ready(atMost).value.get.get // ready throws TimeoutException if timeout so value.get is safe here
jrudolph
jrudolph Nov 5, 2014 Member

Isn't this a bug? If the future has failed this throws away the stacktrace from the calling thread and only rethrows the (probably rather useless) exception from the execution thread. This is so confusing.

jrudolph
jrudolph Nov 5, 2014 Member

(And I hope you apologize grabbing your attention by commenting on this antique commit, @viktorklang)

jrudolph
jrudolph Nov 5, 2014 Member

Arguably the original bug is in scala.util.Failure.get which just rethrows an exception from somewhere else.

viktorklang
viktorklang Nov 6, 2014 Contributor

Hi @jrudolph, I'm not sure I understand what you think the problem is. Could you show what you get and what you would expect?

jrudolph
jrudolph Nov 6, 2014 Member

I created an issue to track this properly: https://issues.scala-lang.org/browse/SI-8963

viktorklang
viktorklang Nov 6, 2014 Contributor

Thanks, but I still don't understand: What should it be wrapped in?

jrudolph
jrudolph Nov 6, 2014 Member

Just another exception that contains the original one as cause:

override def get: T = throw new NoSuchElementException("Failure.get", exception)
viktorklang
viktorklang Nov 6, 2014 Contributor

This a breaking change (all try-blocks around the invocation may now behave differently) and also it hides the exception that happened.

I don't think this is an improvement, tbh.

jrudolph
jrudolph Nov 6, 2014 Member

I added some comments motivating the change to the ticket.

def value: Option[Try[T]] = getState match {
case c: Try[_] => Some(c.asInstanceOf[Try[T]])
case _ => None
}
- override def isCompleted: Boolean = getState match { // Cheaper than boxing result into Option due to "def value"
- case _: Try[_] => true
- case _ => false
- }
+ override def isCompleted: Boolean = getState.isInstanceOf[Try[_]]
def tryComplete(value: Try[T]): Boolean = {
val resolved = resolveTry(value)
- (try {
- @tailrec
+ @tailrec
def tryComplete(v: Try[T]): List[CallbackRunnable[T]] = {
getState match {
case raw: List[_] =>
@@ -124,10 +126,7 @@ private[concurrent] object Promise {
case _ => null
}
}
- tryComplete(resolved)
- } finally {
- synchronized { notifyAll() } //Notify any evil blockers
- }) match {
+ tryComplete(resolved) match {
case null => false
case rs if rs.isEmpty => true
case rs => rs.foreach(r => r.executeWithValue(resolved)); true

0 comments on commit 73d494d

Please sign in to comment.