Skip to content

Commit

Permalink
[pinpoint-apm#7365] Determining whether to insert a header according …
Browse files Browse the repository at this point in the history
…to the broker in Kafka Plugin
  • Loading branch information
koo-taejin committed Oct 27, 2020
1 parent 72aee9b commit 9452d9c
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 90 deletions.
Expand Up @@ -35,12 +35,14 @@
import com.navercorp.pinpoint.plugin.kafka.field.accessor.EndPointFieldAccessor;
import com.navercorp.pinpoint.plugin.kafka.field.accessor.RemoteAddressFieldAccessor;
import com.navercorp.pinpoint.plugin.kafka.field.accessor.SocketChannelListFieldAccessor;
import com.navercorp.pinpoint.plugin.kafka.field.getter.ApiVersionsGetter;
import com.navercorp.pinpoint.plugin.kafka.field.getter.SelectorGetter;
import com.navercorp.pinpoint.plugin.kafka.interceptor.ConsumerConstructorInterceptor;
import com.navercorp.pinpoint.plugin.kafka.interceptor.ConsumerMultiRecordEntryPointInterceptor;
import com.navercorp.pinpoint.plugin.kafka.interceptor.ConsumerPollInterceptor;
import com.navercorp.pinpoint.plugin.kafka.interceptor.ConsumerRecordEntryPointInterceptor;
import com.navercorp.pinpoint.plugin.kafka.interceptor.ConsumerRecordsInterceptor;
import com.navercorp.pinpoint.plugin.kafka.interceptor.ProducerAddHeaderInterceptor;
import com.navercorp.pinpoint.plugin.kafka.interceptor.NetworkClientPollInterceptor;
import com.navercorp.pinpoint.plugin.kafka.interceptor.ProducerConstructorInterceptor;
import com.navercorp.pinpoint.plugin.kafka.interceptor.ProducerSendInterceptor;
Expand All @@ -53,6 +55,7 @@

/**
* @author Harris Gwag (gwagdalf)
* @author Taejin Koo
*/
public class KafkaPlugin implements ProfilerPlugin, TransformTemplateAware {

Expand Down Expand Up @@ -143,7 +146,17 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader,
sendMethod.addInterceptor(ProducerSendInterceptor.class);
}

// Version 0.11.0+ is supported.
InstrumentMethod setReadOnlyMethod = target.getDeclaredMethod("setReadOnly", "org.apache.kafka.common.header.Headers");
if (setReadOnlyMethod != null) {
setReadOnlyMethod.addInterceptor(ProducerAddHeaderInterceptor.class);
}
if (target.hasField("apiVersions")) {
target.addGetter(ApiVersionsGetter.class, "apiVersions");
}

target.addField(RemoteAddressFieldAccessor.class);

return target.toBytecode();
}

