Skip to content

Commit

Permalink
clean
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Mar 27, 2023
1 parent b277eb7 commit e84543b
Showing 1 changed file with 24 additions and 22 deletions.
46 changes: 24 additions & 22 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -491,14 +491,11 @@ private[consumer] final class Runloop private (
s"Starting poll with ${state.pendingRequests.size} pending requests and ${state.pendingCommits.size} pending commits"
)

/**
* Inlined, simplified and specialized for our needs version of [[ZSink.foldChunksZIO]]
*
* Code initially inspired by the implementation of [[ZStream.runFoldZIO]] with everything we don't need removed and
* with chunking added
*/
val sink = {
def execute(state: State, commands: Chunk[Command]): Task[State] =
ZStream
.fromQueue(commandQueue)
.timeoutFail[Throwable](RunloopTimeout)(runloopTimeout)
.takeWhile(_ != StopRunloop)
.runFoldChunksDiscardZIO(State.initial) { (state, commands) =>
for {
_ <- ZIO.logTrace(s"Processing ${commands.size} commands: ${commands.mkString(",")}")
isShutdown <- isShutdown
Expand All @@ -511,28 +508,33 @@ private[consumer] final class Runloop private (
// Immediately poll again, after processing all new queued commands
_ <- commandQueue.offer(Command.Poll).when(updatedStateAfterPoll.shouldPoll)
} yield updatedStateAfterPoll
}
.tapErrorCause(cause => ZIO.logErrorCause("Error in Runloop", cause))
.onError(cause => partitions.offer(Take.failCause(cause)))
}
}

private[consumer] object Runloop {
private implicit final class StreamOps[R, E, A](private val stream: ZStream[R, E, A]) extends AnyVal {

def reader(s: State): ZChannel[Any, Throwable, Chunk[Command], Any, Throwable, Nothing, Unit] =
/**
* Inlined, simplified and specialized for our needs version of [[ZSink.foldChunksZIO]]
*
* Code initially inspired by the implementation of [[ZStream.runFoldZIO]] with everything we don't need removed and
* with chunking added
*/
def runFoldChunksDiscardZIO[R1 <: R, E1 >: E, S](s: S)(f: (S, Chunk[A]) => ZIO[R1, E1, S]): ZIO[R1, E1, Unit] = {
def reader(s: S): ZChannel[R1, E1, Chunk[A], Any, E1, Nothing, Unit] =
ZChannel.readWith(
(in: Chunk[Command]) => ZChannel.fromZIO(execute(s, in)).flatMap(reader),
(err: Throwable) => ZChannel.fail(err),
(in: Chunk[A]) => ZChannel.fromZIO(f(s, in)).flatMap(reader),
(err: E1) => ZChannel.fail(err),
(_: Any) => ZChannel.unit
)

ZSink.fromChannel(reader(State.initial))
stream.run(ZSink.fromChannel(reader(s)))
}

ZStream
.fromQueue(commandQueue)
.timeoutFail[Throwable](RunloopTimeout)(runloopTimeout)
.takeWhile(_ != StopRunloop)
.run(sink)
.tapErrorCause(cause => ZIO.logErrorCause("Error in Runloop", cause))
.onError(cause => partitions.offer(Take.failCause(cause)))
}
}

private[consumer] object Runloop {
type ByteArrayCommittableRecord = CommittableRecord[Array[Byte], Array[Byte]]

// Internal parameters, should not be necessary to tune
Expand Down

0 comments on commit e84543b

Please sign in to comment.