diff --git a/camel-kafka/build.gradle b/camel-kafka/build.gradle index ee12488..f28b50e 100644 --- a/camel-kafka/build.gradle +++ b/camel-kafka/build.gradle @@ -24,5 +24,5 @@ jar { } verifyInstrumentation { - passes 'org.apache.camel:camel-kafka:[3.11.0,3.12.0)' + passes 'org.apache.camel:camel-kafka:(2.22.0,3.12.0)' } \ No newline at end of file diff --git a/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java new file mode 100644 index 0000000..e4198cf --- /dev/null +++ b/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -0,0 +1,22 @@ +package org.apache.camel.component.kafka; + +import org.apache.camel.Exchange; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.TransportType; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.newrelic.instrumentation.camel.kafka.ConsumerRecordHeaders; + +@Weave +public class KafkaConsumer { + + @SuppressWarnings("unused") + private void propagateHeaders(ConsumerRecord record, Exchange exchange, + KafkaConfiguration kafkaConfiguration) { + ConsumerRecordHeaders headers = new ConsumerRecordHeaders(record); + NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.Kafka, headers); + Weaver.callOriginal(); + } +} diff --git a/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java deleted file mode 100644 index c90d8dd..0000000 --- a/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.apache.camel.component.kafka.consumer.support; - -import org.apache.camel.Exchange; -import org.apache.camel.spi.ExceptionHandler; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; - -import com.newrelic.api.agent.NewRelic; -import com.newrelic.api.agent.Trace; -import com.newrelic.api.agent.TransportType; -import com.newrelic.api.agent.weaver.Weave; -import com.newrelic.api.agent.weaver.Weaver; -import com.newrelic.instrumentation.camel.kafka.ConsumerRecordHeaders; - -@Weave -public abstract class KafkaRecordProcessor { - - @Trace(dispatcher = true) - public ProcessResult processExchange(Exchange exchange, TopicPartition partition, boolean partitionHasNext, boolean recordHasNext, ConsumerRecord record, ProcessResult lastResult, - ExceptionHandler exceptionHandler) { - ConsumerRecordHeaders headers = new ConsumerRecordHeaders(record); - NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.Kafka, headers); - return Weaver.callOriginal(); - } - - @Weave - public static final class ProcessResult { - - } -} diff --git a/settings.gradle b/settings.gradle index b094bb4..555da8c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -11,7 +11,7 @@ include 'camel-core-3.9' include 'camel-core-3.18' include 'camel-jms-3.18' include 'camel-core-4.0.0' -include 'camel-core-4.1' +include 'camel-core-4.1.0' include 'camel-jms-4.0.0' include 'camel-netty-3.0' include 'camel-netty-3.18' @@ -20,3 +20,4 @@ include 'camel-kafka-3.14' include 'camel-kafka-3.15' include 'camel-kafka-3.18.3' include 'camel-processor' +include 'camel-kafka'