Skip to content

Commit

Permalink
Handle interrupts
Browse files Browse the repository at this point in the history
  • Loading branch information
mijicd committed May 8, 2019
1 parent a7ee13a commit dbc12ee
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package scalaz.zio.interop

import com.twitter.util.{ Future, Return, Throw }
import scalaz.zio.Task
import com.twitter.util.{ Future, FutureCancelledException, Return, Throw }
import scalaz.zio.{ Task, UIO }

package object twitter {
implicit class TaskObjOps(private val obj: Task.type) extends AnyVal {
final def fromTwitterFuture[A](future: => Future[A]): Task[A] =
Task.effectAsync { cb =>
Task.effectAsyncInterrupt { cb =>
future.respond {
case Return(a) => cb(Task.succeed(a))
case Throw(e) => cb(Task.fail(e))
}

()
Left(UIO(future.raise(new FutureCancelledException)))
}
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package scalaz.zio.interop

import com.twitter.util.Future
import java.util.concurrent.atomic.AtomicInteger

import com.twitter.util.{ Duration => TwitterDuration, Future, JavaTimer }
import org.specs2.concurrent.ExecutionEnv
import scalaz.zio.{ FiberFailure, Task, TestRuntime }
import scalaz.zio.Exit.Cause.Fail
import scalaz.zio.duration.Duration
import scalaz.zio.interop.twitter._

import scala.concurrent.duration._

class TwitterSpec(implicit ee: ExecutionEnv) extends TestRuntime {
def is =
"Twitter spec".title ^ s2"""
`Task.fromTwitterFuture` must return
a failing `Task` if the future failed. $propagateFailures
a successful `Task` that produces the value from future. $propagateResults
`Task.fromTwitterFuture` must
return failing `Task` if future failed. $propagateFailures
return successful `Task` if future succeeded. $propagateResults
ensure future is interrupted together with task. $propagateInterrupts
"""

private def propagateFailures = {
Expand All @@ -29,4 +35,22 @@ class TwitterSpec(implicit ee: ExecutionEnv) extends TestRuntime {

unsafeRun(task) ==== value
}

private def propagateInterrupts = {
implicit val timer = new JavaTimer(true)

val futureTimeout = TwitterDuration.fromSeconds(3)
val taskTimeout = Duration.fromScala(1.second)
val value = new AtomicInteger(0)

lazy val future = Future.sleep(futureTimeout).map(_ => value.incrementAndGet())

val task = Task.fromTwitterFuture(future).timeout(taskTimeout)

unsafeRun(task) must beNone

SECONDS.sleep(5)

value.get() ==== 0
}
}

0 comments on commit dbc12ee

Please sign in to comment.