Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#665 Adds 2.2.1 and 2.3.0 support #669

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions app/controllers/Cluster.scala
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ package controllers
import features.{ApplicationFeatures, KMClusterManagerFeature}
import kafka.manager.ApiError
import kafka.manager.model._
import kafka.manager.KafkaManager._
import models.FollowLink
import models.form._
import models.navigation.Menus
@@ -101,19 +102,19 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag
, "tuning" -> optional(
mapping(
"brokerViewUpdatePeriodSeconds" -> optional(number(10, 1000))
, "clusterManagerThreadPoolSize" -> optional(number(2, 1000))
, "clusterManagerThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000))
, "clusterManagerThreadPoolQueueSize" -> optional(number(10, 10000))
, "kafkaCommandThreadPoolSize" -> optional(number(2, 1000))
, "kafkaCommandThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000))
, "kafkaCommandThreadPoolQueueSize" -> optional(number(10, 10000))
, "logkafkaCommandThreadPoolSize" -> optional(number(2, 1000))
, "logkafkaCommandThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000))
, "logkafkaCommandThreadPoolQueueSize" -> optional(number(10, 10000))
, "logkafkaUpdatePeriodSeconds" -> optional(number(10, 1000))
, "partitionOffsetCacheTimeoutSecs" -> optional(number(5, 100))
, "brokerViewThreadPoolSize" -> optional(number(2, 1000))
, "brokerViewThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000))
, "brokerViewThreadPoolQueueSize" -> optional(number(10, 10000))
, "offsetCacheThreadPoolSize" -> optional(number(2, 1000))
, "offsetCacheThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000))
, "offsetCacheThreadPoolQueueSize" -> optional(number(10, 10000))
, "kafkaAdminClientThreadPoolSize" -> optional(number(2, 1000))
, "kafkaAdminClientThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000))
, "kafkaAdminClientThreadPoolQueueSize" -> optional(number(10, 10000))
, "kafkaManagedOffsetMetadataCheckMillis" -> optional(number(10000, 120000))
, "kafkaManagedOffsetGroupCacheSize" -> optional(number(10000, 100000000))
@@ -145,19 +146,19 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag
"tuning" -> optional(
mapping(
"brokerViewUpdatePeriodSeconds" -> optional(number(10, 1000))
, "clusterManagerThreadPoolSize" -> optional(number(2, 1000))
, "clusterManagerThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000))
, "clusterManagerThreadPoolQueueSize" -> optional(number(10, 10000))
, "kafkaCommandThreadPoolSize" -> optional(number(2, 1000))
, "kafkaCommandThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000))
, "kafkaCommandThreadPoolQueueSize" -> optional(number(10, 10000))
, "logkafkaCommandThreadPoolSize" -> optional(number(2, 1000))
, "logkafkaCommandThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000))
, "logkafkaCommandThreadPoolQueueSize" -> optional(number(10, 10000))
, "logkafkaUpdatePeriodSeconds" -> optional(number(10, 1000))
, "partitionOffsetCacheTimeoutSecs" -> optional(number(5, 100))
, "brokerViewThreadPoolSize" -> optional(number(2, 1000))
, "brokerViewThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000))
, "brokerViewThreadPoolQueueSize" -> optional(number(10, 10000))
, "offsetCacheThreadPoolSize" -> optional(number(2, 1000))
, "offsetCacheThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000))
, "offsetCacheThreadPoolQueueSize" -> optional(number(10, 10000))
, "kafkaAdminClientThreadPoolSize" -> optional(number(2, 1000))
, "kafkaAdminClientThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000))
, "kafkaAdminClientThreadPoolQueueSize" -> optional(number(10, 10000))
, "kafkaManagedOffsetMetadataCheckMillis" -> optional(number(10000, 120000))
, "kafkaManagedOffsetGroupCacheSize" -> optional(number(10000, 100000000))
8 changes: 8 additions & 0 deletions app/controllers/Logkafka.scala
Original file line number Diff line number Diff line change
@@ -97,6 +97,10 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana
LogkafkaNewConfigs.configMaps(Kafka_2_1_1).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_2_2_0_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_2_2_0).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_2_2_1_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_2_2_1).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_2_3_0_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_2_3_0).map{case(k,v) => LKConfig(k,Some(v))}.toList)

