diff --git a/app/controllers/Cluster.scala b/app/controllers/Cluster.scala index 679202c17..7a3427c8a 100644 --- a/app/controllers/Cluster.scala +++ b/app/controllers/Cluster.scala @@ -108,6 +108,9 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag , "offsetCacheThreadPoolQueueSize" -> optional(number(10, 10000)) , "kafkaAdminClientThreadPoolSize" -> optional(number(2, 1000)) , "kafkaAdminClientThreadPoolQueueSize" -> optional(number(10, 10000)) + , "kafkaManagedOffsetMetadataCheckMillis" -> optional(number(10000, 120000)) + , "kafkaManagedOffsetGroupCacheSize" -> optional(number(10000, 100000000)) + , "kafkaManagedOffsetGroupExpireDays" -> optional(number(1, 100)) )(ClusterTuning.apply)(ClusterTuning.unapply) ) , "securityProtocol" -> nonEmptyText.verifying(validateSecurityProtocol) @@ -147,6 +150,9 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag , "offsetCacheThreadPoolQueueSize" -> optional(number(10, 10000)) , "kafkaAdminClientThreadPoolSize" -> optional(number(2, 1000)) , "kafkaAdminClientThreadPoolQueueSize" -> optional(number(10, 10000)) + , "kafkaManagedOffsetMetadataCheckMillis" -> optional(number(10000, 120000)) + , "kafkaManagedOffsetGroupCacheSize" -> optional(number(10000, 100000000)) + , "kafkaManagedOffsetGroupExpireDays" -> optional(number(1, 100)) )(ClusterTuning.apply)(ClusterTuning.unapply) ) , "securityProtocol" -> nonEmptyText.verifying(validateSecurityProtocol) diff --git a/app/kafka/manager/KafkaManager.scala b/app/kafka/manager/KafkaManager.scala index 2ea968b01..a0b39bb9c 100644 --- a/app/kafka/manager/KafkaManager.scala +++ b/app/kafka/manager/KafkaManager.scala @@ -6,24 +6,25 @@ package kafka.manager import java.util.Properties -import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, ThreadPoolExecutor} +import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} import akka.actor.{ActorPath, ActorSystem, Props} import akka.util.Timeout -import com.typesafe.config.{ConfigFactory, Config} +import com.typesafe.config.{Config, ConfigFactory} import grizzled.slf4j.Logging -import kafka.manager.actor.{KafkaManagerActorConfig, KafkaManagerActor} +import kafka.manager.actor.{KafkaManagerActor, KafkaManagerActorConfig} import kafka.manager.base.LongRunningPoolConfig import kafka.manager.model._ import ActorModel._ +import kafka.manager.actor.cluster.KafkaManagedOffsetCacheConfig import kafka.manager.utils.UtilException import kafka.manager.utils.zero81.ReassignPartitionErrors.ReplicationOutOfSync -import kafka.manager.utils.zero81.{ReassignPartitionErrors, ForceReassignmentCommand, ForceOnReplicationOutOfSync} +import kafka.manager.utils.zero81.{ForceOnReplicationOutOfSync, ForceReassignmentCommand, ReassignPartitionErrors} import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.reflect.ClassTag -import scala.util.{Success, Failure, Try} +import scala.util.{Failure, Success, Try} /** * @author hiral @@ -80,6 +81,9 @@ object KafkaManager { val OffsetCacheMaxQueueSize = "kafka-manager.offset-cache-max-queue-size" val KafkaAdminClientThreadPoolSize = "kafka-manager.kafka-admin-client-thread-pool-size" val KafkaAdminClientMaxQueueSize = "kafka-manager.kafka-admin-client-max-queue-size" + val KafkaManagedOffsetMetadataCheckMillis = "kafka-manager.kafka-managed-offset-metadata-check-millis" + val KafkaManagedOffsetGroupCacheSize = "kafka-manager.kafka-managed-offset-group-cache-size" + val KafkaManagedOffsetGroupExpireDays = "kafka-manager.kafka-managed-offset-group-expire-days" val DefaultConfig: Config = { val defaults: Map[String, _ <: AnyRef] = Map( @@ -102,7 +106,10 @@ object KafkaManager { OffsetCacheThreadPoolSize -> Runtime.getRuntime.availableProcessors().toString, OffsetCacheMaxQueueSize -> "1000", KafkaAdminClientThreadPoolSize -> Runtime.getRuntime.availableProcessors().toString, - KafkaAdminClientMaxQueueSize -> "1000" + KafkaAdminClientMaxQueueSize -> "1000", + KafkaManagedOffsetMetadataCheckMillis -> KafkaManagedOffsetCacheConfig.defaultGroupMemberMetadataCheckMillis.toString, + KafkaManagedOffsetGroupCacheSize -> KafkaManagedOffsetCacheConfig.defaultGroupTopicPartitionOffsetMaxSize.toString, + KafkaManagedOffsetGroupExpireDays -> KafkaManagedOffsetCacheConfig.defaultGroupTopicPartitionOffsetExpireDays.toString ) import scala.collection.JavaConverters._ ConfigFactory.parseMap(defaults.asJava) @@ -132,6 +139,9 @@ class KafkaManager(akkaConfig: Config) extends Logging { , offsetCacheThreadPoolQueueSize = Option(configWithDefaults.getInt(OffsetCacheMaxQueueSize)) , kafkaAdminClientThreadPoolSize = Option(configWithDefaults.getInt(KafkaAdminClientThreadPoolSize)) , kafkaAdminClientThreadPoolQueueSize = Option(configWithDefaults.getInt(KafkaAdminClientMaxQueueSize)) + , kafkaManagedOffsetMetadataCheckMillis = Option(configWithDefaults.getInt(KafkaManagedOffsetMetadataCheckMillis)) + , kafkaManagedOffsetGroupCacheSize = Option(configWithDefaults.getInt(KafkaManagedOffsetGroupCacheSize)) + , kafkaManagedOffsetGroupExpireDays = Option(configWithDefaults.getInt(KafkaManagedOffsetGroupExpireDays)) ) private[this] val kafkaManagerConfig = { val curatorConfig = CuratorConfig(configWithDefaults.getString(ZkHosts)) diff --git a/app/kafka/manager/actor/KafkaManagerActor.scala b/app/kafka/manager/actor/KafkaManagerActor.scala index f6f10e6bd..9751f11b3 100644 --- a/app/kafka/manager/actor/KafkaManagerActor.scala +++ b/app/kafka/manager/actor/KafkaManagerActor.scala @@ -417,24 +417,30 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig) val offsetCacheThreadPoolQueueSize = config.tuning.flatMap(_.offsetCacheThreadPoolQueueSize) orElse kmConfig.defaultTuning.offsetCacheThreadPoolQueueSize val kafkaAdminClientThreadPoolSize = config.tuning.flatMap(_.kafkaAdminClientThreadPoolSize) orElse kmConfig.defaultTuning.kafkaAdminClientThreadPoolSize val kafkaAdminClientThreadPoolQueueSize = config.tuning.flatMap(_.kafkaAdminClientThreadPoolQueueSize) orElse kmConfig.defaultTuning.kafkaAdminClientThreadPoolQueueSize + val kafkaManagedOffsetMetadataCheckMillis = config.tuning.flatMap(_.kafkaManagedOffsetMetadataCheckMillis) orElse kmConfig.defaultTuning.kafkaManagedOffsetMetadataCheckMillis + val kafkaManagedOffsetGroupCacheSize = config.tuning.flatMap(_.kafkaManagedOffsetGroupCacheSize) orElse kmConfig.defaultTuning.kafkaManagedOffsetGroupCacheSize + val kafkaManagedOffsetGroupExpireDays = config.tuning.flatMap(_.kafkaManagedOffsetGroupExpireDays) orElse kmConfig.defaultTuning.kafkaManagedOffsetGroupExpireDays val tuning = Option( ClusterTuning( - brokerViewUpdatePeriodSeconds = brokerViewUpdatePeriodSeconds - , clusterManagerThreadPoolSize = clusterManagerThreadPoolSize - , clusterManagerThreadPoolQueueSize = clusterManagerThreadPoolQueueSize - , kafkaCommandThreadPoolSize = kafkaCommandThreadPoolSize - , kafkaCommandThreadPoolQueueSize = kafkaCommandThreadPoolQueueSize - , logkafkaCommandThreadPoolSize = logkafkaCommandThreadPoolSize - , logkafkaCommandThreadPoolQueueSize = logkafkaCommandThreadPoolQueueSize - , logkafkaUpdatePeriodSeconds = logkafkaUpdatePeriodSeconds - , partitionOffsetCacheTimeoutSecs = partitionOffsetCacheTimeoutSecs - , brokerViewThreadPoolSize = brokerViewThreadPoolSize - , brokerViewThreadPoolQueueSize = brokerViewThreadPoolQueueSize - , offsetCacheThreadPoolSize = offsetCacheThreadPoolSize - , offsetCacheThreadPoolQueueSize = offsetCacheThreadPoolQueueSize - , kafkaAdminClientThreadPoolSize = kafkaAdminClientThreadPoolSize - , kafkaAdminClientThreadPoolQueueSize = kafkaAdminClientThreadPoolQueueSize + brokerViewUpdatePeriodSeconds = brokerViewUpdatePeriodSeconds + , clusterManagerThreadPoolSize = clusterManagerThreadPoolSize + , clusterManagerThreadPoolQueueSize = clusterManagerThreadPoolQueueSize + , kafkaCommandThreadPoolSize = kafkaCommandThreadPoolSize + , kafkaCommandThreadPoolQueueSize = kafkaCommandThreadPoolQueueSize + , logkafkaCommandThreadPoolSize = logkafkaCommandThreadPoolSize + , logkafkaCommandThreadPoolQueueSize = logkafkaCommandThreadPoolQueueSize + , logkafkaUpdatePeriodSeconds = logkafkaUpdatePeriodSeconds + , partitionOffsetCacheTimeoutSecs = partitionOffsetCacheTimeoutSecs + , brokerViewThreadPoolSize = brokerViewThreadPoolSize + , brokerViewThreadPoolQueueSize = brokerViewThreadPoolQueueSize + , offsetCacheThreadPoolSize = offsetCacheThreadPoolSize + , offsetCacheThreadPoolQueueSize = offsetCacheThreadPoolQueueSize + , kafkaAdminClientThreadPoolSize = kafkaAdminClientThreadPoolSize + , kafkaAdminClientThreadPoolQueueSize = kafkaAdminClientThreadPoolQueueSize + , kafkaManagedOffsetMetadataCheckMillis = kafkaManagedOffsetMetadataCheckMillis + , kafkaManagedOffsetGroupCacheSize = kafkaManagedOffsetGroupCacheSize + , kafkaManagedOffsetGroupExpireDays = kafkaManagedOffsetGroupExpireDays ) ) config.copy( diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala index 1c0d65625..274e172f4 100644 --- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala @@ -1,7 +1,7 @@ /** - * Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 - * See accompanying LICENSE file. - */ + * Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 + * See accompanying LICENSE file. + */ package kafka.manager.actor.cluster @@ -19,7 +19,6 @@ import grizzled.slf4j.Logging import kafka.admin.AdminClient import kafka.api.PartitionOffsetRequestInfo import kafka.common.{OffsetAndMetadata, TopicAndPartition} -import kafka.coordinator.group.{GroupMetadataKey, OffsetKey} import kafka.manager._ import kafka.manager.base.cluster.{BaseClusterQueryActor, BaseClusterQueryCommandActor} import kafka.manager.base.{LongRunningPoolActor, LongRunningPoolConfig} @@ -28,8 +27,7 @@ import kafka.manager.model.ActorModel._ import kafka.manager.model._ import kafka.manager.utils.ZkUtils import kafka.manager.utils.zero81.{PreferredReplicaLeaderElectionCommand, ReassignPartitionCommand} -//import kafka.manager.utils.zero90.{GroupMetadata, MemberMetadata} -import kafka.manager.utils.one10.{GroupMetadata, MemberMetadata} +import kafka.manager.utils.one10.{GroupMetadata, MemberMetadata, OffsetKey, GroupMetadataKey} import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode import org.apache.curator.framework.recipes.cache._ @@ -48,8 +46,8 @@ import scala.util.{Failure, Success, Try} import org.apache.kafka.clients.consumer.ConsumerConfig._ /** - * @author hiral - */ + * @author hiral + */ import kafka.manager.utils._ import scala.collection.JavaConverters._ @@ -179,16 +177,16 @@ object KafkaManagedOffsetCacheConfig { val defaultGroupTopicPartitionOffsetMaxSize: Int = 1000000 val defaultGroupTopicPartitionOffsetExpireDays: Int = 7 } -import KafkaManagedOffsetCacheConfig._ -case class KafkaManagedOffsetCacheConfig(groupMemberMetadataCheckMillis: Int = defaultGroupMemberMetadataCheckMillis - , groupTopicPartitionOffsetMaxSize: Int = defaultGroupTopicPartitionOffsetMaxSize - , groupTopicPartitionOffsetExpireDays: Int = defaultGroupTopicPartitionOffsetExpireDays) + +case class KafkaManagedOffsetCacheConfig(groupMemberMetadataCheckMillis: Int = KafkaManagedOffsetCacheConfig.defaultGroupMemberMetadataCheckMillis + , groupTopicPartitionOffsetMaxSize: Int = KafkaManagedOffsetCacheConfig.defaultGroupTopicPartitionOffsetMaxSize + , groupTopicPartitionOffsetExpireDays: Int = KafkaManagedOffsetCacheConfig.defaultGroupTopicPartitionOffsetExpireDays) case class KafkaManagedOffsetCache(clusterContext: ClusterContext , adminClient: KafkaAdminClient , consumerProperties: Option[Properties] , bootstrapBrokerList: BrokerList , config: KafkaManagedOffsetCacheConfig - ) extends Runnable with Closeable with Logging { + ) extends Runnable with Closeable with Logging { val groupTopicPartitionOffsetSet: mutable.Set[(String, String, Int)] = KafkaManagedOffsetCache.createSet() val groupTopicPartitionOffsetMap:Cache[(String, String, Int), OffsetAndMetadata] = Caffeine .newBuilder() @@ -405,7 +403,7 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext case class ConsumerInstanceSubscriptions private(id: String, subs: Map[String, Int]) object ConsumerInstanceSubscriptions extends Logging { - + //{"version":1,"subscription":{"DXSPreAgg":1},"pattern":"static","timestamp":"1443578242654"} def apply(consumer: String, id: String, jsonString: String) : ConsumerInstanceSubscriptions = { import org.json4s.jackson.JsonMethods.parse @@ -426,7 +424,7 @@ trait OffsetCache extends Logging { def clusterContext: ClusterContext def getKafkaVersion: KafkaVersion - + def getCacheTimeoutSecs: Int def getSimpleConsumerSocketTimeoutMillis: Int @@ -434,21 +432,21 @@ trait OffsetCache extends Logging { def kafkaManagedOffsetCacheConfig: KafkaManagedOffsetCacheConfig protected[this] implicit def ec: ExecutionContext - + protected[this] implicit def cf: ClusterFeatures - + protected[this] val loadOffsets: Boolean // Caches a map of partitions to offsets at a key that is the topic's name. private[this] lazy val partitionOffsetsCache: LoadingCache[String, Future[PartitionOffsetsCapture]] = CacheBuilder.newBuilder() .expireAfterWrite(getCacheTimeoutSecs,TimeUnit.SECONDS) // TODO - update more or less often maybe, or make it configurable .build( - new CacheLoader[String,Future[PartitionOffsetsCapture]] { - def load(topic: String): Future[PartitionOffsetsCapture] = { - loadPartitionOffsets(topic) - } + new CacheLoader[String,Future[PartitionOffsetsCapture]] { + def load(topic: String): Future[PartitionOffsetsCapture] = { + loadPartitionOffsets(topic) } - ) + } + ) // Get the latest offsets for the partitions of the topic, // Code based off of the GetOffsetShell tool in kafka.tools, kafka 0.8.2.1 @@ -513,7 +511,7 @@ trait OffsetCache extends Logging { } private[this] def emptyPartitionOffsetsCapture: Future[PartitionOffsetsCapture] = Future.successful(PartitionOffsetsCapture(System.currentTimeMillis(), Map())) - + protected def getTopicPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]] protected def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] @@ -521,7 +519,7 @@ trait OffsetCache extends Logging { protected def getBrokerList : () => BrokerList protected def readConsumerOffsetByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]) : Map[Int, Long] - + protected def readConsumerOwnerByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]) : Map[Int, String] protected def getConsumerTopicsFromIds(consumer: String) : Set[String] @@ -559,7 +557,7 @@ trait OffsetCache extends Logging { throw new IllegalArgumentException(s"Unsupported Kafka Version: ${clusterContext.config.version}") } } - + def stop() : Unit = { kafkaManagedOffsetCache.foreach { of => info("Stopping kafka managed offset cache ...") @@ -630,12 +628,12 @@ trait OffsetCache extends Logging { } val topicDescriptions: Map[String, ConsumedTopicDescription] = consumerTopics.map { topic => - val topicDesc = getConsumedTopicDescription(consumer, topic, false, consumerType) - (topic, topicDesc) - }.toMap + val topicDesc = getConsumedTopicDescription(consumer, topic, false, consumerType) + (topic, topicDesc) + }.toMap ConsumerDescription(consumer, topicDescriptions, consumerType) } - + final def getConsumedTopicDescription(consumer:String , topic:String , interactive: Boolean @@ -677,7 +675,7 @@ trait OffsetCache extends Logging { partitionOffsets.map(_.size).getOrElse(0)) ConsumedTopicDescription(consumer, topic, numPartitions, optTopic, partitionOwners, partitionOffsets) } - + final def getConsumerList: ConsumerList = { ConsumerList(getKafkaManagedConsumerList ++ getZKManagedConsumerList, clusterContext) } @@ -694,7 +692,7 @@ case class OffsetCacheActive(curator: CuratorFramework , consumerProperties: Option[Properties] , kafkaManagedOffsetCacheConfig: KafkaManagedOffsetCacheConfig , getBrokerList : () => BrokerList - ) + ) (implicit protected[this] val ec: ExecutionContext, val cf: ClusterFeatures) extends OffsetCache { def getKafkaVersion: KafkaVersion = kafkaVersion @@ -716,9 +714,9 @@ case class OffsetCacheActive(curator: CuratorFramework } } } - + private[this] val consumersTreeCache = new TreeCache(curator, ZkUtils.ConsumersPath) - + @volatile private[this] var consumersTreeCacheLastUpdateMillis : Long = System.currentTimeMillis() @@ -729,7 +727,7 @@ case class OffsetCacheActive(curator: CuratorFramework protected def getTopicPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]] = partitionLeaders(topic) protected def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = topicDescriptions(topic, interactive) - + override def start(): Unit = { super.start() info("Starting consumers tree cache...") @@ -738,12 +736,12 @@ case class OffsetCacheActive(curator: CuratorFramework info("Adding consumers tree cache listener...") consumersTreeCache.getListenable.addListener(consumersTreeCacheListener) } - + override def stop(): Unit = { super.stop() info("Removing consumers tree cache listener...") Try(consumersTreeCache.getListenable.removeListener(consumersTreeCacheListener)) - + info("Shutting down consumers tree cache...") Try(consumersTreeCache.close()) } @@ -756,9 +754,9 @@ case class OffsetCacheActive(curator: CuratorFramework val offsetPath = "%s/%s/%s/%s/%s".format(ZkUtils.ConsumersPath, consumer, "offsets", topic, p) (p, Option(consumersTreeCache.getCurrentData(offsetPath)).flatMap(cd => Option(cd.getData)).map(asString).getOrElse("-1").toLong) } - + } - + protected def readConsumerOwnerByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]) : Map[Int, String] = { tpi.map { case (p, _) => @@ -812,7 +810,7 @@ case class OffsetCachePassive(curator: CuratorFramework , consumerProperties: Option[Properties] , kafkaManagedOffsetCacheConfig: KafkaManagedOffsetCacheConfig , getBrokerList : () => BrokerList - ) + ) (implicit protected[this] val ec: ExecutionContext, val cf: ClusterFeatures) extends OffsetCache { def getKafkaVersion: KafkaVersion = kafkaVersion @@ -928,7 +926,7 @@ case class KafkaStateActorConfig(curator: CuratorFramework , simpleConsumerSocketTimeoutMillis: Int , consumerProperties: Option[Properties] , kafkaManagedOffsetCacheConfig: KafkaManagedOffsetCacheConfig - ) + ) class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCommandActor with LongRunningPoolActor { protected implicit val clusterContext: ClusterContext = config.clusterContext @@ -1035,7 +1033,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom } } } - + private[this] lazy val offsetCache: OffsetCache = { if(config.clusterContext.config.activeOffsetCacheEnabled) new OffsetCacheActive(config.curator @@ -1134,15 +1132,15 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom } def getTopicPartitionOffsetsNotFuture(topic: String, interactive: Boolean): PartitionOffsetsCapture = { - var partitionOffsets = PartitionOffsetsCapture(System.currentTimeMillis(), Map()) + var partitionOffsets = PartitionOffsetsCapture(System.currentTimeMillis(), Map()) - val loadOffsets = featureGateFold(KMPollConsumersFeature)(false, true) - if ((interactive || loadOffsets) && - kafkaTopicOffsetCaptureMap.contains(topic)) { - partitionOffsets = kafkaTopicOffsetCaptureMap(topic) - } + val loadOffsets = featureGateFold(KMPollConsumersFeature)(false, true) + if ((interactive || loadOffsets) && + kafkaTopicOffsetCaptureMap.contains(topic)) { + partitionOffsets = kafkaTopicOffsetCaptureMap(topic) + } - partitionOffsets + partitionOffsets } def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = { @@ -1223,15 +1221,15 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom override def processQueryRequest(request: QueryRequest): Unit = { request match { case KSGetTopics => - val deleteSet: Set[String] = + val deleteSet: Set[String] = featureGateFold(KMDeleteTopicFeature)( - Set.empty, - { - val deleteTopicsData: mutable.Buffer[ChildData] = deleteTopicsPathCache.getCurrentData.asScala - deleteTopicsData.map { cd => - nodeFromPath(cd.getPath) - }.toSet - }) + Set.empty, + { + val deleteTopicsData: mutable.Buffer[ChildData] = deleteTopicsPathCache.getCurrentData.asScala + deleteTopicsData.map { cd => + nodeFromPath(cd.getPath) + }.toSet + }) withTopicsTreeCache { cache => cache.getCurrentChildren(ZkUtils.BrokerTopicsPath) }.fold { @@ -1377,7 +1375,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom } //--------------------------------------------------- - private[this] var kafkaTopicOffsetGetter : Option[KafkaTopicOffsetGetter] = None + private[this] var kafkaTopicOffsetGetter : Option[KafkaTopicOffsetGetter] = None private[this] var kafkaTopicOffsetMap = new TrieMap[String, Map[Int, Long]] private[this] var kafkaTopicOffsetCaptureMap = new TrieMap[String, PartitionOffsetsCapture] def startTopicOffsetGetter() : Unit = { @@ -1408,97 +1406,102 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom try { withTopicsTreeCache { cache: TreeCache => cache.getCurrentChildren(ZkUtils.BrokerTopicsPath) - }.fold { - } { data: java.util.Map[String, ChildData] => - var broker2TopicPartitionMap: Map[BrokerIdentity, List[(TopicAndPartition, PartitionOffsetRequestInfo)]] = Map() - - breakable { - data.asScala.keys.toIndexedSeq.foreach(topic => { - if (shutdown) { - return - } - var optPartitionsWithLeaders : Option[List[(Int, Option[BrokerIdentity])]] = getPartitionLeaders(topic) - optPartitionsWithLeaders match { - case Some(leaders) => - leaders.foreach(leader => { - leader._2 match { - case Some(brokerIden) => - var tlList : List[(TopicAndPartition, PartitionOffsetRequestInfo)] = null - if (broker2TopicPartitionMap.contains(brokerIden)) { - tlList = broker2TopicPartitionMap(brokerIden) - } else { - tlList = List() - } - tlList = (TopicAndPartition(topic, leader._1), PartitionOffsetRequestInfo(-1, 1)) +: tlList - broker2TopicPartitionMap += (brokerIden -> tlList) - case None => + }.fold { + } { data: java.util.Map[String, ChildData] => + var broker2TopicPartitionMap: Map[BrokerIdentity, List[(TopicAndPartition, PartitionOffsetRequestInfo)]] = Map() + + breakable { + data.asScala.keys.toIndexedSeq.foreach(topic => { + if (shutdown) { + return + } + var optPartitionsWithLeaders : Option[List[(Int, Option[BrokerIdentity])]] = getPartitionLeaders(topic) + optPartitionsWithLeaders match { + case Some(leaders) => + leaders.foreach(leader => { + leader._2 match { + case Some(brokerIden) => + var tlList : List[(TopicAndPartition, PartitionOffsetRequestInfo)] = null + if (broker2TopicPartitionMap.contains(brokerIden)) { + tlList = broker2TopicPartitionMap(brokerIden) + } else { + tlList = List() } - }) - case None => - } - } - ) + tlList = (TopicAndPartition(topic, leader._1), PartitionOffsetRequestInfo(-1, 1)) +: tlList + broker2TopicPartitionMap += (brokerIden -> tlList) + case None => + } + }) + case None => } + } + ) + } - breakable { - broker2TopicPartitionMap.keys.foreach(broker => { - if (shutdown) { - return - } - - val tpList = broker2TopicPartitionMap(broker) - val port: Int = broker.endpoints(PLAINTEXT) - val consumerProperties = kaConfig.consumerProperties.getOrElse(getDefaultConsumerProperties(s"${broker.host}:$port")) - val kafkaConsumer = new KafkaConsumer(consumerProperties) - try { - val request = tpList.toList.map( f => new TopicPartition(f._1.topic, f._1.partition)).toList - var tpOffsetMap = kafkaConsumer.endOffsets(request) - - var topicOffsetMap : Map[Int, Long] = null - tpOffsetMap.keys.foreach(tp => { - if (kafkaTopicOffsetMap.contains(tp.topic)) { - topicOffsetMap = kafkaTopicOffsetMap(tp.topic) - } else { - topicOffsetMap = Map() - } + breakable { + broker2TopicPartitionMap.keys.foreach(broker => { + if (shutdown) { + return + } - topicOffsetMap += (tp.partition -> tpOffsetMap(tp)) - kafkaTopicOffsetMap += (tp.topic -> topicOffsetMap) - }) - } finally { - kafkaConsumer.close() + val tpList = broker2TopicPartitionMap(broker) + val port: Int = broker.endpoints(PLAINTEXT) + val consumerProperties = kaConfig.consumerProperties.getOrElse(getDefaultConsumerProperties(s"${broker.host}:$port")) + var kafkaConsumer: Option[KafkaConsumer[Any, Any]] = None + try { + kafkaConsumer = Option(new KafkaConsumer(consumerProperties)) + val request = tpList.map(f => new TopicPartition(f._1.topic, f._1.partition)) + var tpOffsetMapOption = kafkaConsumer.map(_.endOffsets(request)) + + var topicOffsetMap: Map[Int, Long] = null + tpOffsetMapOption.foreach(tpOffsetMap => tpOffsetMap.keys.foreach(tp => { + if (kafkaTopicOffsetMap.contains(tp.topic)) { + topicOffsetMap = kafkaTopicOffsetMap(tp.topic) + } else { + topicOffsetMap = Map() } - }) + + topicOffsetMap += (tp.partition -> tpOffsetMap(tp)) + kafkaTopicOffsetMap += (tp.topic -> topicOffsetMap) + })) + } catch { + case e: Exception => + log.error(s"consumerProperties:$consumerProperties", e) + throw e + } finally { + kafkaConsumer.foreach(_.close()) } + }) + } - kafkaTopicOffsetCaptureMap = kafkaTopicOffsetMap.map(kv => - (kv._1, PartitionOffsetsCapture(System.currentTimeMillis(), kv._2))) - } - } catch { - case e: Exception => - log.error(e, s"KafkaTopicOffsetGetter exception ") - } + kafkaTopicOffsetCaptureMap = kafkaTopicOffsetMap.map(kv => + (kv._1, PartitionOffsetsCapture(System.currentTimeMillis(), kv._2))) + } + } catch { + case e: Exception => + log.error(e, s"KafkaTopicOffsetGetter exception ") + } - if (!shutdown) { - Thread.sleep(config.partitionOffsetCacheTimeoutSecs * 1000) + if (!shutdown) { + Thread.sleep(config.partitionOffsetCacheTimeoutSecs * 1000) + } } - } - log.info(s"KafkaTopicOffsetGetter exit") - } + log.info(s"KafkaTopicOffsetGetter exit") + } - def close(): Unit = { - this.shutdown = true - } + def close(): Unit = { + this.shutdown = true + } - def getDefaultConsumerProperties(bootstrapServers: String): Properties = { - val properties = new Properties() - properties.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) - properties.put(GROUP_ID_CONFIG, getClass.getCanonicalName) - properties.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - properties + def getDefaultConsumerProperties(bootstrapServers: String): Properties = { + val properties = new Properties() + properties.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + properties.put(GROUP_ID_CONFIG, getClass.getCanonicalName) + properties.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") + properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") + properties + } } } -} diff --git a/app/kafka/manager/model/model.scala b/app/kafka/manager/model/model.scala index bd5967bf2..1d24d5c6f 100644 --- a/app/kafka/manager/model/model.scala +++ b/app/kafka/manager/model/model.scala @@ -367,9 +367,9 @@ object ClusterTuning { offsetCacheThreadPoolQueueSize <- fieldExtended[Option[Int]]("offsetCacheThreadPoolQueueSize")(json) kafkaAdminClientThreadPoolSize <- fieldExtended[Option[Int]]("kafkaAdminClientThreadPoolSize")(json) kafkaAdminClientThreadPoolQueueSize <- fieldExtended[Option[Int]]("kafkaAdminClientThreadPoolQueueSize")(json) - kafkaManagedOffsetMetadataCheckMillis <- fieldExtended[Option[Int]]("kafkaManagedOffsetMetadataCheckMillis") - kafkaManagedOffsetGroupCacheSize <- fieldExtended[Option[Int]]("kafkaManagedOffsetGroupCacheSize") - kafkaManagedOffsetGroupExpireDays <- fieldExtended[Option[Int]]("kafkaManagedOffsetGroupExpireDays") + kafkaManagedOffsetMetadataCheckMillis <- fieldExtended[Option[Int]]("kafkaManagedOffsetMetadataCheckMillis")(json) + kafkaManagedOffsetGroupCacheSize <- fieldExtended[Option[Int]]("kafkaManagedOffsetGroupCacheSize")(json) + kafkaManagedOffsetGroupExpireDays <- fieldExtended[Option[Int]]("kafkaManagedOffsetGroupExpireDays")(json) } yield { ClusterTuning( brokerViewUpdatePeriodSeconds = brokerViewUpdatePeriodSeconds diff --git a/app/views/cluster/addCluster.scala.html b/app/views/cluster/addCluster.scala.html index 86b0241c7..56ab2a44d 100644 --- a/app/views/cluster/addCluster.scala.html +++ b/app/views/cluster/addCluster.scala.html @@ -52,6 +52,9 @@ @b3.number(form("tuning.offsetCacheThreadPoolQueueSize"), '_label -> "offsetCacheThreadPoolQueueSize") @b3.number(form("tuning.kafkaAdminClientThreadPoolSize"), '_label -> "kafkaAdminClientThreadPoolSize") @b3.number(form("tuning.kafkaAdminClientThreadPoolQueueSize"), '_label -> "kafkaAdminClientThreadPoolQueueSize") + @b3.number(form("tuning.kafkaManagedOffsetMetadataCheckMillis"), '_label -> "kafkaManagedOffsetMetadataCheckMillis") + @b3.number(form("tuning.kafkaManagedOffsetGroupCacheSize"), '_label -> "kafkaManagedOffsetGroupCacheSize") + @b3.number(form("tuning.kafkaManagedOffsetGroupExpireDays"), '_label -> "kafkaManagedOffsetGroupExpireDays") @b3.select( form("securityProtocol"), options = kafka.manager.model.SecurityProtocol.formSelectList, '_label -> "Security Protocol" ) @b3.submit('class -> "submit-button btn btn-primary"){ Save } Cancel diff --git a/app/views/cluster/updateCluster.scala.html b/app/views/cluster/updateCluster.scala.html index 3548b2d1e..87a1ad288 100644 --- a/app/views/cluster/updateCluster.scala.html +++ b/app/views/cluster/updateCluster.scala.html @@ -55,6 +55,9 @@ @b3.number(form("tuning.offsetCacheThreadPoolQueueSize"), '_label -> "offsetCacheThreadPoolQueueSize") @b3.number(form("tuning.kafkaAdminClientThreadPoolSize"), '_label -> "kafkaAdminClientThreadPoolSize") @b3.number(form("tuning.kafkaAdminClientThreadPoolQueueSize"), '_label -> "kafkaAdminClientThreadPoolQueueSize") + @b3.number(form("tuning.kafkaManagedOffsetMetadataCheckMillis"), '_label -> "kafkaManagedOffsetMetadataCheckMillis") + @b3.number(form("tuning.kafkaManagedOffsetGroupCacheSize"), '_label -> "kafkaManagedOffsetGroupCacheSize") + @b3.number(form("tuning.kafkaManagedOffsetGroupExpireDays"), '_label -> "kafkaManagedOffsetGroupExpireDays") @b3.select( form("securityProtocol"), options = kafka.manager.model.SecurityProtocol.formSelectList, '_label -> "Security Protocol" ) @b3.submit('class -> "submit-button btn btn-primary btn"){ Save } Cancel diff --git a/test/kafka/manager/BaseTest.scala b/test/kafka/manager/BaseTest.scala index 7b24f25ca..8b975933a 100644 --- a/test/kafka/manager/BaseTest.scala +++ b/test/kafka/manager/BaseTest.scala @@ -1,5 +1,6 @@ package kafka.manager +import kafka.manager.actor.cluster.KafkaManagedOffsetCacheConfig import kafka.manager.model.ClusterTuning /** @@ -25,9 +26,17 @@ trait BaseTest { ,Option(defaultPoolQueueSize) ,Option(defaultPoolSize) ,Option(defaultPoolQueueSize) + ,Option(KafkaManagedOffsetCacheConfig.defaultGroupMemberMetadataCheckMillis) + ,Option(KafkaManagedOffsetCacheConfig.defaultGroupTopicPartitionOffsetMaxSize) + ,Option(KafkaManagedOffsetCacheConfig.defaultGroupTopicPartitionOffsetExpireDays) ) - def getClusterTuning(defaultPoolSize: Int, defaultPoolQueueSize: Int, defaultPollingSeconds: Int) : ClusterTuning = { + def getClusterTuning(defaultPoolSize: Int + , defaultPoolQueueSize: Int + , defaultPollingSeconds: Int + , defaultGroupMemberMetadataCheckMillis: Int + , defaultGroupTopicPartitionOffsetMaxSize: Int + , defaultGroupTopicPartitionOffsetExpireDays: Int) : ClusterTuning = { ClusterTuning( Option(defaultPollingSeconds) ,Option(defaultPoolSize) @@ -44,6 +53,9 @@ trait BaseTest { ,Option(defaultPoolQueueSize) ,Option(defaultPoolSize) ,Option(defaultPoolQueueSize) + ,Option(defaultGroupMemberMetadataCheckMillis) + ,Option(defaultGroupTopicPartitionOffsetMaxSize) + ,Option(defaultGroupTopicPartitionOffsetExpireDays) ) } } diff --git a/test/kafka/manager/TestBrokerViewCacheActor.scala b/test/kafka/manager/TestBrokerViewCacheActor.scala index bceb0bc33..144da4a3d 100644 --- a/test/kafka/manager/TestBrokerViewCacheActor.scala +++ b/test/kafka/manager/TestBrokerViewCacheActor.scala @@ -10,10 +10,10 @@ import akka.actor.{ActorRef, ActorSystem, Kill, Props} import akka.pattern._ import akka.util.Timeout import com.typesafe.config.{Config, ConfigFactory} -import kafka.manager.actor.cluster.{KafkaStateActorConfig, KafkaStateActor, BrokerViewCacheActorConfig, BrokerViewCacheActor} +import kafka.manager.actor.cluster.{BrokerViewCacheActor, BrokerViewCacheActorConfig, KafkaManagedOffsetCacheConfig, KafkaStateActor, KafkaStateActorConfig} import kafka.manager.base.LongRunningPoolConfig import kafka.manager.features.ClusterFeatures -import kafka.manager.model.{ClusterConfig, ClusterContext, ActorModel} +import kafka.manager.model.{ActorModel, ClusterConfig, ClusterContext} import kafka.manager.utils.KafkaServerInTest import ActorModel._ import kafka.test.SeededBroker @@ -45,7 +45,7 @@ class TestBrokerViewCacheActor extends KafkaServerInTest with BaseTest { super.beforeAll() val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") val clusterContext = ClusterContext(ClusterFeatures.from(clusterConfig), clusterConfig) - val ksConfig = KafkaStateActorConfig(sharedCurator, "pinned-dispatcher", clusterContext, LongRunningPoolConfig(2,100), LongRunningPoolConfig(2,100), 5, 10000, None) + val ksConfig = KafkaStateActorConfig(sharedCurator, "pinned-dispatcher", clusterContext, LongRunningPoolConfig(2,100), LongRunningPoolConfig(2,100), 5, 10000, None, KafkaManagedOffsetCacheConfig()) val props = Props(classOf[KafkaStateActor],ksConfig) kafkaStateActor = Some(system.actorOf(props.withDispatcher("pinned-dispatcher"),"ksa")) diff --git a/test/kafka/manager/TestKafkaManagerActor.scala b/test/kafka/manager/TestKafkaManagerActor.scala index 4534fb925..4416ec84c 100644 --- a/test/kafka/manager/TestKafkaManagerActor.scala +++ b/test/kafka/manager/TestKafkaManagerActor.scala @@ -166,7 +166,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster tuning") { - val newTuning = getClusterTuning(3, 101, 11) + val newTuning = getClusterTuning(3, 101, 11, 10000, 10000, 1) val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(newTuning), securityProtocol="PLAINTEXT" ) diff --git a/test/kafka/manager/TestKafkaStateActor.scala b/test/kafka/manager/TestKafkaStateActor.scala index 3242774d1..7134c9280 100644 --- a/test/kafka/manager/TestKafkaStateActor.scala +++ b/test/kafka/manager/TestKafkaStateActor.scala @@ -11,10 +11,10 @@ import akka.pattern._ import akka.util.Timeout import akka.util.Timeout._ import com.typesafe.config.{Config, ConfigFactory} -import kafka.manager.actor.cluster.{KafkaStateActorConfig, KafkaStateActor} +import kafka.manager.actor.cluster.{KafkaManagedOffsetCacheConfig, KafkaStateActor, KafkaStateActorConfig} import kafka.manager.base.LongRunningPoolConfig import kafka.manager.features.ClusterFeatures -import kafka.manager.model.{ClusterContext, ClusterConfig, ActorModel} +import kafka.manager.model.{ActorModel, ClusterConfig, ClusterContext} import kafka.manager.utils.KafkaServerInTest import ActorModel._ import kafka.test.SeededBroker @@ -52,6 +52,7 @@ class TestKafkaStateActor extends KafkaServerInTest with BaseTest { , 5 , 10000 , None + , KafkaManagedOffsetCacheConfig() ) val props = Props(classOf[KafkaStateActor],ksConfig) diff --git a/test/kafka/manager/utils/TestClusterConfig.scala b/test/kafka/manager/utils/TestClusterConfig.scala index 118792e66..6b86a4044 100644 --- a/test/kafka/manager/utils/TestClusterConfig.scala +++ b/test/kafka/manager/utils/TestClusterConfig.scala @@ -87,7 +87,26 @@ class TestClusterConfig extends FunSuite with Matchers { test("deserialize from 0.9.0.1") { val cc = ClusterConfig("qa","0.9.0.1","localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, - tuning = Option(ClusterTuning(Option(1),Option(2),Option(3), Option(4), Option(5), Option(6), Option(7), Option(8), Option(9), Option(10), Option(11), Option(12), Option(13), Option(14), Option(15))) + tuning = Option(ClusterTuning( + Option(1) + ,Option(2) + ,Option(3) + , Option(4) + , Option(5) + , Option(6) + , Option(7) + , Option(8) + , Option(9) + , Option(10) + , Option(11) + , Option(12) + , Option(13) + , Option(14) + , Option(15) + , Option(16) + , Option(17) + , Option(18) + )) , securityProtocol = "PLAINTEXT" ) val serialize: String = ClusterConfig.serialize(cc)