Expand Down
@@ -0,0 +1,26 @@
/*
* Copyright 2020 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.kafka.field.getter;

/**
* @author Taejin Koo
*/
public interface ApiVersionsGetter {

org.apache.kafka.clients.ApiVersions _$PINPOINT$_getApiVersions();

}
@@ -0,0 +1,123 @@
/*
* Copyright 2020 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.kafka.interceptor;

import com.navercorp.pinpoint.bootstrap.context.Header;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.context.TraceId;
import com.navercorp.pinpoint.bootstrap.interceptor.AroundInterceptor;
import com.navercorp.pinpoint.bootstrap.logging.PLogger;
import com.navercorp.pinpoint.bootstrap.logging.PLoggerFactory;
import com.navercorp.pinpoint.bootstrap.sampler.SamplingFlagUtils;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.plugin.kafka.KafkaConstants;
import com.navercorp.pinpoint.plugin.kafka.field.getter.ApiVersionsGetter;

/**
* @author Taejin Koo
*/
public class ProducerAddHeaderInterceptor implements AroundInterceptor {

private final PLogger logger = PLoggerFactory.getLogger(getClass());

private final DefaultHeaderSetter headerSetter = new DefaultHeaderSetter();

private final TraceContext traceContext;

public ProducerAddHeaderInterceptor(TraceContext traceContext) {
this.traceContext = traceContext;
}

@Override
public void before(Object target, Object[] args) {
if (logger.isDebugEnabled()) {
logger.beforeInterceptor(target, args);
}

Trace trace = traceContext.currentRawTraceObject();
if (trace == null) {
return;
}

if (!(target instanceof ApiVersionsGetter)) {
return;

}
ApiVersionsGetter apiVersionsGetter = (ApiVersionsGetter) target;
org.apache.kafka.clients.ApiVersions apiVersions = apiVersionsGetter._$PINPOINT$_getApiVersions();
if (apiVersions == null || apiVersions.maxUsableProduceMagic() < org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2) {
return;
}

if (!(args[0] instanceof org.apache.kafka.common.header.Headers)) {
return;
}

org.apache.kafka.common.header.Headers headers = (org.apache.kafka.common.header.Headers) args[0];

SpanEventRecorder spanEventRecorder = trace.currentSpanEventRecorder();
headerSetter.setPinpointHeaders(spanEventRecorder, trace, headers, trace.canSampled(), traceContext.getApplicationName(), traceContext.getServerTypeCode());
}

@Override
public void after(Object target, Object[] args, Object result, Throwable throwable) {
if (logger.isDebugEnabled()) {
logger.afterInterceptor(target, args, result, throwable);
}
}

private static class DefaultHeaderSetter {

public void setPinpointHeaders(SpanEventRecorder recorder, Trace trace, org.apache.kafka.common.header.Headers headers, boolean sample, String applicationName, short serverTypeCode) {
if (headers == null) {
return;
}

cleanPinpointHeader(headers);
if (sample) {
final TraceId nextId = trace.getTraceId().getNextTraceId();
recorder.recordNextSpanId(nextId.getSpanId());

headers.add(new org.apache.kafka.common.header.internals.RecordHeader(Header.HTTP_TRACE_ID.toString(), nextId.getTransactionId().getBytes(KafkaConstants.DEFAULT_PINPOINT_HEADER_CHARSET)));
headers.add(new org.apache.kafka.common.header.internals.RecordHeader(Header.HTTP_SPAN_ID.toString(), String.valueOf(nextId.getSpanId()).getBytes(KafkaConstants.DEFAULT_PINPOINT_HEADER_CHARSET)));
headers.add(new org.apache.kafka.common.header.internals.RecordHeader(Header.HTTP_PARENT_SPAN_ID.toString(), String.valueOf(nextId.getParentSpanId()).getBytes(KafkaConstants.DEFAULT_PINPOINT_HEADER_CHARSET)));
headers.add(new org.apache.kafka.common.header.internals.RecordHeader(Header.HTTP_FLAGS.toString(), String.valueOf(nextId.getFlags()).getBytes(KafkaConstants.DEFAULT_PINPOINT_HEADER_CHARSET)));
headers.add(new org.apache.kafka.common.header.internals.RecordHeader(Header.HTTP_PARENT_APPLICATION_NAME.toString(), String.valueOf(applicationName).getBytes(KafkaConstants.DEFAULT_PINPOINT_HEADER_CHARSET)));
headers.add(new org.apache.kafka.common.header.internals.RecordHeader(Header.HTTP_PARENT_APPLICATION_TYPE.toString(), Short.toString(serverTypeCode).getBytes(KafkaConstants.DEFAULT_PINPOINT_HEADER_CHARSET)));
} else {
headers.add(new org.apache.kafka.common.header.internals.RecordHeader(Header.HTTP_SAMPLED.toString(), SamplingFlagUtils.SAMPLING_RATE_FALSE.getBytes(KafkaConstants.DEFAULT_PINPOINT_HEADER_CHARSET)));
}
}

private void cleanPinpointHeader(org.apache.kafka.common.header.Headers kafkaHeaders) {
Assert.requireNonNull(kafkaHeaders, "kafkaHeaders");

for (org.apache.kafka.common.header.Header kafkaHeader : kafkaHeaders.toArray()) {
String kafkaHeaderKey = kafkaHeader.key();
if (Header.startWithPinpointHeader(kafkaHeaderKey)) {
kafkaHeaders.remove(kafkaHeaderKey);
}
}
}

}

}


Expand Up @@ -16,37 +16,31 @@

package com.navercorp.pinpoint.plugin.kafka.interceptor;

import com.navercorp.pinpoint.bootstrap.context.Header;
import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.context.TraceId;
import com.navercorp.pinpoint.bootstrap.interceptor.AroundInterceptor;
import com.navercorp.pinpoint.bootstrap.logging.PLogger;
import com.navercorp.pinpoint.bootstrap.logging.PLoggerFactory;
import com.navercorp.pinpoint.bootstrap.sampler.SamplingFlagUtils;
import com.navercorp.pinpoint.common.util.ArrayUtils;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.common.util.StringUtils;
import com.navercorp.pinpoint.plugin.kafka.KafkaConstants;
import com.navercorp.pinpoint.plugin.kafka.field.accessor.RemoteAddressFieldAccessor;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicReference;


/**
* @author Taejin Koo
*/
public class ProducerSendInterceptor implements AroundInterceptor {

private final PLogger logger = PLoggerFactory.getLogger(getClass());

private final TraceContext traceContext;
private final MethodDescriptor descriptor;

private final AtomicReference<HeaderSetter> headerSetterReference = new AtomicReference<HeaderSetter>();

public ProducerSendInterceptor(TraceContext traceContext, MethodDescriptor descriptor) {
this.traceContext = traceContext;
this.descriptor = descriptor;
Expand All @@ -69,11 +63,8 @@ public void before(Object target, Object[] args) {
}

if (trace.canSampled()) {
SpanEventRecorder recorder = trace.traceBlockBegin();
recorder.recordServiceType(KafkaConstants.KAFKA_CLIENT);
setPinpointHeaders(recorder, trace, record, true);
} else {
setPinpointHeaders(null, trace, record, false);
SpanEventRecorder spanEventRecorder = trace.traceBlockBegin();
spanEventRecorder.recordServiceType(KafkaConstants.KAFKA_CLIENT);
}
}

Expand All @@ -89,15 +80,6 @@ private ProducerRecord getProducerRecord(Object args[]) {
return null;
}

private void setPinpointHeaders(SpanEventRecorder recorder, Trace trace, ProducerRecord record, boolean sample) {
HeaderSetter headerSetter = headerSetterReference.get();
if (headerSetter == null) {
headerSetter = HeaderSetterProvider.get(record);
headerSetterReference.compareAndSet(null, headerSetter);
}
headerSetter.setPinpointHeaders(recorder, trace, record, sample, traceContext.getApplicationName(), traceContext.getServerTypeCode());
}

@Override
public void after(Object target, Object[] args, Object result, Throwable throwable) {
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -150,72 +132,5 @@ private String getRemoteAddress(Object remoteAddressFieldAccessor) {
}
}

private static class HeaderSetterProvider {

static HeaderSetter get(Object object) {
try {
final Class<?> aClass = object.getClass();
final Method method = aClass.getMethod("headers");
if (method != null) {
return new DefaultHeaderSetter();
}
} catch (NoSuchMethodException e) {
// ignore
}
return new DisabledHeaderSetter();
}

}

private interface HeaderSetter {

void setPinpointHeaders(SpanEventRecorder recorder, Trace trace, ProducerRecord record, boolean sample, String applicationName, short serverTypeCode);

}

private static class DisabledHeaderSetter implements HeaderSetter {

@Override
public void setPinpointHeaders(SpanEventRecorder recorder, Trace trace, ProducerRecord record, boolean sample, String applicationName, short serverTypeCode) {
}

}

private static class DefaultHeaderSetter implements HeaderSetter {

public void setPinpointHeaders(SpanEventRecorder recorder, Trace trace, ProducerRecord record, boolean sample, String applicationName, short serverTypeCode) {
org.apache.kafka.common.header.Headers kafkaHeaders = record.headers();
if (kafkaHeaders == null) {
return;
}

cleanPinpointHeader(kafkaHeaders);
if (sample) {
final TraceId nextId = trace.getTraceId().getNextTraceId();
recorder.recordNextSpanId(nextId.getSpanId());

kafkaHeaders.add(new org.apache.kafka.common.header.internals.RecordHeader(Header.HTTP_TRACE_ID.toString(), nextId.getTransactionId().getBytes(KafkaConstants.DEFAULT_PINPOINT_HEADER_CHARSET)));
kafkaHeaders.add(new org.apache.kafka.common.header.internals.RecordHeader(Header.HTTP_SPAN_ID.toString(), String.valueOf(nextId.getSpanId()).getBytes(KafkaConstants.DEFAULT_PINPOINT_HEADER_CHARSET)));
kafkaHeaders.add(new org.apache.kafka.common.header.internals.RecordHeader(Header.HTTP_PARENT_SPAN_ID.toString(), String.valueOf(nextId.getParentSpanId()).getBytes(KafkaConstants.DEFAULT_PINPOINT_HEADER_CHARSET)));
kafkaHeaders.add(new org.apache.kafka.common.header.internals.RecordHeader(Header.HTTP_FLAGS.toString(), String.valueOf(nextId.getFlags()).getBytes(KafkaConstants.DEFAULT_PINPOINT_HEADER_CHARSET)));
kafkaHeaders.add(new org.apache.kafka.common.header.internals.RecordHeader(Header.HTTP_PARENT_APPLICATION_NAME.toString(), String.valueOf(applicationName).getBytes(KafkaConstants.DEFAULT_PINPOINT_HEADER_CHARSET)));
kafkaHeaders.add(new org.apache.kafka.common.header.internals.RecordHeader(Header.HTTP_PARENT_APPLICATION_TYPE.toString(), Short.toString(serverTypeCode).getBytes(KafkaConstants.DEFAULT_PINPOINT_HEADER_CHARSET)));
} else {
kafkaHeaders.add(new org.apache.kafka.common.header.internals.RecordHeader(Header.HTTP_SAMPLED.toString(), SamplingFlagUtils.SAMPLING_RATE_FALSE.getBytes(KafkaConstants.DEFAULT_PINPOINT_HEADER_CHARSET)));
}
}

private void cleanPinpointHeader(org.apache.kafka.common.header.Headers kafkaHeaders) {
Assert.requireNonNull(kafkaHeaders, "kafkaHeaders");

for (org.apache.kafka.common.header.Header kafkaHeader : kafkaHeaders.toArray()) {
String kafkaHeaderKey = kafkaHeader.key();
if (Header.startWithPinpointHeader(kafkaHeaderKey)) {
kafkaHeaders.remove(kafkaHeaderKey);
}
}
}

}

}

0 comments on commit 9452d9c

Please sign in to comment.