Skip to content

Commit

Permalink
Added a test for retriable tasks, and accumulating errors.
Browse files Browse the repository at this point in the history
  • Loading branch information
Runar committed Jul 24, 2013
1 parent b8f2fd1 commit 88d1325
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 37 deletions.
37 changes: 25 additions & 12 deletions concurrent/src/main/scala/scalaz/concurrent/Task.scala
Expand Up @@ -140,25 +140,38 @@ class Task[+A](val get: Future[Throwable \/ A]) {

def timed(timeout: Duration): Task[A] = timed(timeout.toMillis)

/**
* Retries this task if it fails, once for each element in `delays`,
* each retry delayed by the corresponding duration, accumulating
* errors into a list.
* A retriable failure is one for which the predicate `p` returns `true`.
*/
def retryAccumulating(delays: Seq[Duration], p: (Throwable => Boolean) = _.isInstanceOf[Exception]): Task[(A, List[Throwable])] =
retryInternal(delays, p, true)

/**
* Retries this task if it fails, once for each element in `delays`,
* each retry delayed by the corresponding duration.
* A retriable failure is one for which the predicate `p` returns `true`.
*/
def retry(delays: Seq[Duration],
p: (Throwable => Boolean) = {
case (e: Exception) => true
case _ => false
}): Task[A] = {
def help(ds: Seq[Duration]): Future[Throwable \/ A] = ds match {
case Seq() => get
case Seq(t, ts @_*) => get flatMap {
case -\/(e) if p(e) =>
help(ts) after t
case x => Future.now(x)
def retry(delays: Seq[Duration], p: (Throwable => Boolean) = _.isInstanceOf[Exception]): Task[A] =
retryInternal(delays, p, false).map(_._1)

private def retryInternal(delays: Seq[Duration],
p: (Throwable => Boolean),
accumulateErrors: Boolean): Task[(A, List[Throwable])] = {
def help(ds: Seq[Duration], es: => Stream[Throwable]): Future[Throwable \/ (A, List[Throwable])] = {
def acc = if (accumulateErrors) es.toList else Nil
ds match {
case Seq() => get map (_. map(_ -> acc))
case Seq(t, ts @_*) => get flatMap {
case -\/(e) if p(e) =>
help(ts, e #:: es) after t
case x => Future.now(x.map(_ -> acc))
}
}
}
Task.async { help(delays).runAsync }
Task.async { help(delays, Stream()).runAsync }
}
}

Expand Down
57 changes: 32 additions & 25 deletions tests/src/test/scala/scalaz/concurrent/TaskTest.scala
Expand Up @@ -11,16 +11,16 @@ import java.util.concurrent.atomic._


class TaskTest extends Spec {

val N = 10000
val correct = (0 to N).sum
val LM = Monad[List]; import LM.monadSyntax._;
val LM = Monad[List]; import LM.monadSyntax._;
val LT = Traverse[List]; import LT.traverseSyntax._

// standard worst case scenario for trampolining -
// standard worst case scenario for trampolining -
// huge series of left associated binds
def leftAssociatedBinds(seed: (=> Int) => Task[Int],
cur: (=> Int) => Task[Int]): Task[Int] =
def leftAssociatedBinds(seed: (=> Int) => Task[Int],
cur: (=> Int) => Task[Int]): Task[Int] =
(0 to N).map(cur(_)).foldLeft(seed(0))(Task.taskInstance.lift2(_ + _))

val options = List[(=> Int) => Task[Int]](n => Task.now(n), Task.delay _ , Task.apply _)
Expand All @@ -30,26 +30,26 @@ class TaskTest extends Spec {
combinations.forall { case (seed, cur) => leftAssociatedBinds(seed, cur).run == correct }
}

"traverse-based map == sequential map" ! prop { (xs: List[Int]) =>
xs.map(_ + 1) == xs.traverse(x => Task(x + 1)).run
"traverse-based map == sequential map" ! prop { (xs: List[Int]) =>
xs.map(_ + 1) == xs.traverse(x => Task(x + 1)).run
}

"gather-based map == sequential map" ! prop { (xs: List[Int]) =>
"gather-based map == sequential map" ! prop { (xs: List[Int]) =>
xs.map(_ + 1) == Nondeterminism[Task].gather(xs.map(x => Task(x + 1))).run
}

case object FailWhale extends RuntimeException {
override def fillInStackTrace = this
override def fillInStackTrace = this
}

"catches exceptions" ! check {
Task { Thread.sleep(10); throw FailWhale; 42 }.map(_ + 1).attemptRun ==
-\/(FailWhale)
}

"catches exceptions in parallel execution" ! prop { (x: Int, y: Int) =>
val t1 = Task { Thread.sleep(10); throw FailWhale; 42 }
val t2 = Task { 43 }
"catches exceptions in parallel execution" ! prop { (x: Int, y: Int) =>
val t1 = Task { Thread.sleep(10); throw FailWhale; 42 }
val t2 = Task { 43 }
Nondeterminism[Task].both(t1, t2).attemptRun == -\/(FailWhale)
}

Expand Down Expand Up @@ -85,23 +85,23 @@ class TaskTest extends Spec {
"early terminate once any of the tasks failed" in {
import Thread._
val ex = new RuntimeException("expected")

val t1v = new AtomicInteger(0)
val t3v = new AtomicInteger(0)

val es3 = Executors.newFixedThreadPool(3)

// NB: Task can only be interrupted in between steps (before the `map`)
val t1 = fork { sleep(1000); now(()) }.map { _ => t1v.set(1) }
val t2 = fork { now(throw ex) }
val t3 = fork { sleep(1000); now(()) }.map { _ => t3v.set(3) }

val t = fork(Task.gatherUnordered(Seq(t1,t2,t3), exceptionCancels = true))(es3)

t.attemptRun match {
case -\/(e) => e must_== ex
case -\/(e) => e must_== ex
}

t1v.get must_== 0
t3v.get must_== 0
}
Expand All @@ -123,9 +123,9 @@ class TaskTest extends Spec {
val t = fork(Task.gatherUnordered(Seq(t1,t2,t3), exceptionCancels = true))(es3)

t.attemptRun match {
case -\/(e) => e must_== ex
case -\/(e) => e must_== ex
}

sleep(3000)

t1v.get must_== 0
Expand All @@ -136,21 +136,21 @@ class TaskTest extends Spec {
"correctly exit when timeout is exceeded on runFor" in {

val es = Executors.newFixedThreadPool(1)

val t = fork { Thread.sleep(3000); now(1) }(es)

t.attemptRunFor(100) match {
case -\/(ex:TimeoutException) => //ok
}
}

es.shutdown()
}

"correctly cancels scheduling of all tasks once first task hit timeout" in {
val es = Executors.newFixedThreadPool(1)

@volatile var bool = false

val t = fork { Thread.sleep(1000); now(1) }(es).map(_=> bool = true)

t.attemptRunFor(100) match {
Expand All @@ -164,5 +164,12 @@ class TaskTest extends Spec {
es.shutdown()
}
}

"retries a retriable task n times" ! prop { xs: List[Byte] =>
import scala.concurrent.duration._
var x = 0
Task.delay {x += 1; sys.error("oops")}.retry(xs.map(_ => 0.milliseconds)).attempt.run
x == (xs.length + 1)
}
}

0 comments on commit 88d1325

Please sign in to comment.