val defaultCreateForm = Form(
mapping(
@@ -157,6 +161,8 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana
case Kafka_2_1_0 => (defaultCreateForm.fill(kafka_2_1_0_Default), clusterContext)
case Kafka_2_1_1 => (defaultCreateForm.fill(kafka_2_1_1_Default), clusterContext)
case Kafka_2_2_0 => (defaultCreateForm.fill(kafka_2_2_0_Default), clusterContext)
case Kafka_2_2_1 => (defaultCreateForm.fill(kafka_2_2_1_Default), clusterContext)
case Kafka_2_3_0 => (defaultCreateForm.fill(kafka_2_3_0_Default), clusterContext)
}
}
}
@@ -261,6 +267,8 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana
case Kafka_2_1_0 => LogkafkaNewConfigs.configNames(Kafka_2_1_0).map(n => (n,LKConfig(n,None))).toMap
case Kafka_2_1_1 => LogkafkaNewConfigs.configNames(Kafka_2_1_1).map(n => (n,LKConfig(n,None))).toMap
case Kafka_2_2_0 => LogkafkaNewConfigs.configNames(Kafka_2_2_0).map(n => (n,LKConfig(n,None))).toMap
case Kafka_2_2_1 => LogkafkaNewConfigs.configNames(Kafka_2_2_1).map(n => (n,LKConfig(n,None))).toMap
case Kafka_2_3_0 => LogkafkaNewConfigs.configNames(Kafka_2_3_0).map(n => (n,LKConfig(n,None))).toMap
}
val identityOption = li.identityMap.get(log_path)
if (identityOption.isDefined) {
8 changes: 7 additions & 1 deletion app/controllers/Topic.scala
Original file line number Diff line number Diff line change
@@ -67,6 +67,8 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager
val kafka_2_1_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_1_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_1_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_1_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_2_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_2_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_2_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_2_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_3_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_3_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)

