Skip to content

Commit

Permalink
Refactor ZStream to use ZManaged as the carrier for the fold result
Browse files Browse the repository at this point in the history
  • Loading branch information
iravid committed May 30, 2019
1 parent 6ef36b1 commit 44ae7f2
Show file tree
Hide file tree
Showing 15 changed files with 880 additions and 588 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ lazy val coreJS = core.js
lazy val streams = crossProject(JSPlatform, JVMPlatform)
.in(file("streams"))
.settings(stdSettings("zio-streams"))
.settings(replSettings)
.settings(buildInfoSettings)
.settings(replSettings)
.enablePlugins(BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile")

Expand Down
37 changes: 37 additions & 0 deletions core/jvm/src/test/scala/scalaz/zio/ZManagedSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class ZManagedSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends Test
Uses at most n fibers for reservation $foreachParN_ReservePar
Uses at most n fibers for acquisition $foreachParN_AcquirePar
Runs finalizers $foreachParN_Finalizers
ZManaged.fork
Runs finalizers properly $forkFinalizer
Acquires interruptibly $forkAcquisitionIsInterruptible
ZManaged.mergeAll
Merges elements in the correct order $mergeAllOrder
Runs finalizers $mergeAllFinalizers
Expand Down Expand Up @@ -504,6 +507,40 @@ class ZManagedSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends Test
testReservePar(2, res => ZManaged.foreachParN_(2)(List(1, 2, 3, 4))(_ => res))
}

private def forkFinalizer = unsafeRun {
for {
finalized <- Ref.make(false)
latch <- Promise.make[Nothing, Unit]
_ <- ZManaged
.reserve(Reservation(latch.succeed(()) *> ZIO.never, finalized.set(true)))
.fork
.use_(latch.await)
result <- finalized.get
} yield result must_=== true
}

private def forkAcquisitionIsInterruptible = unsafeRun {
for {
finalized <- Ref.make(false)
acquireLatch <- Promise.make[Nothing, Unit]
useLatch <- Promise.make[Nothing, Unit]
fib <- ZManaged
.reserve(
Reservation(
acquireLatch.succeed(()) *> ZIO.never,
finalized.set(true)
)
)
.fork
.use_(useLatch.succeed(()) *> ZIO.never)
.fork
_ <- acquireLatch.await
_ <- useLatch.await
_ <- fib.interrupt
result <- finalized.get
} yield result must_=== true
}

