Skip to content
Permalink
Browse files

Porting akka future tests.

Fixed a bug in Future.zip.
  • Loading branch information
axel22 committed Apr 27, 2012
1 parent 8fc543b commit 9c4baa93d906b161f501ae04f1552e1b7d448436
@@ -384,18 +384,18 @@ trait Future[+T] extends Awaitable[T] {
*/
def zip[U](that: Future[U]): Future[(T, U)] = {
val p = newPromise[(T, U)]

this onComplete {
case Left(t) => p failure t
case Right(r) => that onSuccess {
case r2 => p success ((r, r2))
}
}

that onFailure {
case f => p failure f
case Right(r) =>
that onSuccess {
case r2 => p success ((r, r2))
}
that onFailure {
case f => p failure f
}
}

p.future
}

@@ -0,0 +1,364 @@



import scala.concurrent._
import scala.concurrent.util.duration._
import scala.concurrent.util.Duration.Inf



object FutureTests extends MinimalScalaTest {

/* some utils */

def testAsync(s: String): Future[String] = s match {
case "Hello" => future { "World" }
case "Failure" => Promise.failed(new RuntimeException("Expected exception; to test fault-tolerance")).future
case "NoReply" => Promise[String]().future
}

val defaultTimeout = Inf

/* future specification */

"A future" should {

"compose with for-comprehensions" in {
def async(x: Int) = future { (x * 2).toString }
val future0 = future[Any] {
"five!".length
}

val future1 = for {
a <- future0.mapTo[Int] // returns 5
b <- async(a) // returns "10"
c <- async(7) // returns "14"
} yield b + "-" + c

val future2 = for {
a <- future0.mapTo[Int]
b <- (future { (a * 2).toString }).mapTo[Int]
c <- future { (7 * 2).toString }
} yield b + "-" + c

Await.result(future1, defaultTimeout) mustBe ("10-14")
assert(checkType(future1, manifest[String]))
intercept[ClassCastException] { Await.result(future2, defaultTimeout) }
}

"support pattern matching within a for-comprehension" in {
case class Req[T](req: T)
case class Res[T](res: T)
def async[T](req: Req[T]) = req match {
case Req(s: String) => future { Res(s.length) }
case Req(i: Int) => future { Res((i * 2).toString) }
}

val future1 = for {
Res(a: Int) <- async(Req("Hello"))
Res(b: String) <- async(Req(a))
Res(c: String) <- async(Req(7))
} yield b + "-" + c

val future2 = for {
Res(a: Int) <- async(Req("Hello"))
Res(b: Int) <- async(Req(a))
Res(c: Int) <- async(Req(7))
} yield b + "-" + c

Await.result(future1, defaultTimeout) mustBe ("10-14")
intercept[NoSuchElementException] { Await.result(future2, defaultTimeout) }
}

"recover from exceptions" in {
val future1 = Future(5)
val future2 = future1 map (_ / 0)
val future3 = future2 map (_.toString)

val future4 = future1 recover {
case e: ArithmeticException => 0
} map (_.toString)

val future5 = future2 recover {
case e: ArithmeticException => 0
} map (_.toString)

val future6 = future2 recover {
case e: MatchError => 0
} map (_.toString)

val future7 = future3 recover {
case e: ArithmeticException => "You got ERROR"
}

val future8 = testAsync("Failure")
val future9 = testAsync("Failure") recover {
case e: RuntimeException => "FAIL!"
}
val future10 = testAsync("Hello") recover {
case e: RuntimeException => "FAIL!"
}
val future11 = testAsync("Failure") recover {
case _ => "Oops!"
}

Await.result(future1, defaultTimeout) mustBe (5)
intercept[ArithmeticException] { Await.result(future2, defaultTimeout) }
intercept[ArithmeticException] { Await.result(future3, defaultTimeout) }
Await.result(future4, defaultTimeout) mustBe ("5")
Await.result(future5, defaultTimeout) mustBe ("0")
intercept[ArithmeticException] { Await.result(future6, defaultTimeout) }
Await.result(future7, defaultTimeout) mustBe ("You got ERROR")
intercept[RuntimeException] { Await.result(future8, defaultTimeout) }
Await.result(future9, defaultTimeout) mustBe ("FAIL!")
Await.result(future10, defaultTimeout) mustBe ("World")
Await.result(future11, defaultTimeout) mustBe ("Oops!")
}

"recoverWith from exceptions" in {
val o = new IllegalStateException("original")
val r = new IllegalStateException("recovered")

intercept[IllegalStateException] {
val failed = Promise.failed[String](o).future recoverWith {
case _ if false == true => Promise.successful("yay!").future
}
Await.result(failed, defaultTimeout)
} mustBe (o)

val recovered = Promise.failed[String](o).future recoverWith {
case _ => Promise.successful("yay!").future
}
Await.result(recovered, defaultTimeout) mustBe ("yay!")

intercept[IllegalStateException] {
val refailed = Promise.failed[String](o).future recoverWith {
case _ => Promise.failed[String](r).future
}
Await.result(refailed, defaultTimeout)
} mustBe (r)
}

"andThen like a boss" in {
val q = new java.util.concurrent.LinkedBlockingQueue[Int]
for (i <- 1 to 1000) {
val chained = future {
q.add(1); 3
} andThen {
case _ => q.add(2)
} andThen {
case Right(0) => q.add(Int.MaxValue)
} andThen {
case _ => q.add(3);
}
Await.result(chained, defaultTimeout) mustBe (3)
q.poll() mustBe (1)
q.poll() mustBe (2)
q.poll() mustBe (3)
q.clear()
}
}

"firstCompletedOf" in {
val futures = Vector.fill[Future[Int]](10) {
Promise[Int]().future
} :+ Promise.successful[Int](5).future
Await.result(Future.firstCompletedOf(futures), defaultTimeout) mustBe (5)
}

"find" in {
val futures = for (i <- 1 to 10) yield future {
i
}

val result = Future.find[Int](futures)(_ == 3)
Await.result(result, defaultTimeout) mustBe (Some(3))

val notFound = Future.find[Int](futures)(_ == 11)
Await.result(notFound, defaultTimeout) mustBe (None)
}

"zip" in {
val timeout = 10000 millis
val f = new IllegalStateException("test")
intercept[IllegalStateException] {
val failed = Promise.failed[String](f).future zip Promise.successful("foo").future
Await.result(failed, timeout)
} mustBe (f)

intercept[IllegalStateException] {
val failed = Promise.successful("foo").future zip Promise.failed[String](f).future
Await.result(failed, timeout)
} mustBe (f)

intercept[IllegalStateException] {
val failed = Promise.failed[String](f).future zip Promise.failed[String](f).future
Await.result(failed, timeout)
} mustBe (f)

val successful = Promise.successful("foo").future zip Promise.successful("foo").future
Await.result(successful, timeout) mustBe (("foo", "foo"))
}

"fold" in {
val timeout = 10000 millis
def async(add: Int, wait: Int) = future {
Thread.sleep(wait)
add
}
def futures = (0 to 9) map {
idx => async(idx, idx * 200)
}
def folded = Future.fold(futures)(0)(_ + _)
Await.result(folded, timeout) mustBe (45)
}

"fold by composing" in {
val timeout = 10000 millis
def async(add: Int, wait: Int) = future {
Thread.sleep(wait)
add
}
def futures = (0 to 9) map {
idx => async(idx, idx * 200)
}
val folded = futures.foldLeft(Future(0)) {
case (fr, fa) => for (r <- fr; a <- fa) yield (r + a)
}
Await.result(folded, timeout) mustBe (45)
}

"fold with an exception" in {
val timeout = 10000 millis
def async(add: Int, wait: Int) = future {
Thread.sleep(wait)
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
add
}
def futures = (0 to 9) map {
idx => async(idx, idx * 100)
}
val folded = Future.fold(futures)(0)(_ + _)
intercept[IllegalArgumentException] {
Await.result(folded, timeout)
}.getMessage mustBe ("shouldFoldResultsWithException: expected")
}

"fold mutable zeroes safely" in {
import scala.collection.mutable.ArrayBuffer
def test(testNumber: Int) {
val fs = (0 to 1000) map (i => Future(i))
val f = Future.fold(fs)(ArrayBuffer.empty[AnyRef]) {
case (l, i) if i % 2 == 0 => l += i.asInstanceOf[AnyRef]
case (l, _) => l
}
val result = Await.result(f.mapTo[ArrayBuffer[Int]], 10000 millis).sum

assert(result == 250500)
}

(1 to 100) foreach test //Make sure it tries to provoke the problem
}

"return zero value if folding empty list" in {
val zero = Future.fold(List[Future[Int]]())(0)(_ + _)
Await.result(zero, defaultTimeout) mustBe (0)
}

"shouldReduceResults" in {
def async(idx: Int) = future {
Thread.sleep(idx * 200)
idx
}
val timeout = 10000 millis
val futures = (0 to 9) map { async }
val reduced = Future.reduce(futures)(_ + _)
Await.result(reduced, timeout) mustBe (45)
}

"shouldReduceResultsWithException" in {
def async(add: Int, wait: Int) = future {
Thread.sleep(wait)
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
else add
}
val timeout = 10000 millis
def futures = (1 to 10) map {
idx => async(idx, idx * 100)
}
val failed = Future.reduce(futures)(_ + _)
intercept[IllegalArgumentException] {
Await.result(failed, timeout)
}.getMessage mustBe ("shouldFoldResultsWithException: expected")
}

"shouldReduceThrowNSEEOnEmptyInput" in {
intercept[java.util.NoSuchElementException] {
val emptyreduced = Future.reduce(List[Future[Int]]())(_ + _)
Await.result(emptyreduced, defaultTimeout)
}
}

"shouldTraverseFutures" in {
object counter {
var count = -1
def incAndGet() = counter.synchronized {
count += 2
count
}
}

val oddFutures = List.fill(100)(future { counter.incAndGet() })
val traversed = Future.sequence(oddFutures)
Await.result(traversed, defaultTimeout).sum mustBe (10000)

val list = (1 to 100).toList
val traversedList = Future.traverse(list)(x => Future(x * 2 - 1))
Await.result(traversedList, defaultTimeout).sum mustBe (10000)
}

/* need configurable execution contexts here
"shouldHandleThrowables" in {
class ThrowableTest(m: String) extends Throwable(m)
val f1 = future[Any] {
throw new ThrowableTest("test")
}
intercept[ThrowableTest] {
Await.result(f1, defaultTimeout)
}
val latch = new TestLatch
val f2 = future {
Await.ready(latch, 5 seconds)
"success"
}
val f3 = f2 map { s => s.toUpperCase }
f2 foreach { _ => throw new ThrowableTest("dispatcher foreach") }
f2 onSuccess { case _ => throw new ThrowableTest("dispatcher receive") }
latch.open()
Await.result(f2, defaultTimeout) mustBe ("success")
f2 foreach { _ => throw new ThrowableTest("current thread foreach") }
f2 onSuccess { case _ => throw new ThrowableTest("current thread receive") }
Await.result(f3, defaultTimeout) mustBe ("SUCCESS")
}
*/
}

}










0 comments on commit 9c4baa9

Please sign in to comment.
You can’t perform that action at this time.