Skip to content

Commit

Permalink
Improve performances: Remove the aggregateAsync call (#751)
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Mar 27, 2023
1 parent e359e61 commit 04381bf
Showing 1 changed file with 21 additions and 2 deletions.
23 changes: 21 additions & 2 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,7 @@ private[consumer] final class Runloop private (
.fromQueue(commandQueue)
.timeoutFail[Throwable](RunloopTimeout)(runloopTimeout)
.takeWhile(_ != StopRunloop)
.aggregateAsync(ZSink.collectAllN[Command](commandQueueSize))
.runFoldZIO(State.initial) { case (state, commands) =>
.runFoldChunksDiscardZIO(State.initial) { (state, commands) =>
for {
_ <- ZIO.logTrace(s"Processing ${commands.size} commands: ${commands.mkString(",")}")
isShutdown <- isShutdown
Expand All @@ -516,6 +515,26 @@ private[consumer] final class Runloop private (
}

private[consumer] object Runloop {
private implicit final class StreamOps[R, E, A](private val stream: ZStream[R, E, A]) extends AnyVal {

/**
* Inlined, simplified and specialized for our needs version of [[ZSink.foldChunksZIO]]
*
* Code initially inspired by the implementation of [[ZStream.runFoldZIO]] with everything we don't need removed and
* with chunking added
*/
def runFoldChunksDiscardZIO[R1 <: R, E1 >: E, S](s: S)(f: (S, Chunk[A]) => ZIO[R1, E1, S]): ZIO[R1, E1, Unit] = {
def reader(s: S): ZChannel[R1, E1, Chunk[A], Any, E1, Nothing, Unit] =
ZChannel.readWith(
(in: Chunk[A]) => ZChannel.fromZIO(f(s, in)).flatMap(reader),
(err: E1) => ZChannel.fail(err),
(_: Any) => ZChannel.unit
)

stream.run(ZSink.fromChannel(reader(s)))
}
}

type ByteArrayCommittableRecord = CommittableRecord[Array[Byte], Array[Byte]]

// Internal parameters, should not be necessary to tune
Expand Down

0 comments on commit 04381bf

Please sign in to comment.