Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ZStream#{groupBy, split, fanOut} #1387

Merged
merged 28 commits into from
Aug 20, 2019
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/zio/ZManaged.scala
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ object ZManaged {
* specified text message. This method can be used for terminating a fiber
* because a defect has been detected in the code.
*/
final def dieMessage(message: String): ZManaged[Any, Throwable, Nothing] = die(new RuntimeException(message))
final def dieMessage(message: String): ZManaged[Any, Nothing, Nothing] = die(new RuntimeException(message))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! 👍


/**
* Returns an effect from a [[zio.Exit]] value.
Expand Down
8 changes: 4 additions & 4 deletions docs/datatypes/sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ Sink.fail[Exception](new NotImplementedException)
Basic fold accumulation of received elements:

```scala mdoc:silent
Sink.foldLeft[Nothing, Int, Int](0)(_ + _)
Sink.foldLeft[Int, Int](0)(_ + _)
```

Fold where each fold step has to be described in sink `Step` API.
A `foldLeft` uses `Step.more` in its implementation:
A `foldLeft` uses `Step.more` in its implementation:

```scala mdoc:silent
Sink.fold[Nothing, Int, Int](0)((acc, e) => ZSink.Step.more(acc + e))
Expand All @@ -87,7 +87,7 @@ Sink
.fold(init)((acc, e) => ZSink.Step.more(acc + e)))
```

`read1` tries to read head element from stream,
`read1` tries to read head element from stream,
fails if isn't present or doesn't satisfy given condition:

```scala mdoc:silent
Expand All @@ -99,7 +99,7 @@ Sink.read1[String, Int] {

## Transforming sinks

Having created the sink, we can transform it with provided operations.
Having created the sink, we can transform it with provided operations.
One of them already appeared in previous section - `collectAll` in `read1`.

Sink that after collecting input - filters it:
Expand Down
26 changes: 26 additions & 0 deletions streams-tests/jvm/src/test/scala/zio/stream/SinkSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ class SinkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)
step error $filterMStepError
extractError $filterMExtractError

keyed
happy path $keyedHappyPath
init error $keyedInitError
step error $keyedStepError
extract error $keyedExtractError

map
happy path $mapHappyPath
init error $mapInitError
Expand Down Expand Up @@ -561,6 +567,26 @@ class SinkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)
unsafeRun(sinkIteration(sink, 1).either.map(_ must_=== Left("Ouch")))
}

private def keyedHappyPath = {
val sink = ZSink.identity[Int].keyed((_: Int) + 1)
unsafeRun(sinkIteration(sink, 1).map(_ must_=== Map(2 -> 1)))
}

private def keyedInitError = {
val sink = initErrorSink.keyed((_: Int) + 1)
unsafeRun(sinkIteration(sink, 1).either.map(_ must_=== Left("Ouch")))
}

private def keyedStepError = {
val sink = stepErrorSink.keyed((_: Int) + 1)
unsafeRun(sinkIteration(sink, 1).either.map(_ must_=== Left("Ouch")))
}

private def keyedExtractError = {
val sink = extractErrorSink.keyed((_: Int) + 1)
unsafeRun(sinkIteration(sink, 1).either.map(_ must_=== Left("Ouch")))
}

private def mapHappyPath = {
val sink = ZSink.identity[Int].map(_.toString)
unsafeRun(sinkIteration(sink, 1).map(_ must_=== "1"))
Expand Down
189 changes: 189 additions & 0 deletions streams-tests/jvm/src/test/scala/zio/stream/StreamSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ class ZStreamSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)

Stream.ensuringFirst $ensuringFirst

Stream.fanOut
Values $fanOutValues
Errors $fanOutErrors
BackPressure $fanOutBackPressure
Unsubscribe $fanOutUnsubscribe

Stream.finalizer $finalizer

Stream.filter
Expand Down Expand Up @@ -138,6 +144,11 @@ class ZStreamSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)
Stream.fromIterable $fromIterable
Stream.fromQueue $fromQueue

Stream.groupBy
values $groupByValues
ending outer stream does not interrupt inner streams $groupByValuesInner
outer errors $groupByErrorsOuter

Stream interleaving
interleave $interleave
interleaveWith $interleaveWith
Expand Down Expand Up @@ -176,6 +187,11 @@ class ZStreamSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)
short circuits in schedule $spacedShortCircuitsWhileInSchedule
short circuits after schedule $spacedShortCircuitsAfterScheduleFinished

Stream.split
values $splitValues
errors $splitErrors
backpressure $splitBackPressure

Stream.take
take $take
take short circuits $takeShortCircuits
Expand All @@ -186,6 +202,9 @@ class ZStreamSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)

Stream.tap $tap

Stream.timeout
should interrupt stream $timeout

Stream.throttleEnforce
free elements $throttleEnforceFreeElements
no bandwidth $throttleEnforceNoBandwidth
Expand Down Expand Up @@ -656,6 +675,66 @@ class ZStreamSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)
} yield execution must_=== List("Release", "Ensuring", "Use", "Acquire")
}

private def fanOutValues =
unsafeRun {
Stream.range(0, 5).fanOut(2, 12).use {
case s1 :: s2 :: Nil =>
for {
out1 <- s1.runCollect
out2 <- s2.runCollect
expected = List(0, 1, 2, 3, 4, 5)
} yield (out1 must_=== expected) && (out2 must_=== expected)
case _ =>
ZIO.fail("Wrong number of streams produced")
}
}

private def fanOutErrors =
unsafeRun {
(Stream.range(0, 1) ++ Stream.fail("Boom")).fanOut(2, 12).use {
case s1 :: s2 :: Nil =>
for {
out1 <- s1.runCollect.either
out2 <- s2.runCollect.either
expected = Left("Boom")
} yield (out1 must_=== expected) && (out2 must_=== expected)
case _ =>
ZIO.fail("Wrong number of streams produced")
}
}

private def fanOutBackPressure =
flaky(
Stream
.range(0, 5)
.fanOut(2, 2)
.use {
case s1 :: s2 :: Nil =>
for {
ref <- Ref.make[List[Int]](Nil)
_ <- s1.timeout(100.milliseconds).foreach(i => ref.update(i :: _)).ignore
result <- ref.get
_ <- s2.runDrain
expected = List(2, 1, 0)
} yield result must_=== expected
case _ =>
ZIO.fail("Wrong number of streams produced")
}
)

private def fanOutUnsubscribe =
unsafeRun {
Stream.range(0, 5).fanOut(2, 2).use {
case s1 :: s2 :: Nil =>
for {
_ <- s1.timeout(Duration.Zero).runDrain.ignore
out2 <- s2.runCollect
} yield out2 must_=== List(0, 1, 2, 3, 4, 5)
case _ =>
ZIO.fail("Wrong number of streams produced")
}
}

private def finalizer =
unsafeRun {
for {
Expand Down Expand Up @@ -1069,6 +1148,53 @@ class ZStreamSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)
result must_=== Success(c.toSeq.toList)
}

private def groupByValues =
unsafeRun {
val words = List.fill(1000)(0 to 100).flatten.map(_.toString())
Stream
.fromIterable(words)
.groupByKey(identity, 8192)
.flatMapParSema(10, 16) {
case (sema, (k, s)) =>
s.withPermit(sema)
.transduce(Sink.foldLeft[String, Int](0) { case (acc: Int, _: String) => acc + 1 })
.take(1)
.map((k -> _))
}
.runCollect
.map(_.toMap must_=== (0 to 100).map((_.toString -> 1000)).toMap)
}

private def groupByValuesInner =
unsafeRun {
val words = List.fill(1000)(0 to 100).flatten.map(_.toString())
Stream
.fromIterable(words)
.groupByKey(identity, 1050)
.take(2)
.flatMapPar(110, 100) {
case (k, s) =>
s.transduce(Sink.foldLeft[String, Int](0) { case (acc: Int, _: String) => acc + 1 })
.take(1)
.map((k -> _))
}
.runCollect
.map(_.toMap must_=== (0 to 1).map((_.toString -> 1000)).toMap)
}

private def groupByErrorsOuter =
unsafeRun {
val words = List("abc", "test", "test", "foo")
(Stream.fromIterable(words) ++ Stream.fail("Boom"))
.groupByKey(identity)
.mapM {
case (_, s) => s.runDiscard.ignore
}
.runCollect
.either
.map(_ must_=== Left("Boom"))
}

private def map =
prop { (s: Stream[String, Byte], f: Byte => Int) =>
slurp(s.map(f)) must_=== slurp(s).map(_.map(f))
Expand Down Expand Up @@ -1291,6 +1417,56 @@ class ZStreamSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)
.map(_ must_=== List("A", "A", "!", "B"))
)

private def splitValues =
unsafeRun {
Stream
.range(0, 5)
.split { i =>
if (i % 2 == 0) ZIO.succeed(Left(i))
else ZIO.succeed(Right(i))
}
.use {
case (s1, s2) =>
for {
out1 <- s1.runCollect
out2 <- s2.runCollect
} yield (out1 must_=== List(0, 2, 4)) && (out2 must_=== List(1, 3, 5))
}
}

private def splitErrors =
unsafeRun {
(Stream.range(0, 1) ++ Stream.fail("Boom")).split { i =>
if (i % 2 == 0) ZIO.succeed(Left(i))
else ZIO.succeed(Right(i))
}.use {
case (s1, s2) =>
for {
out1 <- s1.runCollect.either
out2 <- s2.runCollect.either
} yield (out1 must_=== Left("Boom")) && (out2 must_=== Left("Boom"))
}
}

private def splitBackPressure =
flaky {
Stream
.range(0, 2)
.split { i =>
if (i % 2 == 0) ZIO.succeed(Left(i))
else ZIO.succeed(Right(i))
}
.use {
case (s1, s2) =>
for {
ref <- Ref.make[List[Int]](Nil)
_ <- s1.timeout(100.milliseconds).foreach(i => ref.update(i :: _)).ignore
result <- ref.get
_ <- s2.runDrain
} yield result must_=== List(2, 0)
}
}

private def take =
prop { (s: Stream[String, Byte], n: Int) =>
val takeStreamesult = slurp(s.take(n))
Expand Down Expand Up @@ -1383,6 +1559,19 @@ class ZStreamSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)
.runCollect must_=== List(1, 2)
}

private def timeout =
unsafeRun {
Promise.make[Nothing, Unit].flatMap { prom =>
Stream
.range(0, 5)
.tap(_ => ZIO.sleep(Duration.Infinity).ensuring(prom.succeed(())))
.timeout(Duration.Zero)
.runDrain
.ignore *>
prom.isDone.map(_ must_=== true)
}
}

private def toQueue = prop { c: Chunk[Int] =>
val s = Stream.fromChunk(c)
val result = unsafeRunSync {
Expand Down
2 changes: 1 addition & 1 deletion streams/shared/src/main/scala/zio/stream/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ object Sink {
/**
* see [[ZSink.foldLeft]]
*/
final def foldLeft[A0, A, S](z: S)(f: (S, A) => S): Sink[Nothing, A0, A, S] =
final def foldLeft[A, S](z: S)(f: (S, A) => S): Sink[Nothing, Nothing, A, S] =
ZSink.foldLeft(z)(f)

/**
Expand Down