Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for get committed offset of the consumer group for kafka #183

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
247 changes: 230 additions & 17 deletions app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,22 @@
package kafka.manager.actor.cluster

import java.util.concurrent.TimeUnit
import java.util.Properties

import akka.pattern._
import akka.actor.{ActorRef, Cancellable, ActorPath}
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import grizzled.slf4j.Logging
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import kafka.admin._
import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.utils.Utils
import kafka.manager._
import kafka.manager.base.cluster.BaseClusterQueryCommandActor
import kafka.manager.base.{LongRunningPoolActor, LongRunningPoolConfig}
Expand All @@ -27,7 +36,8 @@ import org.apache.curator.framework.recipes.cache._
import org.joda.time.{DateTime, DateTimeZone}

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, Await}
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}

/**
Expand Down Expand Up @@ -140,7 +150,7 @@ trait OffsetCache extends Logging {

protected def getTopicPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]]

protected def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription]
def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription]

protected def readConsumerOffsetByTopicPartition(consumer: String, topic: String, tpi: Map[Int, TopicPartitionIdentity]) : Map[Int, Long]

Expand Down Expand Up @@ -239,7 +249,22 @@ trait OffsetCache extends Logging {
ConsumedTopicDescription(consumer, topic, numPartitions, optTopic, partitionOwners, partitionOffsets)
}

def getConsumerList: ConsumerList
def getConsumerList(adminClient: AdminClient, isCloseClient: Boolean): ConsumerList

def getConsumerListByKafka(adminClient: AdminClient): IndexedSeq[String] = {
var consumerGroupList: List[String] = List()
var groupOverviewList: List[GroupOverview] = adminClient.listAllConsumerGroupsFlattened()
groupOverviewList match {
case Nil => IndexedSeq.empty
case l: List[GroupOverview] => {
l.foreach {
x => consumerGroupList = x.groupId :: consumerGroupList
}
}
}
groupOverviewList = null
consumerGroupList.toIndexedSeq
}
}

case class OffsetCacheActive(curator: CuratorFramework,
Expand Down Expand Up @@ -282,7 +307,7 @@ case class OffsetCacheActive(curator: CuratorFramework,

protected def getTopicPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]] = partitionLeaders(topic)

protected def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = topicDescriptions(topic, interactive)
def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = topicDescriptions(topic, interactive)

def start(): Unit = {
info("Starting consumers tree cache...")
Expand Down Expand Up @@ -352,20 +377,27 @@ case class OffsetCacheActive(curator: CuratorFramework,
topicDescriptions.map(ConsumerDescription(consumer, _))
}*/

def getConsumerList: ConsumerList = {
def getConsumerList(adminClient: AdminClient, isCloseClient: Boolean): ConsumerList = {
var sumConsumerList = getConsumerListByKafka(adminClient)
withConsumersTreeCache { cache =>
cache.getCurrentChildren(ZkUtils.ConsumersPath)
}.fold {
ConsumerList(IndexedSeq.empty, clusterContext)
if (isCloseClient) {
adminClient.close()
}
ConsumerList(IndexedSeq.empty.++:(sumConsumerList), clusterContext)
} { data: java.util.Map[String, ChildData] =>
if (isCloseClient) {
adminClient.close()
}
val filteredList: IndexedSeq[String] = data.asScala.filter{
case (consumer, childData) =>
if (clusterContext.config.filterConsumers)
// Defining "inactive consumer" as a consumer that is missing one of three children ids/ offsets/ or owners/
childData.getStat.getNumChildren > 2
else true
}.keySet.toIndexedSeq
ConsumerList(filteredList, clusterContext)
ConsumerList(filteredList.++:(sumConsumerList).toSet.toIndexedSeq, clusterContext)
}
}
}
Expand Down Expand Up @@ -410,7 +442,7 @@ case class OffsetCachePassive(curator: CuratorFramework,

protected def getTopicPartitionLeaders(topic: String) : Option[List[(Int, Option[BrokerIdentity])]] = partitionLeaders(topic)

protected def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = topicDescriptions(topic, interactive)
def getTopicDescription(topic: String, interactive: Boolean) : Option[TopicDescription] = topicDescriptions(topic, interactive)

def start(): Unit = {
info("Starting consumers path children cache...")
Expand Down Expand Up @@ -469,14 +501,21 @@ case class OffsetCachePassive(curator: CuratorFramework,
Try(Option(curator.getChildren.forPath(zkPath)).map(_.asScala.toSet)).toOption.flatten.getOrElse(Set.empty)
}

def getConsumerList: ConsumerList = {
def getConsumerList(adminClient: AdminClient, isCloseClient: Boolean): ConsumerList = {
val sumConsumerList = getConsumerListByKafka(adminClient)
withConsumersPathChildrenCache { cache =>
val currentData = cache.getCurrentData
currentData
}.fold {
ConsumerList(IndexedSeq.empty, clusterContext)
if (isCloseClient) {
adminClient.close()
}
ConsumerList(IndexedSeq.empty.++:(sumConsumerList), clusterContext)
} { data: java.util.List[ChildData] =>
ConsumerList(data.asScala.map(cd => cd.getPath.split("/").last).toIndexedSeq, clusterContext)
if (isCloseClient) {
adminClient.close()
}
ConsumerList(data.asScala.map(cd => cd.getPath.split("/").last).toIndexedSeq.++:(sumConsumerList).toSet.toIndexedSeq, clusterContext)
}
}
}
Expand All @@ -487,6 +526,8 @@ case class KafkaStateActorConfig(curator: CuratorFramework,
partitionOffsetCacheTimeoutSecs: Int, simpleConsumerSocketTimeoutMillis: Int)
class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCommandActor with LongRunningPoolActor {

private val consumerInfoByKafka: Duration = 10 seconds

protected implicit val clusterContext: ClusterContext = config.clusterContext

protected implicit val cf: ClusterFeatures = clusterContext.clusterFeatures
Expand Down Expand Up @@ -581,7 +622,19 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
}
}
}


