Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Polish and withCompletionHandler #17

Merged
merged 20 commits into from
Aug 14, 2019
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 125 additions & 54 deletions src/main/scala/zio/interop/javaconcurrent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package zio.interop

import java.nio.channels.CompletionHandler
import java.util.concurrent.{ CompletableFuture, CompletionException, CompletionStage, Future }

import zio._
Expand All @@ -25,85 +26,155 @@ import scala.concurrent.ExecutionException

object javaconcurrent {

implicit class IOObjJavaconcurrentOps(private val taskObj: Task.type) extends AnyVal {

private def unsafeCompletionStageToIO[A](cs: CompletionStage[A]): Task[A] =
IO.effectAsync { cb =>
val _ = cs.handle[Unit] { (v: A, t: Throwable) =>
val io = t match {
case null =>
IO.succeed(v)
case e: CompletionException =>
IO.fail(e.getCause)
case t: Throwable =>
IO.fail(t)
def withCompletionHandler[T](op: CompletionHandler[T, Any] => Unit): Task[T] =
Task.effectSuspendTotalWith { p =>
Task.effectAsync { k =>
val handler = new CompletionHandler[T, Any] {

def completed(result: T, u: Any): Unit = k(Task.succeed(result))

def failed(t: Throwable, u: Any): Unit = t match {
case e if !p.fatal(e) => k(Task.fail(e))
case _ => k(Task.die(t))
}
cb(io)
}
}

def fromCompletionStage[A, E >: Throwable](csIo: IO[E, CompletionStage[A]]): IO[E, A] =
csIo.flatMap(unsafeCompletionStageToIO)

def fromCompletionStage[A](cs: () => CompletionStage[A]): Task[A] =
IO.effectSuspendTotal {
unsafeCompletionStageToIO(cs())
try {
op(handler)
} catch {
case e if !p.fatal(e) => k(Task.fail(e))
}
}
}

private def unsafeFutureJavaToIO[A](future: Future[A]): Task[A] = {
def unwrap[B](f: Future[B]): Task[B] =
IO.flatten {
IO.effectTotal {
try {
val result = f.get()
IO.succeed(result)
} catch {
case e: ExecutionException =>
IO.fail(e.getCause)
case _: InterruptedException =>
IO.interrupt
case t: Throwable => // CancellationException
IO.fail(t)
private def catchFromGet(isFatal: Throwable => Boolean): PartialFunction[Throwable, Task[Nothing]] = {
case e: CompletionException =>
Task.fail(e.getCause)
case e: ExecutionException =>
Task.fail(e.getCause)
case _: InterruptedException =>
Task.interrupt
case e if !isFatal(e) =>
Task.fail(e)
}

private def unwrapDone[A](isFatal: Throwable => Boolean)(f: Future[A]): Task[A] =
try {
Task.succeed(f.get())
} catch catchFromGet(isFatal)

def fromCompletionStage[A](csUio: UIO[CompletionStage[A]]): Task[A] =
csUio.flatMap { cs =>
Task.effectSuspendTotalWith { p =>
val cf = cs.toCompletableFuture
if (cf.isDone) {
unwrapDone(p.fatal)(cf)
} else {
Task.effectAsync { cb =>
val _ = cs.handle[Unit] { (v: A, t: Throwable) =>
val io = Option(t).fold[Task[A]](Task.succeed(v)) { t =>
catchFromGet(p.fatal).lift(t).getOrElse(Task.die(t))
}
cb(io)
}
}
}
}
}

if (future.isDone) {
unwrap(future)
} else {
blocking(unwrap(future)).provide(Blocking.Live)
/** WARNING: this uses the blocking Future#get, consider using `fromCompletionStage` */
def fromFutureJava[A](futureUio: UIO[Future[A]]): Task[A] =
futureUio.flatMap { future =>
Task.effectSuspendTotalWith { p =>
if (future.isDone) {
unwrapDone(p.fatal)(future)
} else {
blocking(Task.effectSuspend(unwrapDone(p.fatal)(future))).provide(Blocking.Live)
neko-kai marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

def fromFutureJava[A, E >: Throwable](futureIo: IO[E, Future[A]]): IO[E, A] =
futureIo.flatMap(unsafeFutureJavaToIO)
implicit class CompletionStageJavaconcurrentOps[A](private val csUio: UIO[CompletionStage[A]]) extends AnyVal {
def toZio: Task[A] = Task.fromCompletionStage(csUio)
}

implicit class FutureJavaconcurrentOps[A](private val futureUio: UIO[Future[A]]) extends AnyVal {

/** WARNING: this uses the blocking Future#get, consider using `CompletionStage` */
def toZio: Task[A] = Task.fromFutureJava(futureUio)
}

implicit class TaskObjJavaconcurrentOps(private val taskObj: Task.type) extends AnyVal {

def withCompletionHandler[T](op: CompletionHandler[T, Any] => Unit): Task[T] =
javaconcurrent.withCompletionHandler(op)

def fromCompletionStage[A](csUio: UIO[CompletionStage[A]]): Task[A] = javaconcurrent.fromCompletionStage(csUio)

/** WARNING: this uses the blocking Future#get, consider using `fromCompletionStage` */
def fromFutureJava[A](futureUio: UIO[Future[A]]): Task[A] = javaconcurrent.fromFutureJava(futureUio)

}

implicit class ZioObjJavaconcurrentOps(private val zioObj: ZIO.type) extends AnyVal {

def withCompletionHandler[T](op: CompletionHandler[T, Any] => Unit): Task[T] =
javaconcurrent.withCompletionHandler(op)

def fromCompletionStage[A](csUio: UIO[CompletionStage[A]]): Task[A] = javaconcurrent.fromCompletionStage(csUio)

/** WARNING: this uses the blocking Future#get, consider using `fromCompletionStage` */
def fromFutureJava[A](futureUio: UIO[Future[A]]): Task[A] = javaconcurrent.fromFutureJava(futureUio)

def fromFutureJava[A](future: () => Future[A]): Task[A] =
IO.effectSuspendTotal {
unsafeFutureJavaToIO(future())
}
}

implicit class FiberObjOps(private val fiberObj: Fiber.type) extends AnyVal {

def fromFutureJava[A](_ftr: () => Future[A]): Fiber[Throwable, A] = {
def fromCompletionStage[A](thunk: => CompletionStage[A]): Fiber[Throwable, A] = {

lazy val cs = thunk

new Fiber[Throwable, A] {

override def await: UIO[Exit[Throwable, A]] = Task.fromCompletionStage(UIO.effectTotal(cs)).run

override def poll: UIO[Option[Exit[Throwable, A]]] =
UIO.effectSuspendTotal {
val cf = cs.toCompletableFuture
if (cf.isDone) {
Task
.effectSuspendWith(p => unwrapDone(p.fatal)(cf))
.fold(Exit.fail, Exit.succeed)
.map(Some(_))
} else {
UIO.succeed(None)
}
}

override def interrupt: UIO[Exit[Throwable, A]] = join.fold(Exit.fail, Exit.succeed)

override def inheritFiberRefs: UIO[Unit] = UIO.unit
}
}

def fromFutureJava[A](thunk: => Future[A]): Fiber[Throwable, A] = {

lazy val ftr = _ftr()
lazy val ftr = thunk

new Fiber[Throwable, A] {

def await: UIO[Exit[Throwable, A]] =
Task.fromFutureJava(() => ftr).fold(Exit.fail, Exit.succeed)
Task.fromFutureJava(UIO.effectTotal(ftr)).run

def poll: UIO[Option[Exit[Throwable, A]]] =
IO.effectSuspendTotal {
UIO.effectSuspendTotal {
if (ftr.isDone) {
IO.effect(ftr.get())
.refineToOrDie[Exception]
Task
.effectSuspendWith(p => unwrapDone(p.fatal)(ftr))
.fold(Exit.fail, Exit.succeed)
.map(Some(_))
} else {
IO.succeed(None)
UIO.succeed(None)
}
}

Expand All @@ -126,13 +197,13 @@ object javaconcurrent {
}
}

implicit class IOThrowableOps[A](private val io: Task[A]) extends AnyVal {
implicit class TaskCompletableFutureOps[A](private val io: Task[A]) extends AnyVal {
def toCompletableFuture: UIO[CompletableFuture[A]] =
io.fold(CompletableFuture_.failedFuture, CompletableFuture.completedFuture[A])
}

implicit class IOOps[E, A](private val io: IO[E, A]) extends AnyVal {
def toCompletableFutureE(f: E => Throwable): UIO[CompletableFuture[A]] =
implicit class IOCompletableFutureOps[E, A](private val io: IO[E, A]) extends AnyVal {
def toCompletableFutureWith(f: E => Throwable): UIO[CompletableFuture[A]] =
io.mapError(f).toCompletableFuture
}

Expand Down
Loading