diff --git a/integration-tests/kafka/pom.xml b/integration-tests/kafka/pom.xml
index 2d1893a45064..c7fb29847e92 100644
--- a/integration-tests/kafka/pom.xml
+++ b/integration-tests/kafka/pom.xml
@@ -50,6 +50,10 @@
org.apache.camel.quarkus
camel-quarkus-direct
+
+ org.apache.camel.quarkus
+ camel-quarkus-seda
+
org.apache.camel.quarkus
camel-quarkus-integration-tests-support-kafka
@@ -74,6 +78,11 @@
kafka
test
+
+ org.awaitility
+ awaitility
+ test
+
@@ -128,6 +137,19 @@
+
+ org.apache.camel.quarkus
+ camel-quarkus-seda-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
diff --git a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaResource.java b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaResource.java
index ed46943441a1..d5d1108497be 100644
--- a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaResource.java
+++ b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaResource.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.quarkus.component.kafka;
+import java.math.BigInteger;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
@@ -39,16 +40,22 @@
import javax.ws.rs.core.Response;
import org.apache.camel.CamelContext;
+import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.kafka.KafkaClientFactory;
+import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.quarkus.component.kafka.model.KafkaMessage;
+import org.apache.camel.quarkus.component.kafka.model.Price;
+import org.apache.camel.spi.RouteController;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeader;
@Path("/kafka")
@ApplicationScoped
@@ -68,6 +75,9 @@ public class CamelKafkaResource {
@Inject
ProducerTemplate producerTemplate;
+ @Inject
+ ConsumerTemplate consumerTemplate;
+
@Path("/custom/client/factory/missing")
@GET
@Produces(MediaType.TEXT_PLAIN)
@@ -127,4 +137,77 @@ public List getIdempotentResults() {
.map(m -> m.getBody(String.class))
.collect(Collectors.toList());
}
+
+ @Path("/foo/{action}")
+ @POST
+ public Response modifyFooConsumerState(@PathParam("action") String action) throws Exception {
+ modifyConsumerState("foo", action);
+ return Response.ok().build();
+ }
+
+ @Path("/bar/{action}")
+ @POST
+ public Response modifyBarConsumerState(@PathParam("action") String action) throws Exception {
+ modifyConsumerState("bar", action);
+ return Response.ok().build();
+ }
+
+ private void modifyConsumerState(String routeId, String action) throws Exception {
+ RouteController controller = context.getRouteController();
+ if (action.equals("start")) {
+ controller.startRoute(routeId);
+ } else if (action.equals("stop")) {
+ controller.stopRoute(routeId);
+ } else {
+ throw new IllegalArgumentException("Unknown action: " + action);
+ }
+ }
+
+ @Path("/seda/{queue}")
+ @GET
+ public String getSedaMessage(@PathParam("queue") String queueName) {
+ return consumerTemplate.receiveBody(String.format("seda:%s", queueName), 10000, String.class);
+ }
+
+ @Path("price/{key}")
+ @POST
+ public Response postPrice(@PathParam("key") Integer key, Double price) {
+ String routeURI = "kafka:test-serializer?autoOffsetReset=earliest&keySerializer=org.apache.kafka.common.serialization.IntegerSerializer"
+ +
+ "&valueSerializer=org.apache.kafka.common.serialization.DoubleSerializer";
+ producerTemplate.sendBodyAndHeader(routeURI, price, KafkaConstants.KEY, key);
+ return Response.ok().build();
+ }
+
+ @Path("price")
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Price getPrice() {
+ Exchange exchange = consumerTemplate.receive("seda:serializer", 10000);
+ Integer key = exchange.getMessage().getHeader(KafkaConstants.KEY, Integer.class);
+ Double price = exchange.getMessage().getBody(Double.class);
+ return new Price(key, price);
+ }
+
+ @Path("propagate/{id}")
+ @POST
+ public Response postMessageWithHeader(@PathParam("id") Integer id, String message) {
+ try (Producer producer = new KafkaProducer<>(producerProperties)) {
+ ProducerRecord data = new ProducerRecord<>("test-propagation", id, message);
+ data.headers().add(new RecordHeader("id", BigInteger.valueOf(id).toByteArray()));
+ producer.send(data);
+ }
+ return Response.ok().build();
+ }
+
+ @Path("propagate")
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public KafkaMessage getKafkaMessage() {
+ Exchange exchange = consumerTemplate.receive("seda:propagation", 10000);
+ String id = exchange.getMessage().getHeader("id", String.class);
+ String message = exchange.getMessage().getBody(String.class);
+ return new KafkaMessage(id, message);
+ }
+
}
diff --git a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRoutes.java b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRoutes.java
index 65e752e9b437..1dbfec9bd238 100644
--- a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRoutes.java
+++ b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRoutes.java
@@ -21,10 +21,22 @@
import javax.inject.Named;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.KafkaManualCommit;
import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
import org.eclipse.microprofile.config.inject.ConfigProperty;
public class CamelKafkaRoutes extends RouteBuilder {
+
+ private final static String KAFKA_CONSUMER_MANUAL_COMMIT = "kafka:manual-commit-topic"
+ + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
+ + "&allowManualCommit=true&autoOffsetReset=earliest";
+
+ private final static String SEDA_FOO = "seda:foo";
+ private final static String SEDA_BAR = "seda:bar";
+ private final static String SEDA_SERIALIZER = "seda:serializer";
+ private final static String SEDA_HEADER_PROPAGATION = "seda:propagation";
+
@ConfigProperty(name = "camel.component.kafka.brokers")
String brokers;
@@ -35,6 +47,13 @@ KafkaIdempotentRepository kafkaIdempotentRepository() {
return new KafkaIdempotentRepository("idempotent-topic", brokers);
}
+ @Produces
+ @ApplicationScoped
+ @Named("customHeaderDeserializer")
+ CustomHeaderDeserializer customHeaderDeserializer() {
+ return new CustomHeaderDeserializer();
+ }
+
@Override
public void configure() throws Exception {
from("kafka:inbound?autoOffsetReset=earliest")
@@ -46,5 +65,33 @@ public void configure() throws Exception {
.messageIdRepositoryRef("kafkaIdempotentRepository")
.to("mock:idempotent-results")
.end();
+
+ // Kafka consumer that use Manual commit and performs the manual commit
+ from(KAFKA_CONSUMER_MANUAL_COMMIT)
+ .routeId("foo")
+ .to(SEDA_FOO)
+ .process(e -> {
+ KafkaManualCommit manual = e.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+ manual.commitSync();
+ });
+
+ // Kafka consumer that use Maunal commit and doesn't perform the manual commit
+ from(KAFKA_CONSUMER_MANUAL_COMMIT)
+ .routeId("bar")
+ .autoStartup(false)
+ .to(SEDA_BAR);
+
+ // By default, keyDeserializer & valueDeserializer == org.apache.kafka.common.serialization.StringDeserializer
+ // and valueSerializer & keySerializer == org.apache.kafka.common.serialization.StringSerializer
+ // the idea here is to test setting different kinds of Deserializers
+ from("kafka:test-serializer?autoOffsetReset=earliest" +
+ "&keyDeserializer=org.apache.kafka.common.serialization.IntegerDeserializer" +
+ "&valueDeserializer=org.apache.kafka.common.serialization.DoubleDeserializer")
+ .to(SEDA_SERIALIZER);
+
+ // Header Propagation using CustomHeaderDeserialize
+ from("kafka:test-propagation?headerDeserializer=#customHeaderDeserializer")
+ .to(SEDA_HEADER_PROPAGATION);
+
}
}
diff --git a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CustomHeaderDeserializer.java b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CustomHeaderDeserializer.java
new file mode 100644
index 000000000000..21993f49e9fb
--- /dev/null
+++ b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CustomHeaderDeserializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.kafka;
+
+import java.math.BigInteger;
+
+import io.quarkus.runtime.annotations.RegisterForReflection;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+
+@RegisterForReflection
+public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
+
+ public CustomHeaderDeserializer() {
+ }
+
+ @Override
+ public Object deserialize(String key, byte[] value) {
+ if (key.equals("id")) {
+ BigInteger bi = new BigInteger(value);
+ return String.valueOf(bi.longValue());
+ } else {
+ return super.deserialize(key, value);
+ }
+ }
+}
diff --git a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/KafkaMessage.java b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/KafkaMessage.java
new file mode 100644
index 000000000000..78faebe4c321
--- /dev/null
+++ b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/KafkaMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.kafka.model;
+
+import io.quarkus.runtime.annotations.RegisterForReflection;
+
+@RegisterForReflection
+public class KafkaMessage {
+ String id;
+ String message;
+
+ public KafkaMessage() {
+ }
+
+ public KafkaMessage(String id, String message) {
+ this.id = id;
+ this.message = message;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+}
diff --git a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/Price.java b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/Price.java
new file mode 100644
index 000000000000..9e9a5bed2087
--- /dev/null
+++ b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/Price.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.kafka.model;
+
+import io.quarkus.runtime.annotations.RegisterForReflection;
+
+@RegisterForReflection
+public class Price {
+ Integer key;
+ Double price;
+
+ public Price(Integer key, Double price) {
+ this.key = key;
+ this.price = price;
+ }
+
+ public Integer getKey() {
+ return key;
+ }
+
+ public void setKey(Integer key) {
+ this.key = key;
+ }
+
+ public Double getPrice() {
+ return price;
+ }
+
+ public void setPrice(Double price) {
+ this.price = price;
+ }
+}
diff --git a/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaTest.java b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaTest.java
index 6e0fdaa26b18..7c210080fc2b 100644
--- a/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaTest.java
+++ b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaTest.java
@@ -18,6 +18,7 @@
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
@@ -25,10 +26,12 @@
import io.restassured.http.ContentType;
import io.restassured.path.json.JsonPath;
import org.apache.camel.quarkus.test.support.kafka.KafkaTestResource;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import static io.restassured.RestAssured.given;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -86,4 +89,127 @@ void testQuarkusKafkaClientFactoryNotConfigured() {
.statusCode(200)
.body(is("true"));
}
+
+ @Test
+ void testManualCommit() {
+ String body1 = UUID.randomUUID().toString();
+
+ // test consuming first message with manual auto-commit
+ // send message that should be consumed by route with routeId = foo
+ given()
+ .contentType("text/plain")
+ .body(body1)
+ .post("/kafka/manual-commit-topic")
+ .then()
+ .statusCode(200);
+
+ // make sure the message has been consumed
+ given()
+ .contentType("text/plain")
+ .body(body1)
+ .get("/kafka/seda/foo")
+ .then()
+ .body(equalTo(body1));
+
+ // stop the foo route
+ given()
+ .contentType("text/plain")
+ .body(body1)
+ .post("/kafka/foo/stop")
+ .then()
+ .statusCode(200);
+
+ // start the bar route
+ given()
+ .contentType("text/plain")
+ .body(body1)
+ .post("/kafka/bar/start")
+ .then()
+ .statusCode(200);
+
+ String body2 = UUID.randomUUID().toString();
+
+ given()
+ .contentType("text/plain")
+ .body(body2)
+ .post("/kafka/manual-commit-topic")
+ .then()
+ .statusCode(200);
+
+ // wait for the bar route to start
+ Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(10, TimeUnit.SECONDS).until(() -> {
+
+ // make sure the message has been consumed
+ String result = given()
+ .contentType("text/plain")
+ .get("/kafka/seda/bar")
+ .asString();
+ return body2.equals(result);
+ });
+
+ // stop bar route
+ given()
+ .contentType("text/plain")
+ .body(body1)
+ .post("/kafka/bar/stop")
+ .then()
+ .statusCode(200);
+
+ // start the foo route
+ given()
+ .contentType("text/plain")
+ .body(body1)
+ .post("/kafka/foo/start")
+ .then()
+ .statusCode(200);
+
+ // as the bar route didn't commit the message, the foo Route should consume the message
+ // wait for the foo route to start
+ Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS).until(() -> {
+
+ // make sure the message has been consumed
+ String result = given()
+ .contentType("text/plain")
+ .get("/kafka/seda/foo")
+ .asString();
+ return body2.equals(result);
+ });
+ }
+
+ @Test
+ void testSerializers() {
+ given()
+ .contentType("text/json")
+ .body(95.59F)
+ .post("/kafka/price/1")
+ .then()
+ .statusCode(200);
+
+ // make sure the message has been consumed
+ given()
+ .contentType("text/json")
+ .get("/kafka/price")
+ .then()
+ .body("key", equalTo(1))
+ .body("price", equalTo(95.59F));
+ }
+
+ @Test
+ void testHeaderPropagation() throws InterruptedException {
+ given()
+ .contentType(ContentType.JSON)
+ .accept(ContentType.JSON)
+ .body("hello world")
+ .post("/kafka/propagate/5")
+ .then()
+ .statusCode(200);
+
+ // make sure the message has been consumed, and that the id put in the header has been propagated
+ given()
+ .contentType("text/json")
+ .get("/kafka/propagate")
+ .then()
+ .body("id", equalTo("5"))
+ .body("message", equalTo("hello world"));
+ }
}