Skip to content

Commit

Permalink
Explicitly wait for the broker to be running before executing the test.
Browse files Browse the repository at this point in the history
Also, increase the range between used ports, as a broker may register multiple ports.

Unfortunately, this commit does not fix the conflicting broker ids because Debezium overrides the broker id and set 1 to all of them.

(cherry picked from commit 7289e85)
  • Loading branch information
cescoffier authored and gsmet committed Apr 7, 2021
1 parent e60d2de commit 77476a0
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 4 deletions.
5 changes: 5 additions & 0 deletions integration-tests/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@
<artifactId>kafka_2.12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

<!-- Minimal test dependencies to *-deployment artifacts for consistent build order -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private static void addSSL(Properties props) {

public static KafkaConsumer<Integer, String> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19093");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19099");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.quarkus.it.kafka;

import static io.quarkus.it.kafka.KafkaTestResource.extract;
import static org.awaitility.Awaitility.await;

import java.io.File;
import java.util.Collections;
import java.util.Map;
Expand All @@ -10,6 +13,8 @@
import io.debezium.kafka.KafkaCluster;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import kafka.server.KafkaServer;
import kafka.server.RunningAsBroker;

public class KafkaSASLTestResource implements QuarkusTestResourceLifecycleManager {

Expand Down Expand Up @@ -46,6 +51,10 @@ public Map<String, String> start() {
throw new RuntimeException(e);
}

KafkaServer server = extract(kafka);
await().until(() -> server.brokerState().currentState() == RunningAsBroker.state());
server.logger().underlying().info("Broker 'kafka-sasl' started");

return Collections.emptyMap();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.quarkus.it.kafka;

import static io.quarkus.it.kafka.KafkaTestResource.extract;
import static org.awaitility.Awaitility.await;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -15,6 +18,8 @@
import io.debezium.kafka.KafkaCluster;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import kafka.server.KafkaServer;
import kafka.server.RunningAsBroker;

public class KafkaSSLTestResource implements QuarkusTestResourceLifecycleManager {

Expand Down Expand Up @@ -48,7 +53,7 @@ public Map<String, String> start() {
props.setProperty("zookeeper.connection.timeout.ms", "45000");
//See http://kafka.apache.org/documentation.html#security_ssl for detail
props.setProperty("listener.security.protocol.map", "CLIENT:SSL");
props.setProperty("listeners", "CLIENT://:19093");
props.setProperty("listeners", "CLIENT://:19099");
props.setProperty("inter.broker.listener.name", "CLIENT");
props.setProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ksPath.toString());
props.setProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
Expand All @@ -60,7 +65,7 @@ public Map<String, String> start() {
props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

kafka = new KafkaCluster()
.withPorts(2183, 19093)
.withPorts(2189, 19099)
.addBrokers(1)
.usingDirectory(directory)
.deleteDataUponShutdown(true)
Expand All @@ -70,6 +75,11 @@ public Map<String, String> start() {
} catch (Exception e) {
throw new RuntimeException(e);
}

KafkaServer server = extract(kafka);
await().until(() -> server.brokerState().currentState() == RunningAsBroker.state());
server.logger().underlying().info("Broker 'kafka-ssl' started");

return Collections.emptyMap();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package io.quarkus.it.kafka;

import static org.awaitility.Awaitility.await;

import java.io.File;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

import io.debezium.kafka.KafkaCluster;
import io.debezium.kafka.KafkaServer;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import kafka.server.RunningAsBroker;

public class KafkaTestResource implements QuarkusTestResourceLifecycleManager {

Expand All @@ -29,6 +34,11 @@ public Map<String, String> start() {
} catch (Exception e) {
throw new RuntimeException(e);
}

kafka.server.KafkaServer server = extract(kafka);
await().until(() -> server.brokerState().currentState() == RunningAsBroker.state());
server.logger().underlying().info("Broker 'kafka' started");

return Collections.emptyMap();
}

Expand All @@ -38,4 +48,22 @@ public void stop() {
kafka.shutdown();
}
}

@SuppressWarnings("unchecked")
static kafka.server.KafkaServer extract(KafkaCluster cluster) {
Field kafkaServersField;
Field serverField;
try {
kafkaServersField = cluster.getClass().getDeclaredField("kafkaServers");
kafkaServersField.setAccessible(true);
Map<Integer, KafkaServer> map = (Map<Integer, KafkaServer>) kafkaServersField.get(cluster);
KafkaServer server = map.get(1);
serverField = KafkaServer.class.getDeclaredField("server");
serverField.setAccessible(true);
return (kafka.server.KafkaServer) serverField.get(server);
} catch (Exception e) {
throw new RuntimeException(e);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private static void addSsl(Properties props) {

public static Producer<Integer, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19093");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19099");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-ssl-producer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Expand Down

0 comments on commit 77476a0

Please sign in to comment.