Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
implement futureAny on NodeSet
Browse files Browse the repository at this point in the history
  • Loading branch information
freels committed Sep 2, 2011
1 parent 5a1a354 commit 2d74926
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 4 deletions.
44 changes: 40 additions & 4 deletions src/main/scala/com/twitter/gizzard/shards/NodeSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.twitter.gizzard.shards

import scala.annotation.tailrec
import scala.collection.generic.CanBuild
import com.twitter.util.{Try, Return, Throw, Future}
import com.twitter.util.{Try, Return, Throw, Future, Promise}

// For read or write, three node states:
// - normal: should apply normally
Expand Down Expand Up @@ -67,28 +67,64 @@ trait NodeIterable[+T] {
if (activeShards.isEmpty && blockedShards.isEmpty) {
Throw(new ShardBlackHoleException(rootInfo.id))
} else {
_any(iterator)(f)
_any(iterator, f)
}
}

def tryAny[R](f: T => Try[R]): Try[R] = {
this tryAny { (_, t) => f(t) }
}

@tailrec
protected final def _any[T1 >: T, R](
iter: Iterator[(ShardId, T1)],
f: (ShardId, T1) => Try[R]
): Try[R] = {

@tailrec protected final def _any[T1 >: T, R](iter: Iterator[(ShardId, T1)])(f: (ShardId, T1) => Try[R]): Try[R] = {
if (iter.hasNext) {
val (id, s) = iter.next

f(id, s) match {
case rv if rv.isReturn => rv
case _ => _any(iter)(f)
case _ => _any(iter, f)
}
} else {
Throw(new ShardOfflineException(rootInfo.id))
}
}

def futureAny[R](f: (ShardId, T) => Future[R]): Future[R] = {
if (activeShards.isEmpty && blockedShards.isEmpty) {
Future.exception(new ShardBlackHoleException(rootInfo.id))
} else {
val promise = new Promise[R]
_futureAny(iterator, promise, f)
promise
}
}

def futureAny[R](f: T => Future[R]): Future[R] = {
this futureAny { (_, t) => f(t) }
}

protected final def _futureAny[T1 >: T, R](
iter: Iterator[(ShardId, T1)],
promise: Promise[R],
f: (ShardId, T1) => Future[R]
) {

if (iter.hasNext) {
val (id, s) = iter.next

f(id, s) respond {
case Return(r) => promise.setValue(r)
case Throw(e) => _futureAny(iter, promise, f)
}
} else {
promise.setException(new ShardOfflineException(rootInfo.id))
}
}


// XXX: it would be nice to have a way to implement all in terms of fmap. :(
def fmap[R, That](f: (ShardId, T) => Future[R])(implicit bf: CanBuild[Future[R], That]): That = {
Expand Down
11 changes: 11 additions & 0 deletions src/test/scala/com/twitter/gizzard/shards/NodeSetSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ object NodeSetSpec extends Specification {
mixed.read.tryAny { _ => Return(true) } must beLike { case Return(_) => true }
}

"futureAny" in {
(allNormal.read.futureAny { _ => Future(true) } apply()) mustEqual true
(allNormal.read.futureAny { _ => Future[Any](error("oops")) } apply()) must throwA[Exception]
(allNormal.read.futureAny { s => Future(if (s.i == 0) s.i else error("oops")) } apply()) mustEqual 0

(allBlocked.read.futureAny { _ => Future(true) } apply()) must throwA[ShardOfflineException]
(allBlackhole.read.futureAny { _ => Future(true) } apply()) must throwA[ShardBlackHoleException]

(mixed.read.futureAny { _ => Future(true) } apply()) mustEqual true
}

"any" in {
allNormal.read.any { _ => true } mustEqual true
allNormal.read.any { _ => error("oops"); true } must throwA[Exception]
Expand Down

0 comments on commit 2d74926

Please sign in to comment.