Skip to content

Commit

Permalink
KAFKA-13270: Set JUTE_MAXBUFFER to 4 MB by default (apache#11295)
Browse files Browse the repository at this point in the history
We restore the 3.4.x/3.5.x behavior unless the caller has set the property (note that ZKConfig
auto configures itself if certain system properties have been set).

I added a unit test that fails without the change and passes with it.

I also refactored the code to streamline the way we handle parameters passed to
KafkaZkClient and ZooKeeperClient.
 
See apache/zookeeper#1129 for the details on why the behavior
changed in 3.6.0.

Credit to @rondagostino for finding and reporting this issue.

Reviewers: David Jacot <djacot@confluent.io>
  • Loading branch information
ijuma authored and Ralph Debusmann committed Dec 22, 2021
1 parent 845b50a commit 18e9489
Show file tree
Hide file tree
Showing 15 changed files with 169 additions and 144 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/ConfigCommand.scala
Expand Up @@ -115,7 +115,7 @@ object ConfigCommand extends Config {
val zkClientConfig = ZkSecurityMigrator.createZkClientConfigFromOption(opts.options, opts.zkTlsConfigFile)
.getOrElse(new ZKClientConfig())
val zkClient = KafkaZkClient(zkConnectString, JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig))
Int.MaxValue, Time.SYSTEM, zkClientConfig = zkClientConfig, name = "ConfigCommand")
val adminZkClient = new AdminZkClient(zkClient)
try {
if (opts.options.has(opts.alterOpt))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
Expand Up @@ -105,7 +105,7 @@ object ZkSecurityMigrator extends Logging {
val zkSessionTimeout = opts.options.valueOf(opts.zkSessionTimeoutOpt).intValue
val zkConnectionTimeout = opts.options.valueOf(opts.zkConnectionTimeoutOpt).intValue
val zkClient = KafkaZkClient(zkUrl, zkAcl, zkSessionTimeout, zkConnectionTimeout,
Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig))
Int.MaxValue, Time.SYSTEM, zkClientConfig = zkClientConfig, name = "ZkSecurityMigrator")
val enablePathCheck = opts.options.has(opts.enablePathCheckOpt)
val migrator = new ZkSecurityMigrator(zkClient)
migrator.run(enablePathCheck)
Expand Down
22 changes: 11 additions & 11 deletions core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
Expand Up @@ -95,25 +95,25 @@ object AclAuthorizer {
}
}

private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): Option[ZKClientConfig] = {
private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = {
val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + KafkaConfig.ZkSslClientEnableProp).
map(_.toString).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
if (!zkSslClientEnable)
None
new ZKClientConfig
else {
// start with the base config from the Kafka configuration
// be sure to force creation since the zkSslClientEnable property in the kafkaConfig could be false
val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, true)
// add in any prefixed overlays
KafkaConfig.ZkSslConfigToSystemPropertyMap.foreach{ case (kafkaProp, sysProp) => {
val prefixedValue = configMap.get(AclAuthorizer.configPrefix + kafkaProp)
if (prefixedValue.isDefined)
zkClientConfig.get.setProperty(sysProp,
KafkaConfig.ZkSslConfigToSystemPropertyMap.forKeyValue { (kafkaProp, sysProp) =>
configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue =>
zkClientConfig.setProperty(sysProp,
if (kafkaProp == KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp)
(prefixedValue.get.toString.toUpperCase == "HTTPS").toString
(prefixedValue.toString.toUpperCase == "HTTPS").toString
else
prefixedValue.get.toString)
}}
prefixedValue.toString)
}
}
zkClientConfig
}
}
Expand Down Expand Up @@ -178,8 +178,8 @@ class AclAuthorizer extends Authorizer with Logging {
// createChrootIfNecessary=true is necessary in case we are running in a KRaft cluster
// because such a cluster will not create any chroot path in ZooKeeper (it doesn't connect to ZooKeeper)
zkClient = KafkaZkClient(zkUrl, kafkaConfig.zkEnableSecureAcls, zkSessionTimeOutMs, zkConnectionTimeoutMs,
zkMaxInFlightRequests, time, "kafka.security", "AclAuthorizer", name=Some("ACL authorizer"),
zkClientConfig = zkClientConfig, createChrootIfNecessary = true)
zkMaxInFlightRequests, time, name = "ACL authorizer", zkClientConfig = zkClientConfig,
metricGroup = "kafka.security", metricType = "AclAuthorizer", createChrootIfNecessary = true)
zkClient.createAclPaths()

