Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ZStream to use ZManaged #906

Merged
merged 1 commit into from
May 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ lazy val coreJS = core.js
lazy val streams = crossProject(JSPlatform, JVMPlatform)
.in(file("streams"))
.settings(stdSettings("zio-streams"))
.settings(replSettings)
.settings(buildInfoSettings)
.settings(replSettings)
.enablePlugins(BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile")

Expand Down
37 changes: 37 additions & 0 deletions core/jvm/src/test/scala/scalaz/zio/ZManagedSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class ZManagedSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends Test
Uses at most n fibers for reservation $foreachParN_ReservePar
Uses at most n fibers for acquisition $foreachParN_AcquirePar
Runs finalizers $foreachParN_Finalizers
ZManaged.fork
Runs finalizers properly $forkFinalizer
Acquires interruptibly $forkAcquisitionIsInterruptible
ZManaged.mergeAll
Merges elements in the correct order $mergeAllOrder
Runs finalizers $mergeAllFinalizers
Expand Down Expand Up @@ -504,6 +507,40 @@ class ZManagedSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends Test
testReservePar(2, res => ZManaged.foreachParN_(2)(List(1, 2, 3, 4))(_ => res))
}

private def forkFinalizer = unsafeRun {
for {
finalized <- Ref.make(false)
latch <- Promise.make[Nothing, Unit]
_ <- ZManaged
.reserve(Reservation(latch.succeed(()) *> ZIO.never, finalized.set(true)))
.fork
.use_(latch.await)
result <- finalized.get
} yield result must_=== true
}

private def forkAcquisitionIsInterruptible = unsafeRun {
for {
finalized <- Ref.make(false)
acquireLatch <- Promise.make[Nothing, Unit]
useLatch <- Promise.make[Nothing, Unit]
fib <- ZManaged
.reserve(
Reservation(
acquireLatch.succeed(()) *> ZIO.never,
finalized.set(true)
)
)
.fork
.use_(useLatch.succeed(()) *> ZIO.never)
.fork
_ <- acquireLatch.await
_ <- useLatch.await
_ <- fib.interrupt
result <- finalized.get
} yield result must_=== true
}

