Skip to content

Commit

Permalink
Update tuning in controller, views, config, fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
patelh committed Jul 4, 2018
1 parent 5672010 commit 29ef9db
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 168 deletions.
6 changes: 6 additions & 0 deletions app/controllers/Cluster.scala
Expand Up @@ -108,6 +108,9 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
, "offsetCacheThreadPoolQueueSize" -> optional(number(10, 10000))
, "kafkaAdminClientThreadPoolSize" -> optional(number(2, 1000))
, "kafkaAdminClientThreadPoolQueueSize" -> optional(number(10, 10000))
, "kafkaManagedOffsetMetadataCheckMillis" -> optional(number(10000, 120000))
, "kafkaManagedOffsetGroupCacheSize" -> optional(number(10000, 100000000))
, "kafkaManagedOffsetGroupExpireDays" -> optional(number(1, 100))
)(ClusterTuning.apply)(ClusterTuning.unapply)
)
, "securityProtocol" -> nonEmptyText.verifying(validateSecurityProtocol)
Expand Down Expand Up @@ -147,6 +150,9 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
, "offsetCacheThreadPoolQueueSize" -> optional(number(10, 10000))
, "kafkaAdminClientThreadPoolSize" -> optional(number(2, 1000))
, "kafkaAdminClientThreadPoolQueueSize" -> optional(number(10, 10000))
, "kafkaManagedOffsetMetadataCheckMillis" -> optional(number(10000, 120000))
, "kafkaManagedOffsetGroupCacheSize" -> optional(number(10000, 100000000))
, "kafkaManagedOffsetGroupExpireDays" -> optional(number(1, 100))
)(ClusterTuning.apply)(ClusterTuning.unapply)
)
, "securityProtocol" -> nonEmptyText.verifying(validateSecurityProtocol)
Expand Down
22 changes: 16 additions & 6 deletions app/kafka/manager/KafkaManager.scala
Expand Up @@ -6,24 +6,25 @@
package kafka.manager

import java.util.Properties
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, ThreadPoolExecutor}
import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}

import akka.actor.{ActorPath, ActorSystem, Props}
import akka.util.Timeout
import com.typesafe.config.{ConfigFactory, Config}
import com.typesafe.config.{Config, ConfigFactory}
import grizzled.slf4j.Logging
import kafka.manager.actor.{KafkaManagerActorConfig, KafkaManagerActor}
import kafka.manager.actor.{KafkaManagerActor, KafkaManagerActorConfig}
import kafka.manager.base.LongRunningPoolConfig
import kafka.manager.model._
import ActorModel._
import kafka.manager.actor.cluster.KafkaManagedOffsetCacheConfig
import kafka.manager.utils.UtilException
import kafka.manager.utils.zero81.ReassignPartitionErrors.ReplicationOutOfSync
import kafka.manager.utils.zero81.{ReassignPartitionErrors, ForceReassignmentCommand, ForceOnReplicationOutOfSync}
import kafka.manager.utils.zero81.{ForceOnReplicationOutOfSync, ForceReassignmentCommand, ReassignPartitionErrors}

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.reflect.ClassTag
import scala.util.{Success, Failure, Try}
import scala.util.{Failure, Success, Try}

