Skip to content

Commit

Permalink
Backport improvement made in zio/zio#7996
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Apr 12, 2023
1 parent 005a5c5 commit ba05522
Showing 1 changed file with 4 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.RebalanceInProgressException
import zio._
import zio.kafka.consumer.Consumer.{ OffsetRetrieval, RunloopTimeout }
import zio.kafka.consumer.Consumer.OffsetRetrieval
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer
import zio.kafka.consumer.internal.Runloop.Command.{ Commit, Request, StopAllStreams, StopRunloop }
Expand Down Expand Up @@ -472,8 +472,6 @@ private[consumer] final class Runloop private (

ZStream
.fromQueue(commandQueue)
.timeoutFail[Throwable](RunloopTimeout)(runloopTimeout)
.takeWhile(_ != StopRunloop)
.runFoldChunksDiscardZIO(State.initial) { (state, commands) =>
for {
_ <- ZIO.logTrace(s"Processing ${commands.size} commands: ${commands.mkString(",")}")
Expand Down Expand Up @@ -502,10 +500,10 @@ private[consumer] object Runloop {
*/
def runFoldChunksDiscardZIO[R1 <: R, E1 >: E, S](s: S)(f: (S, Chunk[A]) => ZIO[R1, E1, S]): ZIO[R1, E1, Unit] = {
def reader(s: S): ZChannel[R1, E1, Chunk[A], Any, E1, Nothing, Unit] =
ZChannel.readWith(
ZChannel.readWithCause(
(in: Chunk[A]) => ZChannel.fromZIO(f(s, in)).flatMap(reader),
(err: E1) => ZChannel.fail(err),
(_: Any) => ZChannel.unit
(err: Cause[E1]) => ZChannel.refailCause(err),
(_: Any) => ZChannel.succeedNow(s)
)

stream.run(ZSink.fromChannel(reader(s)))
Expand Down

0 comments on commit ba05522

Please sign in to comment.