From 85153c24fbe6804902e620b95cff86e70cb85521 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 12 Dec 2024 14:41:14 -0500 Subject: [PATCH 1/4] GH-3662: Support Kafka 3.9.0+ in EmbeddedKafkaKraftBroker Fixes: #3662 Issue: https://github.com/spring-projects/spring-kafka/issues/3662 Add compatibility for both Kafka 3.8.0 and 3.9.0+ by handling different method signatures for setConfigProp: - 3.9.0+: setConfigProp(String, Object) - 3.8.0: setConfigProp(String, String) The change uses reflection to detect Kafka version and call appropriate method. --- .../kafka/test/EmbeddedKafkaKraftBroker.java | 37 ++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java index 35f80813c6..9d975b31d8 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java @@ -18,6 +18,8 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.file.Files; import java.time.Duration; import java.util.AbstractMap.SimpleEntry; @@ -77,6 +79,8 @@ */ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker { + private static final String CLASS_EXISTS_ONLY_IN_390 = "org.apache.kafka.server.config.AbstractKafkaConfig"; + private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(EmbeddedKafkaKraftBroker.class)); /** @@ -217,7 +221,7 @@ private void start() { .setNumBrokerNodes(this.count) .setNumControllerNodes(this.count) .build()); - this.brokerProperties.forEach((k, v) -> clusterBuilder.setConfigProp((String) k, (String) v)); + this.brokerProperties.forEach((k, v) -> setConfigProperty(clusterBuilder, k, v)); this.cluster = clusterBuilder.build(); } catch (Exception ex) { @@ -243,6 +247,37 @@ private void start() { System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString()); } + private static void setConfigProperty(Object clusterBuilder, Object key, Object value) { + try { + boolean isKafka39OrLater = isClassicConsumerPresent(); + Class builderClass = clusterBuilder.getClass(); + + if (isKafka39OrLater) { + // For Kafka 3.9.0+: setConfigProp(String, Object) + Method setConfigMethod = builderClass.getMethod("setConfigProp", String.class, Object.class); + setConfigMethod.invoke(clusterBuilder, (String) key, value); + } + else { + // For Kafka 3.8.0: setConfigProp(String, String) + Method setConfigMethod = builderClass.getMethod("setConfigProp", String.class, String.class); + setConfigMethod.invoke(clusterBuilder, (String) key, (String) value); + } + } + catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException("Failed to set config property", e); + } + } + + private static boolean isClassicConsumerPresent() { + try { + Class.forName(CLASS_EXISTS_ONLY_IN_390); + return true; // Class exists - Kafka 3.9.0+ + } + catch (ClassNotFoundException e) { + return false; // Class doesn't exist - Kafka 3.8.0 + } + } + @Override public void destroy() { AtomicReference shutdownFailure = new AtomicReference<>(); From ec113377915b2fde35ca2484e7131f146996ccc5 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 12 Dec 2024 14:54:58 -0500 Subject: [PATCH 2/4] Direct call to setConfigProp when using 3.8.0 --- .../kafka/test/EmbeddedKafkaKraftBroker.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java index 9d975b31d8..a361af65ea 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java @@ -247,7 +247,7 @@ private void start() { System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString()); } - private static void setConfigProperty(Object clusterBuilder, Object key, Object value) { + private static void setConfigProperty(KafkaClusterTestKit.Builder clusterBuilder, Object key, Object value) { try { boolean isKafka39OrLater = isClassicConsumerPresent(); Class builderClass = clusterBuilder.getClass(); @@ -258,9 +258,8 @@ private static void setConfigProperty(Object clusterBuilder, Object key, Object setConfigMethod.invoke(clusterBuilder, (String) key, value); } else { - // For Kafka 3.8.0: setConfigProp(String, String) - Method setConfigMethod = builderClass.getMethod("setConfigProp", String.class, String.class); - setConfigMethod.invoke(clusterBuilder, (String) key, (String) value); + // For Kafka 3.8.0: direct call to setConfigProp(String, String) + clusterBuilder.setConfigProp((String) key, (String) value); } } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { From bd2d4786bfcb3bbd39cb17d4003fb1692c9d14f1 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 12 Dec 2024 15:14:58 -0500 Subject: [PATCH 3/4] Addressing PR review --- .../kafka/test/EmbeddedKafkaKraftBroker.java | 52 +++++++++---------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java index a361af65ea..11e08010a7 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.file.Files; import java.time.Duration; @@ -58,6 +57,8 @@ import org.springframework.core.log.LogAccessor; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; +import org.springframework.util.ReflectionUtils; /** * An embedded Kafka Broker(s) using KRaft. @@ -91,6 +92,24 @@ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker { public static final int DEFAULT_ADMIN_TIMEOUT = 10; + private static final boolean IS_KAFKA_39_OR_LATER; + + private static final Method SET_CONFIG_METHOD; + + static { + IS_KAFKA_39_OR_LATER = ClassUtils.isPresent(CLASS_EXISTS_ONLY_IN_390, EmbeddedKafkaKraftBroker.class.getClassLoader()); + + if (IS_KAFKA_39_OR_LATER) { + SET_CONFIG_METHOD = ReflectionUtils.findMethod( + KafkaClusterTestKit.Builder.class, + "setConfigProp", + String.class, Object.class); + } + else { + SET_CONFIG_METHOD = null; + } + } + private final int count; private final Set topics; @@ -248,32 +267,13 @@ private void start() { } private static void setConfigProperty(KafkaClusterTestKit.Builder clusterBuilder, Object key, Object value) { - try { - boolean isKafka39OrLater = isClassicConsumerPresent(); - Class builderClass = clusterBuilder.getClass(); - - if (isKafka39OrLater) { - // For Kafka 3.9.0+: setConfigProp(String, Object) - Method setConfigMethod = builderClass.getMethod("setConfigProp", String.class, Object.class); - setConfigMethod.invoke(clusterBuilder, (String) key, value); - } - else { - // For Kafka 3.8.0: direct call to setConfigProp(String, String) - clusterBuilder.setConfigProp((String) key, (String) value); - } + if (IS_KAFKA_39_OR_LATER) { + // For Kafka 3.9.0+: use reflection + ReflectionUtils.invokeMethod(SET_CONFIG_METHOD, clusterBuilder, (String) key, value); } - catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException("Failed to set config property", e); - } - } - - private static boolean isClassicConsumerPresent() { - try { - Class.forName(CLASS_EXISTS_ONLY_IN_390); - return true; // Class exists - Kafka 3.9.0+ - } - catch (ClassNotFoundException e) { - return false; // Class doesn't exist - Kafka 3.8.0 + else { + // For Kafka 3.8.0: direct call + clusterBuilder.setConfigProp((String) key, (String) value); } } From 87805c8904e799c8635adbf0350de52e60092f59 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Fri, 13 Dec 2024 12:01:33 -0500 Subject: [PATCH 4/4] Addressing PR review --- .../kafka/test/EmbeddedKafkaKraftBroker.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java index 11e08010a7..970a87d1c0 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java @@ -80,8 +80,6 @@ */ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker { - private static final String CLASS_EXISTS_ONLY_IN_390 = "org.apache.kafka.server.config.AbstractKafkaConfig"; - private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(EmbeddedKafkaKraftBroker.class)); /** @@ -92,13 +90,12 @@ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker { public static final int DEFAULT_ADMIN_TIMEOUT = 10; - private static final boolean IS_KAFKA_39_OR_LATER; + private static final boolean IS_KAFKA_39_OR_LATER = ClassUtils.isPresent( + "org.apache.kafka.server.config.AbstractKafkaConfig", EmbeddedKafkaKraftBroker.class.getClassLoader()); private static final Method SET_CONFIG_METHOD; static { - IS_KAFKA_39_OR_LATER = ClassUtils.isPresent(CLASS_EXISTS_ONLY_IN_390, EmbeddedKafkaKraftBroker.class.getClassLoader()); - if (IS_KAFKA_39_OR_LATER) { SET_CONFIG_METHOD = ReflectionUtils.findMethod( KafkaClusterTestKit.Builder.class, @@ -240,7 +237,7 @@ private void start() { .setNumBrokerNodes(this.count) .setNumControllerNodes(this.count) .build()); - this.brokerProperties.forEach((k, v) -> setConfigProperty(clusterBuilder, k, v)); + this.brokerProperties.forEach((k, v) -> setConfigProperty(clusterBuilder, (String) k, v)); this.cluster = clusterBuilder.build(); } catch (Exception ex) { @@ -266,14 +263,14 @@ private void start() { System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString()); } - private static void setConfigProperty(KafkaClusterTestKit.Builder clusterBuilder, Object key, Object value) { + private static void setConfigProperty(KafkaClusterTestKit.Builder clusterBuilder, String key, Object value) { if (IS_KAFKA_39_OR_LATER) { // For Kafka 3.9.0+: use reflection - ReflectionUtils.invokeMethod(SET_CONFIG_METHOD, clusterBuilder, (String) key, value); + ReflectionUtils.invokeMethod(SET_CONFIG_METHOD, clusterBuilder, key, value); } else { // For Kafka 3.8.0: direct call - clusterBuilder.setConfigProp((String) key, (String) value); + clusterBuilder.setConfigProp(key, (String) value); } }