Skip to content

Commit

Permalink
fixed problems with camel netty instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
dhilpipre committed Jan 25, 2024
1 parent 23b1149 commit 6fc2a1a
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public abstract class NettyProducer {
@Trace(dispatcher = true)
public boolean process(Exchange exchange, AsyncCallback callback) {
NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.Other, new ExchangeHeaders(exchange));

return Weaver.callOriginal();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package org.apache.camel.component.netty.handlers;

import org.apache.camel.Exchange;
import org.apache.camel.component.netty.NettyCamelState;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.TransactionNamePriority;
import com.newrelic.api.agent.TransportType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.newrelic.instrumentation.labs.camel.netty.ExchangeHeaders;

import io.netty.channel.ChannelHandlerContext;

Expand All @@ -11,6 +18,19 @@ public abstract class ClientChannelHandler {

@Trace(dispatcher = true)
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
NewRelic.getAgent().getTransaction().setTransactionName(TransactionNamePriority.FRAMEWORK_LOW, false, "ClientChannelHandler", "Custom","ClientChannelHandler",msg.getClass().getSimpleName());
Weaver.callOriginal();
}

@SuppressWarnings("unused")
private NettyCamelState getState(ChannelHandlerContext ctx, Object msg) {
NettyCamelState state = Weaver.callOriginal();
Exchange exchange = state.getExchange();
if(exchange != null) {
ExchangeHeaders headers = new ExchangeHeaders(exchange);
NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.Other, headers);
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(headers);
}
return state;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
import java.util.HashMap;

import org.apache.camel.Exchange;
import org.apache.camel.Route;
import org.apache.camel.component.netty.NettyConsumer;
import org.apache.camel.component.netty.NettyEndpoint;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.TransactionNamePriority;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.newrelic.instrumentation.labs.camel.netty.Util;
Expand All @@ -15,8 +19,39 @@
@Weave
public abstract class ServerChannelHandler {

private final NettyConsumer consumer = Weaver.callOriginal();

@Trace(dispatcher = true)
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
NewRelic.getAgent().getTracedMethod().addCustomAttribute("MessageType", msg.getClass().getName());
if(consumer != null) {
boolean txNameSet = false;
NettyEndpoint endpoint = consumer.getEndpoint();
if(endpoint != null) {
String uri = endpoint.getEndpointUri();
if(uri != null && !uri.isEmpty()) {
NewRelic.getAgent().getTransaction().setTransactionName(TransactionNamePriority.FRAMEWORK_HIGH, false, "Camel-Netty", "Camel-Netty",uri);
txNameSet = true;
}
}
if(!txNameSet) {
String routeId = consumer.getRouteId();
if(routeId != null && !routeId.isEmpty()) {
NewRelic.getAgent().getTransaction().setTransactionName(TransactionNamePriority.FRAMEWORK_HIGH, false, "Camel-Netty", "Camel-Netty",routeId);
txNameSet = true;
}
}
if(!txNameSet) {
Route route = consumer.getRoute();
if(route != null) {
String id = route.getId();
if(id != null && !id.isEmpty()) {
NewRelic.getAgent().getTransaction().setTransactionName(TransactionNamePriority.FRAMEWORK_HIGH, false, "Camel-Netty", "Camel-Netty",id);
txNameSet = true;
}
}
}
}
Weaver.callOriginal();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
import com.newrelic.api.agent.Segment;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.TransportType;
import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.newrelic.instrumentation.labs.camel.netty.AsyncCallbackWrapper;
import com.newrelic.instrumentation.labs.camel.netty.ExchangeHeaders;

import io.netty.channel.ChannelFuture;
Expand All @@ -25,33 +25,34 @@ public abstract class NettyProducer {

@Trace(dispatcher = true)
public boolean process(Exchange exchange, AsyncCallback callback) {
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(new ExchangeHeaders(exchange));
// NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.Other, new ExchangeHeaders(exchange));
if(configuration != null) {
AsyncCallbackWrapper wrapper = new AsyncCallbackWrapper(callback);
String host = configuration.getHost();
int port = configuration.getPort();
URI uri = URI.create("tcp://"+host+":"+port);
GenericParameters params = GenericParameters.library("Camel-Netty").uri(uri).procedure("process").build();
Segment segment = NewRelic.getAgent().getTransaction().startSegment("Camel/Netty/Send");
segment.reportAsExternal(params);
wrapper.setSegment(segment);
callback = wrapper;
exchange.setProperty("NRSEGMENT", segment);
}

return Weaver.callOriginal();
}

@Trace(dispatcher = true)
public void processWithConnectedChannel(Exchange exchange, BodyReleaseCallback callback, ChannelFuture channelFuture, Object body) {
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(new ExchangeHeaders(exchange));
Object obj = exchange.getProperty("NRSEGMENT");
if(obj != null && obj instanceof Segment) {
((Segment)obj).end();
}
Weaver.callOriginal();
}


@SuppressWarnings("unused")
private boolean processWithBody(final Exchange exchange, Object body, BodyReleaseCallback callback) {
ExchangeHeaders headers = new ExchangeHeaders(exchange);
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(headers);
// ExchangeHeaders headers = new ExchangeHeaders(exchange);
// NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(headers);
return Weaver.callOriginal();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.netty.NettyCamelState;
import org.apache.camel.component.netty.NettyConstants;
import org.apache.camel.component.netty.NettyProducer;

import com.newrelic.api.agent.NewRelic;
Expand All @@ -15,7 +14,9 @@
import com.newrelic.api.agent.weaver.Weaver;
import com.newrelic.instrumentation.labs.camel.netty.ExchangeHeaders;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;

@Weave
public abstract class ClientChannelHandler {
Expand All @@ -29,37 +30,50 @@ public ClientChannelHandler(NettyProducer producer) {

@Trace(dispatcher = true)
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
String channelName = "UnknownChannel";
Channel channel = ctx.channel();
if(channel != null) {
ChannelId channelId = channel.id();
if(channelId != null) {
channelName = channelId.asShortText();
}
}

NewRelic.getAgent().getTracedMethod().setMetricName("Custom","Camel","Netty","ClientChannelHandler","channelRead");
NewRelic.getAgent().getTracedMethod().addCustomAttribute("MessageType", msg.getClass().getName());
NewRelic.getAgent().getTracedMethod().addCustomAttribute("Channel",channelName);
Weaver.callOriginal();
}

@SuppressWarnings("unused")
private NettyCamelState getState(ChannelHandlerContext ctx, Object msg) {
NettyCamelState returnState = Weaver.callOriginal();

if(returnState != null) {
Exchange exchange = returnState.getExchange();
ExchangeHeaders headers = new ExchangeHeaders(exchange);
NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.Other, headers);
NettyCamelState state = Weaver.callOriginal();
if (state != null) {
Exchange exchange = state.getExchange();
if (exchange != null) {
ExchangeHeaders headers = new ExchangeHeaders(exchange);
NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.Other, headers);
} else {
NewRelic.getAgent().getTransaction().ignore();
}
} else {
NewRelic.getAgent().getTransaction().ignore();
}
return returnState;
}

protected Message getResponseMessage(Exchange exchange, ChannelHandlerContext ctx, Object message) {
Message result = Weaver.callOriginal();
Boolean waiting = exchange.getProperty(NettyConstants.NETTY_CLIENT_CONTINUE, Boolean.class);
if(result == null || (waiting != null && waiting)) {
NewRelic.getAgent().getTransaction().ignore();
}
return result;
return state;
}



public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
NewRelic.noticeError(cause);
Weaver.callOriginal();
}

@Trace
protected Message getResponseMessage(Exchange exchange, ChannelHandlerContext ctx, Object message) throws Exception {
try {
return Weaver.callOriginal();
} catch (Exception e) {
NewRelic.getAgent().getTransaction().ignore();
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
import java.util.HashMap;

import org.apache.camel.Exchange;
import org.apache.camel.Route;
import org.apache.camel.component.netty.NettyConsumer;
import org.apache.camel.component.netty.NettyEndpoint;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.TransactionNamePriority;
import com.newrelic.api.agent.TransportType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
Expand All @@ -16,9 +20,40 @@

@Weave
public abstract class ServerChannelHandler {

private final NettyConsumer consumer = Weaver.callOriginal();

@Trace(dispatcher = true)
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
NewRelic.getAgent().getTracedMethod().addCustomAttribute("MessageType", msg.getClass().getName());
if(consumer != null) {
boolean txNameSet = false;
NettyEndpoint endpoint = consumer.getEndpoint();
if(endpoint != null) {
String uri = endpoint.getEndpointUri();
if(uri != null && !uri.isEmpty()) {
NewRelic.getAgent().getTransaction().setTransactionName(TransactionNamePriority.FRAMEWORK_HIGH, false, "Camel-Netty", "Camel-Netty",uri);
txNameSet = true;
}
}
if(!txNameSet) {
String routeId = consumer.getRouteId();
if(routeId != null && !routeId.isEmpty()) {
NewRelic.getAgent().getTransaction().setTransactionName(TransactionNamePriority.FRAMEWORK_HIGH, false, "Camel-Netty", "Camel-Netty",routeId);
txNameSet = true;
}
}
if(!txNameSet) {
Route route = consumer.getRoute();
if(route != null) {
String id = route.getId();
if(id != null && !id.isEmpty()) {
NewRelic.getAgent().getTransaction().setTransactionName(TransactionNamePriority.FRAMEWORK_HIGH, false, "Camel-Netty", "Camel-Netty",id);
txNameSet = true;
}
}
}
}
Weaver.callOriginal();
}

Expand All @@ -27,8 +62,8 @@ private void processAsynchronously(Exchange exchange, ChannelHandlerContext ctx)
HashMap<String, Object> attributes = new HashMap<>();
Util.recordExchange(attributes, exchange);
NewRelic.getAgent().getTracedMethod().addCustomAttributes(attributes);
ExchangeHeaders headers = new ExchangeHeaders(exchange);
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(headers);
// ExchangeHeaders headers = new ExchangeHeaders(exchange);
// NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(headers);
Weaver.callOriginal();
}

Expand Down

0 comments on commit 6fc2a1a

Please sign in to comment.