Skip to content

Commit

Permalink
KAFKA-16126: Kcontroller dynamic configurations may fail to apply at …
Browse files Browse the repository at this point in the history
…startup

Some kcontroller dynamic configurations may fail to apply at startup. This happens because there is
a race between registering the reconfigurables to the DynamicBrokerConfig class, and receiving the
first update from the metadata publisher. We can fix this by registering the reconfigurables first.
This seems to have been introduced by the "MINOR: Install ControllerServer metadata publishers
sooner" change.

Reviewers: Ron Dagostino  <rdagostino@confluent.io>
  • Loading branch information
cmccabe authored and showuon committed Jan 22, 2024
1 parent d4f5f4a commit ee356b6
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 17 deletions.
11 changes: 7 additions & 4 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Expand Up @@ -368,6 +368,13 @@ class ControllerServer(
),
"controller"))

// Register this instance for dynamic config changes to the KafkaConfig. This must be called
// after the authorizer and quotaManagers are initialized, since it references those objects.
// It must be called before DynamicClientQuotaPublisher is installed, since otherwise we may
// miss the initial update which establishes the dynamic configurations that are in effect on
// startup.
config.dynamicConfig.addReconfigurables(this)

// Set up the client quotas publisher. This will enable controller mutation quotas and any
// other quotas which are applicable.
metadataPublishers.add(new DynamicClientQuotaPublisher(
Expand All @@ -384,7 +391,6 @@ class ControllerServer(
credentialProvider
))


// Set up the DelegationToken publisher.
// We need a tokenManager for the Publisher
// The tokenCache in the tokenManager is the same used in DelegationTokenControlManager
Expand Down Expand Up @@ -450,9 +456,6 @@ class ControllerServer(
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"all of the SocketServer Acceptors to be started",
socketServerFuture, startupDeadline, time)

// register this instance for dynamic config changes to the KafkaConfig
config.dynamicConfig.addReconfigurables(this)
} catch {
case e: Throwable =>
maybeChangeStatus(STARTING, STARTED)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaRequestHandler.scala
Expand Up @@ -206,7 +206,7 @@ class KafkaRequestHandlerPool(
) extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)

private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)

Expand Down
25 changes: 14 additions & 11 deletions core/src/test/java/kafka/testkit/TestKitNodes.java
Expand Up @@ -33,7 +33,8 @@ public class TestKitNodes {
public static class Builder {
private boolean combined = false;
private Uuid clusterId = null;
private MetadataVersion bootstrapMetadataVersion = null;
private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
fromVersion(MetadataVersion.latest(), "testkit");
private final NavigableMap<Integer, ControllerNode.Builder> controllerNodeBuilders = new TreeMap<>();
private final NavigableMap<Integer, BrokerNode.Builder> brokerNodeBuilders = new TreeMap<>();

Expand All @@ -43,7 +44,12 @@ public Builder setClusterId(Uuid clusterId) {
}

public Builder setBootstrapMetadataVersion(MetadataVersion metadataVersion) {
this.bootstrapMetadataVersion = metadataVersion;
this.bootstrapMetadata = BootstrapMetadata.fromVersion(metadataVersion, "testkit");
return this;
}

public Builder setBootstrapMetadata(BootstrapMetadata bootstrapMetadata) {
this.bootstrapMetadata = bootstrapMetadata;
return this;
}

Expand Down Expand Up @@ -97,9 +103,6 @@ public TestKitNodes build() {
if (clusterId == null) {
clusterId = Uuid.randomUuid();
}
if (bootstrapMetadataVersion == null) {
bootstrapMetadataVersion = MetadataVersion.latest();
}
TreeMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
for (ControllerNode.Builder builder : controllerNodeBuilders.values()) {
ControllerNode node = builder.
Expand All @@ -118,7 +121,7 @@ public TestKitNodes build() {
}
return new TestKitNodes(baseDirectory,
clusterId,
bootstrapMetadataVersion,
bootstrapMetadata,
controllerNodes,
brokerNodes);
} catch (Exception e) {
Expand All @@ -145,20 +148,20 @@ private int startControllerId() {

private final String baseDirectory;
private final Uuid clusterId;
private final MetadataVersion bootstrapMetadataVersion;
private final BootstrapMetadata bootstrapMetadata;
private final NavigableMap<Integer, ControllerNode> controllerNodes;
private final NavigableMap<Integer, BrokerNode> brokerNodes;

private TestKitNodes(
String baseDirectory,
Uuid clusterId,
MetadataVersion bootstrapMetadataVersion,
BootstrapMetadata bootstrapMetadata,
NavigableMap<Integer, ControllerNode> controllerNodes,
NavigableMap<Integer, BrokerNode> brokerNodes
) {
this.baseDirectory = baseDirectory;
this.clusterId = clusterId;
this.bootstrapMetadataVersion = bootstrapMetadataVersion;
this.bootstrapMetadata = bootstrapMetadata;
this.controllerNodes = controllerNodes;
this.brokerNodes = brokerNodes;
}
Expand All @@ -176,15 +179,15 @@ public Uuid clusterId() {
}

public MetadataVersion bootstrapMetadataVersion() {
return bootstrapMetadataVersion;
return bootstrapMetadata.metadataVersion();
}

public Map<Integer, ControllerNode> controllerNodes() {
return controllerNodes;
}

public BootstrapMetadata bootstrapMetadata() {
return BootstrapMetadata.fromVersion(bootstrapMetadataVersion(), "testkit");
return bootstrapMetadata;
}

public NavigableMap<Integer, BrokerNode> brokerNodes() {
Expand Down
Expand Up @@ -28,6 +28,7 @@ import org.apache.kafka.common.config.{ConfigException, ConfigResource}
import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.errors.{PolicyViolationException, UnsupportedVersionException}
import org.apache.kafka.common.message.DescribeClusterRequestData
import org.apache.kafka.common.metadata.{ConfigRecord, FeatureLevelRecord}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors._
Expand All @@ -38,8 +39,9 @@ import org.apache.kafka.common.{Cluster, Endpoint, Reconfigurable, TopicPartitio
import org.apache.kafka.controller.{QuorumController, QuorumControllerIntegrationTestUtils}
import org.apache.kafka.image.ClusterImage
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.quota
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaType}
Expand Down Expand Up @@ -1219,6 +1221,36 @@ class KRaftClusterTest {
cluster.close()
}
}

@Test
def testStartupWithNonDefaultKControllerDynamicConfiguration(): Unit = {
val bootstrapRecords = util.Arrays.asList(
new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.IBP_3_7_IV0.featureLevel), 0.toShort),
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(ConfigResource.Type.BROKER.id).
setResourceName("").
setName("num.io.threads").
setValue("9"), 0.toShort))
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadata(BootstrapMetadata.fromRecords(bootstrapRecords, "testRecords")).
setNumBrokerNodes(1).
setNumControllerNodes(1).build()).
build()
try {
cluster.format()
cluster.startup()
val controller = cluster.controllers().values().iterator().next()
TestUtils.retry(60000) {
assertNotNull(controller.controllerApisHandlerPool)
assertEquals(9, controller.controllerApisHandlerPool.threadPoolSize.get())
}
} finally {
cluster.close()
}
}
}

class BadAuthorizer() extends Authorizer {
Expand Down

0 comments on commit ee356b6

Please sign in to comment.