From f602b953c835aeae746e64d67a803bfd48c04a2e Mon Sep 17 00:00:00 2001 From: Henry Cai Date: Sun, 23 Jun 2024 13:55:37 -0700 Subject: [PATCH] Use literal config name for listeners and broker.id config Fix issue: https://github.com/linkedin/cruise-control/issues/2168 When running embedded MetricsReporter with the upcoming Kafka 3.8, hit the following exception: [2024-06-18 16:08:06,722] ERROR [KafkaServer id=1025] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.lang.NoSuchMethodError: 'java.lang.String kafka.server.KafkaConfig.ListenersProp()' at com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.getBootstrapServers(CruiseControlMetricsReporter.java:129) The root cause is Kafka 3.8 (maybe also 3.7) refactored the core module for many configuration parameters, see this PR: apache/kafka@1b301b3?diff=split&w=0#diff-8eb3e01716508551f203b0b37c2d5f951e93cce7ffed7c00c2b33633d7c8ed23 The easiest fix is just to use hard-coded literal string 'listeners' for this config parameter, the name of this parameter is unlikely to change (otherwise it will break million's customer's server.properties). Although you can try to change the variable name reference from KafkaConfig.ListenersProp() to SocketServerConfigs.LISTENERS_CONFIG, but SocketServerConfigs is also a new class introduced in Kafka 3.8, to do that you would have to create a new CC branch migrate_to_3_8 to make the compilation work, but maintaining different branches for different kafka version is cumbersome and hard to maintain in the long run. This also applies on another property broker.id --- .../metricsreporter/CruiseControlMetricsReporter.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporter.java b/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporter.java index 18d49744a..58ef29328 100644 --- a/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporter.java +++ b/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporter.java @@ -25,7 +25,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import kafka.server.KafkaConfig; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClient; @@ -126,7 +125,7 @@ public void close() { static String getBootstrapServers(Map configs) { Object port = configs.get("port"); - String listeners = String.valueOf(configs.get(KafkaConfig.ListenersProp())); + String listeners = String.valueOf(configs.get("listeners")); if (!"null".equals(listeners) && listeners.length() != 0) { // See https://kafka.apache.org/documentation/#listeners for possible responses. If multiple listeners are configured, this function // picks the first listener in the list of listeners. Hence, users of this config must adjust their order accordingly. @@ -184,7 +183,7 @@ public void configure(Map configs) { this.close(); } - _brokerId = Integer.parseInt((String) configs.get(KafkaConfig.BrokerIdProp())); + _brokerId = Integer.parseInt((String) configs.get("broker.id")); _cruiseControlMetricsTopic = reporterConfig.getString(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG); _reportingIntervalMs = reporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG);