Skip to content

Commit

Permalink
Merge pull request #1990 from dmitriibundin/main
Browse files Browse the repository at this point in the history
Fixes #1987
  • Loading branch information
mpilquist committed Aug 15, 2020
2 parents ed52222 + d32ac28 commit 9142e0f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 5 deletions.
18 changes: 13 additions & 5 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1918,17 +1918,25 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
Deferred[F2, Either[Throwable, Unit]].flatMap { resultR =>
Ref.of[F2, Boolean](false).flatMap { otherSideDone =>
Queue.unbounded[F2, Option[Stream[F2, O2]]].map { resultQ =>
def go(
s: Stream[F2, O2],
guard: Semaphore[F2],
queue: Queue[F2, Option[Stream[F2, O2]]]
): Pull[F2, O2, Unit] =
Pull.eval(guard.acquire) >> s.pull.uncons.flatMap {
case Some((hd, tl)) =>
Pull
.eval(resultQ.enqueue1(Some(Stream.chunk(hd).onFinalize(guard.release)))) >>
go(tl, guard, queue)
case None => Pull.done
}
def runStream(
s: Stream[F2, O2],
whenDone: Deferred[F2, Either[Throwable, Unit]]
): F2[Unit] =
Semaphore(1).flatMap {
guard => // guarantee we process only single chunk at any given time from any given side.
s.chunks
.evalMap { chunk =>
guard.acquire >>
resultQ.enqueue1(Some(Stream.chunk(chunk).onFinalize(guard.release)))
}
go(s, guard, resultQ).stream
.interruptWhen(interrupt.get.attempt)
.compile
.drain
Expand Down
19 changes: 19 additions & 0 deletions core/shared/src/test/scala/fs2/StreamMergeSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,23 @@ class StreamMergeSuite extends Fs2Suite {
}
}
}

test("merge not emit ahead") {
forAllF { (v: Int) =>
val expected = List(v, v + 1)
Ref
.of[IO, Int](v)
.map { ref =>
Stream
.repeatEval(ref.get)
.merge(Stream.never[IO])
.evalMap { value =>
IO.sleep(100.milliseconds) >> ref.set(value + 1) >> IO(value)
}
.take(2)
}
.flatMap(_.compile.toList)
.map(result => assert(result == expected))
}
}
}

0 comments on commit 9142e0f

Please sign in to comment.