Skip to content

Commit

Permalink
Improve Kafka test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
zbendhiba committed Sep 22, 2021
1 parent 57edc73 commit 20987cb
Show file tree
Hide file tree
Showing 7 changed files with 416 additions and 0 deletions.
22 changes: 22 additions & 0 deletions integration-tests/kafka/pom.xml
Expand Up @@ -50,6 +50,10 @@
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-direct</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-seda</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-integration-tests-support-kafka</artifactId>
Expand All @@ -74,6 +78,11 @@
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

<!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
<dependency>
Expand Down Expand Up @@ -128,6 +137,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-seda-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<profiles>
Expand Down
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.camel.quarkus.component.kafka;

import java.math.BigInteger;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
Expand All @@ -39,16 +40,22 @@
import javax.ws.rs.core.Response;

import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.kafka.KafkaClientFactory;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.quarkus.component.kafka.model.KafkaMessage;
import org.apache.camel.quarkus.component.kafka.model.Price;
import org.apache.camel.spi.RouteController;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;

@Path("/kafka")
@ApplicationScoped
Expand All @@ -68,6 +75,9 @@ public class CamelKafkaResource {
@Inject
ProducerTemplate producerTemplate;

@Inject
ConsumerTemplate consumerTemplate;

@Path("/custom/client/factory/missing")
@GET
@Produces(MediaType.TEXT_PLAIN)
Expand Down Expand Up @@ -127,4 +137,77 @@ public List<String> getIdempotentResults() {
.map(m -> m.getBody(String.class))
.collect(Collectors.toList());
}

@Path("/foo/{action}")
@POST
public Response modifyFooConsumerState(@PathParam("action") String action) throws Exception {
modifyConsumerState("foo", action);
return Response.ok().build();
}

@Path("/bar/{action}")
@POST
public Response modifyBarConsumerState(@PathParam("action") String action) throws Exception {
modifyConsumerState("bar", action);
return Response.ok().build();
}

private void modifyConsumerState(String routeId, String action) throws Exception {
RouteController controller = context.getRouteController();
if (action.equals("start")) {
controller.startRoute(routeId);
} else if (action.equals("stop")) {
controller.stopRoute(routeId);
} else {
throw new IllegalArgumentException("Unknown action: " + action);
}
}

@Path("/seda/{queue}")
@GET
public String getSedaMessage(@PathParam("queue") String queueName) {
return consumerTemplate.receiveBody(String.format("seda:%s", queueName), 10000, String.class);
}

@Path("price/{key}")
@POST
public Response postMessage(@PathParam("key") Integer key, Double price) {
String routeURI = "kafka:test-serializer?autoOffsetReset=earliest&keySerializer=org.apache.kafka.common.serialization.IntegerSerializer"
+
"&valueSerializer=org.apache.kafka.common.serialization.DoubleSerializer";
producerTemplate.sendBodyAndHeader(routeURI, price, KafkaConstants.KEY, key);
return Response.ok().build();
}

@Path("price")
@GET
@Produces(MediaType.APPLICATION_JSON)
public Price getPrice() {
Exchange exchange = consumerTemplate.receive("seda:serializer", 10000);
Integer key = exchange.getMessage().getHeader(KafkaConstants.KEY, Integer.class);
Double price = exchange.getMessage().getBody(Double.class);
return new Price(key, price);
}

@Path("propagate/{id}")
@POST
public Response postMessageWithHeader(@PathParam("id") Integer id, String message) {
try (Producer<Integer, String> producer = new KafkaProducer<>(producerProperties)) {
ProducerRecord data = new ProducerRecord<>("test-propagation", id, message);
data.headers().add(new RecordHeader("id", BigInteger.valueOf(id).toByteArray()));
producer.send(data);
}
return Response.ok().build();
}

@Path("propagate")
@GET
@Produces(MediaType.APPLICATION_JSON)
public KafkaMessage getKafkaMessage() {
Exchange exchange = consumerTemplate.receive("seda:propagation", 10000);
String id = exchange.getMessage().getHeader("id", String.class);
String message = exchange.getMessage().getBody(String.class);
return new KafkaMessage(id, message);
}

}
Expand Up @@ -21,10 +21,22 @@
import javax.inject.Named;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.kafka.KafkaManualCommit;
import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
import org.eclipse.microprofile.config.inject.ConfigProperty;