val defaultCreateForm = Form(
mapping(
@@ -168,7 +170,9 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager
case Kafka_2_0_0 => (defaultCreateForm.fill(kafka_2_0_0_Default), clusterContext)
case Kafka_2_1_0 => (defaultCreateForm.fill(kafka_2_1_0_Default), clusterContext)
case Kafka_2_1_1 => (defaultCreateForm.fill(kafka_2_1_1_Default), clusterContext)
case Kafka_2_2_0 => (defaultCreateForm.fill(kafka_2_1_1_Default), clusterContext)
case Kafka_2_2_0 => (defaultCreateForm.fill(kafka_2_2_0_Default), clusterContext)
case Kafka_2_2_1 => (defaultCreateForm.fill(kafka_2_2_1_Default), clusterContext)
case Kafka_2_3_0 => (defaultCreateForm.fill(kafka_2_3_0_Default), clusterContext)
}
}
}
@@ -419,6 +423,8 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager
case Kafka_2_1_0 => TopicConfigs.configNamesAndDoc(Kafka_2_1_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_1_1 => TopicConfigs.configNamesAndDoc(Kafka_2_1_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_2_0 => TopicConfigs.configNamesAndDoc(Kafka_2_2_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_2_1 => TopicConfigs.configNamesAndDoc(Kafka_2_2_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_3_0 => TopicConfigs.configNamesAndDoc(Kafka_2_3_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
}
val updatedConfigMap = ti.config.toMap
val updatedConfigList = defaultConfigs.map {
17 changes: 12 additions & 5 deletions app/kafka/manager/KafkaManager.scala
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ package kafka.manager

import java.util.Properties
import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import java.lang.Runtime.{getRuntime, _}

import akka.actor.{ActorPath, ActorSystem, Props}
import akka.util.Timeout
@@ -15,7 +16,7 @@ import grizzled.slf4j.Logging
import kafka.manager.actor.{KafkaManagerActor, KafkaManagerActorConfig}
import kafka.manager.base.LongRunningPoolConfig
import kafka.manager.model._
import ActorModel._
import kafka.manager.model.ActorModel._
import kafka.manager.actor.cluster.KafkaManagedOffsetCacheConfig
import kafka.manager.utils.UtilException
import kafka.manager.utils.zero81.ReassignPartitionErrors.ReplicationOutOfSync
@@ -59,6 +60,12 @@ object ApiError extends Logging {

object KafkaManager {

val AvailableProcessors: Int = getRuntime().availableProcessors();
val DefaultMinThreadPoolSize: Int = 2;
val DefaultMinThreadPoolSizeAsString: String = DefaultMinThreadPoolSize.toString;
val DefaultThreadPoolSize: Int = if (AvailableProcessors < DefaultMinThreadPoolSize) DefaultMinThreadPoolSize else AvailableProcessors;
val DefaultThreadPoolSizeAsString: String = DefaultThreadPoolSize.toString;

val ConsumerPropertiesFile = "kafka-manager.consumer.properties.file"
val BaseZkPath = "kafka-manager.base-zk-path"
val PinnedDispatchName = "kafka-manager.pinned-dispatcher-name"
@@ -94,18 +101,18 @@ object KafkaManager {
DeleteClusterUpdateSeconds -> "10",
DeletionBatchSize -> "2",
MaxQueueSize -> "100",
ThreadPoolSize -> "2",
ThreadPoolSize -> DefaultMinThreadPoolSizeAsString,
MutexTimeoutMillis -> "4000",
StartDelayMillis -> "1000",
ApiTimeoutMillis -> "5000",
ClusterActorsAskTimeoutMillis -> "2000",
PartitionOffsetCacheTimeoutSecs -> "5",
SimpleConsumerSocketTimeoutMillis -> "10000",
BrokerViewThreadPoolSize -> Runtime.getRuntime.availableProcessors().toString,
BrokerViewThreadPoolSize -> DefaultThreadPoolSizeAsString,
BrokerViewMaxQueueSize -> "1000",
OffsetCacheThreadPoolSize -> Runtime.getRuntime.availableProcessors().toString,
OffsetCacheThreadPoolSize -> DefaultThreadPoolSizeAsString,
OffsetCacheMaxQueueSize -> "1000",
KafkaAdminClientThreadPoolSize -> Runtime.getRuntime.availableProcessors().toString,
KafkaAdminClientThreadPoolSize -> DefaultThreadPoolSizeAsString,
KafkaAdminClientMaxQueueSize -> "1000",
KafkaManagedOffsetMetadataCheckMillis -> KafkaManagedOffsetCacheConfig.defaultGroupMemberMetadataCheckMillis.toString,
KafkaManagedOffsetGroupCacheSize -> KafkaManagedOffsetCacheConfig.defaultGroupTopicPartitionOffsetMaxSize.toString,
3 changes: 2 additions & 1 deletion app/kafka/manager/actor/cluster/ClusterManagerActor.scala
Original file line number Diff line number Diff line change
@@ -49,6 +49,7 @@ object ClusterManagerActor {
}

import kafka.manager.model.ActorModel._
import kafka.manager.KafkaManager._

case class ClusterManagerActorConfig(pinnedDispatcherName: String
, baseZkPath : String
@@ -168,7 +169,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
LogkafkaViewCacheActorConfig(
logkafkaStateActor.get,
clusterContext,
LongRunningPoolConfig(Runtime.getRuntime.availableProcessors(), 1000),
LongRunningPoolConfig(AvailableProcessors, 1000),
FiniteDuration(clusterConfig.tuning.get.logkafkaUpdatePeriodSeconds.get, TimeUnit.SECONDS)
)
)
2 changes: 1 addition & 1 deletion app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
@@ -176,7 +176,7 @@ class KafkaAdminClient(context: => ActorContext, adminClientActorPath: ActorPath


object KafkaManagedOffsetCache {
val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0)
val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0, Kafka_2_2_1, Kafka_2_3_0)
val ConsumerOffsetTopic = "__consumer_offsets"

def isSupported(version: KafkaVersion) : Boolean = {
12 changes: 11 additions & 1 deletion app/kafka/manager/model/model.scala
Original file line number Diff line number Diff line change
@@ -100,6 +100,14 @@ case object Kafka_2_2_0 extends KafkaVersion {
override def toString = "2.2.0"
}

case object Kafka_2_2_1 extends KafkaVersion {
override def toString = "2.2.1"
}

case object Kafka_2_3_0 extends KafkaVersion {
override def toString = "2.3.0"
}

object KafkaVersion {
val supportedVersions: Map[String,KafkaVersion] = Map(
"0.8.1.1" -> Kafka_0_8_1_1,
@@ -124,7 +132,9 @@ object KafkaVersion {
"2.0.0" -> Kafka_2_0_0,
"2.1.0" -> Kafka_2_1_0,
"2.1.1" -> Kafka_2_1_1,
"2.2.0" -> Kafka_2_2_0
"2.2.0" -> Kafka_2_2_0,
"2.2.1" -> Kafka_2_2_1,
"2.3.0" -> Kafka_2_3_0
)

val formSelectList : IndexedSeq[(String,String)] = supportedVersions.toIndexedSeq.filterNot(_._1.contains("beta")).map(t => (t._1,t._2.toString)).sortWith((a, b) => sortVersion(a._1, b._1))
4 changes: 3 additions & 1 deletion app/kafka/manager/utils/LogkafkaNewConfigs.scala
Original file line number Diff line number Diff line change
@@ -39,7 +39,9 @@ object LogkafkaNewConfigs {
Kafka_2_0_0 -> logkafka82.LogConfig,
Kafka_2_1_0 -> logkafka82.LogConfig,
Kafka_2_1_1 -> logkafka82.LogConfig,
Kafka_2_2_0 -> logkafka82.LogConfig
Kafka_2_2_0 -> logkafka82.LogConfig,
Kafka_2_2_1 -> logkafka82.LogConfig,
Kafka_2_3_0 -> logkafka82.LogConfig,
)

def configNames(version: KafkaVersion) : Set[String] = {
4 changes: 3 additions & 1 deletion app/kafka/manager/utils/TopicConfigs.scala
Original file line number Diff line number Diff line change
@@ -42,7 +42,9 @@ object TopicConfigs {
Kafka_2_0_0 -> two00.LogConfig,
Kafka_2_1_0 -> two00.LogConfig,
Kafka_2_1_1 -> two00.LogConfig,
Kafka_2_2_0 -> two00.LogConfig
Kafka_2_2_0 -> two00.LogConfig,
Kafka_2_2_1 -> two00.LogConfig,
Kafka_2_3_0 -> two00.LogConfig
)

def configNames(version: KafkaVersion): Seq[String] = {
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -9,6 +9,8 @@ version := "2.0.0.2"

scalaVersion := "2.12.8"

val kafkaVersion = "2.2.1"

scalacOptions ++= Seq("-Xlint:-missing-interpolator","-Xfatal-warnings","-deprecation","-feature","-language:implicitConversions","-language:postfixOps","-Xmax-classfile-name","240")

// From https://www.playframework.com/documentation/2.3.x/ProductionDist
@@ -37,8 +39,8 @@ libraryDependencies ++= Seq(
"org.slf4j" % "log4j-over-slf4j" % "1.7.25",
"com.adrianhurt" %% "play-bootstrap" % "1.4-P26-B4" exclude("com.typesafe.play", "*"),
"org.clapper" %% "grizzled-slf4j" % "1.3.3",
"org.apache.kafka" %% "kafka" % "2.2.0" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(),
"org.apache.kafka" % "kafka-streams" % "2.2.0",
"org.apache.kafka" %% "kafka" % kafkaVersion exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(),
"org.apache.kafka" % "kafka-streams" % kafkaVersion,
"com.beachape" %% "enumeratum" % "1.5.13",
"com.github.ben-manes.caffeine" % "caffeine" % "2.6.2",
"com.typesafe.play" %% "play-logback" % "2.6.21",
5 changes: 4 additions & 1 deletion test/controller/api/TestKafkaStateCheck.scala
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ import scala.concurrent.{Await, ExecutionContext}
import scala.util.Try

class TestKafkaStateCheck extends CuratorAwareTest with KafkaServerInTest with MockitoSugar {
val KafkaVersion = "2.3.0"
private[this] val broker = new SeededBroker("controller-api-test", 4)
override val kafkaServerZkPath = broker.getZookeeperConnectionString
private[this] val duration = FiniteDuration(10, SECONDS)
@@ -76,9 +77,11 @@ class TestKafkaStateCheck extends CuratorAwareTest with KafkaServerInTest with M
super.afterAll()
}



private[this] def createCluster() = {
val future = kafkaManagerContext.get.getKafkaManager.addCluster(
testClusterName, "2.2.0", kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManagerContext.get.getKafkaManager.defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None
testClusterName, KafkaVersion, kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManagerContext.get.getKafkaManager.defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None
)
val result = Await.result(future, duration)
result.toEither.left.foreach(apiError => sys.error(apiError.msg))
6 changes: 3 additions & 3 deletions test/kafka/manager/TestKafkaManager.scala
Original file line number Diff line number Diff line change
@@ -125,7 +125,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest {
}

test("add cluster") {
val future = kafkaManager.addCluster("dev","2.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManager.defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None)
val future = kafkaManager.addCluster("dev","2.3.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManager.defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None)
val result = Await.result(future,duration)
assert(result.isRight === true)
Thread.sleep(2000)
@@ -392,7 +392,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest {
}

test("update cluster zkhost") {
val future = kafkaManager.updateCluster("dev","2.2.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxSsl = false, jmxPass = None, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val future = kafkaManager.updateCluster("dev","2.3.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxSsl = false, jmxPass = None, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val result = Await.result(future,duration)
assert(result.isRight === true)

@@ -449,7 +449,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest {
}

test("update cluster logkafka enabled and activeOffsetCache enabled") {
val future = kafkaManager.updateCluster("dev","2.2.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val future = kafkaManager.updateCluster("dev","2.3.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None)
val result = Await.result(future,duration)
assert(result.isRight === true)

Loading
Oops, something went wrong.