Skip to content

Commit

Permalink
Merge pull request #16435 from cescoffier/attempt-to-fix-kafka-ci-issues
Browse files Browse the repository at this point in the history
Attempt to fix the Kafka flaky tests
  • Loading branch information
gsmet committed Apr 14, 2021
2 parents 2dc8bc8 + 70d9d46 commit 890149e
Show file tree
Hide file tree
Showing 60 changed files with 1,812 additions and 701 deletions.
2 changes: 0 additions & 2 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ updates:
# Kafka
- dependency-name: org.apache.kafka:*
- dependency-name: org.apache.zookeeper:zookeeper
# Debezium
- dependency-name: io.debezium:debezium-core
# Scala
- dependency-name: org.scala-lang:*
- dependency-name: net.alchim31.maven:scala-maven-plugin
Expand Down
14 changes: 10 additions & 4 deletions .github/native-tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
"Security2",
"Security3",
"Amazon",
"Messaging",
"Messaging1",
"Messaging2",
"Cache",
"HTTP",
"Misc1",
Expand Down Expand Up @@ -63,9 +64,14 @@
"test-modules": "amazon-services amazon-lambda amazon-lambda-http"
},
{
"category": "Messaging",
"timeout": 120,
"test-modules": "artemis-core artemis-jms kafka kafka-avro kafka-snappy kafka-streams reactive-messaging-amqp reactive-messaging-kafka reactive-messaging-http"
"category": "Messaging1",
"timeout": 100,
"test-modules": "kafka kafka-ssl kafka-sasl kafka-avro kafka-snappy kafka-streams reactive-messaging-kafka"
},
{
"category": "Messaging2",
"timeout": 70,
"test-modules": "artemis-core artemis-jms reactive-messaging-amqp reactive-messaging-http"
},
{
"category": "Security1",
Expand Down
15 changes: 5 additions & 10 deletions bom/test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
<description>Dependency management for integration tests. Importable by third party extension developers.</description>

<properties>
<debezium.version>1.4.2.Final</debezium.version>
<testcontainers.version>1.15.2</testcontainers.version>
<strimzi-test-container.version>0.22.1</strimzi-test-container.version>
</properties>

<dependencyManagement>
Expand All @@ -31,15 +31,10 @@
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${debezium.version}</version>
<type>test-jar</type>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<version>${strimzi-test-container.version}</version>
<scope>test</scope>
</dependency>

</dependencies>
Expand Down
21 changes: 8 additions & 13 deletions integration-tests/kafka-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,20 +123,15 @@
</exclusions>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.time.Duration;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
Expand All @@ -20,29 +21,32 @@
@Path("/avro")
public class AvroEndpoint {

@Inject
AvroKafkaCreator creator;

@GET
@Path("/confluent")
public JsonObject getConfluent() {
return get(AvroKafkaCreator.createConfluentConsumer("test-avro-confluent-consumer", "test-avro-confluent-consumer"));
return get(creator.createConfluentConsumer("test-avro-confluent-consumer", "test-avro-confluent-consumer"));
}

@POST
@Path("/confluent")
public void sendConfluent(Pet pet) {
KafkaProducer<Integer, Pet> p = AvroKafkaCreator.createConfluentProducer("test-avro-confluent");
KafkaProducer<Integer, Pet> p = creator.createConfluentProducer("test-avro-confluent");
send(p, pet, "test-avro-confluent-producer");
}

@GET
@Path("/apicurio")
public JsonObject getApicurio() {
return get(AvroKafkaCreator.createApicurioConsumer("test-avro-apicurio-consumer", "test-avro-apicurio-consumer"));
return get(creator.createApicurioConsumer("test-avro-apicurio-consumer", "test-avro-apicurio-consumer"));
}

@POST
@Path("/apicurio")
public void sendApicurio(Pet pet) {
KafkaProducer<Integer, Pet> p = AvroKafkaCreator.createApicurioProducer("test-avro-apicurio");
KafkaProducer<Integer, Pet> p = creator.createApicurioProducer("test-avro-apicurio");
send(p, pet, "test-avro-apicurio-producer");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@

import java.util.Collections;
import java.util.Properties;
import java.util.UUID;

import javax.enterprise.context.ApplicationScoped;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.apicurio.registry.utils.serde.AbstractKafkaSerDe;
import io.apicurio.registry.utils.serde.AbstractKafkaSerializer;
Expand All @@ -26,57 +30,93 @@
/**
* Create Avro Kafka Consumers and Producers
*/
@ApplicationScoped
public class AvroKafkaCreator {

public static KafkaConsumer<Integer, Pet> createConfluentConsumer(String groupdIdConfig, String subscribtionName) {
Properties p = getConfluentConsumerProperties(groupdIdConfig);
@ConfigProperty(name = "kafka.bootstrap.servers")
String bootstrap;
@ConfigProperty(name = "schema.url.confluent")
String confluent;
@ConfigProperty(name = "schema.url.apicurio")
String apicurio;

public KafkaConsumer<Integer, Pet> createConfluentConsumer(String groupdIdConfig, String subscribtionName) {
return createConfluentConsumer(bootstrap, confluent, groupdIdConfig, subscribtionName);
}

public KafkaProducer<Integer, Pet> createConfluentProducer(String clientId) {
return createConfluentProducer(bootstrap, confluent, clientId);
}

public KafkaConsumer<Integer, Pet> createApicurioConsumer(String groupdIdConfig, String subscribtionName) {
return createApicurioConsumer(bootstrap, apicurio, groupdIdConfig, subscribtionName);
}

public KafkaProducer<Integer, Pet> createApicurioProducer(String clientId) {
return createApicurioProducer(bootstrap, apicurio, clientId);
}

public static KafkaConsumer<Integer, Pet> createConfluentConsumer(String bootstrap, String confluent,
String groupdIdConfig, String subscribtionName) {
Properties p = getConfluentConsumerProperties(bootstrap, confluent, groupdIdConfig);
return createConsumer(p, subscribtionName);
}

public static KafkaConsumer<Integer, Pet> createApicurioConsumer(String groupdIdConfig, String subscribtionName) {
Properties p = getApicurioConsumerProperties(groupdIdConfig);
public static KafkaConsumer<Integer, Pet> createApicurioConsumer(String bootstrap, String apicurio,
String groupdIdConfig, String subscribtionName) {
Properties p = getApicurioConsumerProperties(bootstrap, apicurio, groupdIdConfig);
return createConsumer(p, subscribtionName);
}

public static KafkaProducer<Integer, Pet> createConfluentProducer(String clientId) {
Properties p = getConfluentProducerProperties(clientId);
public static KafkaProducer<Integer, Pet> createConfluentProducer(String bootstrap, String confluent,
String clientId) {
Properties p = getConfluentProducerProperties(bootstrap, confluent, clientId);
return createProducer(p);
}

public static KafkaProducer<Integer, Pet> createApicurioProducer(String clientId) {
Properties p = getApicurioProducerProperties(clientId);
public static KafkaProducer<Integer, Pet> createApicurioProducer(String bootstrap, String apicurio,
String clientId) {
Properties p = getApicurioProducerProperties(bootstrap, apicurio, clientId);
return createProducer(p);
}

private static KafkaConsumer<Integer, Pet> createConsumer(Properties props, String subscribtionName) {
if (!props.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)) {
props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
}
KafkaConsumer<Integer, Pet> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(subscribtionName));
return consumer;
}

private static KafkaProducer<Integer, Pet> createProducer(Properties props) {
if (!props.containsKey(ProducerConfig.CLIENT_ID_CONFIG)) {
props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
}
return new KafkaProducer<>(props);
}

private static Properties getConfluentConsumerProperties(String groupdIdConfig) {
Properties props = getGenericConsumerProperties(groupdIdConfig);
private static Properties getConfluentConsumerProperties(String bootstrap, String confluent,
String groupdIdConfig) {
Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, System.getProperty("schema.url.confluent"));
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, confluent);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
return props;
}

public static Properties getApicurioConsumerProperties(String groupdIdConfig) {
Properties props = getGenericConsumerProperties(groupdIdConfig);
public static Properties getApicurioConsumerProperties(String bootstrap, String apicurio, String groupdIdConfig) {
Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
props.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, System.getProperty("schema.url.apicurio"));
props.put(AvroDatumProvider.REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM, ReflectAvroDatumProvider.class.getName());
props.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, apicurio);
props.put(AvroDatumProvider.REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM,
ReflectAvroDatumProvider.class.getName());
return props;
}

private static Properties getGenericConsumerProperties(String groupdIdConfig) {
private static Properties getGenericConsumerProperties(String bootstrap, String groupdIdConfig) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupdIdConfig);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
Expand All @@ -85,27 +125,30 @@ private static Properties getGenericConsumerProperties(String groupdIdConfig) {
return props;
}

private static Properties getConfluentProducerProperties(String clientId) {
Properties props = getGenericProducerProperties(clientId);
private static Properties getConfluentProducerProperties(String bootstrap, String confluent, String clientId) {
Properties props = getGenericProducerProperties(bootstrap, clientId);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, System.getProperty("schema.url.confluent"));
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, confluent);
return props;
}

private static Properties getApicurioProducerProperties(String clientId) {
Properties props = getGenericProducerProperties(clientId);
private static Properties getApicurioProducerProperties(String bootstrap, String apicurio, String clientId) {
Properties props = getGenericProducerProperties(bootstrap, clientId);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());
props.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, System.getProperty("schema.url.apicurio"));
props.put(AbstractKafkaSerializer.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM, SimpleTopicIdStrategy.class.getName());
props.put(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, GetOrCreateIdStrategy.class.getName());
props.put(AvroDatumProvider.REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM, ReflectAvroDatumProvider.class.getName());
props.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, apicurio);
props.put(AbstractKafkaSerializer.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM,
SimpleTopicIdStrategy.class.getName());
props.put(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM,
GetOrCreateIdStrategy.class.getName());
props.put(AvroDatumProvider.REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM,
ReflectAvroDatumProvider.class.getName());
return props;
}

private static Properties getGenericProducerProperties(String clientId) {
private static Properties getGenericProducerProperties(String bootstrap, String clientId) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
return props;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN

# enable health check
quarkus.kafka.health.enabled=true
kafka.bootstrap.servers=localhost:19092

Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.quarkus.it.kafka;

import java.util.HashMap;
import java.util.Map;

import org.testcontainers.containers.GenericContainer;

import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.strimzi.StrimziKafkaContainer;

public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {

private static final StrimziKafkaContainer kafka = new StrimziKafkaContainer();

private static GenericContainer<?> registry;

public static String getBootstrapServers() {
return kafka.getBootstrapServers();
}

public static String getConfluentSchemaRegistryUrl() {
return "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/api/ccompat";
}

public static String getApicurioSchemaRegistryUrl() {
return "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/api";
}

@Override
public Map<String, String> start() {
kafka.start();
registry = new GenericContainer<>("apicurio/apicurio-registry-mem:1.2.2.Final")
.withExposedPorts(8080)
.withEnv("QUARKUS_PROFILE", "prod")
.withEnv("KAFKA_BOOTSTRAP_SERVERS", kafka.getBootstrapServers())
.withEnv("APPLICATION_ID", "registry_id")
.withEnv("APPLICATION_SERVER", "localhost:9000");
registry.start();
Map<String, String> properties = new HashMap<>();
properties.put("schema.url.confluent",
"http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/api/ccompat");
properties.put("schema.url.apicurio",
"http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/api");
properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
return properties;
}

@Override
public void stop() {
registry.stop();
kafka.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import io.quarkus.test.junit.NativeImageTest;

@NativeImageTest
@QuarkusTestResource(KafkaTestResource.class)
@QuarkusTestResource(SchemaRegistryTestResource.class)
@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
public class KafkaAvroIT extends KafkaAvroTest {

}
Loading

0 comments on commit 890149e

Please sign in to comment.