Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support kafka 0.10.x.x #405

Merged
merged 4 commits into from
Jul 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions app/controllers/Logkafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ class Logkafka (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaMana
LogkafkaNewConfigs.configMaps(Kafka_0_10_0_1).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_0_10_1_0_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_0_10_1_0).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_0_10_1_1_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_0_10_1_1).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_0_10_2_0_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_0_10_2_0).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_0_10_2_1_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_0_10_2_1).map{case(k,v) => LKConfig(k,Some(v))}.toList)

val defaultCreateForm = Form(
mapping(
Expand Down Expand Up @@ -119,6 +125,9 @@ class Logkafka (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaMana
case Kafka_0_10_0_0 => (defaultCreateForm.fill(kafka_0_10_0_0_Default), clusterContext)
case Kafka_0_10_0_1 => (defaultCreateForm.fill(kafka_0_10_0_1_Default), clusterContext)
case Kafka_0_10_1_0 => (defaultCreateForm.fill(kafka_0_10_1_0_Default), clusterContext)
case Kafka_0_10_1_1 => (defaultCreateForm.fill(kafka_0_10_1_1_Default), clusterContext)
case Kafka_0_10_2_0 => (defaultCreateForm.fill(kafka_0_10_2_0_Default), clusterContext)
case Kafka_0_10_2_1 => (defaultCreateForm.fill(kafka_0_10_2_1_Default), clusterContext)
}
}
}
Expand Down Expand Up @@ -210,6 +219,9 @@ class Logkafka (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaMana
case Kafka_0_10_0_0 => LogkafkaNewConfigs.configNames(Kafka_0_10_0_0).map(n => (n,LKConfig(n,None))).toMap
case Kafka_0_10_0_1 => LogkafkaNewConfigs.configNames(Kafka_0_10_0_1).map(n => (n,LKConfig(n,None))).toMap
case Kafka_0_10_1_0 => LogkafkaNewConfigs.configNames(Kafka_0_10_1_0).map(n => (n,LKConfig(n,None))).toMap
case Kafka_0_10_1_1 => LogkafkaNewConfigs.configNames(Kafka_0_10_1_1).map(n => (n,LKConfig(n,None))).toMap
case Kafka_0_10_2_0 => LogkafkaNewConfigs.configNames(Kafka_0_10_2_0).map(n => (n,LKConfig(n,None))).toMap
case Kafka_0_10_2_1 => LogkafkaNewConfigs.configNames(Kafka_0_10_2_1).map(n => (n,LKConfig(n,None))).toMap
}
val identityOption = li.identityMap.get(log_path)
if (identityOption.isDefined) {
Expand Down
26 changes: 18 additions & 8 deletions app/controllers/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@ package controllers

import java.util.Properties

import features.{KMTopicManagerFeature, ApplicationFeatures}
import kafka.manager.model._
import ActorModel.TopicIdentity
import features.{ApplicationFeatures, KMTopicManagerFeature}
import kafka.manager.ApiError
import kafka.manager.features.ClusterFeatures
import kafka.manager.model.ActorModel.TopicIdentity
import kafka.manager.model._
import kafka.manager.utils.TopicConfigs
import kafka.manager.ApiError
import models.FollowLink
import models.form.ReassignPartitionOperation.{RunAssignment, ForceRunAssignment}
import models.form.ReassignPartitionOperation.{ForceRunAssignment, RunAssignment}
import models.form._
import models.navigation.Menus
import play.api.data.Form
import play.api.data.Forms._
import play.api.data.validation.{Valid, Invalid, Constraint}
import play.api.data.validation.Constraints._
import play.api.data.validation.{Constraint, Invalid, Valid}
import play.api.i18n.{I18nSupport, MessagesApi}
import play.api.mvc._

import scala.concurrent.Future
import scala.util.{Success, Failure, Try}
import scalaz.{\/-, -\/}
import scala.util.{Failure, Success, Try}
import scalaz.-\/

/**
* @author hiral
Expand Down Expand Up @@ -55,6 +55,9 @@ class Topic (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManager
val kafka_0_10_0_0_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_10_0_0).map(n => TConfig(n,None)).toList)
val kafka_0_10_0_1_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_10_0_1).map(n => TConfig(n,None)).toList)
val kafka_0_10_1_0_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_10_1_0).map(n => TConfig(n,None)).toList)
val kafka_0_10_1_1_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_10_1_1).map(n => TConfig(n,None)).toList)
val kafka_0_10_2_0_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_10_2_0).map(n => TConfig(n,None)).toList)
val kafka_0_10_2_1_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_10_2_1).map(n => TConfig(n,None)).toList)

val defaultCreateForm = Form(
mapping(
Expand Down Expand Up @@ -142,6 +145,10 @@ class Topic (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManager
case Kafka_0_10_0_0 => (defaultCreateForm.fill(kafka_0_10_0_0_Default), clusterContext)
case Kafka_0_10_0_1 => (defaultCreateForm.fill(kafka_0_10_0_1_Default), clusterContext)
case Kafka_0_10_1_0 => (defaultCreateForm.fill(kafka_0_10_1_0_Default), clusterContext)
case Kafka_0_10_1_1 => (defaultCreateForm.fill(kafka_0_10_1_1_Default), clusterContext)
case Kafka_0_10_2_0 => (defaultCreateForm.fill(kafka_0_10_2_0_Default), clusterContext)
case Kafka_0_10_2_1 => (defaultCreateForm.fill(kafka_0_10_2_1_Default), clusterContext)

}
}
}
Expand Down Expand Up @@ -379,6 +386,9 @@ class Topic (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManager
case Kafka_0_10_0_0 => TopicConfigs.configNames(Kafka_0_10_0_0).map(n => (n,TConfig(n,None))).toMap
case Kafka_0_10_0_1 => TopicConfigs.configNames(Kafka_0_10_0_1).map(n => (n,TConfig(n,None))).toMap
case Kafka_0_10_1_0 => TopicConfigs.configNames(Kafka_0_10_1_0).map(n => (n,TConfig(n,None))).toMap
case Kafka_0_10_1_1 => TopicConfigs.configNames(Kafka_0_10_1_1).map(n => (n,TConfig(n,None))).toMap
case Kafka_0_10_2_0 => TopicConfigs.configNames(Kafka_0_10_2_0).map(n => (n,TConfig(n,None))).toMap
case Kafka_0_10_2_1 => TopicConfigs.configNames(Kafka_0_10_2_1).map(n => (n,TConfig(n,None))).toMap
}
val combinedMap = defaultConfigMap ++ ti.config.toMap.map(tpl => tpl._1 -> TConfig(tpl._1,Option(tpl._2)))
(defaultUpdateConfigForm.fill(UpdateTopicConfig(ti.topic,combinedMap.toList.map(_._2),ti.configReadVersion)),
Expand Down
4 changes: 2 additions & 2 deletions app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecords, KafkaConsum
import org.joda.time.{DateTime, DateTimeZone}

import scala.collection.concurrent.TrieMap
import scala.collection.{JavaConverters, mutable}
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -144,7 +144,7 @@ class KafkaAdminClient(context: => ActorContext, adminClientActorPath: ActorPath


object KafkaManagedOffsetCache {
val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0)
val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1)
val ConsumerOffsetTopic = "__consumer_offsets"

def isSupported(version: KafkaVersion) : Boolean = {
Expand Down
24 changes: 19 additions & 5 deletions app/kafka/manager/model/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ case object Kafka_0_10_1_0 extends KafkaVersion {
override def toString = "0.10.1.0"
}

case object Kafka_0_10_1_1 extends KafkaVersion {
override def toString = "0.10.1.1"
}

case object Kafka_0_10_2_0 extends KafkaVersion {
override def toString = "0.10.2.0"
}

case object Kafka_0_10_2_1 extends KafkaVersion {
override def toString = "0.10.2.1"
}

object KafkaVersion {
val supportedVersions: Map[String,KafkaVersion] = Map(
"0.8.1.1" -> Kafka_0_8_1_1,
Expand All @@ -59,7 +71,10 @@ object KafkaVersion {
"0.9.0.1" -> Kafka_0_9_0_1,
"0.10.0.0" -> Kafka_0_10_0_0,
"0.10.0.1" -> Kafka_0_10_0_1,
"0.10.1.0" -> Kafka_0_10_1_0
"0.10.1.0" -> Kafka_0_10_1_0,
"0.10.1.1" -> Kafka_0_10_1_1,
"0.10.2.0" -> Kafka_0_10_2_0,
"0.10.2.1" -> Kafka_0_10_2_1
)

val formSelectList : IndexedSeq[(String,String)] = supportedVersions.toIndexedSeq.filterNot(_._1.contains("beta")).map(t => (t._1,t._2.toString))
Expand Down Expand Up @@ -147,12 +162,13 @@ object ClusterConfig {
)
}

import scalaz.{Failure,Success}
import scalaz.syntax.applicative._
import scalaz.{Failure, Success}
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.json4s.scalaz.JsonScalaz._

import scala.language.reflectiveCalls

implicit val formats = Serialization.formats(FullTypeHints(List(classOf[ClusterConfig])))
Expand Down Expand Up @@ -254,12 +270,10 @@ case class ClusterTuning(brokerViewUpdatePeriodSeconds: Option[Int]
, kafkaAdminClientThreadPoolQueueSize: Option[Int]
)
object ClusterTuning {
import scalaz.{Failure,Success}
import scalaz.syntax.applicative._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.json4s.scalaz.JsonScalaz._

import scala.language.reflectiveCalls

implicit val formats = Serialization.formats(FullTypeHints(List(classOf[ClusterTuning])))
Expand Down
5 changes: 4 additions & 1 deletion app/kafka/manager/utils/LogkafkaNewConfigs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ object LogkafkaNewConfigs {
Kafka_0_9_0_1 -> logkafka82.LogConfig,
Kafka_0_10_0_0 -> logkafka82.LogConfig,
Kafka_0_10_0_1 -> logkafka82.LogConfig,
Kafka_0_10_1_0 -> logkafka82.LogConfig
Kafka_0_10_1_0 -> logkafka82.LogConfig,
Kafka_0_10_1_1 -> logkafka82.LogConfig,
Kafka_0_10_2_0 -> logkafka82.LogConfig,
Kafka_0_10_2_1 -> logkafka82.LogConfig
)

def configNames(version: KafkaVersion) : Set[String] = {
Expand Down
5 changes: 4 additions & 1 deletion app/kafka/manager/utils/TopicConfigs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ object TopicConfigs {
Kafka_0_9_0_1 -> zero90.LogConfig,
Kafka_0_10_0_0 -> zero90.LogConfig,
Kafka_0_10_0_1 -> zero90.LogConfig,
Kafka_0_10_1_0 -> zero90.LogConfig
Kafka_0_10_1_0 -> zero90.LogConfig,
Kafka_0_10_1_1 -> zero90.LogConfig,
Kafka_0_10_2_0 -> zero90.LogConfig,
Kafka_0_10_2_1 -> zero90.LogConfig
)

def configNames(version: KafkaVersion) : Set[String] = {
Expand Down
24 changes: 24 additions & 0 deletions test/kafka/manager/utils/TestClusterConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,28 @@ class TestClusterConfig extends FunSuite with Matchers {
assert(deserialize.isSuccess === true)
assert(cc == deserialize.get)
}

test("serialize and deserialize 0.10.1.1") {
val cc = ClusterConfig("qa", "0.10.1.1", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None)
val serialize: String = ClusterConfig.serialize(cc)
val deserialize = ClusterConfig.deserialize(serialize)
assert(deserialize.isSuccess === true)
assert(cc == deserialize.get)
}

test("serialize and deserialize 0.10.2.0") {
val cc = ClusterConfig("qa", "0.10.2.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None)
val serialize: String = ClusterConfig.serialize(cc)
val deserialize = ClusterConfig.deserialize(serialize)
assert(deserialize.isSuccess === true)
assert(cc == deserialize.get)
}

test("serialize and deserialize 0.10.2.1") {
val cc = ClusterConfig("qa", "0.10.2.1", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None)
val serialize: String = ClusterConfig.serialize(cc)
val deserialize = ClusterConfig.deserialize(serialize)
assert(deserialize.isSuccess === true)
assert(cc == deserialize.get)
}
}