public class CamelKafkaRoutes extends RouteBuilder {

private final static String KAFKA_CONSUMER_MANUAL_COMMIT = "kafka:manual-commit-topic"
+ "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
+ "&allowManualCommit=true&autoOffsetReset=earliest";

private final static String SEDA_FOO = "seda:foo";
private final static String SEDA_BAR = "seda:bar";
private final static String SEDA_SERIALIZER = "seda:serializer";
private final static String SEDA_HEADER_PROPAGATION = "seda:propagation";

@ConfigProperty(name = "camel.component.kafka.brokers")
String brokers;

Expand All @@ -35,6 +47,13 @@ KafkaIdempotentRepository kafkaIdempotentRepository() {
return new KafkaIdempotentRepository("idempotent-topic", brokers);
}

@Produces
@ApplicationScoped
@Named("customHeaderDeserializer")
CustomHeaderDeserializer customHeaderDeserializer() {
return new CustomHeaderDeserializer();
}

@Override
public void configure() throws Exception {
from("kafka:inbound?autoOffsetReset=earliest")
Expand All @@ -46,5 +65,37 @@ public void configure() throws Exception {
.messageIdRepositoryRef("kafkaIdempotentRepository")
.to("mock:idempotent-results")
.end();

// Kafka consumer that use Manual commit and performs the manual commit
from(KAFKA_CONSUMER_MANUAL_COMMIT)
.routeId("foo")
.to(SEDA_FOO)
.process(e -> {
KafkaManualCommit manual = e.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commitSync();
});

// Kafka consumer that use Maunal commit and doesn't perform the manual commit
from(KAFKA_CONSUMER_MANUAL_COMMIT)
.routeId("bar")
.autoStartup(false)
.to(SEDA_BAR);

// By default, keyDeserializer & valueDeserializer == org.apache.kafka.common.serialization.StringDeserializer
// and valueSerializer & keySerializer == org.apache.kafka.common.serialization.StringSerializer
// the idea here is to test setting different kinds of Deserializers
from("kafka:test-serializer?autoOffsetReset=earliest" +
"&keyDeserializer=org.apache.kafka.common.serialization.IntegerDeserializer" +
"&valueDeserializer=org.apache.kafka.common.serialization.DoubleDeserializer")
.to(SEDA_SERIALIZER);

// Header Propagation using CustomHeaderDeserialize
from("kafka:test-propagation?headerDeserializer=#customHeaderDeserializer")
.process(exchange -> {
String id = exchange.getMessage().getHeader("id", String.class);
System.out.println("id :: " + id);
})
.to(SEDA_HEADER_PROPAGATION);

}
}
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.kafka;

import java.math.BigInteger;

import io.quarkus.runtime.annotations.RegisterForReflection;
import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;

@RegisterForReflection
public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {

public CustomHeaderDeserializer() {
}

@Override
public Object deserialize(String key, byte[] value) {
if (key.equals("id")) {
BigInteger bi = new BigInteger(value);
return String.valueOf(bi.longValue());
} else {
return super.deserialize(key, value);
}
}
}
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.kafka.model;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class KafkaMessage {
String id;
String message;

public KafkaMessage() {
}

public KafkaMessage(String id, String message) {
this.id = id;
this.message = message;
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}
}
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.kafka.model;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class Price {
Integer key;
Double price;

public Price(Integer key, Double price) {
this.key = key;
this.price = price;
}

public Integer getKey() {
return key;
}

public void setKey(Integer key) {
this.key = key;
}

public Double getPrice() {
return price;
}

public void setPrice(Double price) {
this.price = price;
}
}

0 comments on commit 20987cb

Please sign in to comment.