Skip to content

Commit

Permalink
Rename ZStream#chunkN to ZStream#rechunk (#5629)
Browse files Browse the repository at this point in the history
* rename

* format
  • Loading branch information
adamgfraser committed Sep 18, 2021
1 parent 9ad1a22 commit e5256f2
Show file tree
Hide file tree
Showing 13 changed files with 170 additions and 145 deletions.
12 changes: 6 additions & 6 deletions docs/datatypes/stream/zstream.md
Expand Up @@ -1016,7 +1016,7 @@ val s1 = ZStream(1, 2, 3)

val s2 = ZStream("a", "b", "c", "d")
.schedule(Schedule.spaced(500.milliseconds))
.chunkN(3)
.rechunk(3)

s1.zipWithLatest(s2)((a, b) => (a, b))

Expand Down Expand Up @@ -1296,8 +1296,8 @@ Sometimes we need to interleave the emission of two streams and create another s
The `ZSstream#merge` picks elements randomly from specified streams:

```scala mdoc:silent:nest
val s1 = ZStream(1, 2, 3).chunkN(1)
val s2 = ZStream(4, 5, 6).chunkN(1)
val s1 = ZStream(1, 2, 3).rechunk(1)
val s2 = ZStream(4, 5, 6).rechunk(1)

val merged = s1 merge s2
// As the merge operation is not deterministic, it may output the following stream of numbers:
Expand All @@ -1321,8 +1321,8 @@ Here is an example of specifying termination strategy when merging two streams:

```scala mdoc:silent:nest
import zio.stream.ZStream.TerminationStrategy
val s1 = ZStream.iterate(1)(_+1).take(5).chunkN(1)
val s2 = ZStream.repeat(0).chunkN(1)
val s1 = ZStream.iterate(1)(_+1).take(5).rechunk(1)
val s2 = ZStream.repeat(0).rechunk(1)

