Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fix futureAny to propagate and handle Future cancellation

  • Loading branch information...
commit f3c9ad8835268a79907d889e1f681c85ad82058e 1 parent 78e78be
Raghavendra Prabhu authored
View
23 src/main/scala/com/twitter/gizzard/shards/NodeSet.scala
@@ -1,5 +1,6 @@
package com.twitter.gizzard.shards
+import java.util.concurrent.CancellationException
import scala.annotation.tailrec
import scala.collection.generic.CanBuild
import com.twitter.util.{Try, Return, Throw, Future, Promise}
@@ -97,9 +98,7 @@ trait NodeIterable[+T] {
if (activeShards.isEmpty && blockedShards.isEmpty) {
Future.exception(new ShardBlackHoleException(rootInfo.id))
} else {
- val promise = new Promise[R]
- _futureAny(iterator, promise, f)
- promise
+ _futureAny(iterator, f)
}
}
@@ -108,20 +107,18 @@ trait NodeIterable[+T] {
}
protected final def _futureAny[T1 >: T, R](
- iter: Iterator[(ShardId, T1)],
- promise: Promise[R],
- f: (ShardId, T1) => Future[R]
- ) {
-
+ iter: Iterator[(ShardId, T1)],
+ f: (ShardId, T1) => Future[R]): Future[R] = {
if (iter.hasNext) {
val (id, s) = iter.next
-
- f(id, s) respond {
- case Return(r) => promise.setValue(r)
- case Throw(e) => _futureAny(iter, promise, f)
+ f(id, s) rescue {
+ // No point in continuing to iterate if the exception was due to
+ // Future cancellation.
+ case e: CancellationException => Future.exception(e)
+ case _ => _futureAny(iter, f)
}
} else {
- promise.setException(new ShardOfflineException(rootInfo.id))
+ Future.exception(new ShardOfflineException(rootInfo.id))
}
}
View
10 src/test/scala/com/twitter/gizzard/shards/NodeSetSpec.scala
@@ -1,8 +1,9 @@
package com.twitter.gizzard.shards
+import java.util.concurrent.CancellationException
import com.twitter.conversions.time._
import org.specs.Specification
-import com.twitter.util.{Try, Return, Throw, Future}
+import com.twitter.util.{Try, Return, Throw, Future, Promise}
object NodeSetSpec extends Specification {
@@ -87,11 +88,18 @@ object NodeSetSpec extends Specification {
(allNormal.read.futureAny { _ => Future(true) } apply()) mustEqual true
(allNormal.read.futureAny { _ => Future[Any](error("oops")) } apply()) must throwA[Exception]
(allNormal.read.futureAny { s => Future(if (s.i == 0) s.i else error("oops")) } apply()) mustEqual 0
+ // (allNormal.read.futureAny { s => Future(if (s.i == 0) s.i else throw new CancellationException) } apply()) must throwA[Exception]
(allBlocked.read.futureAny { _ => Future(true) } apply()) must throwA[ShardOfflineException]
(allBlackhole.read.futureAny { _ => Future(true) } apply()) must throwA[ShardBlackHoleException]
(mixed.read.futureAny { _ => Future(true) } apply()) mustEqual true
+
+ val p = new Promise[Unit]
+ p onCancellation { p.update(Throw(new CancellationException)) }
+ val f = allNormal.read.futureAny { s => p }
+ f cancel()
+ (f apply()) must throwA[Exception]
}
"any" in {
Please sign in to comment.
Something went wrong with that request. Please try again.