Skip to content

Commit

Permalink
Fixes #1987
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitriibundin committed Aug 14, 2020
1 parent ed52222 commit 8ad16ae
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
3 changes: 1 addition & 2 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1924,9 +1924,8 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
): F2[Unit] =
Semaphore(1).flatMap {
guard => // guarantee we process only single chunk at any given time from any given side.
s.chunks
Stream.repeatEval(guard.acquire).zipRight(s.chunks)
.evalMap { chunk =>
guard.acquire >>
resultQ.enqueue1(Some(Stream.chunk(chunk).onFinalize(guard.release)))
}
.interruptWhen(interrupt.get.attempt)
Expand Down
11 changes: 11 additions & 0 deletions core/shared/src/test/scala/fs2/StreamMergeSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,15 @@ class StreamMergeSuite extends Fs2Suite {
}
}
}

test("merge not emit ahead") {
forAllF { (v: Int) =>
val expected = List(v, v + 1)
Ref.of[IO, Int](v).map(ref => {
Stream.repeatEval(ref.get).merge(Stream.never[IO]).evalMap(value => {
IO.sleep(100.milliseconds) >> ref.set(value + 1) >> IO(value)
}).take(2)
}).flatMap(_.compile.toList).map(result => assert(result == expected))
}
}
}

0 comments on commit 8ad16ae

Please sign in to comment.