val merged = s1.merge(s2, TerminationStrategy.Left)
```
Expand Down Expand Up @@ -1514,7 +1514,7 @@ In the following example, we are going to buffer a stream. We print each element
```scala mdoc:silent:nest
ZStream
.fromIterable(1 to 10)
.chunkN(1)
.rechunk(1)
.tap(x => Console.printLine(s"before buffering: $x"))
.buffer(4)
.tap(x => Console.printLine(s"after buffering: $x"))
Expand Down
Expand Up @@ -37,7 +37,7 @@ object CompressionSpec extends DefaultRunnableSpec {
),
test("stream of two deflated inputs as a single chunk")(
assertM(
(deflatedStream(shortText) ++ deflatedStream(otherShortText)).chunkN(500).transduce(inflate(64)).runCollect
(deflatedStream(shortText) ++ deflatedStream(otherShortText)).rechunk(500).transduce(inflate(64)).runCollect
)(equalTo(Chunk.fromArray(shortText) ++ Chunk.fromArray(otherShortText)))
),
test("long input")(
Expand All @@ -47,12 +47,12 @@ object CompressionSpec extends DefaultRunnableSpec {
),
test("long input, buffer smaller than chunks")(
assertM(
deflatedStream(longText).chunkN(500).transduce(inflate(1)).runCollect
deflatedStream(longText).rechunk(500).transduce(inflate(1)).runCollect
)(equalTo(Chunk.fromArray(longText)))
),
test("long input, chunks smaller then buffer")(
assertM(
deflatedStream(longText).chunkN(1).transduce(inflate(500)).runCollect
deflatedStream(longText).rechunk(1).transduce(inflate(500)).runCollect
)(equalTo(Chunk.fromArray(longText)))
),
test("long input, not wrapped in ZLIB header and trailer")(
Expand All @@ -70,7 +70,7 @@ object CompressionSpec extends DefaultRunnableSpec {
case (chunk, n, bufferSize) =>
assertM(for {
deflated <- ZIO.succeed(deflatedStream(chunk.toArray))
out <- deflated.chunkN(n).transduce(inflate(bufferSize)).runCollect
out <- deflated.rechunk(n).transduce(inflate(bufferSize)).runCollect
} yield out.toList)(equalTo(chunk))
}
),
Expand All @@ -79,7 +79,7 @@ object CompressionSpec extends DefaultRunnableSpec {
case (chunk, n, bufferSize) =>
assertM(for {
deflated <- ZIO.succeed(noWrapDeflatedStream(chunk.toArray))
out <- deflated.chunkN(n).transduce(inflate(bufferSize, true)).runCollect
out <- deflated.rechunk(n).transduce(inflate(bufferSize, true)).runCollect
} yield out.toList)(equalTo(chunk))
}
),
Expand All @@ -88,7 +88,7 @@ object CompressionSpec extends DefaultRunnableSpec {
assertM(for {
input <- ZIO.succeed(inflateRandomExampleThatFailed)
deflated <- ZIO.succeed(noWrapDeflatedStream(input))
out <- deflated.chunkN(40).transduce(inflate(11, true)).runCollect
out <- deflated.rechunk(40).transduce(inflate(11, true)).runCollect
} yield out.toList)(equalTo(inflateRandomExampleThatFailed.toList))
),
test("fail if input stream finished unexpected")(
Expand All @@ -106,7 +106,7 @@ object CompressionSpec extends DefaultRunnableSpec {
test("stream of two gzipped inputs as a single chunk")(
assertM(
(jdkGzippedStream(shortText) ++ jdkGzippedStream(otherShortText))
.chunkN(500)
.rechunk(500)
.transduce(gunzip(64))
.runCollect
)(equalTo(Chunk.fromArray(shortText) ++ Chunk.fromArray(otherShortText)))
Expand All @@ -123,12 +123,12 @@ object CompressionSpec extends DefaultRunnableSpec {
),
test("long input, buffer smaller than chunks")(
assertM(
jdkGzippedStream(longText).chunkN(500).transduce(gunzip(1)).runCollect
jdkGzippedStream(longText).rechunk(500).transduce(gunzip(1)).runCollect
)(equalTo(Chunk.fromArray(longText)))
),
test("long input, chunks smaller then buffer")(
assertM(
jdkGzippedStream(longText).chunkN(1).transduce(gunzip(500)).runCollect
jdkGzippedStream(longText).rechunk(1).transduce(gunzip(500)).runCollect
)(equalTo(Chunk.fromArray(longText)))
),
test("fail early if header is corrupted")(
Expand All @@ -151,7 +151,7 @@ object CompressionSpec extends DefaultRunnableSpec {
case (chunk, n, bufferSize) =>
assertM(for {
deflated <- ZIO.succeed(jdkGzippedStream(chunk.toArray))
out <- deflated.chunkN(n).transduce(gunzip(bufferSize)).runCollect
out <- deflated.rechunk(n).transduce(gunzip(bufferSize)).runCollect
} yield out.toList)(equalTo(chunk))
}
),
Expand All @@ -172,7 +172,7 @@ object CompressionSpec extends DefaultRunnableSpec {
),
test("parses header with CRC16")(
assertM(
headerWithCrc.chunkN(1).transduce(gunzip(64)).runCollect
headerWithCrc.rechunk(1).transduce(gunzip(64)).runCollect
)(equalTo(Chunk.fromArray(shortText)))
),
test("parses header with CRC16, FNAME, FCOMMENT, FEXTRA")(
Expand All @@ -186,33 +186,33 @@ object CompressionSpec extends DefaultRunnableSpec {
checkM(Gen.listOfBounded(0, `1K`)(Gen.byte).zip(Gen.int(1, `1K`)).zip(Gen.int(1, `1K`))) {
case (input, n, bufferSize) =>
assertM(for {
deflated <- Stream.fromIterable(input).chunkN(n).transduce(deflate(bufferSize, false)).runCollect
deflated <- Stream.fromIterable(input).rechunk(n).transduce(deflate(bufferSize, false)).runCollect
inflated <- jdkInflate(deflated, noWrap = false)
} yield inflated)(equalTo(input))
}
),
test("deflate empty bytes, small buffer")(
assertM(
Stream.fromIterable(List.empty).chunkN(1).transduce(deflate(100, false)).runCollect.map(_.toList)
Stream.fromIterable(List.empty).rechunk(1).transduce(deflate(100, false)).runCollect.map(_.toList)
)(equalTo(jdkDeflate(Array.empty, new Deflater(-1, false)).toList))
),
test("deflates same as JDK")(
assertM(Stream.fromIterable(longText).chunkN(128).transduce(deflate(256, false)).runCollect)(
assertM(Stream.fromIterable(longText).rechunk(128).transduce(deflate(256, false)).runCollect)(
equalTo(Chunk.fromArray(jdkDeflate(longText, new Deflater(-1, false))))
)
),
test("deflates same as JDK, nowrap")(
assertM(Stream.fromIterable(longText).chunkN(128).transduce(deflate(256, true)).runCollect)(
assertM(Stream.fromIterable(longText).rechunk(128).transduce(deflate(256, true)).runCollect)(
equalTo(Chunk.fromArray(jdkDeflate(longText, new Deflater(-1, true))))
)
),
test("deflates same as JDK, small buffer")(
assertM(Stream.fromIterable(longText).chunkN(64).transduce(deflate(1, false)).runCollect)(
assertM(Stream.fromIterable(longText).rechunk(64).transduce(deflate(1, false)).runCollect)(
equalTo(Chunk.fromArray(jdkDeflate(longText, new Deflater(-1, false))))
)
),
test("deflates same as JDK, nowrap, small buffer ")(
assertM(Stream.fromIterable(longText).chunkN(64).transduce(deflate(1, true)).runCollect)(
assertM(Stream.fromIterable(longText).rechunk(64).transduce(deflate(1, true)).runCollect)(
equalTo(Chunk.fromArray(jdkDeflate(longText, new Deflater(-1, true))))
)
)
Expand All @@ -222,7 +222,7 @@ object CompressionSpec extends DefaultRunnableSpec {
checkM(Gen.listOfBounded(0, `1K`)(Gen.byte).zip(Gen.int(1, `1K`)).zip(Gen.int(1, `1K`))) {
case (input, n, bufferSize) =>
assertM(for {
gzipped <- Stream.fromIterable(input).chunkN(n).transduce(gzip(bufferSize)).runCollect
gzipped <- Stream.fromIterable(input).rechunk(n).transduce(gzip(bufferSize)).runCollect
inflated <- jdkGunzip(gzipped)
} yield inflated)(equalTo(input))
}
Expand All @@ -241,19 +241,19 @@ object CompressionSpec extends DefaultRunnableSpec {
),
test("gzips, small chunks, small buffer")(
assertM(for {
gzipped <- Stream.fromIterable(longText).chunkN(1).transduce(gzip(1)).runCollect
gzipped <- Stream.fromIterable(longText).rechunk(1).transduce(gzip(1)).runCollect
jdkGunzipped <- jdkGunzip(gzipped)
} yield jdkGunzipped)(equalTo(longText.toList))
),
test("gzips, small chunks, 1k buffer")(
assertM(for {
gzipped <- Stream.fromIterable(longText).chunkN(1).transduce(gzip(`1K`)).runCollect
gzipped <- Stream.fromIterable(longText).rechunk(1).transduce(gzip(`1K`)).runCollect
jdkGunzipped <- jdkGunzip(gzipped)
} yield jdkGunzipped)(equalTo(longText.toList))
),
test("chunks bigger than buffer")(
assertM(for {
gzipped <- Stream.fromIterable(longText).chunkN(`1K`).transduce(gzip(64)).runCollect
gzipped <- Stream.fromIterable(longText).rechunk(`1K`).transduce(gzip(64)).runCollect
jdkGunzipped <- jdkGunzip(gzipped)
} yield jdkGunzipped)(equalTo(longText.toList))
),
Expand Down
Expand Up @@ -15,45 +15,45 @@ object DeflateSpec extends DefaultRunnableSpec {
case (input, n, bufferSize) =>
assertM(for {
(deflated, _) <-
(ZStream.fromIterable(input).chunkN(n).channel >>> Deflate.makeDeflater(bufferSize)).runCollect
(ZStream.fromIterable(input).rechunk(n).channel >>> Deflate.makeDeflater(bufferSize)).runCollect
inflated <- jdkInflate(deflated.flatten, noWrap = false)
} yield inflated)(equalTo(input))
}
),
test("deflate empty bytes, small buffer")(
assertM(
(ZStream.fromIterable(List.empty).chunkN(1).channel >>> Deflate
(ZStream.fromIterable(List.empty).rechunk(1).channel >>> Deflate
.makeDeflater(100, false)).runCollect
.map(_._1.flatten.toList)
)(equalTo(jdkDeflate(Array.empty, new Deflater(-1, false)).toList))
),
test("deflates same as JDK")(
assertM(
(ZStream.fromIterable(longText).chunkN(128).channel >>> Deflate.makeDeflater(256, false)).runCollect
(ZStream.fromIterable(longText).rechunk(128).channel >>> Deflate.makeDeflater(256, false)).runCollect
.map(_._1.flatten)
)(
equalTo(Chunk.fromArray(jdkDeflate(longText, new Deflater(-1, false))))
)
),
test("deflates same as JDK, nowrap")(
assertM(
(ZStream.fromIterable(longText).chunkN(128).channel >>> Deflate.makeDeflater(256, true)).runCollect
(ZStream.fromIterable(longText).rechunk(128).channel >>> Deflate.makeDeflater(256, true)).runCollect
.map(_._1.flatten)
)(
equalTo(Chunk.fromArray(jdkDeflate(longText, new Deflater(-1, true))))
)
),
test("deflates same as JDK, small buffer")(
assertM(
(ZStream.fromIterable(longText).chunkN(64).channel >>> Deflate.makeDeflater(1, false)).runCollect
(ZStream.fromIterable(longText).rechunk(64).channel >>> Deflate.makeDeflater(1, false)).runCollect
.map(_._1.flatten)
)(
equalTo(Chunk.fromArray(jdkDeflate(longText, new Deflater(-1, false))))
)
),
test("deflates same as JDK, nowrap, small buffer ")(
assertM(
(ZStream.fromIterable(longText).chunkN(64).channel >>> Deflate.makeDeflater(1, true)).runCollect
(ZStream.fromIterable(longText).rechunk(64).channel >>> Deflate.makeDeflater(1, true)).runCollect
.map(_._1.flatten)
)(
equalTo(Chunk.fromArray(jdkDeflate(longText, new Deflater(-1, true))))
Expand Down
Expand Up @@ -11,7 +11,7 @@ object GzipSpec extends DefaultRunnableSpec {
checkM(Gen.listOfBounded(0, `1K`)(Gen.byte).zip(Gen.int(1, `1K`)).zip(Gen.int(1, `1K`))) {
case (input, n, bufferSize) =>
assertM(for {
gzipped <- (ZStream.fromIterable(input).chunkN(n).channel >>> Gzip.makeGzipper(bufferSize)).runCollect
gzipped <- (ZStream.fromIterable(input).rechunk(n).channel >>> Gzip.makeGzipper(bufferSize)).runCollect
.map(_._1.flatten)
inflated <- jdkGunzip(gzipped)
} yield inflated)(equalTo(input))
Expand All @@ -32,20 +32,20 @@ object GzipSpec extends DefaultRunnableSpec {
test("gzips, small chunks, small buffer")(
assertM(for {
gzipped <-
(ZStream.fromIterable(longText).chunkN(1).channel >>> Gzip.makeGzipper(1)).runCollect.map(_._1.flatten)
(ZStream.fromIterable(longText).rechunk(1).channel >>> Gzip.makeGzipper(1)).runCollect.map(_._1.flatten)
jdkGunzipped <- jdkGunzip(gzipped)
} yield jdkGunzipped)(equalTo(longText.toList))
),
test("gzips, small chunks, 1k buffer")(
assertM(for {
gzipped <-
(ZStream.fromIterable(longText).chunkN(1).channel >>> Gzip.makeGzipper(`1K`)).runCollect.map(_._1.flatten)
(ZStream.fromIterable(longText).rechunk(1).channel >>> Gzip.makeGzipper(`1K`)).runCollect.map(_._1.flatten)
jdkGunzipped <- jdkGunzip(gzipped)
} yield jdkGunzipped)(equalTo(longText.toList))
),
test("chunks bigger than buffer")(
assertM(for {
gzipped <- (ZStream.fromIterable(longText).chunkN(`1K`).channel >>> Gzip.makeGzipper(64)).runCollect
gzipped <- (ZStream.fromIterable(longText).rechunk(`1K`).channel >>> Gzip.makeGzipper(64)).runCollect
.map(_._1.flatten)
jdkGunzipped <- jdkGunzip(gzipped)
} yield jdkGunzipped)(equalTo(longText.toList))
Expand Down
Expand Up @@ -25,7 +25,7 @@ object InflateSpec extends DefaultRunnableSpec {
),
test("stream of two deflated inputs as a single chunk")(
assertM(
((deflatedStream(shortText) ++ deflatedStream(otherShortText)).chunkN(500).channel >>> makeInflater(
((deflatedStream(shortText) ++ deflatedStream(otherShortText)).rechunk(500).channel >>> makeInflater(
64
)).runCollect.map(_._1.flatten)
)(equalTo(Chunk.fromArray(shortText) ++ Chunk.fromArray(otherShortText)))
Expand All @@ -37,12 +37,12 @@ object InflateSpec extends DefaultRunnableSpec {
),
test("long input, buffer smaller than chunks")(
assertM(
(deflatedStream(longText).chunkN(500).channel >>> makeInflater(1)).runCollect.map(_._1.flatten)
(deflatedStream(longText).rechunk(500).channel >>> makeInflater(1)).runCollect.map(_._1.flatten)
)(equalTo(Chunk.fromArray(longText)))
),
test("long input, chunks smaller then buffer")(
assertM(
(deflatedStream(longText).chunkN(1).channel >>> makeInflater(500)).runCollect.map(_._1.flatten)
(deflatedStream(longText).rechunk(1).channel >>> makeInflater(500)).runCollect.map(_._1.flatten)
)(equalTo(Chunk.fromArray(longText)))
),
test("long input, not wrapped in ZLIB header and trailer")(
Expand All @@ -60,7 +60,7 @@ object InflateSpec extends DefaultRunnableSpec {
case (chunk, n, bufferSize) =>
assertM(for {
deflated <- ZIO.succeed(deflatedStream(chunk.toArray))
out <- (deflated.chunkN(n).channel >>> makeInflater(bufferSize)).runCollect.map(_._1.flatten)
out <- (deflated.rechunk(n).channel >>> makeInflater(bufferSize)).runCollect.map(_._1.flatten)
} yield out.toList)(equalTo(chunk))
}
),
Expand All @@ -69,7 +69,7 @@ object InflateSpec extends DefaultRunnableSpec {
case (chunk, n, bufferSize) =>
assertM(for {
deflated <- ZIO.succeed(noWrapDeflatedStream(chunk.toArray))
out <- (deflated.chunkN(n).channel >>> makeInflater(bufferSize, true)).runCollect.map(_._1.flatten)
out <- (deflated.rechunk(n).channel >>> makeInflater(bufferSize, true)).runCollect.map(_._1.flatten)
} yield out.toList)(equalTo(chunk))
}
),
Expand All @@ -78,7 +78,7 @@ object InflateSpec extends DefaultRunnableSpec {
assertM(for {
input <- ZIO.succeed(inflateRandomExampleThatFailed)
deflated <- ZIO.succeed(noWrapDeflatedStream(input))
out <- (deflated.chunkN(40).channel >>> makeInflater(11, true)).runCollect.map(_._1.flatten)
out <- (deflated.rechunk(40).channel >>> makeInflater(11, true)).runCollect.map(_._1.flatten)
} yield out.toList)(equalTo(inflateRandomExampleThatFailed.toList))
),
test("fail if input stream finished unexpected")(
Expand Down
Expand Up @@ -40,7 +40,7 @@ object ZSinkSpec extends ZIOBaseSpec {
assertM(
Stream
.fromIterable(1 to 10)
.chunkN(chunkSize)
.rechunk(chunkSize)
.run(ZSink.sum[Int].collectAllWhileWith(-1)((s: Int) => s == s)(_ + _))
)(equalTo(54))
}
Expand All @@ -53,7 +53,7 @@ object ZSinkSpec extends ZIOBaseSpec {
(a: List[Int], b: Option[Int]) => a ++ b.toList
)
val stream = Stream.fromIterable(1 to 100)
assertM((stream ++ stream).chunkN(3).run(sink))(equalTo(List(1, 2, 3, 4)))
assertM((stream ++ stream).rechunk(3).run(sink))(equalTo(List(1, 2, 3, 4)))
}
),
test("head")(
Expand Down Expand Up @@ -275,7 +275,7 @@ object ZSinkSpec extends ZIOBaseSpec {
val sink: ZSink[Any, Nothing, Int, Int, Option[Option[Int]]] =
ZSink.head[Int].untilOutputZIO(h => ZIO.succeed(h.fold(false)(_ >= 10)))
val assertions = ZIO.foreach(Chunk(1, 3, 7, 20)) { n =>
assertM(Stream.fromIterable(1 to 100).chunkN(n).run(sink))(equalTo(Some(Some(10))))
assertM(Stream.fromIterable(1 to 100).rechunk(n).run(sink))(equalTo(Some(Some(10))))
}
assertions.map(tst => tst.reduce(_ && _))
},
Expand Down

0 comments on commit e5256f2

Please sign in to comment.