private def mergeAllOrder = {
def res(int: Int) =
ZManaged.succeed(int)
Expand Down
6 changes: 6 additions & 0 deletions core/shared/src/main/scala/scalaz/zio/ZIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,12 @@ sealed trait ZIO[-R, +E, +A] extends Serializable { self =>
final def toManaged[R1 <: R](release: A => ZIO[R1, Nothing, _]): ZManaged[R1, E, A] =
ZManaged.make[R1, E, A](this)(release)

/**
* Converts this ZIO to [[scalaz.zio.ZManaged]] with no release action.
*/
final def toManaged_ : ZManaged[R, E, A] =
ZManaged.fromEffect[R, E, A](this)

/**
* Runs the specified effect if this effect fails, providing the error to the
* effect if it exists. The provided effect will not be interrupted.
Expand Down
51 changes: 50 additions & 1 deletion core/shared/src/main/scala/scalaz/zio/ZManaged.scala
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,33 @@ final case class ZManaged[-R, +E, +A](reserve: ZIO[R, E, Reservation[R, E, A]])
}
}

/**
* Creates a `ZManaged` value that acquires the original resource in a fiber,
* and provides that fiber. The finalizer for this value will interrupt the fiber
* and run the original finalizer.
*/
final def fork: ZManaged[R, Nothing, Fiber[E, A]] =
ZManaged {
for {
finalizer <- Ref.make[ZIO[R, Nothing, _]](UIO.unit)
// The reservation phase of the new `ZManaged` runs uninterruptibly;
// so to make sure the acquire phase of the original `ZManaged` runs
// interruptibly, we need to create an interruptible hole in the region.
fiber <- ZIO.interruptibleMask { restore =>
restore {
for {
reservation <- self.reserve
_ <- finalizer.set(reservation.release)
} yield reservation
} >>= (_.acquire)
}.fork
reservation = Reservation(
acquire = UIO.succeed(fiber),
release = fiber.interrupt *> finalizer.get.flatMap(identity(_))
)
} yield reservation
}

/**
* Unwraps the optional success of this effect, but can fail with unit value.
*/
Expand All @@ -313,6 +340,16 @@ final case class ZManaged[-R, +E, +A](reserve: ZIO[R, E, Reservation[R, E, A]])
reserve.map(token => token.copy(acquire = token.acquire.map(f0)))
}

/**
* Effectfully maps the resource acquired by this value.
*/
final def mapM[R1 <: R, E1 >: E, B](f: A => ZIO[R1, E1, B]): ZManaged[R1, E1, B] =
ZManaged[R1, E1, B] {
reserve.map { token =>
token.copy(acquire = token.acquire.flatMap(f))
}
}

/**
* Returns an effect whose failure is mapped by the specified `f` function.
*/
Expand Down Expand Up @@ -682,6 +719,13 @@ object ZManaged {
final def fail[E](error: E): ZManaged[Any, E, Nothing] =
halt(Cause.fail(error))

/**
* Creates an effect that only executes the `UIO` value as its
* release action.
*/
final def finalizer(f: UIO[_]): ZManaged[Any, Nothing, Unit] =
ZManaged.reserve(Reservation(ZIO.unit, f))

/**
* Returns an effect that performs the outer effect first, followed by the
* inner effect, yielding the value of the inner effect.
Expand Down Expand Up @@ -883,6 +927,11 @@ object ZManaged {
}
}

/**
* Returns a `ZManaged` that never acquires a resource.
*/
val never: ZManaged[Any, Nothing, Nothing] = ZManaged.fromEffect(ZIO.never)

/**
* Reduces an `Iterable[IO]` to a single `IO`, working sequentially.
*/
Expand Down Expand Up @@ -1007,7 +1056,7 @@ object ZManaged {
* Unwraps a `ZManaged` that is inside a `ZIO`.
*/
final def unwrap[R, E, A](fa: ZIO[R, E, ZManaged[R, E, A]]): ZManaged[R, E, A] =
ZManaged(fa.flatMap(_.reserve))
ZManaged.fromEffect(fa).flatten

/**
* The moral equivalent of `if (p) exp`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.reactivestreams.{ Subscriber, Subscription }
import scalaz.zio.stream.ZStream
import scalaz.zio.stream.ZStream.Fold
import scalaz.zio.{ Promise, Queue, Runtime, UIO, ZIO }
import scalaz.zio.ZManaged

private[reactiveStreams] object QueueSubscriber {

Expand Down Expand Up @@ -34,7 +35,11 @@ private[reactiveStreams] object QueueSubscriber {
completion.await.ensuring(q.size.flatMap(n => if (n <= 0) q.shutdown else UIO.unit)).fork

override def fold[R1 <: Any, E1 >: Throwable, A1 >: A, S]: Fold[R1, E1, A1, S] =
forkQShutdownHook *> subscription.await.map { sub => (s: S, cont: S => Boolean, f: (S, A1) => ZIO[R1, E1, S]) =>
for {
_ <- ZManaged.finalizer(q.shutdown)
_ <- forkQShutdownHook.toManaged_
sub <- subscription.await.toManaged_
} yield { (s: S, cont: S => Boolean, f: (S, A1) => ZIO[R1, E1, S]) =>
def loop(s: S, demand: Long): ZIO[R1, E1, S] =
if (!cont(s)) UIO.succeed(s)
else {
Expand All @@ -49,8 +54,9 @@ private[reactiveStreams] object QueueSubscriber {
} else takeAndLoop
} <> completeWithS
}
loop(s, 0).ensuring(UIO(sub.cancel()).whenM(completion.isDone.map(!_)) *> q.shutdown)
}.onInterrupt(q.shutdown)

loop(s, 0).ensuring(UIO(sub.cancel()).whenM(completion.isDone.map(!_)) *> q.shutdown).toManaged_
}
}

private def subscriber[A](
Expand Down
3 changes: 2 additions & 1 deletion project/ScalazBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ object Scalaz {
|import scalaz._
|import scalaz.zio._
|import scalaz.zio.console._
|import scalaz.zio.duration._
|object replRTS extends DefaultRuntime {}
|import replRTS._
|implicit class RunSyntax[E, A](io: IO[E, A]){ def unsafeRun: A = replRTS.unsafeRun(io) }
|implicit class RunSyntax[R >: replRTS.Environment, E, A](io: ZIO[R, E, A]){ def unsafeRun: A = replRTS.unsafeRun(io) }
""".stripMargin
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@ object ArbitraryStream {
Arbitrary.arbitrary[Iterable[T]].map(StreamPure.fromIterable)

def genSucceededStream[T: ClassTag: Arbitrary]: Gen[Stream[Nothing, T]] =
Arbitrary.arbitrary[List[T]].map { xs =>
ZStream.unfoldM[Any, List[T], Nothing, T](xs) {
case head :: tail => IO.succeed(Some(head -> tail))
case _ => IO.succeed(None)
}
}
Arbitrary.arbitrary[List[T]].map(ZStream.fromIterable)

def genFailingStream[T: ClassTag: Arbitrary]: Gen[Stream[String, T]] =
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class SinkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)
val sinkResult = unsafeRunSync(s.run(ZSink.foldM(z)(ff)))
val foldResult = unsafeRunSync {
s.foldLeft(List[Int]())((acc, el) => el :: acc)
.use(IO.succeed)
.map(_.reverse)
.flatMap(_.foldLeft(z)((acc, el) => acc.flatMap(f(_, el))))
}
Expand Down
18 changes: 11 additions & 7 deletions streams/jvm/src/test/scala/scalaz/zio/stream/StreamChunkSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class StreamChunkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends T
StreamChunk.monadLaw3 $monadLaw3
StreamChunk.tap $tap
StreamChunk.foldLeft $foldLeft
StreamChunk.foldLazy $foldLazy
StreamChunk.fold $fold
StreamChunk.flattenChunks $flattenChunks
"""

Expand All @@ -48,7 +48,9 @@ class StreamChunkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends T

private def slurpM[E, A](s: StreamChunk[E, A]): Exit[E, Seq[A]] =
unsafeRunSync {
s.foldLazyChunks(Chunk.empty: Chunk[A])(_ => true)((acc, el) => IO.succeed(acc ++ el)).map(_.toSeq)
s.foldChunks(Chunk.empty: Chunk[A])(_ => true)((acc, el) => IO.succeed(acc ++ el))
.use(IO.succeed)
.map(_.toSeq)
}

private def map =
Expand Down Expand Up @@ -168,13 +170,15 @@ class StreamChunkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends T

private def foldLeft =
prop { (s: StreamChunk[String, String], zero: Int, f: (Int, String) => Int) =>
unsafeRunSync(s.foldLeft(zero)(f)) must_=== slurp(s).map(_.foldLeft(zero)(f))
unsafeRunSync(s.foldLeft(zero)(f).use(IO.succeed)) must_=== slurp(s).map(_.foldLeft(zero)(f))
}

private def foldLazy =
private def fold =
prop { (s: StreamChunk[Nothing, String], zero: Int, cont: Int => Boolean, f: (Int, String) => Int) =>
val streamResult = unsafeRunSync(s.foldLazy(zero)(cont)((acc, a) => IO.succeed(f(acc, a))))
val listResult = slurp(s).map(l => foldLazyList(l.toList, zero)(cont)(f))
val streamResult = unsafeRunSync(
s.fold[Any, Nothing, String, Int].flatMap(_(zero, cont, (acc, a) => IO.succeed(f(acc, a)))).use(IO.succeed)
)
val listResult = slurp(s).map(l => foldLazyList(l.toList, zero)(cont)(f))
streamResult must_=== listResult
}

Expand All @@ -190,7 +194,7 @@ class StreamChunkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends T
private def flattenChunks =
prop { (s: StreamChunk[String, String]) =>
val result = unsafeRunSync {
s.flattenChunks.foldLeft[String, List[String]](Nil)((acc, a) => a :: acc).map(_.reverse)
s.flattenChunks.foldLeft[String, List[String]](Nil)((acc, a) => a :: acc).use(IO.succeed).map(_.reverse)
}
result must_== slurp(s)
}
Expand Down

0 comments on commit 44ae7f2

Please sign in to comment.