From 006ea6fb77d62dd5ce321eaf8d30f499e117ceb2 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Sat, 15 Jun 2019 12:05:23 -0400 Subject: [PATCH] Changed Pull#{stream,streamNoScope} to require Unit result type --- core/jvm/src/main/scala/fs2/compress.scala | 2 +- core/shared/src/main/scala/fs2/Pull.scala | 27 ++++++++- core/shared/src/main/scala/fs2/Stream.scala | 58 ++++++++++--------- .../main/scala/fs2/concurrent/Signal.scala | 2 +- core/shared/src/main/scala/fs2/text.scala | 21 +++---- .../src/test/scala/fs2/CompilationTest.scala | 16 ++--- core/shared/src/test/scala/fs2/PullSpec.scala | 1 + 7 files changed, 75 insertions(+), 52 deletions(-) diff --git a/core/jvm/src/main/scala/fs2/compress.scala b/core/jvm/src/main/scala/fs2/compress.scala index 9975452e89..89337b044a 100644 --- a/core/jvm/src/main/scala/fs2/compress.scala +++ b/core/jvm/src/main/scala/fs2/compress.scala @@ -70,7 +70,7 @@ object compress { def inflate[F[_]](nowrap: Boolean = false, bufferSize: Int = 1024 * 32)( implicit ev: RaiseThrowable[F]): Pipe[F, Byte, Byte] = _.pull.uncons.flatMap { - case None => Pull.pure(None) + case None => Pull.done case Some((hd, tl)) => val inflater = new Inflater(nowrap) val buffer = new Array[Byte](bufferSize) diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index 89ecd8d576..18c86f1692 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -35,8 +35,19 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing def attempt: Pull[F, O, Either[Throwable, R]] = Pull.fromFreeC(get[F, O, R].map(r => Right(r)).handleErrorWith(t => FreeC.pure(Left(t)))) - /** Interpret this `Pull` to produce a `Stream`. The result type `R` is discarded. */ - def stream: Stream[F, O] = + /** + * Interpret this `Pull` to produce a `Stream`. + * + * May only be called on pulls which return a `Unit` result type. Use `p.void.stream` to explicitly + * ignore the result type of the pull. + */ + def stream(implicit ev: R <:< Unit): Stream[F, O] = { + val _ = ev + Stream.fromFreeC(this.scope.get[F, O, Unit]) + } + + // For binary compatibility with 1.0.x + private[Pull] def stream: Stream[F, O] = Stream.fromFreeC(this.scope.get[F, O, Unit]) /** @@ -54,7 +65,14 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing * closing but when using `streamNoScope`, they get promoted to the current stream scope, * which may be infinite in the worst case. */ - def streamNoScope: Stream[F, O] = Stream.fromFreeC(get[F, O, R].map(_ => ())) + def streamNoScope(implicit ev: R <:< Unit): Stream[F, O] = { + val _ = ev + Stream.fromFreeC(this.asInstanceOf[Pull[F, O, Unit]].get[F, O, Unit]) + } + + // For binary compatibility with 1.0.x + private[Pull] def streamNoScope: Stream[F, O] = + Stream.fromFreeC(this.asInstanceOf[Pull[F, O, Unit]].get[F, O, Unit]) /** Applies the resource of this pull to `f` and returns the result. */ def flatMap[F2[x] >: F[x], O2 >: O, R2](f: R => Pull[F2, O2, R2]): Pull[F2, O2, R2] = @@ -93,6 +111,9 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing /** Tracks any resources acquired during this pull and releases them when the pull completes. */ def scope: Pull[F, O, Unit] = Pull.fromFreeC(Algebra.scope(get[F, O, R].map(_ => ()))) + + /** Discards the result type of this pull. */ + def void: Pull[F, O, Unit] = as(()) } object Pull extends PullLowPriority { diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 44882a46f7..2adab2c42e 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -426,7 +426,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, } this.pull.uncons.flatMap { - case None => Pull.pure(None) + case None => Pull.done case Some((hd, tl)) => if (hd.size >= n) Pull.output1(hd) >> go(Chunk.Queue.empty, tl) @@ -476,8 +476,8 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, this.pull .find(pf.isDefinedAt) .flatMap { - case None => Pull.pure(None) - case Some((hd, tl)) => Pull.output1(pf(hd)).as(None) + case None => Pull.done + case Some((hd, tl)) => Pull.output1(pf(hd)) } .stream @@ -676,7 +676,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, this.pull .takeWhile(o => !p(o)) .flatMap { - case None => Pull.pure(None) + case None => Pull.done case Some(s) => s.drop(1).pull.echo } .stream @@ -839,9 +839,9 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, def dropRight(n: Int): Stream[F, O] = if (n <= 0) this else { - def go(acc: Chunk.Queue[O], s: Stream[F, O]): Pull[F, O, Option[Unit]] = + def go(acc: Chunk.Queue[O], s: Stream[F, O]): Pull[F, O, Unit] = s.pull.uncons.flatMap { - case None => Pull.pure(None) + case None => Pull.done case Some((hd, tl)) => val all = acc :+ hd all @@ -949,20 +949,20 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, * }}} */ def evalScan[F2[x] >: F[x], O2](z: O2)(f: (O2, O) => F2[O2]): Stream[F2, O2] = { - def go(z: O2, s: Stream[F2, O]): Pull[F2, O2, Option[Stream[F2, O2]]] = + def go(z: O2, s: Stream[F2, O]): Pull[F2, O2, Unit] = s.pull.uncons1.flatMap { case Some((hd, tl)) => Pull.eval(f(z, hd)).flatMap { o => Pull.output1(o) >> go(o, tl) } - case None => Pull.pure(None) + case None => Pull.done } this.pull.uncons1.flatMap { case Some((hd, tl)) => Pull.eval(f(z, hd)).flatMap { o => Pull.output(Chunk.seq(List(z, o))) >> go(o, tl) } - case None => Pull.output1(z) >> Pull.pure(None) + case None => Pull.output1(z) }.stream } @@ -1016,9 +1016,9 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, * }}} */ def filterWithPrevious(f: (O, O) => Boolean): Stream[F, O] = { - def go(last: O, s: Stream[F, O]): Pull[F, O, Option[Unit]] = + def go(last: O, s: Stream[F, O]): Pull[F, O, Unit] = s.pull.uncons.flatMap { - case None => Pull.pure(None) + case None => Pull.done case Some((hd, tl)) => // Check if we can emit this chunk unmodified val (allPass, newLast) = hd.foldLeft((true, last)) { @@ -1036,7 +1036,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, } } this.pull.uncons1.flatMap { - case None => Pull.pure(None) + case None => Pull.done case Some((hd, tl)) => Pull.output1(hd) >> go(hd, tl) }.stream } @@ -1518,7 +1518,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, */ def intersperse[O2 >: O](separator: O2): Stream[F, O2] = this.pull.echo1.flatMap { - case None => Pull.pure(None) + case None => Pull.done case Some(s) => s.repeatPull { _.uncons.flatMap { @@ -2303,7 +2303,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, */ def scanChunksOpt[S, O2 >: O, O3](init: S)( f: S => Option[Chunk[O2] => (S, Chunk[O3])]): Stream[F, O3] = - this.pull.scanChunksOpt(init)(f).stream + this.pull.scanChunksOpt(init)(f).void.stream /** * Alias for `map(f).scanMonoid`. @@ -2464,7 +2464,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, * res0: List[Int] = List(0, 1, 2, 3, 4) * }}} */ - def take(n: Long): Stream[F, O] = this.pull.take(n).stream + def take(n: Long): Stream[F, O] = this.pull.take(n).void.stream /** * Emits the last `n` elements of the input. @@ -2490,7 +2490,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, * }}} */ def takeThrough(p: O => Boolean): Stream[F, O] = - this.pull.takeThrough(p).stream + this.pull.takeThrough(p).void.stream /** * Emits the longest prefix of the input for which all elements test true according to `f`. @@ -2501,7 +2501,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, * }}} */ def takeWhile(p: O => Boolean, takeFailure: Boolean = false): Stream[F, O] = - this.pull.takeWhile(p, takeFailure).stream + this.pull.takeWhile(p, takeFailure).void.stream /** * Transforms this stream using the given `Pipe`. @@ -2618,16 +2618,19 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, } } - covaryAll[F2, O2].pull.stepLeg.flatMap { - case Some(leg1) => - that.pull.stepLeg - .flatMap { - case Some(leg2) => go(leg1, leg2) - case None => k1(Left((leg1.head, leg1.stream))) - } + covaryAll[F2, O2].pull.stepLeg + .flatMap { + case Some(leg1) => + that.pull.stepLeg + .flatMap { + case Some(leg2) => go(leg1, leg2) + case None => k1(Left((leg1.head, leg1.stream))) + } - case None => k2(Right(that)) - }.stream + case None => k2(Right(that)) + } + .void + .stream } /** @@ -3523,7 +3526,7 @@ object Stream extends StreamLowPriority { */ def repeatPull[O2]( using: Stream.ToPull[F, O] => Pull[F, O2, Option[Stream[F, O]]]): Stream[F, O2] = - Pull.loop(using.andThen(_.map(_.map(_.pull))))(pull).stream + Pull.loop(using.andThen(_.map(_.map(_.pull))))(pull).void.stream } @@ -4302,6 +4305,7 @@ object Stream extends StreamLowPriority { .loop[F, O, StepLeg[F, O]] { leg => Pull.output(leg.head).flatMap(_ => leg.stepLeg) }(self.setHead(Chunk.empty)) + .void .stream /** Replaces head of this leg. Useful when the head was not fully consumed. */ diff --git a/core/shared/src/main/scala/fs2/concurrent/Signal.scala b/core/shared/src/main/scala/fs2/concurrent/Signal.scala index 11c92e3d17..e03862c127 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Signal.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Signal.scala @@ -74,7 +74,7 @@ object Signal extends SignalLowPriorityImplicits { (y, restOfYs) = firstYAndRestOfYs _ <- OptionT.liftF(Pull.output1[F, PullOutput]((x, y, restOfXs, restOfYs))) } yield () - firstPull.value.stream + firstPull.value.void.stream .covaryOutput[PullOutput] .flatMap { case (x, y, restOfXs, restOfYs) => diff --git a/core/shared/src/main/scala/fs2/text.scala b/core/shared/src/main/scala/fs2/text.scala index 4f874b27c1..422c600b97 100644 --- a/core/shared/src/main/scala/fs2/text.scala +++ b/core/shared/src/main/scala/fs2/text.scala @@ -60,23 +60,20 @@ object text { Chunk.bytes(allBytes.drop(splitAt))) } - def doPull( - buf: Chunk[Byte], - s: Stream[Pure, Chunk[Byte]]): Pull[Pure, String, Option[Stream[Pure, Chunk[Byte]]]] = + def doPull(buf: Chunk[Byte], s: Stream[Pure, Chunk[Byte]]): Pull[Pure, String, Unit] = s.pull.uncons.flatMap { case Some((byteChunks, tail)) => val (output, nextBuffer) = byteChunks.toList.foldLeft((Nil: List[String], buf))(processSingleChunk) Pull.output(Chunk.seq(output.reverse)) >> doPull(nextBuffer, tail) case None if !buf.isEmpty => - Pull.output1(new String(buf.toArray, utf8Charset)) >> Pull.pure(None) + Pull.output1(new String(buf.toArray, utf8Charset)) case None => - Pull.pure(None) + Pull.done } - def processByteOrderMark( - buffer: Option[Chunk.Queue[Byte]], - s: Stream[Pure, Chunk[Byte]]): Pull[Pure, String, Option[Stream[Pure, Chunk[Byte]]]] = + def processByteOrderMark(buffer: Option[Chunk.Queue[Byte]], + s: Stream[Pure, Chunk[Byte]]): Pull[Pure, String, Unit] = s.pull.uncons1.flatMap { case Some((hd, tl)) => val newBuffer = buffer.getOrElse(Chunk.Queue.empty[Byte]) :+ hd @@ -93,7 +90,7 @@ object text { case Some(b) => doPull(Chunk.empty, Stream.emits(b.chunks)) case None => - Pull.pure(None) + Pull.done } } @@ -179,15 +176,15 @@ object text { def go(buffer: Vector[String], pendingLineFeed: Boolean, - s: Stream[F, String]): Pull[F, String, Option[Unit]] = + s: Stream[F, String]): Pull[F, String, Unit] = s.pull.uncons.flatMap { case Some((chunk, s)) => val (toOutput, newBuffer, newPendingLineFeed) = extractLines(buffer, chunk, pendingLineFeed) Pull.output(toOutput) >> go(newBuffer, newPendingLineFeed, s) case None if buffer.nonEmpty => - Pull.output1(buffer.mkString) >> Pull.pure(None) - case None => Pull.pure(None) + Pull.output1(buffer.mkString) + case None => Pull.done } s => diff --git a/core/shared/src/test/scala/fs2/CompilationTest.scala b/core/shared/src/test/scala/fs2/CompilationTest.scala index d2c92ad7ee..2bc91d4124 100644 --- a/core/shared/src/test/scala/fs2/CompilationTest.scala +++ b/core/shared/src/test/scala/fs2/CompilationTest.scala @@ -11,8 +11,8 @@ object ThisModuleShouldCompile { /* Some checks that `.pull` can be used without annotations */ Stream(1, 2, 3, 4).through(_.take(2)) Stream.eval(IO.pure(1)).through(_.take(2)) - Stream(1, 2, 3).covary[IO].pull.uncons1.stream - Stream.eval(IO.pure(1)).pull.uncons1.stream + Stream(1, 2, 3).covary[IO].pull.uncons1.void.stream + Stream.eval(IO.pure(1)).pull.uncons1.void.stream /* Also in a polymorphic context. */ def a[F[_], A](s: Stream[F, A]) = s.through(_.take(2)) @@ -36,20 +36,20 @@ object ThisModuleShouldCompile { .pull .uncons1 .flatMap { - case Some((hd, _)) => Pull.output1(hd).as(None) - case None => Pull.pure(None) + case Some((hd, _)) => Pull.output1(hd) + case None => Pull.done } .stream Stream(1, 2, 3).pull.uncons1 .flatMap { - case Some((hd, _)) => Pull.output1(hd).as(None) - case None => Pull.pure(None) + case Some((hd, _)) => Pull.output1(hd) + case None => Pull.done } .stream Stream(1, 2, 3).pull.uncons1 .flatMap { - case Some((hd, _)) => Pull.eval(IO.pure(1)) >> Pull.output1(hd).as(None) - case None => Pull.pure(None) + case Some((hd, _)) => Pull.eval(IO.pure(1)) >> Pull.output1(hd) + case None => Pull.done } .stream (Stream(1, 2, 3).evalMap(IO(_))): Stream[IO, Int] diff --git a/core/shared/src/test/scala/fs2/PullSpec.scala b/core/shared/src/test/scala/fs2/PullSpec.scala index 52347124d8..3a4f5ae514 100644 --- a/core/shared/src/test/scala/fs2/PullSpec.scala +++ b/core/shared/src/test/scala/fs2/PullSpec.scala @@ -11,6 +11,7 @@ class PullSpec extends Fs2Spec { .onFinalize(ref.set(1)) .pull .echoChunk + .void .stream .compile .toList