From 18e948967e33fa2851071c5fc4627605fd0cf6ae Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 6 Sep 2021 09:18:47 -0700 Subject: [PATCH] KAFKA-13270: Set JUTE_MAXBUFFER to 4 MB by default (#11295) We restore the 3.4.x/3.5.x behavior unless the caller has set the property (note that ZKConfig auto configures itself if certain system properties have been set). I added a unit test that fails without the change and passes with it. I also refactored the code to streamline the way we handle parameters passed to KafkaZkClient and ZooKeeperClient. See https://github.com/apache/zookeeper/pull/1129 for the details on why the behavior changed in 3.6.0. Credit to @rondagostino for finding and reporting this issue. Reviewers: David Jacot --- .../scala/kafka/admin/ConfigCommand.scala | 2 +- .../kafka/admin/ZkSecurityMigrator.scala | 2 +- .../security/authorizer/AclAuthorizer.scala | 22 ++++----- .../main/scala/kafka/server/KafkaConfig.scala | 49 ++++++++----------- .../main/scala/kafka/server/KafkaServer.scala | 15 +++--- .../main/scala/kafka/zk/KafkaZkClient.scala | 27 +++++++--- .../kafka/zookeeper/ZooKeeperClient.scala | 25 ++-------- .../integration/kafka/api/SaslSetup.scala | 2 +- .../security/auth/ZkAuthorizationTest.scala | 16 +++--- .../authorizer/AclAuthorizerTest.scala | 36 +++++++------- .../AuthorizerInterfaceDefaultTest.scala | 5 +- .../unit/kafka/server/KafkaServerTest.scala | 16 +++--- .../unit/kafka/zk/KafkaZkClientTest.scala | 46 ++++++++++++++--- .../unit/kafka/zk/ZooKeeperTestHarness.scala | 5 +- .../kafka/zookeeper/ZooKeeperClientTest.scala | 45 ++++++++--------- 15 files changed, 169 insertions(+), 144 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 39a3698358d4..5e5ccefa4540 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -115,7 +115,7 @@ object ConfigCommand extends Config { val zkClientConfig = ZkSecurityMigrator.createZkClientConfigFromOption(opts.options, opts.zkTlsConfigFile) .getOrElse(new ZKClientConfig()) val zkClient = KafkaZkClient(zkConnectString, JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000, - Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig)) + Int.MaxValue, Time.SYSTEM, zkClientConfig = zkClientConfig, name = "ConfigCommand") val adminZkClient = new AdminZkClient(zkClient) try { if (opts.options.has(opts.alterOpt)) diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index a8b799a6b45a..126319503b57 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -105,7 +105,7 @@ object ZkSecurityMigrator extends Logging { val zkSessionTimeout = opts.options.valueOf(opts.zkSessionTimeoutOpt).intValue val zkConnectionTimeout = opts.options.valueOf(opts.zkConnectionTimeoutOpt).intValue val zkClient = KafkaZkClient(zkUrl, zkAcl, zkSessionTimeout, zkConnectionTimeout, - Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig)) + Int.MaxValue, Time.SYSTEM, zkClientConfig = zkClientConfig, name = "ZkSecurityMigrator") val enablePathCheck = opts.options.has(opts.enablePathCheckOpt) val migrator = new ZkSecurityMigrator(zkClient) migrator.run(enablePathCheck) diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala index 5f701d8d4098..88648fd3178c 100644 --- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala @@ -95,25 +95,25 @@ object AclAuthorizer { } } - private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): Option[ZKClientConfig] = { + private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = { val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + KafkaConfig.ZkSslClientEnableProp). map(_.toString).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean if (!zkSslClientEnable) - None + new ZKClientConfig else { // start with the base config from the Kafka configuration // be sure to force creation since the zkSslClientEnable property in the kafkaConfig could be false val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, true) // add in any prefixed overlays - KafkaConfig.ZkSslConfigToSystemPropertyMap.foreach{ case (kafkaProp, sysProp) => { - val prefixedValue = configMap.get(AclAuthorizer.configPrefix + kafkaProp) - if (prefixedValue.isDefined) - zkClientConfig.get.setProperty(sysProp, + KafkaConfig.ZkSslConfigToSystemPropertyMap.forKeyValue { (kafkaProp, sysProp) => + configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue => + zkClientConfig.setProperty(sysProp, if (kafkaProp == KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp) - (prefixedValue.get.toString.toUpperCase == "HTTPS").toString + (prefixedValue.toString.toUpperCase == "HTTPS").toString else - prefixedValue.get.toString) - }} + prefixedValue.toString) + } + } zkClientConfig } } @@ -178,8 +178,8 @@ class AclAuthorizer extends Authorizer with Logging { // createChrootIfNecessary=true is necessary in case we are running in a KRaft cluster // because such a cluster will not create any chroot path in ZooKeeper (it doesn't connect to ZooKeeper) zkClient = KafkaZkClient(zkUrl, kafkaConfig.zkEnableSecureAcls, zkSessionTimeOutMs, zkConnectionTimeoutMs, - zkMaxInFlightRequests, time, "kafka.security", "AclAuthorizer", name=Some("ACL authorizer"), - zkClientConfig = zkClientConfig, createChrootIfNecessary = true) + zkMaxInFlightRequests, time, name = "ACL authorizer", zkClientConfig = zkClientConfig, + metricGroup = "kafka.security", metricType = "AclAuthorizer", createChrootIfNecessary = true) zkClient.createAclPaths() extendedAclSupport = kafkaConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1 diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 794f73e82e9f..5dc071d51d2e 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -336,7 +336,7 @@ object KafkaConfig { ZkSslCrlEnableProp -> "zookeeper.ssl.crl", ZkSslOcspEnableProp -> "zookeeper.ssl.ocsp") - private[kafka] def getZooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String): Option[String] = { + private[kafka] def zooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String): Option[String] = { Option(clientConfig.getProperty(ZkSslConfigToSystemPropertyMap(kafkaPropName))) } @@ -345,7 +345,7 @@ object KafkaConfig { kafkaPropName match { case ZkSslEndpointIdentificationAlgorithmProp => (kafkaPropValue.toString.toUpperCase == "HTTPS").toString case ZkSslEnabledProtocolsProp | ZkSslCipherSuitesProp => kafkaPropValue match { - case list: java.util.List[_] => list.asInstanceOf[java.util.List[_]].asScala.mkString(",") + case list: java.util.List[_] => list.asScala.mkString(",") case _ => kafkaPropValue.toString } case _ => kafkaPropValue.toString @@ -354,10 +354,10 @@ object KafkaConfig { // For ZooKeeper TLS client authentication to be enabled the client must (at a minimum) configure itself as using TLS // with both a client connection socket and a key store location explicitly set. - private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig) = { - getZooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).getOrElse("false") == "true" && - getZooKeeperClientProperty(zkClientConfig, ZkClientCnxnSocketProp).isDefined && - getZooKeeperClientProperty(zkClientConfig, ZkSslKeyStoreLocationProp).isDefined + private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig): Boolean = { + zooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).contains("true") && + zooKeeperClientProperty(zkClientConfig, ZkClientCnxnSocketProp).isDefined && + zooKeeperClientProperty(zkClientConfig, ZkSslKeyStoreLocationProp).isDefined } /** ********* General Configuration ***********/ @@ -1443,7 +1443,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO // Need to translate any system property value from true/false (String) to true/false (Boolean) val actuallyProvided = originals.containsKey(propKey) if (actuallyProvided) getBoolean(propKey) else { - val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey) + val sysPropValue = KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey) sysPropValue match { case Some("true") => true case Some(_) => false @@ -1456,35 +1456,27 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO // Use the system property if it exists and the Kafka config value was defaulted rather than actually provided val actuallyProvided = originals.containsKey(propKey) if (actuallyProvided) getString(propKey) else { - val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey) - sysPropValue match { - case Some(_) => sysPropValue.get + KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey) match { + case Some(v) => v case _ => getString(propKey) // not specified so use the default value } } } private def zkOptionalStringConfigOrSystemProperty(propKey: String): Option[String] = { - Option(getString(propKey)) match { - case config: Some[String] => config - case _ => KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey) + Option(getString(propKey)).orElse { + KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey) } } private def zkPasswordConfigOrSystemProperty(propKey: String): Option[Password] = { - Option(getPassword(propKey)) match { - case config: Some[Password] => config - case _ => { - val sysProp = KafkaConfig.getZooKeeperClientProperty (zkClientConfigViaSystemProperties, propKey) - if (sysProp.isDefined) Some (new Password (sysProp.get) ) else None - } + Option(getPassword(propKey)).orElse { + KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey).map(new Password(_)) } } private def zkListConfigOrSystemProperty(propKey: String): Option[util.List[String]] = { - Option(getList(propKey)) match { - case config: Some[util.List[String]] => config - case _ => { - val sysProp = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey) - if (sysProp.isDefined) Some(sysProp.get.split("\\s*,\\s*").toList.asJava) else None + Option(getList(propKey)).orElse { + KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey).map { sysProp => + sysProp.split("\\s*,\\s*").toBuffer.asJava } } } @@ -1505,12 +1497,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO // Need to translate any system property value from true/false to HTTPS/ val kafkaProp = KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp val actuallyProvided = originals.containsKey(kafkaProp) - if (actuallyProvided) getString(kafkaProp) else { - val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, kafkaProp) - sysPropValue match { + if (actuallyProvided) + getString(kafkaProp) + else { + KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, kafkaProp) match { case Some("true") => "HTTPS" case Some(_) => "" - case _ => getString(kafkaProp) // not specified so use the default value + case None => getString(kafkaProp) // not specified so use the default value } } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 4cf80a080523..2e2d0188285c 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -56,11 +56,9 @@ import scala.jdk.CollectionConverters._ object KafkaServer { - def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false) = - if (!config.zkSslClientEnable && !forceZkSslClientEnable) - None - else { - val clientConfig = new ZKClientConfig() + def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false): ZKClientConfig = { + val clientConfig = new ZKClientConfig + if (config.zkSslClientEnable || forceZkSslClientEnable) { KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslClientEnableProp, "true") config.zkClientCnxnSocketClassName.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkClientCnxnSocketProp, _)) config.zkSslKeyStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslKeyStoreLocationProp, _)) @@ -75,8 +73,9 @@ object KafkaServer { KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp, config.ZkSslEndpointIdentificationAlgorithm) KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslCrlEnableProp, config.ZkSslCrlEnable.toString) KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslOcspEnableProp, config.ZkSslOcspEnable.toString) - Some(clientConfig) } + clientConfig + } val MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS: Long = 120000 } @@ -144,7 +143,7 @@ class KafkaServer( var metadataCache: ZkMetadataCache = null var quotaManagers: QuotaFactory.QuotaManagers = null - val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig()) + val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config) private var _zkClient: KafkaZkClient = null private var configRepository: ZkConfigRepository = null @@ -454,7 +453,7 @@ class KafkaServer( s"verification of the JAAS login file failed ${JaasUtils.zkSecuritySysConfigString}") _zkClient = KafkaZkClient(config.zkConnect, secureAclsEnabled, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - config.zkMaxInFlightRequests, time, name = Some("Kafka server"), zkClientConfig = Some(zkClientConfig), + config.zkMaxInFlightRequests, time, name = "Kafka server", zkClientConfig = zkClientConfig, createChrootIfNecessary = true) _zkClient.createTopLevelPaths() } diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index bcc89de9b7f7..823d6e8af600 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -17,7 +17,6 @@ package kafka.zk import java.util.Properties - import com.yammer.metrics.core.MetricName import kafka.api.LeaderAndIsr import kafka.cluster.Broker @@ -38,6 +37,7 @@ import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.zookeeper.KeeperException.{Code, NodeExistsException} import org.apache.zookeeper.OpResult.{CreateResult, ErrorResult, SetDataResult} import org.apache.zookeeper.client.ZKClientConfig +import org.apache.zookeeper.common.ZKConfig import org.apache.zookeeper.data.{ACL, Stat} import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper} @@ -1940,18 +1940,33 @@ object KafkaZkClient { connectionTimeoutMs: Int, maxInFlightRequests: Int, time: Time, + name: String, + zkClientConfig: ZKClientConfig, metricGroup: String = "kafka.server", metricType: String = "SessionExpireListener", - name: Option[String] = None, - zkClientConfig: Option[ZKClientConfig] = None, createChrootIfNecessary: Boolean = false ): KafkaZkClient = { + + /* ZooKeeper 3.6.0 changed the default configuration for JUTE_MAXBUFFER from 4 MB to 1 MB. + * This causes a regression if Kafka tries to retrieve a large amount of data across many + * znodes – in such a case the ZooKeeper client will repeatedly emit a message of the form + * "java.io.IOException: Packet len <####> is out of range". + * + * We restore the 3.4.x/3.5.x behavior unless the caller has set the property (note that ZKConfig + * auto configures itself if certain system properties have been set). + * + * See https://github.com/apache/zookeeper/pull/1129 for the details on why the behavior + * changed in 3.6.0. + */ + if (zkClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER) == null) + zkClientConfig.setProperty(ZKConfig.JUTE_MAXBUFFER, ((4096 * 1024).toString)) + if (createChrootIfNecessary) { val chrootIndex = connectString.indexOf("/") if (chrootIndex > 0) { val zkConnWithoutChrootForChrootCreation = connectString.substring(0, chrootIndex) - val zkClientForChrootCreation = KafkaZkClient(zkConnWithoutChrootForChrootCreation, isSecure, sessionTimeoutMs, - connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType, name, zkClientConfig) + val zkClientForChrootCreation = apply(zkConnWithoutChrootForChrootCreation, isSecure, sessionTimeoutMs, + connectionTimeoutMs, maxInFlightRequests, time, name, zkClientConfig, metricGroup, metricType) try { val chroot = connectString.substring(chrootIndex) if (!zkClientForChrootCreation.pathExists(chroot)) { @@ -1963,7 +1978,7 @@ object KafkaZkClient { } } val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, - time, metricGroup, metricType, name, zkClientConfig) + time, metricGroup, metricType, zkClientConfig, name) new KafkaZkClient(zooKeeperClient, isSecure, time) } diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 96ef6d517936..091b40182eae 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -61,24 +61,10 @@ class ZooKeeperClient(connectString: String, time: Time, metricGroup: String, metricType: String, - name: Option[String], - zkClientConfig: Option[ZKClientConfig]) extends Logging with KafkaMetricsGroup { - - def this(connectString: String, - sessionTimeoutMs: Int, - connectionTimeoutMs: Int, - maxInFlightRequests: Int, - time: Time, - metricGroup: String, - metricType: String) = { - this(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType, None, - None) - } + private[zookeeper] val clientConfig: ZKClientConfig, + name: String) extends Logging with KafkaMetricsGroup { - this.logIdent = name match { - case Some(n) => s"[ZooKeeperClient $n] " - case _ => "[ZooKeeperClient] " - } + this.logIdent = s"[ZooKeeperClient $name] " private val initializationLock = new ReentrantReadWriteLock() private val isConnectedOrExpiredLock = new ReentrantLock() private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition() @@ -109,13 +95,10 @@ class ZooKeeperClient(connectString: String, } } - private val clientConfig = zkClientConfig getOrElse new ZKClientConfig() - info(s"Initializing a new session to $connectString.") // Fail-fast if there's an error during construction (so don't call initialize, which retries forever) @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher, clientConfig) - private[zookeeper] def getClientConfig = clientConfig newGauge("SessionState", () => connectionState.toString) @@ -436,7 +419,7 @@ class ZooKeeperClient(connectString: String, }, delayMs, period = -1L, unit = TimeUnit.MILLISECONDS) } - private def threadPrefix: String = name.map(n => n.replaceAll("\\s", "") + "-").getOrElse("") + private def threadPrefix: String = name.replaceAll("\\s", "") + "-" // package level visibility for testing only private[zookeeper] object ZooKeeperClientWatcher extends Watcher { diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index 92370113d131..d613b7242caa 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -195,7 +195,7 @@ trait SaslSetup { val zkClientConfig = new ZKClientConfig() val zkClient = KafkaZkClient( zkConnect, JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000, - Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig)) + Int.MaxValue, Time.SYSTEM, name = "SaslSetup", zkClientConfig = zkClientConfig) val adminZkClient = new AdminZkClient(zkClient) val entityType = ConfigType.User diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala index 69105c3fcd31..74803ce0f9b9 100644 --- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala @@ -18,7 +18,6 @@ package kafka.security.auth import java.nio.charset.StandardCharsets - import kafka.admin.ZkSecurityMigrator import kafka.utils.{Logging, TestUtils} import kafka.zk._ @@ -36,6 +35,7 @@ import kafka.controller.ReplicaAssignment import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.Time +import org.apache.zookeeper.client.ZKClientConfig import scala.jdk.CollectionConverters._ import scala.collection.Seq @@ -137,13 +137,17 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10) + private def newKafkaZkClient(connectionString: String, isSecure: Boolean) = + KafkaZkClient(connectionString, isSecure, 6000, 6000, Int.MaxValue, Time.SYSTEM, "ZkAuthorizationTest", + new ZKClientConfig) + /** * Tests the migration tool when making an unsecure * cluster secure. */ @Test def testZkMigration(): Unit = { - val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM) + val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false) try { testMigration(zkConnect, unsecureZkClient, zkClient) } finally { @@ -157,7 +161,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { */ @Test def testZkAntiMigration(): Unit = { - val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM) + val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false) try { testMigration(zkConnect, zkClient, unsecureZkClient) } finally { @@ -198,8 +202,8 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { def testChroot(): Unit = { val zkUrl = zkConnect + "/kafka" zkClient.createRecursive("/kafka") - val unsecureZkClient = KafkaZkClient(zkUrl, false, 6000, 6000, Int.MaxValue, Time.SYSTEM) - val secureZkClient = KafkaZkClient(zkUrl, true, 6000, 6000, Int.MaxValue, Time.SYSTEM) + val unsecureZkClient = newKafkaZkClient(zkUrl, isSecure = false) + val secureZkClient = newKafkaZkClient(zkUrl, isSecure = true) try { testMigration(zkUrl, unsecureZkClient, secureZkClient) } finally { @@ -284,7 +288,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { */ private def deleteAllUnsecure(): Unit = { System.setProperty(JaasUtils.ZK_SASL_CLIENT, "false") - val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM) + val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false) val result: Try[Boolean] = { deleteRecursive(unsecureZkClient, "/") } diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala index fa201db461d6..a49b0d398f65 100644 --- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala @@ -22,7 +22,6 @@ import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.Files import java.util.{Collections, UUID} import java.util.concurrent.{Executors, Semaphore, TimeUnit} - import kafka.Kafka import kafka.api.{ApiVersion, KAFKA_2_0_IV0, KAFKA_2_0_IV1} import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString} @@ -43,6 +42,7 @@ import org.apache.kafka.common.resource.PatternType.{LITERAL, MATCH, PREFIXED} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.server.authorizer._ import org.apache.kafka.common.utils.{Time, SecurityUtils => JSecurityUtils} +import org.apache.zookeeper.client.ZKClientConfig import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -86,7 +86,7 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest { resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL) zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, - Time.SYSTEM, "kafka.test", "AclAuthorizerTest") + Time.SYSTEM, "kafka.test", "AclAuthorizerTest", new ZKClientConfig, "AclAuthorizerTest") } @AfterEach @@ -779,9 +779,12 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest { @Test def testAuthorizerNoZkConfig(): Unit = { val noTlsProps = Kafka.getPropsFromArgs(Array(prepareDefaultConfig)) - assertEquals(None, AclAuthorizer.zkClientConfigFromKafkaConfigAndMap( + val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap( KafkaConfig.fromProps(noTlsProps), - mutable.Map(noTlsProps.asInstanceOf[java.util.Map[String, Any]].asScala.toSeq: _*))) + noTlsProps.asInstanceOf[java.util.Map[String, Any]].asScala) + KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach { propName => + assertNull(zkClientConfig.getProperty(propName)) + } } @Test @@ -799,20 +802,19 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest { KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue, KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue, KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue) - configs.foreach{case (key, value) => props.put(key, value.toString) } + configs.foreach { case (key, value) => props.put(key, value) } val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap( KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*)) - assertTrue(zkClientConfig.isDefined) // confirm we get all the values we expect KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match { case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => - assertEquals("true", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("")) + assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("")) case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => - assertEquals("false", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("")) + assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("")) case KafkaConfig.ZkSslProtocolProp => - assertEquals("TLSv1.2", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("")) - case _ => assertEquals(kafkaValue, KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("")) + assertEquals("TLSv1.2", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("")) + case _ => assertEquals(kafkaValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("")) }) } @@ -839,14 +841,13 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest { val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap( KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*)) - assertTrue(zkClientConfig.isDefined) // confirm we get all the values we expect KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match { case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => - assertEquals("true", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("")) + assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("")) case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => - assertEquals("false", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("")) - case _ => assertEquals(kafkaValue, KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("")) + assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("")) + case _ => assertEquals(kafkaValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("")) }) } @@ -889,14 +890,13 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest { val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap( KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*)) - assertTrue(zkClientConfig.isDefined) // confirm we get all the values we expect KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match { case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => - assertEquals("true", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("")) + assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("")) case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => - assertEquals("false", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("")) - case _ => assertEquals(prefixedValue, KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("")) + assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("")) + case _ => assertEquals(prefixedValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("")) }) } diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala index 86c7a147bc67..bccd58afa0b7 100644 --- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala +++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala @@ -18,7 +18,6 @@ package kafka.security.authorizer import java.util.concurrent.CompletionStage import java.{lang, util} - import kafka.server.KafkaConfig import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness @@ -27,6 +26,7 @@ import org.apache.kafka.common.Endpoint import org.apache.kafka.common.acl._ import org.apache.kafka.common.utils.Time import org.apache.kafka.server.authorizer._ +import org.apache.zookeeper.client.ZKClientConfig import org.junit.jupiter.api.{AfterEach, BeforeEach} class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with BaseAuthorizerTest { @@ -49,7 +49,8 @@ class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with BaseAutho interfaceDefaultAuthorizer.authorizer.configure(config.originals) zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, - Time.SYSTEM, "kafka.test", "AuthorizerInterfaceDefaultTest") + Time.SYSTEM, "kafka.test", "AuthorizerInterfaceDefaultTest", new ZKClientConfig, + "AuthorizerInterfaceDefaultTest") } @AfterEach diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala index 35f327336719..8056e23a909d 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala @@ -20,8 +20,7 @@ package kafka.server import kafka.api.ApiVersion import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness -import org.apache.zookeeper.client.ZKClientConfig -import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, fail} +import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, fail} import org.junit.jupiter.api.Test import java.util.Properties @@ -47,7 +46,10 @@ class KafkaServerTest extends ZooKeeperTestHarness { val props = new Properties props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out props.put(KafkaConfig.ZkSslClientEnableProp, "false") - assertEquals(None, KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))) + val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props)) + KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach { propName => + assertNull(zkClientConfig.getProperty(propName)) + } } @Test @@ -62,7 +64,7 @@ class KafkaServerTest extends ZooKeeperTestHarness { case _ => someValue } KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp))) - val zkClientConfig: Option[ZKClientConfig] = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props)) + val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props)) // now check to make sure the values were set correctly def zkClientValueToExpect(kafkaProp: String) : String = kafkaProp match { case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "true" @@ -70,7 +72,7 @@ class KafkaServerTest extends ZooKeeperTestHarness { case _ => someValue } KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => - assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.get.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp)))) + assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp)))) } @Test @@ -87,7 +89,7 @@ class KafkaServerTest extends ZooKeeperTestHarness { case _ => someValue } KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp))) - val zkClientConfig: Option[ZKClientConfig] = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props)) + val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props)) // now check to make sure the values were set correctly def zkClientValueToExpect(kafkaProp: String) : String = kafkaProp match { case KafkaConfig.ZkSslClientEnableProp => "true" @@ -97,7 +99,7 @@ class KafkaServerTest extends ZooKeeperTestHarness { case _ => someValue } KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => - assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.get.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp)))) + assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp)))) } @Test diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index 0c1d860ed8dc..2088e5f3c097 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -51,6 +51,7 @@ import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC} import org.apache.kafka.common.security.JaasUtils import org.apache.zookeeper.ZooDefs import org.apache.zookeeper.client.ZKClientConfig +import org.apache.zookeeper.common.ZKConfig import org.apache.zookeeper.data.Stat import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -76,7 +77,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { super.setUp() zkClient.createControllerEpochRaw(1) otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout, - zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM) + zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClient", + zkClientConfig = new ZKClientConfig) expiredSessionZkClient = ExpiredKafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM) } @@ -103,15 +105,15 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { val propVal = "org.apache.zookeeper.ClientCnxnSocketNetty" KafkaConfig.setZooKeeperClientProperty(clientConfig, propKey, propVal) val client = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout, - zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, zkClientConfig = Some(clientConfig)) + zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClient", zkClientConfig = clientConfig) try { - assertEquals(Some(propVal), KafkaConfig.getZooKeeperClientProperty(client.currentZooKeeper.getClientConfig, propKey)) + assertEquals(Some(propVal), KafkaConfig.zooKeeperClientProperty(client.currentZooKeeper.getClientConfig, propKey)) // For a sanity check, make sure a bad client connection socket class name generates an exception val badClientConfig = new ZKClientConfig() KafkaConfig.setZooKeeperClientProperty(badClientConfig, propKey, propVal + "BadClassName") assertThrows(classOf[Exception], () => KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout, - zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, zkClientConfig = Some(badClientConfig))) + zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClientTest", zkClientConfig = badClientConfig)) } finally { client.close() } @@ -121,9 +123,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @ValueSource(booleans = Array(true, false)) def testChroot(createChrootIfNecessary: Boolean): Unit = { val chroot = "/chroot" - val clientConfig = new ZKClientConfig() val client = KafkaZkClient(zkConnect + chroot, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout, - zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, zkClientConfig = Some(clientConfig), createChrootIfNecessary = createChrootIfNecessary) + zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClientTest", + zkClientConfig = new ZKClientConfig, createChrootIfNecessary = createChrootIfNecessary) try { client.createTopLevelPaths() if (!createChrootIfNecessary) { @@ -158,7 +160,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { // this client doesn't have create permission to the root and chroot, but the chroot already exists // Expect that no exception thrown val chrootClient = KafkaZkClient(zkConnect + chroot, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout, - zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, createChrootIfNecessary = true) + zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClientTest", + zkClientConfig = new ZKClientConfig, createChrootIfNecessary = true) chrootClient.close() } @@ -1340,6 +1343,33 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertEquals(ZooDefs.Ids.READ_ACL_UNSAFE.asScala, zkClient.getAcl(mockPath)) } + @Test + def testJuteMaxBufffer(): Unit = { + + def assertJuteMaxBufferConfig(clientConfig: ZKClientConfig, expectedValue: String): Unit = { + val client = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout, + zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClient", + zkClientConfig = clientConfig) + try assertEquals(expectedValue, client.currentZooKeeper.getClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER)) + finally client.close() + } + + // default case + assertEquals("4194304", zkClient.currentZooKeeper.getClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER)) + + // Value set directly on ZKClientConfig takes precedence over system property + System.setProperty(ZKConfig.JUTE_MAXBUFFER, (3000 * 1024).toString) + try { + val clientConfig1 = new ZKClientConfig + clientConfig1.setProperty(ZKConfig.JUTE_MAXBUFFER, (2000 * 1024).toString) + assertJuteMaxBufferConfig(clientConfig1, expectedValue = "2048000") + + // System property value is used if value is not set in ZKClientConfig + assertJuteMaxBufferConfig(new ZKClientConfig, expectedValue = "3072000") + + } finally System.clearProperty(ZKConfig.JUTE_MAXBUFFER) + } + class ExpiredKafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time) extends KafkaZkClient(zooKeeperClient, isSecure, time) { // Overwriting this method from the parent class to force the client to re-register the Broker. @@ -1365,7 +1395,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { metricGroup: String = "kafka.server", metricType: String = "SessionExpireListener") = { val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, - time, metricGroup, metricType) + time, metricGroup, metricType, new ZKClientConfig, "ExpiredKafkaZkClient") new ExpiredKafkaZkClient(zooKeeperClient, isSecure, time) } } diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 6bc9d3a54f06..8b61c0e7a1db 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -19,7 +19,7 @@ package kafka.zk import javax.security.auth.login.Configuration import kafka.utils.{CoreUtils, Logging, TestUtils} -import org.junit.jupiter.api.{AfterEach, AfterAll, BeforeEach, BeforeAll, Tag} +import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag} import org.junit.jupiter.api.Assertions._ import org.apache.kafka.common.security.JaasUtils @@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.internals.AbstractCoordinator import kafka.controller.ControllerEventManager import org.apache.kafka.clients.admin.AdminClientUnitTestEnv import org.apache.kafka.common.utils.Time +import org.apache.zookeeper.client.ZKClientConfig import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper} @Tag("integration") @@ -53,7 +54,7 @@ abstract class ZooKeeperTestHarness extends Logging { def setUp(): Unit = { zookeeper = new EmbeddedZookeeper() zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout, - zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM) + zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "ZooKeeperTestHarness", new ZKClientConfig) adminZkClient = new AdminZkClient(zkClient) } diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala index 0392386a18e6..a0eb1eab66d7 100644 --- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala +++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala @@ -49,8 +49,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { ZooKeeperTestHarness.verifyNoUnexpectedThreads("@BeforeEach") cleanMetricsRegistry() super.setUp() - zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, - Time.SYSTEM, "testMetricGroup", "testMetricType") + zooKeeperClient = newZooKeeperClient() } @AfterEach @@ -65,8 +64,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testUnresolvableConnectString(): Unit = { try { - new ZooKeeperClient("some.invalid.hostname.foo.bar.local", zkSessionTimeout, connectionTimeoutMs = 10, - Int.MaxValue, time, "testMetricGroup", "testMetricType") + newZooKeeperClient("some.invalid.hostname.foo.bar.local", connectionTimeoutMs = 10) } catch { case e: ZooKeeperClientTimeoutException => assertEquals(Set.empty, runningZkSendThreads, "ZooKeeper client threads still running") @@ -81,14 +79,13 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testConnectionTimeout(): Unit = { zookeeper.shutdown() - assertThrows(classOf[ZooKeeperClientTimeoutException], () => new ZooKeeperClient(zkConnect, zkSessionTimeout, - connectionTimeoutMs = 10, Int.MaxValue, time, "testMetricGroup", "testMetricType").close()) + assertThrows(classOf[ZooKeeperClientTimeoutException], () => newZooKeeperClient( + connectionTimeoutMs = 10).close()) } @Test def testConnection(): Unit = { - val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup", - "testMetricType") + val client = newZooKeeperClient() try { // Verify ZooKeeper event thread name. This is used in ZooKeeperTestHarness to verify that tests have closed ZK clients val threads = Thread.getAllStackTraces.keySet.asScala.map(_.getName) @@ -108,15 +105,13 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { val propKey = KafkaConfig.ZkClientCnxnSocketProp val propVal = "org.apache.zookeeper.ClientCnxnSocketNetty" KafkaConfig.setZooKeeperClientProperty(clientConfig, propKey, propVal) - val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup", - "testMetricType", None, Some(clientConfig)) + val client = newZooKeeperClient(clientConfig = clientConfig) try { - assertEquals(Some(propVal), KafkaConfig.getZooKeeperClientProperty(client.getClientConfig, propKey)) + assertEquals(Some(propVal), KafkaConfig.zooKeeperClientProperty(client.clientConfig, propKey)) // For a sanity check, make sure a bad client connection socket class name generates an exception val badClientConfig = new ZKClientConfig() KafkaConfig.setZooKeeperClientProperty(badClientConfig, propKey, propVal + "BadClassName") - assertThrows(classOf[Exception], () => new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, - Int.MaxValue, time, "testMetricGroup", "testMetricType", None, Some(badClientConfig))) + assertThrows(classOf[Exception], () => newZooKeeperClient(clientConfig = badClientConfig)) } finally { client.close() } @@ -350,8 +345,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { } zooKeeperClient.close() - zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, - "testMetricGroup", "testMetricType") + zooKeeperClient = newZooKeeperClient() zooKeeperClient.registerStateChangeHandler(stateChangeHandler) val requestThread = new Thread() { @@ -399,8 +393,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { } zooKeeperClient.close() - zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, - "testMetricGroup", "testMetricType") + zooKeeperClient = newZooKeeperClient() zooKeeperClient.registerStateChangeHandler(faultyHandler) zooKeeperClient.registerStateChangeHandler(goodHandler) @@ -476,8 +469,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { } } - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, - "testMetricGroup", "testMetricType") + val zooKeeperClient = newZooKeeperClient() try { zooKeeperClient.registerStateChangeHandler(stateChangeHandler) zooKeeperClient.forceReinitialize() @@ -489,8 +481,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testConnectionLossRequestTermination(): Unit = { val batchSize = 10 - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 2, time, - "testGroupType", "testGroupName") + val zooKeeperClient = newZooKeeperClient(maxInFlight = 2) zookeeper.shutdown() try { val requests = (1 to batchSize).map(i => GetDataRequest(s"/$i")) @@ -553,7 +544,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @volatile var resultCodes: Seq[Code] = null val stateChanges = new ConcurrentLinkedQueue[String]() val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, maxInflightRequests, - time, "testGroupType", "testGroupName") { + time, "testGroupType", "testGroupName", new ZKClientConfig, "ZooKeeperClientTest") { override def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = { super.send(request)( response => { responseExecutor.submit(new Runnable { @@ -657,8 +648,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { } zooKeeperClient.close() - zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, - "testMetricGroup", "testMetricType") + zooKeeperClient = newZooKeeperClient() zooKeeperClient.registerStateChangeHandler(changeHandler) zooKeeperClient.ZooKeeperClientWatcher.process(new WatchedEvent(EventType.None, KeeperState.AuthFailed, null)) @@ -708,6 +698,13 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { assertEquals(States.CLOSED, zooKeeperClient.connectionState) } + private def newZooKeeperClient(connectionString: String = zkConnect, + connectionTimeoutMs: Int = zkConnectionTimeout, + maxInFlight: Int = zkMaxInFlightRequests, + clientConfig: ZKClientConfig = new ZKClientConfig) = + new ZooKeeperClient(connectionString, zkSessionTimeout, connectionTimeoutMs, maxInFlight, time, + "testMetricGroup", "testMetricType", clientConfig, "ZooKeeperClientTest") + private def cleanMetricsRegistry(): Unit = { val metrics = KafkaYammerMetrics.defaultRegistry metrics.allMetrics.keySet.forEach(metrics.removeMetric)