Skip to content

Commit

Permalink
resolves #826 Cats Interop for ZManaged
Browse files Browse the repository at this point in the history
  • Loading branch information
mschuwalow committed May 9, 2019
1 parent cce1851 commit 22cd558
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 2 deletions.
81 changes: 81 additions & 0 deletions core/jvm/src/test/scala/scalaz/zio/ZManagedSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ class ZManagedSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends Test
Invokes cleanups in reverse order of acquisition. $traverse
ZManaged.reserve
Interruption is possible when using this form. $interruptible
ZManaged.foldM
Runs onFailure on failure $foldMFailure
Runs onSucess on success $foldMSuccess
Invokes cleanups $foldMCleanup
Invokes cleanups on interrupt 1 $foldMCleanupInterrupt1
Invokes cleanups on interrupt 2 $foldMCleanupInterrupt2
Invokes cleanups on interrupt 3 $foldMCleanupInterrupt3
"""

private def invokesCleanupsInReverse = {
Expand Down Expand Up @@ -90,4 +97,78 @@ class ZManagedSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends Test
unsafeRun(program) must be_===(Right(expected))
}

private def foldMFailure = {
val effects = new mutable.ListBuffer[Int]
def res(x: Int): ZManaged[Any, Unit, Unit] =
ZManaged.make(IO.effectTotal { effects += x; () })(_ => IO.effectTotal { effects += x; () })

val resource = ZManaged.fromEffect(ZIO.fail(())).foldM(_ => res(1), _ => ZManaged.unit)

unsafeRun(resource.use(_ => IO.unit))

effects must be_===(List(1, 1))
}

private def foldMSuccess = {
val effects = new mutable.ListBuffer[Int]
def res(x: Int): ZManaged[Any, Unit, Unit] =
ZManaged.make(IO.effectTotal { effects += x; () })(_ => IO.effectTotal { effects += x; () })

val resource = ZManaged.succeed(()).foldM(_ => ZManaged.unit, _ => res(1))

unsafeRun(resource.use(_ => IO.unit))

effects must be_===(List(1, 1))
}

private def foldMCleanup = {
val effects = new mutable.ListBuffer[Int]
def res(x: Int): ZManaged[Any, Unit, Unit] =
ZManaged.make(IO.effectTotal { effects += x; () })(_ => IO.effectTotal { effects += x; () })

val resource = res(1).flatMap(_ => ZManaged.fail(())).foldM(_ => res(2), _ => res(3))

unsafeRun(resource.use(_ => IO.unit).orElse(ZIO.unit))

effects must be_===(List(1, 2, 2, 1))
}

private def foldMCleanupInterrupt1 = {
val effects = new mutable.ListBuffer[Int]
def res(x: Int): ZManaged[Any, Unit, Unit] =
ZManaged.make(IO.effectTotal { effects += x; () })(_ => IO.effectTotal { effects += x; () })

val resource = res(1).flatMap(_ => ZManaged.fromEffect(ZIO.interrupt)).foldM(_ => res(2), _ => res(3))

unsafeRun(resource.use(_ => IO.unit).orElse(ZIO.unit))

effects must be_===(List(1, 1))
}

private def foldMCleanupInterrupt2 = {
val effects = new mutable.ListBuffer[Int]
def res(x: Int): ZManaged[Any, Unit, Unit] =
ZManaged.make(IO.effectTotal { effects += x; () })(_ => IO.effectTotal { effects += x; () })

val resource = res(1).flatMap(_ => ZManaged.fail(())).foldM(_ => res(2), _ => res(3))

unsafeRun(resource.use(_ => IO.interrupt.unit).orElse(ZIO.unit))

effects must be_===(List(1, 2, 2, 1))
}

private def foldMCleanupInterrupt3 = {
val effects = new mutable.ListBuffer[Int]
def res(x: Int): ZManaged[Any, Unit, Unit] =
ZManaged.make(IO.effectTotal { effects += x; () })(_ => IO.effectTotal { effects += x; () })

val resource = res(1)
.flatMap(_ => ZManaged.fail(()))
.foldM(_ => res(2).flatMap(_ => ZManaged.fromEffect(ZIO.interrupt)), _ => res(3))

unsafeRun(resource.use(_ => IO.unit).orElse(ZIO.unit))

effects must be_===(List(1, 2, 2, 1))
}

}
52 changes: 52 additions & 0 deletions core/shared/src/main/scala/scalaz/zio/ZManaged.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,51 @@ final case class ZManaged[-R, +E, +A](reserve: ZIO[R, E, Reservation[R, E, A]])
}
}

final def foldM[R1 <: R, E2, B](
failure: E => ZManaged[R1, E2, B],
success: A => ZManaged[R1, E2, B]
): ZManaged[R1, E2, B] =
ZManaged[R1, E2, B] {
Ref.make[ZIO[R1, Nothing, Any]](IO.unit).map { finalizers =>
Reservation(
acquire = {
val direct =
ZIO.uninterruptibleMask { restore =>
self.reserve
.flatMap(res => finalizers.update(fs => res.release *> fs).const(res))
.flatMap(res => restore(res.acquire))
}
val onFailure = (e: E) =>
ZIO.uninterruptibleMask { restore =>
failure(e).reserve
.flatMap(res => finalizers.update(fs => res.release *> fs).const(res))
.flatMap(res => restore(res.acquire))
}
val onSuccess = (a: A) =>
ZIO.uninterruptibleMask { restore =>
success(a).reserve
.flatMap(res => finalizers.update(fs => res.release *> fs).const(res))
.flatMap(res => restore(res.acquire))
}
direct.foldM(onFailure, onSuccess)
},
release = ZIO.flatten(finalizers.get)
)
}
}

final def catchAll[R1 <: R, E2, A1 >: A](h: E => ZManaged[R1, E2, A1]) =
self.foldM(h, ZManaged.succeed)

final def orElse[R1 <: R, E2, A1 >: A](that: => ZManaged[R1, E2, A1]): ZManaged[R1, E2, A1] =
self.foldM(_ => that, ZManaged.succeed)

/**
* Operator alias for `orElse`.
*/
final def <>[R1 <: R, E2, A1 >: A](that: => ZManaged[R1, E2, A1]): ZManaged[R1, E2, A1] =
self.orElse(that)

final def *>[R1 <: R, E1 >: E, A1](that: ZManaged[R1, E1, A1]): ZManaged[R1, E1, A1] =
flatMap(_ => that)

Expand Down Expand Up @@ -145,6 +190,7 @@ final case class ZManaged[-R, +E, +A](reserve: ZIO[R, E, Reservation[R, E, A]])
*/
final def zipParRight[R1 <: R, E1 >: E, A1](that: ZManaged[R1, E1, A1]): ZManaged[R1, E1, A1] =
self &> that

}

object ZManaged {
Expand Down Expand Up @@ -193,4 +239,10 @@ object ZManaged {

final def collectAll[R, E, A1, A2](ms: Iterable[ZManaged[R, E, A2]]): ZManaged[R, E, List[A2]] =
foreach(ms)(identity)

final def fail[E](cause: E): ZManaged[Any, E, Nothing] =
ZManaged.fromEffect(ZIO.fail(cause))

final val unit: ZManaged[Any, Nothing, Unit] =
ZManaged.succeed(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package interop

import cats.effect.{ Concurrent, ContextShift, ExitCase }
import cats.{ effect, _ }
import scalaz.zio.{ App, clock => zioClock, ZIO }
import scalaz.zio.{ clock => zioClock }
import scalaz.zio.clock.Clock

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.{ FiniteDuration, NANOSECONDS, TimeUnit }

abstract class CatsPlatform extends CatsInstances {
abstract class CatsPlatform extends CatsInstances with CatsZManagedInstances with CatsZManagedSyntax {
val console = interop.console.cats

trait CatsApp extends App {
Expand All @@ -50,6 +50,7 @@ abstract class CatsPlatform extends CatsInstances {
}

abstract class CatsInstances extends CatsInstances1 {

implicit def zioContextShift[R, E]: ContextShift[ZIO[R, E, ?]] = new ContextShift[ZIO[R, E, ?]] {
override def shift: ZIO[R, E, Unit] =
ZIO.yieldNow
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2017-2019 John A. De Goes and the ZIO Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package scalaz.zio
package interop

import cats.effect.Resource.{ Allocate, Bind, Suspend }
import cats.effect.{ ExitCase, LiftIO, Resource, Sync, IO => CIO }
import cats.{ effect, Bifunctor, Monad, MonadError, Monoid, Semigroup, SemigroupK }

trait CatsZManagedSyntax {

/**
* Convert a cats Resource into a ZManaged.
* Beware that unhandled error during release of the resource will result in the fiber dying.
*/
implicit class ResourceSyntax[A](resource: Resource[CIO, A]) {
def toManaged[R, E](
implicit l: LiftIO[ZIO[R, Throwable, ?]]
): ZManaged[R, Throwable, A] = {
def convert[A1](resource: Resource[CIO, A1]): ZManaged[R, Throwable, A1] =
resource match {
case Allocate(res) =>
ZManaged.unwrap(
l.liftIO(res.bracketCase {
case (a, r) =>
CIO.delay(
ZManaged.reserve(Reservation(ZIO.succeed(a), l.liftIO(r(ExitCase.Completed)).orDie.uninterruptible))
)
} {
case (_, ExitCase.Completed) =>
CIO.unit
case ((_, release), ec) =>
release(ec)
})
)
case Bind(source, fs) =>
convert(source).flatMap(s => convert(fs(s)))
case Suspend(res) =>
ZManaged.unwrap(l.liftIO(res).map(convert))
}
convert(resource)
}
}

implicit class ZManagedSyntax[R, E, A](managed: ZManaged[R, E, A]) {
def toResource[F[_]](implicit r: Runtime[R], S: Sync[F]): Resource[F, A] =
Resource.suspend(S.delay {
r.unsafeRun(managed.reserve) match {
case Reservation(acquire, release) =>
Resource.make(S.delay(r.unsafeRun(acquire)))(_ => S.delay(r.unsafeRun(release.unit)))
}
})

}

}

trait CatsZManagedInstances extends CatsZManagedInstances1 {

implicit def monadErrorZManagedInstances[R, E]: MonadError[ZManaged[R, E, ?], E] =
new CatsZManagedMonadError

implicit def monoidZManagedInstances[R, E, A](implicit ev: Monoid[A]): Monoid[ZManaged[R, E, A]] =
new Monoid[ZManaged[R, E, A]] {
override def empty: ZManaged[R, E, A] = ZManaged.succeed(ev.empty)

override def combine(x: ZManaged[R, E, A], y: ZManaged[R, E, A]): ZManaged[R, E, A] = x.zipWith(y)(ev.combine)
}

implicit def liftIOZManagedInstances[R, A](
implicit ev: LiftIO[ZIO[R, Throwable, ?]]
): LiftIO[ZManaged[R, Throwable, ?]] =
new LiftIO[ZManaged[R, Throwable, ?]] {
override def liftIO[A](ioa: effect.IO[A]): ZManaged[R, Throwable, A] =
ZManaged.fromEffect(ev.liftIO(ioa))
}

}

sealed trait CatsZManagedInstances1 {

implicit def monadZManagedInstances[R, E]: Monad[ZManaged[R, E, ?]] = new CatsZManagedMonad

implicit def semigroupZManagedInstances[R, E, A](implicit ev: Semigroup[A]): Semigroup[ZManaged[R, E, A]] =
(x: ZManaged[R, E, A], y: ZManaged[R, E, A]) => x.zipWith(y)(ev.combine)

implicit def semigroupKZManagedInstances[R, E]: SemigroupK[ZManaged[R, E, ?]] = new CatsZManagedSemigroupK

implicit def bifunctorZManagedInstances[R]: Bifunctor[ZManaged[R, ?, ?]] = new Bifunctor[ZManaged[R, ?, ?]] {
override def bimap[A, B, C, D](fab: ZManaged[R, A, B])(f: A => C, g: B => D): ZManaged[R, C, D] =
fab.mapError(f).map(g)
}

}

private class CatsZManagedMonad[R, E] extends Monad[ZManaged[R, E, ?]] {
override def pure[A](x: A): ZManaged[R, E, A] = ZManaged.succeed(x)

override def flatMap[A, B](fa: ZManaged[R, E, A])(f: A => ZManaged[R, E, B]): ZManaged[R, E, B] = fa.flatMap(f)

override def tailRecM[A, B](a: A)(f: A => ZManaged[R, E, Either[A, B]]): ZManaged[R, E, B] =
ZManaged.succeedLazy(f(a)).flatMap(identity).flatMap {
case Left(nextA) => tailRecM(nextA)(f)
case Right(b) => ZManaged.succeed(b)
}
}

private class CatsZManagedMonadError[R, E] extends CatsZManagedMonad[R, E] with MonadError[ZManaged[R, E, ?], E] {
override def raiseError[A](e: E): ZManaged[R, E, A] = ZManaged.fromEffect(ZIO.fail(e))

override def handleErrorWith[A](fa: ZManaged[R, E, A])(f: E => ZManaged[R, E, A]): ZManaged[R, E, A] =
fa.catchAll(f)
}

/** lossy, throws away errors using the "first success" interpretation of SemigroupK */
private class CatsZManagedSemigroupK[R, E] extends SemigroupK[ZManaged[R, E, ?]] {
override def combineK[A](x: ZManaged[R, E, A], y: ZManaged[R, E, A]): ZManaged[R, E, A] =
x.orElse(y)
}
21 changes: 21 additions & 0 deletions interop-cats/jvm/src/test/scala/scalaz/zio/interop/catzSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ class catzSpec

checkAllAsync("Monad[UIO]", (_) => ExtraMonadTests[UIO].monadExtras[Int])

// ZManaged Tests
checkAllAsync("Monad[ZManaged]", (_) => MonadTests[ZManaged[Any, Throwable, ?]].apply[Int, Int, Int])
checkAllAsync("Monad[ZManaged]", (_) => ExtraMonadTests[ZManaged[Any, Throwable, ?]].monadExtras[Int])
checkAllAsync("SemigroupK[ZManaged]", (_) => SemigroupKTests[ZManaged[Any, Throwable, ?]].semigroupK[Int])
checkAllAsync("MonadError[ZManaged]", (_) => MonadErrorTests[ZManaged[Any, Int, ?], Int].monadError[Int, Int, Int])

object summoningInstancesTest {
import cats._, cats.effect._
Concurrent[TaskR[String, ?]]
Expand All @@ -139,6 +145,12 @@ class catzSpec
Parallel[TaskR[String, ?], ParIO[String, Throwable, ?]]
SemigroupK[TaskR[String, ?]]
Apply[UIO]
LiftIO[ZManaged[String, Throwable, ?]]
MonadError[ZManaged[String, Throwable, ?], Throwable]
Monad[ZManaged[String, Throwable, ?]]
Applicative[ZManaged[String, Throwable, ?]]
Functor[ZManaged[String, Throwable, ?]]
SemigroupK[ZManaged[String, Throwable, ?]]

def concurrentEffect[R: Runtime] = ConcurrentEffect[TaskR[R, ?]]
def effect[R: Runtime] = Effect[TaskR[R, ?]]
Expand Down Expand Up @@ -179,6 +191,12 @@ class catzSpec
rts.unsafeRun(Par.unwrap(io1).either) === rts.unsafeRun(Par.unwrap(io2).either)
}

implicit def catsZManagedEQ[E: Eq, A: Eq]: Eq[ZManaged[Any, E, A]] =
new Eq[ZManaged[Any, E, A]] {
def eqv(io1: ZManaged[Any, E, A], io2: ZManaged[Any, E, A]): Boolean =
rts.unsafeRun(io1.reserve.flatMap(_.acquire).either) === rts.unsafeRun(io2.reserve.flatMap(_.acquire).either)
}

implicit def params: Parameters =
Parameters.default.copy(allowNonTerminationLaws = false)

Expand All @@ -190,4 +208,7 @@ class catzSpec

implicit def ioParArbitrary[E, A: Arbitrary: Cogen, R <: Any]: Arbitrary[ParIO[R, E, A]] =
Arbitrary(genSuccess[E, A].map(Par.apply))

implicit def zManagedArbitrary[E, A: Arbitrary: Cogen, R <: Any]: Arbitrary[ZManaged[R, E, A]] =
Arbitrary(genSuccess[E, A].map(ZManaged.fromEffect(_)))
}

0 comments on commit 22cd558

Please sign in to comment.