private def mergeAllOrder = {
def res(int: Int) =
ZManaged.succeed(int)
Expand Down
6 changes: 6 additions & 0 deletions core/shared/src/main/scala/scalaz/zio/ZIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,12 @@ sealed trait ZIO[-R, +E, +A] extends Serializable { self =>
final def toManaged[R1 <: R](release: A => ZIO[R1, Nothing, _]): ZManaged[R1, E, A] =
ZManaged.make[R1, E, A](this)(release)

/**
* Converts this ZIO to [[scalaz.zio.ZManaged]] with no release action.
*/
final def toManaged_ : ZManaged[R, E, A] =
ZManaged.fromEffect[R, E, A](this)

/**
* Runs the specified effect if this effect fails, providing the error to the
* effect if it exists. The provided effect will not be interrupted.
Expand Down
51 changes: 50 additions & 1 deletion core/shared/src/main/scala/scalaz/zio/ZManaged.scala
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,33 @@ final case class ZManaged[-R, +E, +A](reserve: ZIO[R, E, Reservation[R, E, A]])
}
}

/**
* Creates a `ZManaged` value that acquires the original resource in a fiber,
* and provides that fiber. The finalizer for this value will interrupt the fiber
* and run the original finalizer.
*/
final def fork: ZManaged[R, Nothing, Fiber[E, A]] =
ZManaged {
for {
finalizer <- Ref.make[ZIO[R, Nothing, _]](UIO.unit)
// The reservation phase of the new `ZManaged` runs uninterruptibly;
// so to make sure the acquire phase of the original `ZManaged` runs
// interruptibly, we need to create an interruptible hole in the region.
fiber <- ZIO.interruptibleMask { restore =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdegoes would love another look at this operator for validation. I used interruptibleMask to allow the caller to run the reserve step interruptibly rather than .interruptible

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is clever and looks correct to me.

One thing to keep in mind is that this fiber will inherit the interruptibility settings of its parent, at the moment when it is forked.

Since this is done in reserve, I believe we can therefore simplify the code to just use interruptible rather than interruptibleMask, because we know what the parent interruptibility status will be (right?).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we can assume that 99% of the cases would use an uninterruptible reserve, but, that's not always true, as reserve can also be run manually for advanced manipulations. Which is why I thought interruptibleMask is more compositional.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. Works for me. 👍

restore {
for {
reservation <- self.reserve
_ <- finalizer.set(reservation.release)
} yield reservation
} >>= (_.acquire)
}.fork
reservation = Reservation(
acquire = UIO.succeed(fiber),
release = fiber.interrupt *> finalizer.get.flatMap(identity(_))
)
} yield reservation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! 👍

}

/**
* Unwraps the optional success of this effect, but can fail with unit value.
*/
Expand All @@ -313,6 +340,16 @@ final case class ZManaged[-R, +E, +A](reserve: ZIO[R, E, Reservation[R, E, A]])
reserve.map(token => token.copy(acquire = token.acquire.map(f0)))
}

/**
* Effectfully maps the resource acquired by this value.
*/
final def mapM[R1 <: R, E1 >: E, B](f: A => ZIO[R1, E1, B]): ZManaged[R1, E1, B] =
ZManaged[R1, E1, B] {
reserve.map { token =>
token.copy(acquire = token.acquire.flatMap(f))
}
}

/**
* Returns an effect whose failure is mapped by the specified `f` function.
*/
Expand Down Expand Up @@ -682,6 +719,13 @@ object ZManaged {
final def fail[E](error: E): ZManaged[Any, E, Nothing] =
halt(Cause.fail(error))

/**
* Creates an effect that only executes the `UIO` value as its
* release action.
*/
final def finalizer(f: UIO[_]): ZManaged[Any, Nothing, Unit] =
ZManaged.reserve(Reservation(ZIO.unit, f))

/**
* Returns an effect that performs the outer effect first, followed by the
* inner effect, yielding the value of the inner effect.
Expand Down Expand Up @@ -883,6 +927,11 @@ object ZManaged {
}
}

/**
* Returns a `ZManaged` that never acquires a resource.
*/
val never: ZManaged[Any, Nothing, Nothing] = ZManaged.fromEffect(ZIO.never)

/**
* Reduces an `Iterable[IO]` to a single `IO`, working sequentially.
*/
Expand Down Expand Up @@ -1007,7 +1056,7 @@ object ZManaged {
* Unwraps a `ZManaged` that is inside a `ZIO`.
*/
final def unwrap[R, E, A](fa: ZIO[R, E, ZManaged[R, E, A]]): ZManaged[R, E, A] =
ZManaged(fa.flatMap(_.reserve))
ZManaged.fromEffect(fa).flatten

/**
* The moral equivalent of `if (p) exp`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.reactivestreams.{ Subscriber, Subscription }
import scalaz.zio.stream.ZStream
import scalaz.zio.stream.ZStream.Fold
import scalaz.zio.{ Promise, Queue, Runtime, UIO, ZIO }
import scalaz.zio.ZManaged

private[reactiveStreams] object QueueSubscriber {

Expand Down Expand Up @@ -34,7 +35,11 @@ private[reactiveStreams] object QueueSubscriber {
completion.await.ensuring(q.size.flatMap(n => if (n <= 0) q.shutdown else UIO.unit)).fork

override def fold[R1 <: Any, E1 >: Throwable, A1 >: A, S]: Fold[R1, E1, A1, S] =
forkQShutdownHook *> subscription.await.map { sub => (s: S, cont: S => Boolean, f: (S, A1) => ZIO[R1, E1, S]) =>
for {
_ <- ZManaged.finalizer(q.shutdown)
_ <- forkQShutdownHook.toManaged_
sub <- subscription.await.toManaged_
} yield { (s: S, cont: S => Boolean, f: (S, A1) => ZIO[R1, E1, S]) =>
def loop(s: S, demand: Long): ZIO[R1, E1, S] =
if (!cont(s)) UIO.succeed(s)
else {
Expand All @@ -49,8 +54,9 @@ private[reactiveStreams] object QueueSubscriber {
} else takeAndLoop
} <> completeWithS
}
loop(s, 0).ensuring(UIO(sub.cancel()).whenM(completion.isDone.map(!_)) *> q.shutdown)
}.onInterrupt(q.shutdown)

loop(s, 0).ensuring(UIO(sub.cancel()).whenM(completion.isDone.map(!_)) *> q.shutdown).toManaged_
}
}

private def subscriber[A](
Expand Down
3 changes: 2 additions & 1 deletion project/ScalazBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ object Scalaz {
|import scalaz._
|import scalaz.zio._
|import scalaz.zio.console._
|import scalaz.zio.duration._
|object replRTS extends DefaultRuntime {}
|import replRTS._
|implicit class RunSyntax[E, A](io: IO[E, A]){ def unsafeRun: A = replRTS.unsafeRun(io) }
|implicit class RunSyntax[R >: replRTS.Environment, E, A](io: ZIO[R, E, A]){ def unsafeRun: A = replRTS.unsafeRun(io) }
""".stripMargin
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@ object ArbitraryStream {
Arbitrary.arbitrary[Iterable[T]].map(StreamPure.fromIterable)

def genSucceededStream[T: ClassTag: Arbitrary]: Gen[Stream[Nothing, T]] =
Arbitrary.arbitrary[List[T]].map { xs =>
ZStream.unfoldM[Any, List[T], Nothing, T](xs) {
case head :: tail => IO.succeed(Some(head -> tail))
case _ => IO.succeed(None)
}
}
Arbitrary.arbitrary[List[T]].map(ZStream.fromIterable)

def genFailingStream[T: ClassTag: Arbitrary]: Gen[Stream[String, T]] =
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class SinkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)
val sinkResult = unsafeRunSync(s.run(ZSink.foldM(z)(ff)))
val foldResult = unsafeRunSync {
s.foldLeft(List[Int]())((acc, el) => el :: acc)
.use(IO.succeed)
.map(_.reverse)
.flatMap(_.foldLeft(z)((acc, el) => acc.flatMap(f(_, el))))
}
Expand Down
18 changes: 11 additions & 7 deletions streams/jvm/src/test/scala/scalaz/zio/stream/StreamChunkSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class StreamChunkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends T
StreamChunk.monadLaw3 $monadLaw3
StreamChunk.tap $tap
StreamChunk.foldLeft $foldLeft
StreamChunk.foldLazy $foldLazy
StreamChunk.fold $fold
StreamChunk.flattenChunks $flattenChunks
"""

Expand All @@ -48,7 +48,9 @@ class StreamChunkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends T

private def slurpM[E, A](s: StreamChunk[E, A]): Exit[E, Seq[A]] =
unsafeRunSync {
s.foldLazyChunks(Chunk.empty: Chunk[A])(_ => true)((acc, el) => IO.succeed(acc ++ el)).map(_.toSeq)
s.foldChunks(Chunk.empty: Chunk[A])(_ => true)((acc, el) => IO.succeed(acc ++ el))
.use(IO.succeed)
.map(_.toSeq)
}

private def map =
Expand Down Expand Up @@ -168,13 +170,15 @@ class StreamChunkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends T

private def foldLeft =
prop { (s: StreamChunk[String, String], zero: Int, f: (Int, String) => Int) =>
unsafeRunSync(s.foldLeft(zero)(f)) must_=== slurp(s).map(_.foldLeft(zero)(f))
unsafeRunSync(s.foldLeft(zero)(f).use(IO.succeed)) must_=== slurp(s).map(_.foldLeft(zero)(f))
}

private def foldLazy =
private def fold =
prop { (s: StreamChunk[Nothing, String], zero: Int, cont: Int => Boolean, f: (Int, String) => Int) =>
val streamResult = unsafeRunSync(s.foldLazy(zero)(cont)((acc, a) => IO.succeed(f(acc, a))))
val listResult = slurp(s).map(l => foldLazyList(l.toList, zero)(cont)(f))
val streamResult = unsafeRunSync(
s.fold[Any, Nothing, String, Int].flatMap(_(zero, cont, (acc, a) => IO.succeed(f(acc, a)))).use(IO.succeed)
)
val listResult = slurp(s).map(l => foldLazyList(l.toList, zero)(cont)(f))
streamResult must_=== listResult
}

Expand All @@ -190,7 +194,7 @@ class StreamChunkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends T
private def flattenChunks =
prop { (s: StreamChunk[String, String]) =>
val result = unsafeRunSync {
s.flattenChunks.foldLeft[String, List[String]](Nil)((acc, a) => a :: acc).map(_.reverse)
s.flattenChunks.foldLeft[String, List[String]](Nil)((acc, a) => a :: acc).use(IO.succeed).map(_.reverse)
}
result must_== slurp(s)
}
Expand Down