Skip to content

Commit

Permalink
Run the Runloop on a dedicated single-threaded thread pool (#802)
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii authored Apr 12, 2023
1 parent 5a5d023 commit 005a5c5
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer

import scala.jdk.CollectionConverters._

private[consumer] class ConsumerAccess(
private[consumer] final class ConsumerAccess(
private[consumer] val consumer: ByteArrayKafkaConsumer,
access: Semaphore
) {
Expand All @@ -29,6 +29,12 @@ private[consumer] class ConsumerAccess(
}
.fork
.flatMap(fib => fib.join.onInterrupt(ZIO.succeed(consumer.wakeup()) *> fib.interrupt))

/**
* Do not use this method outside of the Runloop
*/
def runloopAccess[R, E, A](f: ByteArrayKafkaConsumer => ZIO[R, E, A]): ZIO[R, E, A] =
access.withPermit(f(consumer))
}

private[consumer] object ConsumerAccess {
Expand Down
22 changes: 12 additions & 10 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,13 @@ private[consumer] final class Runloop private (
}
}

consumer.withConsumerZIO { c =>
// We don't wait for the completion of the commit here, because it
// will only complete once we poll again.
ZIO.attempt(c.commitAsync(offsets.asJava, callback))
// We don't wait for the completion of the commit here, because it
// will only complete once we poll again.
consumer.runloopAccess { c =>
ZIO
.attempt(c.commitAsync(offsets.asJava, callback))
.catchAll(onFailure)
}
.catchAll(onFailure)
}

/**
Expand Down Expand Up @@ -224,7 +225,7 @@ private[consumer] final class Runloop private (
}

private def getConsumerGroupMetadataIfAny: UIO[Option[ConsumerGroupMetadata]] =
if (hasGroupId) consumer.withConsumer(_.groupMetadata()).fold(_ => None, Some(_))
if (hasGroupId) consumer.runloopAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_))
else ZIO.none

private def doSeekForNewPartitions(c: ByteArrayKafkaConsumer, tps: Set[TopicPartition]): Task[Set[TopicPartition]] =
Expand Down Expand Up @@ -262,7 +263,7 @@ private[consumer] final class Runloop private (
for {
_ <- currentState.set(state)
pollResult <-
consumer.withConsumerZIO { c =>
consumer.runloopAccess { c =>
ZIO.suspend {

val prevAssigned = c.assignment().asScala.toSet
Expand Down Expand Up @@ -414,7 +415,7 @@ private[consumer] final class Runloop private (
private def handleChangeSubscription(
command: Command.ChangeSubscription
): Task[Chunk[PartitionStreamControl]] =
consumer.withConsumerZIO { c =>
consumer.runloopAccess { c =>
command.subscription match {
case None =>
ZIO
Expand Down Expand Up @@ -604,8 +605,9 @@ private[consumer] object Runloop {
)
_ <- ZIO.logDebug("Starting Runloop")

// Run the entire loop on the blocking thread pool to avoid executor shifts
fib <- ZIO.blocking(runloop.run).forkScoped
// Run the entire loop on the a dedicated thread to avoid executor shifts
executor <- RunloopExecutor.newInstance
fib <- ZIO.onExecutor(executor)(runloop.run).forkScoped

_ <- ZIO.addFinalizer(
ZIO.logTrace("Shutting down Runloop") *>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package zio.kafka.consumer.internal

import zio.{ Executor, Scope, ZIO }

import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicLong

private[consumer] object RunloopExecutor {

private val counter: AtomicLong = new AtomicLong(0)

private def newSingleThreadedExecutor(i: Long): ZIO[Scope, Throwable, Executor] =
ZIO.acquireRelease {
ZIO.attempt {
val javaExecutor =
Executors.newSingleThreadExecutor(runnable => new Thread(runnable, s"zio-kafka-runloop-thread-$i"))

Executor.fromJavaExecutor(javaExecutor) -> javaExecutor
}
} { case (_, executor) => ZIO.attempt(executor.shutdown()).orDie }.map(_._1)

val newInstance: ZIO[Scope, Throwable, Executor] = newSingleThreadedExecutor(counter.getAndIncrement())

}

0 comments on commit 005a5c5

Please sign in to comment.