private def createAdminClient(): AdminClient = {
val targetBrokers : IndexedSeq[BrokerIdentity] = getBrokers
var brokerListStr = ""
targetBrokers.foreach {
b => {
brokerListStr += "%s:%d,".format(b.host, b.port)
}
}

AdminClient.createSimplePlaintext(brokerListStr)
}

private[this] val offsetCache: OffsetCache = {
if(config.clusterContext.config.activeOffsetCacheEnabled)
new OffsetCacheActive(
Expand All @@ -603,6 +656,9 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
config.clusterContext.config.version)(longRunningExecutionContext, cf)
}

private[this] var cancellable : Option[Cancellable] = None
private[this] var consumerByKafkaFuture: Future[Option[List[ConsumerDescription]]] = null

@scala.throws[Exception](classOf[Exception])
override def preStart() = {
log.info(config.toString)
Expand All @@ -626,6 +682,13 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
//the offset cache does not poll on its own so it can be started safely
log.info("Starting offset cache...")
offsetCache.start()

cancellable = Some(
context.system.scheduler.schedule(0 seconds,
10 seconds,
self,
KSForceUpdateConsumerByKafka)(context.system.dispatcher,self)
)
}

@scala.throws[Exception](classOf[Exception])
Expand Down Expand Up @@ -764,7 +827,11 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom

case KSGetConsumers =>
asyncPipeToSender {
offsetCache.getConsumerList
var client = createAdminClient()
val consumerList: ConsumerList = offsetCache.getConsumerList(client, false)
client.close()
client = null
consumerList
}

case KSGetTopicConfig(topic) =>
Expand All @@ -778,12 +845,45 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom

case KSGetConsumerDescription(consumer) =>
asyncPipeToSender {
offsetCache.getConsumerDescription(consumer)
var currentCD: ConsumerDescription = offsetCache.getConsumerDescription(consumer)

var topicConsumerMap = currentCD.topics

Await.ready(consumerByKafkaFuture, consumerInfoByKafka).value.get match {
case Success(consumerDescriptionList) =>
consumerDescriptionList.map {
consumerListByKafka =>
consumerListByKafka.filter(_.consumer == consumer).map {
filter => {
topicConsumerMap = topicConsumerMap.++:(filter.topics)
}
}
}
case Failure(e) =>
topicConsumerMap
}

ConsumerDescription(consumer, topicConsumerMap)
}

