Skip to content

Commit

Permalink
fixed problems with camel kafka instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
dhilpipre committed Jan 25, 2024
1 parent 6fc2a1a commit f262079
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.kafka.producer.support.KafkaProducerCallBack;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;

import com.newrelic.api.agent.DestinationType;
import com.newrelic.api.agent.MessageProduceParameters;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.Weave;
Expand All @@ -15,22 +19,69 @@

@Weave
public abstract class KafkaProducer {


private final KafkaConfiguration configuration = Weaver.callOriginal();
private final String endpointTopic = Weaver.callOriginal();

@Trace(dispatcher = true)
public void process(Exchange exchange) {
Weaver.callOriginal();
}

@Trace(dispatcher = true)
public boolean process(Exchange exchange, AsyncCallback callback) {
return Weaver.callOriginal();
}

@SuppressWarnings("unused")
private List<Header> getPropagatedHeaders(Exchange exchange, Message message) {
List<Header> headerList = Weaver.callOriginal();
HeaderListWrapper wrapper = new HeaderListWrapper(headerList);
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(wrapper);
return headerList;
}


@Trace
private void processIterableSync(Exchange exchange, Message message) {
Weaver.callOriginal();
}

@Trace(leaf = true)
private void processSingleMessageSync(Exchange exchange, Message message) {
String topicName = getTopicName(message);
MessageProduceParameters params = MessageProduceParameters.library("Kafka").destinationType(DestinationType.NAMED_TOPIC).destinationName(topicName).outboundHeaders(null).build();
NewRelic.getAgent().getTracedMethod().reportAsExternal(params);
Weaver.callOriginal();
}

@Trace
private void processIterableAsync(Exchange exchange, KafkaProducerCallBack producerCallBack, Message message) {
Weaver.callOriginal();
}

@Trace(leaf = true)
private void doSend(Object key, ProducerRecord<Object, Object> record, KafkaProducerCallBack cb) {
String topic = record.topic();
if(topic == null || topic.isEmpty()) {
topic = "UnknownTopic";
}
MessageProduceParameters params = MessageProduceParameters.library("Kafka").destinationType(DestinationType.NAMED_TOPIC).destinationName(topic).outboundHeaders(null).build();
NewRelic.getAgent().getTracedMethod().reportAsExternal(params);
Weaver.callOriginal();
}

private String getTopicName(Message msg) {
Object overrideTopic = msg.getHeader(KafkaConstants.OVERRIDE_TOPIC);
if(overrideTopic != null) {
return overrideTopic.toString();
}

String topic = configuration.getTopic();
if(topic != null) {
return topic;
}

return endpointTopic;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

import com.newrelic.api.agent.DestinationType;
import com.newrelic.api.agent.MessageConsumeParameters;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.TransportType;
Expand All @@ -20,6 +22,8 @@ public ProcessResult processExchange(Exchange exchange, TopicPartition partition
ExceptionHandler exceptionHandler) {
ConsumerRecordHeaders headers = new ConsumerRecordHeaders(record);
NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.Kafka, headers);
MessageConsumeParameters params = MessageConsumeParameters.library("Kafka").destinationType(DestinationType.NAMED_TOPIC).destinationName(partition.topic()).inboundHeaders(headers).build();
NewRelic.getAgent().getTracedMethod().reportAsExternal(params);
return Weaver.callOriginal();
}

Expand Down
2 changes: 1 addition & 1 deletion camel-kafka-3.15/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ apply plugin: 'java'
targetCompatibility = JavaVersion.VERSION_11

dependencies {
implementation 'org.apache.camel:camel-kafka:3.18.0'
implementation 'org.apache.camel:camel-kafka:3.15.0'

// New Relic Labs Java Agent dependencies
implementation 'com.newrelic.agent.java:newrelic-agent:6.4.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.kafka.producer.support.KafkaProducerCallBack;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;

import com.newrelic.api.agent.DestinationType;
import com.newrelic.api.agent.MessageProduceParameters;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.Weave;
Expand All @@ -15,23 +19,69 @@

@Weave
public abstract class KafkaProducer {


private final KafkaConfiguration configuration = Weaver.callOriginal();
private final String endpointTopic = Weaver.callOriginal();

@Trace(dispatcher = true)
public void process(Exchange exchange) {
Weaver.callOriginal();
}

@Trace(dispatcher = true)
public boolean process(Exchange exchange, AsyncCallback callback) {
return Weaver.callOriginal();
}

@SuppressWarnings("unused")
@SuppressWarnings("unused")
private List<Header> getPropagatedHeaders(Exchange exchange, Message message) {
List<Header> headerList = Weaver.callOriginal();
HeaderListWrapper wrapper = new HeaderListWrapper(headerList);
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(wrapper);

return headerList;
}
List<Header> headerList = Weaver.callOriginal();
HeaderListWrapper wrapper = new HeaderListWrapper(headerList);
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(wrapper);
return headerList;
}


@Trace
private void processIterableSync(Exchange exchange, Message message) {
Weaver.callOriginal();
}

@Trace(leaf = true)
private void processSingleMessageSync(Exchange exchange, Message message) {
String topicName = getTopicName(message);
MessageProduceParameters params = MessageProduceParameters.library("Kafka").destinationType(DestinationType.NAMED_TOPIC).destinationName(topicName).outboundHeaders(null).build();
NewRelic.getAgent().getTracedMethod().reportAsExternal(params);
Weaver.callOriginal();
}

@Trace
private void processIterableAsync(Exchange exchange, KafkaProducerCallBack producerCallBack, Message message) {
Weaver.callOriginal();
}

@Trace(leaf = true)
private void doSend(Object key, ProducerRecord<Object, Object> record, KafkaProducerCallBack cb) {
String topic = record.topic();
if(topic == null || topic.isEmpty()) {
topic = "UnknownTopic";
}
MessageProduceParameters params = MessageProduceParameters.library("Kafka").destinationType(DestinationType.NAMED_TOPIC).destinationName(topic).outboundHeaders(null).build();
NewRelic.getAgent().getTracedMethod().reportAsExternal(params);
Weaver.callOriginal();
}

private String getTopicName(Message msg) {
Object overrideTopic = msg.getHeader(KafkaConstants.OVERRIDE_TOPIC);
if(overrideTopic != null) {
return overrideTopic.toString();
}

String topic = configuration.getTopic();
if(topic != null) {
return topic;
}

return endpointTopic;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.apache.camel.component.kafka.consumer.support;

import org.apache.camel.Exchange;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

import com.newrelic.api.agent.DestinationType;
import com.newrelic.api.agent.MessageConsumeParameters;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.TransportType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.newrelic.instrumentation.camel.kafka.ConsumerRecordHeaders;

@Weave
public abstract class KafkaRecordProcessor {

@Trace(dispatcher = true)
public ProcessingResult processExchange(Exchange exchange, TopicPartition partition, boolean partitionHasNext,
boolean recordHasNext, ConsumerRecord<Object, Object> record, ProcessingResult lastResult, ExceptionHandler exceptionHandler) {
ConsumerRecordHeaders headers = new ConsumerRecordHeaders(record);
NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.Kafka, headers);
MessageConsumeParameters params = MessageConsumeParameters.library("Kafka").destinationType(DestinationType.NAMED_TOPIC).destinationName(partition.topic()).inboundHeaders(headers).build();
NewRelic.getAgent().getTracedMethod().reportAsExternal(params);
return Weaver.callOriginal();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,84 @@
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.kafka.producer.support.KafkaProducerCallBack;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;

import com.newrelic.api.agent.DestinationType;
import com.newrelic.api.agent.Headers;
import com.newrelic.api.agent.MessageProduceParameters;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.newrelic.instrumentation.camel.kafka.HeaderListWrapper;
import com.newrelic.instrumentation.camel.kafka.ProducerRecordHeaders;

@Weave
public abstract class KafkaProducer {

@Trace(dispatcher = true)
public void process(Exchange exchange) {
Weaver.callOriginal();
}

@Trace(dispatcher = true)
public boolean process(Exchange exchange, AsyncCallback callback) {
return Weaver.callOriginal();
}

@Trace
private void processIterableSync(Exchange exchange, Message message) {
Weaver.callOriginal();
}

@Trace
private void processIterableAsync(Exchange exchange, KafkaProducerCallBack producerCallBack, Message message) {
Weaver.callOriginal();
}

@Trace
private void processSingleMessageSync(Exchange exchange, Message message) {
NRKafkaWrapper.isSingleMessage.set(true);
Weaver.callOriginal();
}

protected ProducerRecord<Object, Object> createRecord(Exchange exchange, Message message) {
ProducerRecord<Object, Object> record = Weaver.callOriginal();
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(new ProducerRecordHeaders(record));

return record;
}

public List<Header> getPropagatedHeaders(Exchange exchange, Message message) {
List<Header> headerList = Weaver.callOriginal();
HeaderListWrapper wrapper = new HeaderListWrapper(headerList);
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(wrapper);

return headerList;
}
List<Header> headerList = Weaver.callOriginal();
HeaderListWrapper wrapper = new HeaderListWrapper(headerList);
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(wrapper);

return headerList;
}

@Trace(leaf = true)
private void doSend(Object key, ProducerRecord<Object, Object> record, KafkaProducerCallBack cb) {
Headers headers = new ProducerRecordHeaders(record);
String topic = record.topic();
if(topic == null || topic.isEmpty()) {
topic = "UnknownTopic";
}
MessageProduceParameters params = MessageProduceParameters.library("Kafka").destinationType(DestinationType.NAMED_TOPIC).destinationName(topic).outboundHeaders(headers).build();
NewRelic.getAgent().getTracedMethod().reportAsExternal(params);
Weaver.callOriginal();
}

@SuppressWarnings("unused")
private String evaluateTopic(Message message) {
String topicName = Weaver.callOriginal();
Boolean isSingle = NRKafkaWrapper.isSingleMessage.get();
if(isSingle != null && isSingle) {
NRKafkaWrapper.currentTopic.set(topicName);
}

return topicName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.apache.camel.component.kafka;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.DestinationType;
import com.newrelic.api.agent.MessageProduceParameters;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;

public class NRKafkaWrapper {

private static boolean isTransformed = false;

public static ThreadLocal<String> currentTopic = new ThreadLocal<>();
public static ThreadLocal<Boolean> isSingleMessage = new ThreadLocal<>();

public NRKafkaWrapper() {
if(!isTransformed) {
AgentBridge.instrumentation.retransformUninstrumentedClass(getClass());
isTransformed = true;
}
}

@Trace
public void sendMessage() {
String topic = currentTopic.get();
if(topic == null) topic = "UnknownTopic";
currentTopic.remove();
isSingleMessage.remove();
MessageProduceParameters params = MessageProduceParameters.library("Kafka").destinationType(DestinationType.NAMED_TOPIC).destinationName(topic).outboundHeaders(null).build();
NewRelic.getAgent().getTracedMethod().reportAsExternal(params);
}

}
Loading

0 comments on commit f262079

Please sign in to comment.