/**
* @author hiral
Expand Down Expand Up @@ -80,6 +81,9 @@ object KafkaManager {
val OffsetCacheMaxQueueSize = "kafka-manager.offset-cache-max-queue-size"
val KafkaAdminClientThreadPoolSize = "kafka-manager.kafka-admin-client-thread-pool-size"
val KafkaAdminClientMaxQueueSize = "kafka-manager.kafka-admin-client-max-queue-size"
val KafkaManagedOffsetMetadataCheckMillis = "kafka-manager.kafka-managed-offset-metadata-check-millis"
val KafkaManagedOffsetGroupCacheSize = "kafka-manager.kafka-managed-offset-group-cache-size"
val KafkaManagedOffsetGroupExpireDays = "kafka-manager.kafka-managed-offset-group-expire-days"

val DefaultConfig: Config = {
val defaults: Map[String, _ <: AnyRef] = Map(
Expand All @@ -102,7 +106,10 @@ object KafkaManager {
OffsetCacheThreadPoolSize -> Runtime.getRuntime.availableProcessors().toString,
OffsetCacheMaxQueueSize -> "1000",
KafkaAdminClientThreadPoolSize -> Runtime.getRuntime.availableProcessors().toString,
KafkaAdminClientMaxQueueSize -> "1000"
KafkaAdminClientMaxQueueSize -> "1000",
KafkaManagedOffsetMetadataCheckMillis -> KafkaManagedOffsetCacheConfig.defaultGroupMemberMetadataCheckMillis.toString,
KafkaManagedOffsetGroupCacheSize -> KafkaManagedOffsetCacheConfig.defaultGroupTopicPartitionOffsetMaxSize.toString,
KafkaManagedOffsetGroupExpireDays -> KafkaManagedOffsetCacheConfig.defaultGroupTopicPartitionOffsetExpireDays.toString
)
import scala.collection.JavaConverters._
ConfigFactory.parseMap(defaults.asJava)
Expand Down Expand Up @@ -132,6 +139,9 @@ class KafkaManager(akkaConfig: Config) extends Logging {
, offsetCacheThreadPoolQueueSize = Option(configWithDefaults.getInt(OffsetCacheMaxQueueSize))
, kafkaAdminClientThreadPoolSize = Option(configWithDefaults.getInt(KafkaAdminClientThreadPoolSize))
, kafkaAdminClientThreadPoolQueueSize = Option(configWithDefaults.getInt(KafkaAdminClientMaxQueueSize))
, kafkaManagedOffsetMetadataCheckMillis = Option(configWithDefaults.getInt(KafkaManagedOffsetMetadataCheckMillis))
, kafkaManagedOffsetGroupCacheSize = Option(configWithDefaults.getInt(KafkaManagedOffsetGroupCacheSize))
, kafkaManagedOffsetGroupExpireDays = Option(configWithDefaults.getInt(KafkaManagedOffsetGroupExpireDays))
)
private[this] val kafkaManagerConfig = {
val curatorConfig = CuratorConfig(configWithDefaults.getString(ZkHosts))
Expand Down
36 changes: 21 additions & 15 deletions app/kafka/manager/actor/KafkaManagerActor.scala
Expand Up @@ -417,24 +417,30 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
val offsetCacheThreadPoolQueueSize = config.tuning.flatMap(_.offsetCacheThreadPoolQueueSize) orElse kmConfig.defaultTuning.offsetCacheThreadPoolQueueSize
val kafkaAdminClientThreadPoolSize = config.tuning.flatMap(_.kafkaAdminClientThreadPoolSize) orElse kmConfig.defaultTuning.kafkaAdminClientThreadPoolSize
val kafkaAdminClientThreadPoolQueueSize = config.tuning.flatMap(_.kafkaAdminClientThreadPoolQueueSize) orElse kmConfig.defaultTuning.kafkaAdminClientThreadPoolQueueSize
val kafkaManagedOffsetMetadataCheckMillis = config.tuning.flatMap(_.kafkaManagedOffsetMetadataCheckMillis) orElse kmConfig.defaultTuning.kafkaManagedOffsetMetadataCheckMillis
val kafkaManagedOffsetGroupCacheSize = config.tuning.flatMap(_.kafkaManagedOffsetGroupCacheSize) orElse kmConfig.defaultTuning.kafkaManagedOffsetGroupCacheSize
val kafkaManagedOffsetGroupExpireDays = config.tuning.flatMap(_.kafkaManagedOffsetGroupExpireDays) orElse kmConfig.defaultTuning.kafkaManagedOffsetGroupExpireDays

val tuning = Option(
ClusterTuning(
brokerViewUpdatePeriodSeconds = brokerViewUpdatePeriodSeconds
, clusterManagerThreadPoolSize = clusterManagerThreadPoolSize
, clusterManagerThreadPoolQueueSize = clusterManagerThreadPoolQueueSize
, kafkaCommandThreadPoolSize = kafkaCommandThreadPoolSize
, kafkaCommandThreadPoolQueueSize = kafkaCommandThreadPoolQueueSize
, logkafkaCommandThreadPoolSize = logkafkaCommandThreadPoolSize
, logkafkaCommandThreadPoolQueueSize = logkafkaCommandThreadPoolQueueSize
, logkafkaUpdatePeriodSeconds = logkafkaUpdatePeriodSeconds
, partitionOffsetCacheTimeoutSecs = partitionOffsetCacheTimeoutSecs
, brokerViewThreadPoolSize = brokerViewThreadPoolSize
, brokerViewThreadPoolQueueSize = brokerViewThreadPoolQueueSize
, offsetCacheThreadPoolSize = offsetCacheThreadPoolSize
, offsetCacheThreadPoolQueueSize = offsetCacheThreadPoolQueueSize
, kafkaAdminClientThreadPoolSize = kafkaAdminClientThreadPoolSize
, kafkaAdminClientThreadPoolQueueSize = kafkaAdminClientThreadPoolQueueSize
brokerViewUpdatePeriodSeconds = brokerViewUpdatePeriodSeconds
, clusterManagerThreadPoolSize = clusterManagerThreadPoolSize
, clusterManagerThreadPoolQueueSize = clusterManagerThreadPoolQueueSize
, kafkaCommandThreadPoolSize = kafkaCommandThreadPoolSize
, kafkaCommandThreadPoolQueueSize = kafkaCommandThreadPoolQueueSize
, logkafkaCommandThreadPoolSize = logkafkaCommandThreadPoolSize
, logkafkaCommandThreadPoolQueueSize = logkafkaCommandThreadPoolQueueSize
, logkafkaUpdatePeriodSeconds = logkafkaUpdatePeriodSeconds
, partitionOffsetCacheTimeoutSecs = partitionOffsetCacheTimeoutSecs
, brokerViewThreadPoolSize = brokerViewThreadPoolSize
, brokerViewThreadPoolQueueSize = brokerViewThreadPoolQueueSize
, offsetCacheThreadPoolSize = offsetCacheThreadPoolSize
, offsetCacheThreadPoolQueueSize = offsetCacheThreadPoolQueueSize
, kafkaAdminClientThreadPoolSize = kafkaAdminClientThreadPoolSize
, kafkaAdminClientThreadPoolQueueSize = kafkaAdminClientThreadPoolQueueSize
, kafkaManagedOffsetMetadataCheckMillis = kafkaManagedOffsetMetadataCheckMillis
, kafkaManagedOffsetGroupCacheSize = kafkaManagedOffsetGroupCacheSize
, kafkaManagedOffsetGroupExpireDays = kafkaManagedOffsetGroupExpireDays
)
)
config.copy(
Expand Down

0 comments on commit 29ef9db

Please sign in to comment.