case KSGetConsumedTopicDescription(consumer, topic) =>
asyncPipeToSender {
offsetCache.getConsumedTopicDescription(consumer, topic, true)
var consumedTD = offsetCache.getConsumedTopicDescription(consumer, topic, true)

Await.ready(consumerByKafkaFuture, consumerInfoByKafka).value.get match {
case Success(consumerDescriptionList) =>
consumerDescriptionList.map {
consumerListByKafka =>
consumerListByKafka.filter(_.consumer == consumer).map {
filter => {
consumedTD = filter.topics.get(topic).getOrElse(consumedTD)
}
}
}
case Failure(e) =>
}

consumedTD
}

case KSGetAllTopicDescriptions(lastUpdateMillisOption) =>
Expand All @@ -804,10 +904,31 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
val lastUpdateMillis = lastUpdateMillisOption.getOrElse(0L)
if (offsetCache.lastUpdateMillis > lastUpdateMillis) {
asyncPipeToSender {
var client = createAdminClient()
ConsumerDescriptions(offsetCache
.getConsumerList
.getConsumerList(client, true)
.list
.map(c => offsetCache.getConsumerDescription(c)), offsetCache.lastUpdateMillis)
.flatMap(c => {

var topicConsumerMap = offsetCache.getConsumerDescription(c).topics

Await.ready(consumerByKafkaFuture, consumerInfoByKafka).value.get match {
case Success(consumerDescriptionList) =>
consumerDescriptionList.map {
consumerListByKafka =>
consumerListByKafka.filter(_.consumer == c).map {
filter => {
topicConsumerMap = topicConsumerMap.++:(filter.topics)
}
}
}
case Failure(e) =>
topicConsumerMap
}

Option(ConsumerDescription(c, topicConsumerMap))
}),
offsetCache.lastUpdateMillis)
}
}

Expand All @@ -823,6 +944,10 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
case KSGetReassignPartition =>
sender ! reassignPartitions

case KSForceUpdateConsumerByKafka =>
consumerByKafkaFuture = null
consumerByKafkaFuture = getConsumerDescriptionByKafka()

case any: Any => log.warning("ksa : processQueryRequest : Received unknown message: {}", any.toString)
}
}
Expand Down Expand Up @@ -894,5 +1019,93 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
Option(fn(topicsTreeCache))
}

private[this] def getConsumerDescriptionByKafka(): Future[Option[List[ConsumerDescription]]] = {
def createNewConsumer(group: String): KafkaConsumer[String, String] = {
val properties = new Properties()
val deserializer = (new StringDeserializer).getClass.getName
val targetBrokers : IndexedSeq[BrokerIdentity] = getBrokers
var brokerListStr = ""
targetBrokers.foreach {
b => {
brokerListStr += "%s:%d,".format(b.host, b.port)
}
}
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerListStr)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group)
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)

new KafkaConsumer(properties)
}

implicit val ec = longRunningExecutionContext
var consumerByKafkaFuture: Future[Option[List[ConsumerDescription]]] = Future {
var client = createAdminClient()
offsetCache.getConsumerListByKafka(client).toList match {
case Nil => {
client.close()
client = null
None
}
case l: List[String] => {
var descriptions: List[ConsumerDescription] = List()

l.foreach {
groupId => {
var comsumedTopicDescriptions: Map[String, ConsumedTopicDescription] = Map()

val consumerSummaries = client.describeConsumerGroup(groupId)
if (!consumerSummaries.isEmpty) {
val consumer = createNewConsumer(groupId)

consumerSummaries.foreach { consumerSummary =>
val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition))
val partitionOffsets = topicPartitions.flatMap { topicPartition =>
Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
topicPartition -> offsetAndMetadata.offset
}
}.toMap

descriptions = descriptions.:+(ConsumerDescription(groupId,
topicPartitions.map {
tp => (tp.topic, tp.partition)
}.groupBy(_._1).map {
case (topic, tpl) => {
(topic,
ConsumedTopicDescription(groupId,
topic,
tpl.size,
offsetCache.getTopicDescription(topic, false),
Option(tpl.map {
p => (p._2, s"${consumerSummary.clientId}_${consumerSummary.clientHost}")
}.toMap),
Option(tpl.map {
p => (p._2, partitionOffsets.get(TopicAndPartition(topic, p._2)).get)
}.toMap)))
}
}))
}//foreach
consumer.close()
}//isEmpty
}//groupId
}//foreach
if (!descriptions.isEmpty) {
client.close()
client = null
Some(descriptions)
} else {
client.close()
client = null
None
}
}//List
}//match
}//Future
consumerByKafkaFuture.recover { case t => None }

consumerByKafkaFuture
}
}