Skip to content

Commit

Permalink
Update KafkaConsumer.partitionedStream.enqueueStream using for-compre…
Browse files Browse the repository at this point in the history
…hension
  • Loading branch information
CremboC committed Apr 3, 2019
1 parent b51fde2 commit 60b3199
Showing 1 changed file with 58 additions and 55 deletions.
113 changes: 58 additions & 55 deletions src/main/scala/fs2/kafka/KafkaConsumer.scala
Expand Up @@ -371,63 +371,66 @@ private[kafka] object KafkaConsumer {
streamId: Int,
partition: TopicPartition,
partitions: Queue[F, Stream[F, CommittableMessage[F, K, V]]]
): F[Unit] =
chunkQueue.flatMap { chunks =>
Deferred[F, Unit].flatMap { dequeueDone =>
Deferred.tryable[F, Unit].flatMap { stopRequests =>
val shutdown = F.race(fiber.join.attempt, dequeueDone.get).void
partitions.enqueue1 {
Stream.eval {
F.guarantee {
Stream
.repeatEval {
stopRequests.tryGet.flatMap {
case None =>
Deferred[F, PartitionRequest].flatMap { deferred =>
val request = Request.Fetch(partition, streamId, deferred)
val fetch = requests.enqueue1(request) >> deferred.get
F.race(shutdown, fetch).flatMap {
case Left(()) =>
stopRequests.complete(())

case Right((chunk, reason)) =>
val enqueueChunk =
if (chunk.nonEmpty)
chunks.enqueue1(Some(chunk))
else F.unit

val completeRevoked =
if (reason.topicPartitionRevoked)
stopRequests.complete(())
else F.unit

enqueueChunk >> completeRevoked
}
}

case Some(()) =>
// Prevent issuing additional requests after partition is
// revoked or shutdown happens, in case the stream isn't
// interrupted fast enough
F.unit
}
}
.interruptWhen(F.race(shutdown, stopRequests.get).void.attempt)
.compile
.drain
}(F.race(dequeueDone.get, chunks.enqueue1(None)).void)
.start
.as {
chunks.dequeue.unNoneTerminate
.flatMap(Stream.chunk)
.covary[F]
.onFinalize(dequeueDone.complete(()))
}
}.flatten
): F[Unit] = {
for {
chunks <- chunkQueue
dequeueDone <- Deferred[F, Unit]
shutdown = F.race(fiber.join.attempt, dequeueDone.get).void
stopReqs <- Deferred.tryable[F, Unit]
_ <- partitions.enqueue1 {
Stream.eval {
def fetchPartition(deferred: Deferred[F, PartitionRequest]): F[Unit] = {
val request = Request.Fetch(partition, streamId, deferred)
val fetch = requests.enqueue1(request) >> deferred.get
F.race(shutdown, fetch).flatMap {
case Left(()) =>
stopReqs.complete(())

case Right((chunk, reason)) =>
val enqueueChunk =
if (chunk.nonEmpty)
chunks.enqueue1(Some(chunk))
else F.unit

val completeRevoked =
if (reason.topicPartitionRevoked)
stopReqs.complete(())
else F.unit

enqueueChunk >> completeRevoked
}
}
}


F.guarantee {
Stream
.repeatEval {
stopReqs.tryGet.flatMap {
case None =>
Deferred[F, PartitionRequest] >>= fetchPartition

case Some(()) =>
// Prevent issuing additional requests after partition is
// revoked or shutdown happens, in case the stream isn't
// interrupted fast enough
F.unit
}
}
.interruptWhen(F.race(shutdown, stopReqs.get).void.attempt)
.compile
.drain
}(F.race(dequeueDone.get, chunks.enqueue1(None)).void)
.start
.as {
chunks.dequeue.unNoneTerminate
.flatMap(Stream.chunk)
.covary[F]
.onFinalize(dequeueDone.complete(()))
}
}.flatten
}
}
} yield ()
}

def enqueueStreams(
streamId: Int,
Expand Down

0 comments on commit 60b3199

Please sign in to comment.