extendedAclSupport = kafkaConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1
Expand Down
49 changes: 21 additions & 28 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Expand Up @@ -336,7 +336,7 @@ object KafkaConfig {
ZkSslCrlEnableProp -> "zookeeper.ssl.crl",
ZkSslOcspEnableProp -> "zookeeper.ssl.ocsp")

private[kafka] def getZooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String): Option[String] = {
private[kafka] def zooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String): Option[String] = {
Option(clientConfig.getProperty(ZkSslConfigToSystemPropertyMap(kafkaPropName)))
}

Expand All @@ -345,7 +345,7 @@ object KafkaConfig {
kafkaPropName match {
case ZkSslEndpointIdentificationAlgorithmProp => (kafkaPropValue.toString.toUpperCase == "HTTPS").toString
case ZkSslEnabledProtocolsProp | ZkSslCipherSuitesProp => kafkaPropValue match {
case list: java.util.List[_] => list.asInstanceOf[java.util.List[_]].asScala.mkString(",")
case list: java.util.List[_] => list.asScala.mkString(",")
case _ => kafkaPropValue.toString
}
case _ => kafkaPropValue.toString
Expand All @@ -354,10 +354,10 @@ object KafkaConfig {

// For ZooKeeper TLS client authentication to be enabled the client must (at a minimum) configure itself as using TLS
// with both a client connection socket and a key store location explicitly set.
private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig) = {
getZooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).getOrElse("false") == "true" &&
getZooKeeperClientProperty(zkClientConfig, ZkClientCnxnSocketProp).isDefined &&
getZooKeeperClientProperty(zkClientConfig, ZkSslKeyStoreLocationProp).isDefined
private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig): Boolean = {
zooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).contains("true") &&
zooKeeperClientProperty(zkClientConfig, ZkClientCnxnSocketProp).isDefined &&
zooKeeperClientProperty(zkClientConfig, ZkSslKeyStoreLocationProp).isDefined
}

