diff --git a/integration-tests/kafka/pom.xml b/integration-tests/kafka/pom.xml index e045d16835463..284140bcbb64a 100644 --- a/integration-tests/kafka/pom.xml +++ b/integration-tests/kafka/pom.xml @@ -88,6 +88,11 @@ kafka_2.12 test + + org.awaitility + awaitility + test + diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/ssl/SslKafkaEndpoint.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/ssl/SslKafkaEndpoint.java index ba05f3d5a5dcf..9a0ac69fc8832 100644 --- a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/ssl/SslKafkaEndpoint.java +++ b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/ssl/SslKafkaEndpoint.java @@ -52,7 +52,7 @@ private static void addSSL(Properties props) { public static KafkaConsumer createConsumer() { Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19093"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19099"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaSASLTestResource.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaSASLTestResource.java index 79ff20527d595..a0c73ab7a689e 100644 --- a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaSASLTestResource.java +++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaSASLTestResource.java @@ -1,5 +1,8 @@ package io.quarkus.it.kafka; +import static io.quarkus.it.kafka.KafkaTestResource.extract; +import static org.awaitility.Awaitility.await; + import java.io.File; import java.util.Collections; import java.util.Map; @@ -10,6 +13,8 @@ import io.debezium.kafka.KafkaCluster; import io.debezium.util.Testing; import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; +import kafka.server.KafkaServer; +import kafka.server.RunningAsBroker; public class KafkaSASLTestResource implements QuarkusTestResourceLifecycleManager { @@ -46,6 +51,10 @@ public Map start() { throw new RuntimeException(e); } + KafkaServer server = extract(kafka); + await().until(() -> server.brokerState().currentState() == RunningAsBroker.state()); + server.logger().underlying().info("Broker 'kafka-sasl' started"); + return Collections.emptyMap(); } diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaSSLTestResource.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaSSLTestResource.java index d24c2e89c4068..5cc87de15f2f0 100644 --- a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaSSLTestResource.java +++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaSSLTestResource.java @@ -1,5 +1,8 @@ package io.quarkus.it.kafka; +import static io.quarkus.it.kafka.KafkaTestResource.extract; +import static org.awaitility.Awaitility.await; + import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -15,6 +18,8 @@ import io.debezium.kafka.KafkaCluster; import io.debezium.util.Testing; import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; +import kafka.server.KafkaServer; +import kafka.server.RunningAsBroker; public class KafkaSSLTestResource implements QuarkusTestResourceLifecycleManager { @@ -48,7 +53,7 @@ public Map start() { props.setProperty("zookeeper.connection.timeout.ms", "45000"); //See http://kafka.apache.org/documentation.html#security_ssl for detail props.setProperty("listener.security.protocol.map", "CLIENT:SSL"); - props.setProperty("listeners", "CLIENT://:19093"); + props.setProperty("listeners", "CLIENT://:19099"); props.setProperty("inter.broker.listener.name", "CLIENT"); props.setProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ksPath.toString()); props.setProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password); @@ -60,7 +65,7 @@ public Map start() { props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); kafka = new KafkaCluster() - .withPorts(2183, 19093) + .withPorts(2189, 19099) .addBrokers(1) .usingDirectory(directory) .deleteDataUponShutdown(true) @@ -70,6 +75,11 @@ public Map start() { } catch (Exception e) { throw new RuntimeException(e); } + + KafkaServer server = extract(kafka); + await().until(() -> server.brokerState().currentState() == RunningAsBroker.state()); + server.logger().underlying().info("Broker 'kafka-ssl' started"); + return Collections.emptyMap(); } diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java index b4e819b120574..18a2440d4646b 100644 --- a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java +++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java @@ -1,13 +1,18 @@ package io.quarkus.it.kafka; +import static org.awaitility.Awaitility.await; + import java.io.File; +import java.lang.reflect.Field; import java.util.Collections; import java.util.Map; import java.util.Properties; import io.debezium.kafka.KafkaCluster; +import io.debezium.kafka.KafkaServer; import io.debezium.util.Testing; import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; +import kafka.server.RunningAsBroker; public class KafkaTestResource implements QuarkusTestResourceLifecycleManager { @@ -29,6 +34,11 @@ public Map start() { } catch (Exception e) { throw new RuntimeException(e); } + + kafka.server.KafkaServer server = extract(kafka); + await().until(() -> server.brokerState().currentState() == RunningAsBroker.state()); + server.logger().underlying().info("Broker 'kafka' started"); + return Collections.emptyMap(); } @@ -38,4 +48,22 @@ public void stop() { kafka.shutdown(); } } + + @SuppressWarnings("unchecked") + static kafka.server.KafkaServer extract(KafkaCluster cluster) { + Field kafkaServersField; + Field serverField; + try { + kafkaServersField = cluster.getClass().getDeclaredField("kafkaServers"); + kafkaServersField.setAccessible(true); + Map map = (Map) kafkaServersField.get(cluster); + KafkaServer server = map.get(1); + serverField = KafkaServer.class.getDeclaredField("server"); + serverField.setAccessible(true); + return (kafka.server.KafkaServer) serverField.get(server); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } } diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/SslKafkaConsumerTest.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/SslKafkaConsumerTest.java index 753ce93953335..db500ffed64b4 100644 --- a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/SslKafkaConsumerTest.java +++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/SslKafkaConsumerTest.java @@ -40,7 +40,7 @@ private static void addSsl(Properties props) { public static Producer createProducer() { Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19093"); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19099"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-ssl-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());