/** ********* General Configuration ***********/
Expand Down Expand Up @@ -1443,7 +1443,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
// Need to translate any system property value from true/false (String) to true/false (Boolean)
val actuallyProvided = originals.containsKey(propKey)
if (actuallyProvided) getBoolean(propKey) else {
val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
val sysPropValue = KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
sysPropValue match {
case Some("true") => true
case Some(_) => false
Expand All @@ -1456,35 +1456,27 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
// Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
val actuallyProvided = originals.containsKey(propKey)
if (actuallyProvided) getString(propKey) else {
val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
sysPropValue match {
case Some(_) => sysPropValue.get
KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey) match {
case Some(v) => v
case _ => getString(propKey) // not specified so use the default value
}
}
}

private def zkOptionalStringConfigOrSystemProperty(propKey: String): Option[String] = {
Option(getString(propKey)) match {
case config: Some[String] => config
case _ => KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
Option(getString(propKey)).orElse {
KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
}
}
private def zkPasswordConfigOrSystemProperty(propKey: String): Option[Password] = {
Option(getPassword(propKey)) match {
case config: Some[Password] => config
case _ => {
val sysProp = KafkaConfig.getZooKeeperClientProperty (zkClientConfigViaSystemProperties, propKey)
if (sysProp.isDefined) Some (new Password (sysProp.get) ) else None
}
Option(getPassword(propKey)).orElse {
KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey).map(new Password(_))
}
}
private def zkListConfigOrSystemProperty(propKey: String): Option[util.List[String]] = {
Option(getList(propKey)) match {
case config: Some[util.List[String]] => config
case _ => {
val sysProp = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
if (sysProp.isDefined) Some(sysProp.get.split("\\s*,\\s*").toList.asJava) else None
Option(getList(propKey)).orElse {
KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey).map { sysProp =>
sysProp.split("\\s*,\\s*").toBuffer.asJava
}
}
}
Expand All @@ -1505,12 +1497,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
// Need to translate any system property value from true/false to HTTPS/<blank>
val kafkaProp = KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp
val actuallyProvided = originals.containsKey(kafkaProp)
if (actuallyProvided) getString(kafkaProp) else {
val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, kafkaProp)
sysPropValue match {
if (actuallyProvided)
getString(kafkaProp)
else {
KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, kafkaProp) match {
case Some("true") => "HTTPS"
case Some(_) => ""
case _ => getString(kafkaProp) // not specified so use the default value
case None => getString(kafkaProp) // not specified so use the default value
}
}
}
Expand Down
15 changes: 7 additions & 8 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Expand Up @@ -56,11 +56,9 @@ import scala.jdk.CollectionConverters._

object KafkaServer {

def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false) =
if (!config.zkSslClientEnable && !forceZkSslClientEnable)
None
else {
val clientConfig = new ZKClientConfig()
def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false): ZKClientConfig = {
val clientConfig = new ZKClientConfig
if (config.zkSslClientEnable || forceZkSslClientEnable) {
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslClientEnableProp, "true")
config.zkClientCnxnSocketClassName.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkClientCnxnSocketProp, _))
config.zkSslKeyStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslKeyStoreLocationProp, _))
Expand All @@ -75,8 +73,9 @@ object KafkaServer {
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp, config.ZkSslEndpointIdentificationAlgorithm)
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslCrlEnableProp, config.ZkSslCrlEnable.toString)
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslOcspEnableProp, config.ZkSslOcspEnable.toString)
Some(clientConfig)
}
clientConfig
}

val MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS: Long = 120000
}
Expand Down Expand Up @@ -144,7 +143,7 @@ class KafkaServer(
var metadataCache: ZkMetadataCache = null
var quotaManagers: QuotaFactory.QuotaManagers = null

val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config)
private var _zkClient: KafkaZkClient = null
private var configRepository: ZkConfigRepository = null

Expand Down Expand Up @@ -454,7 +453,7 @@ class KafkaServer(
s"verification of the JAAS login file failed ${JaasUtils.zkSecuritySysConfigString}")

_zkClient = KafkaZkClient(config.zkConnect, secureAclsEnabled, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
config.zkMaxInFlightRequests, time, name = Some("Kafka server"), zkClientConfig = Some(zkClientConfig),
config.zkMaxInFlightRequests, time, name = "Kafka server", zkClientConfig = zkClientConfig,
createChrootIfNecessary = true)
_zkClient.createTopLevelPaths()
}
Expand Down
27 changes: 21 additions & 6 deletions core/src/main/scala/kafka/zk/KafkaZkClient.scala
Expand Up @@ -17,7 +17,6 @@
package kafka.zk

import java.util.Properties

import com.yammer.metrics.core.MetricName
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
Expand All @@ -38,6 +37,7 @@ import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
import org.apache.zookeeper.OpResult.{CreateResult, ErrorResult, SetDataResult}
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.common.ZKConfig
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper}

Expand Down Expand Up @@ -1940,18 +1940,33 @@ object KafkaZkClient {
connectionTimeoutMs: Int,
maxInFlightRequests: Int,
time: Time,
name: String,
zkClientConfig: ZKClientConfig,
metricGroup: String = "kafka.server",
metricType: String = "SessionExpireListener",
name: Option[String] = None,
zkClientConfig: Option[ZKClientConfig] = None,
createChrootIfNecessary: Boolean = false
): KafkaZkClient = {

/* ZooKeeper 3.6.0 changed the default configuration for JUTE_MAXBUFFER from 4 MB to 1 MB.
* This causes a regression if Kafka tries to retrieve a large amount of data across many
* znodes – in such a case the ZooKeeper client will repeatedly emit a message of the form
* "java.io.IOException: Packet len <####> is out of range".
*
* We restore the 3.4.x/3.5.x behavior unless the caller has set the property (note that ZKConfig
* auto configures itself if certain system properties have been set).
*
* See https://github.com/apache/zookeeper/pull/1129 for the details on why the behavior
* changed in 3.6.0.
*/
if (zkClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER) == null)
zkClientConfig.setProperty(ZKConfig.JUTE_MAXBUFFER, ((4096 * 1024).toString))

if (createChrootIfNecessary) {
val chrootIndex = connectString.indexOf("/")
if (chrootIndex > 0) {
val zkConnWithoutChrootForChrootCreation = connectString.substring(0, chrootIndex)
val zkClientForChrootCreation = KafkaZkClient(zkConnWithoutChrootForChrootCreation, isSecure, sessionTimeoutMs,
connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType, name, zkClientConfig)
val zkClientForChrootCreation = apply(zkConnWithoutChrootForChrootCreation, isSecure, sessionTimeoutMs,
connectionTimeoutMs, maxInFlightRequests, time, name, zkClientConfig, metricGroup, metricType)
try {
val chroot = connectString.substring(chrootIndex)
if (!zkClientForChrootCreation.pathExists(chroot)) {
Expand All @@ -1963,7 +1978,7 @@ object KafkaZkClient {
}
}
val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests,
time, metricGroup, metricType, name, zkClientConfig)
time, metricGroup, metricType, zkClientConfig, name)
new KafkaZkClient(zooKeeperClient, isSecure, time)
}

Expand Down
25 changes: 4 additions & 21 deletions core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
Expand Up @@ -61,24 +61,10 @@ class ZooKeeperClient(connectString: String,
time: Time,
metricGroup: String,
metricType: String,
name: Option[String],
zkClientConfig: Option[ZKClientConfig]) extends Logging with KafkaMetricsGroup {

def this(connectString: String,
sessionTimeoutMs: Int,
connectionTimeoutMs: Int,
maxInFlightRequests: Int,
time: Time,
metricGroup: String,
metricType: String) = {
this(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType, None,
None)
}
private[zookeeper] val clientConfig: ZKClientConfig,
name: String) extends Logging with KafkaMetricsGroup {

this.logIdent = name match {
case Some(n) => s"[ZooKeeperClient $n] "
case _ => "[ZooKeeperClient] "
}
this.logIdent = s"[ZooKeeperClient $name] "
private val initializationLock = new ReentrantReadWriteLock()
private val isConnectedOrExpiredLock = new ReentrantLock()
private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition()
Expand Down Expand Up @@ -109,13 +95,10 @@ class ZooKeeperClient(connectString: String,
}
}

private val clientConfig = zkClientConfig getOrElse new ZKClientConfig()

info(s"Initializing a new session to $connectString.")
// Fail-fast if there's an error during construction (so don't call initialize, which retries forever)
@volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher,
clientConfig)
private[zookeeper] def getClientConfig = clientConfig

newGauge("SessionState", () => connectionState.toString)

Expand Down Expand Up @@ -436,7 +419,7 @@ class ZooKeeperClient(connectString: String,
}, delayMs, period = -1L, unit = TimeUnit.MILLISECONDS)
}

private def threadPrefix: String = name.map(n => n.replaceAll("\\s", "") + "-").getOrElse("")
private def threadPrefix: String = name.replaceAll("\\s", "") + "-"

// package level visibility for testing only
private[zookeeper] object ZooKeeperClientWatcher extends Watcher {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/integration/kafka/api/SaslSetup.scala
Expand Up @@ -195,7 +195,7 @@ trait SaslSetup {
val zkClientConfig = new ZKClientConfig()
val zkClient = KafkaZkClient(
zkConnect, JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig))
Int.MaxValue, Time.SYSTEM, name = "SaslSetup", zkClientConfig = zkClientConfig)
val adminZkClient = new AdminZkClient(zkClient)

val entityType = ConfigType.User
Expand Down

0 comments on commit 18e9489

Please sign in to comment.