Skip to content

Commit

Permalink
Kafka Connect spans
Browse files Browse the repository at this point in the history
  • Loading branch information
meiao committed Jun 21, 2023
1 parent 9ae8d39 commit f39fb49
Show file tree
Hide file tree
Showing 50 changed files with 2,453 additions and 10 deletions.
17 changes: 9 additions & 8 deletions build.gradle
Expand Up @@ -45,6 +45,15 @@ idea.module {
excludeDirs += file("automation")
}

ext {
// when upgrading ASM, check the javadoc for the following classes:
// SynchronizedAnnotationNode, SynchronizedClassNode, SynchronizedFieldNode, SynchronizedInnerClassNode, SynchronizedMethodNode,
// SynchronizedModuleExportNode, SynchronizedModuleNode, SynchronizedModuleOpenNode, SynchronizedModuleProvideNode, SynchronizedModuleRequireNode,
// SynchronizedTypeAnnotationNode
asmVersion="9.5"
mockitoVersion="3.12.4"
}

subprojects {
repositories {
mavenLocal()
Expand All @@ -69,11 +78,3 @@ subprojects {
}
}
}

ext {
// when upgrading ASM, check the javadoc for the following classes:
// SynchronizedAnnotationNode, SynchronizedClassNode, SynchronizedFieldNode, SynchronizedInnerClassNode, SynchronizedMethodNode,
// SynchronizedModuleExportNode, SynchronizedModuleNode, SynchronizedModuleOpenNode, SynchronizedModuleProvideNode, SynchronizedModuleRequireNode,
// SynchronizedTypeAnnotationNode
asmVersion=9.5
}
2 changes: 1 addition & 1 deletion gradle/script/java.gradle
Expand Up @@ -262,7 +262,7 @@ jacocoTestReport {

dependencies {
testImplementation("junit:junit:4.12")
testImplementation("org.mockito:mockito-inline:3.12.4")
testImplementation("org.mockito:mockito-core:$mockitoVersion")
testImplementation("org.hamcrest:hamcrest-library:1.3")
testImplementation(project(":test-annotations"))
}
2 changes: 1 addition & 1 deletion instrumentation-build/build.gradle.kts
Expand Up @@ -22,7 +22,7 @@ tasks.test {
dependencies {
implementation(project(":newrelic-weaver"))
testImplementation(project(":newrelic-weaver-api"))
testImplementation("org.mockito:mockito-core:3.4.6")
testImplementation("org.junit.jupiter:junit-jupiter-engine:5.6.2")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.2")
testImplementation("org.mockito:mockito-inline:3.12.4")
}
2 changes: 2 additions & 0 deletions instrumentation/kafka-connect-metrics-1.0.0/build.gradle
Expand Up @@ -3,6 +3,8 @@ dependencies {
implementation(project(":newrelic-api"))
implementation(project(":newrelic-weaver-api"))
implementation("org.apache.kafka:connect-runtime:3.3.2")

testImplementation("org.mockito:mockito-inline:$mockitoVersion")
}

jar {
Expand Down
21 changes: 21 additions & 0 deletions instrumentation/kafka-connect-spans-2.0.0/build.gradle
@@ -0,0 +1,21 @@

dependencies {
implementation(project(":agent-bridge"))
implementation("org.apache.kafka:connect-runtime:3.2.3")

testImplementation("org.mockito:mockito-inline:$mockitoVersion")
}

jar {
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.kafka-connect-spans-2.0.0',
'Implementation-Title-Alias': 'kafka-connect' }
}

verifyInstrumentation {
passesOnly 'org.apache.kafka:connect-runtime:[2.0.0,3.3.0)'
}

site {
title 'Kafka'
type 'Messaging'
}
@@ -0,0 +1,89 @@
/*
*
* * Copyright 2023 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation.kafka.connect;

import com.newrelic.api.agent.HeaderType;
import com.newrelic.api.agent.Headers;
import org.apache.kafka.common.header.Header;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;

public class HeaderWrapper implements Headers {

private final org.apache.kafka.common.header.Headers delegate;

public HeaderWrapper(org.apache.kafka.common.header.Headers headers) {
this.delegate = headers;
}

@Override
public HeaderType getHeaderType() {
return HeaderType.MESSAGE;
}

@Override
public String getHeader(String name) {
String value = null;
Iterator<Header> iterator = delegate.headers(name).iterator();
if (iterator.hasNext()) {
byte[] bytes = iterator.next().value();
if (bytes != null) {
value = new String(bytes);
}
}
return value;
}

@Override
public Collection<String> getHeaders(String name) {
Collection<String> headers = new ArrayList<>();
Iterator<Header> iterator = delegate.headers(name).iterator();
while (iterator.hasNext()) {
byte[] bytes = iterator.next().value();
if (bytes != null) {
headers.add(new String(bytes));
}
}
return headers;
}

@Override
public void setHeader(String name, String value) {
delegate.remove(name);
delegate.add(name, value.getBytes());
}

@Override
public void addHeader(String name, String value) {
delegate.add(name, value.getBytes());
}

@Override
public Collection<String> getHeaderNames() {
Collection<String> headerNames = new HashSet<>();
for(Header header : delegate) {
headerNames.add(header.key());
}
return headerNames;
}

@Override
public boolean containsHeader(String name) {
for(Header header : delegate) {
if (Objects.equals(name,header.key())) {
return true;
}
}
return false;
}
}
@@ -0,0 +1,13 @@
/*
*
* * Copyright 2023 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation.kafka.connect;

public class KafkaConnectConstants {
public static final String MESSAGE = "Message";
public static final String KAFKA_CONNECT = "Kafka/Connect";
}
@@ -0,0 +1,17 @@
/*
*
* * Copyright 2023 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.apache.kafka.connect.runtime;

import com.newrelic.api.agent.weaver.SkipIfPresent;

/**
* This class was introduced in Kafka 3 and is a base for 2 WorkerSourceTasks.
*/
@SkipIfPresent(originalName = "org.apache.kafka.connect.runtime.AbstractWorkerSourceTask")
public abstract class AbstractWorkerSourceTask_Skip {
}
@@ -0,0 +1,44 @@
/*
*
* * Copyright 2023 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.apache.kafka.connect.runtime;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.TransactionNamePriority;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.connect.util.ConnectorTaskId;

import static com.nr.instrumentation.kafka.connect.KafkaConnectConstants.MESSAGE;
import static com.nr.instrumentation.kafka.connect.KafkaConnectConstants.KAFKA_CONNECT;

@Weave(originalName = "org.apache.kafka.connect.runtime.WorkerSinkTask")
abstract class WorkerSinkTask_Instrumentation {

@Trace(dispatcher = true)
protected void poll(long timeoutMs) {
NewRelic.getAgent().getTransaction()
.setTransactionName(TransactionNamePriority.FRAMEWORK_HIGH, true, MESSAGE, KAFKA_CONNECT, id().connector());
Weaver.callOriginal();
}

private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
if (msgs.isEmpty()) {
NewRelic.getAgent().getTransaction().ignore();
}
Weaver.callOriginal();
}

@Trace
private void deliverMessages() {
Weaver.callOriginal();
}

public abstract ConnectorTaskId id();
}
@@ -0,0 +1,54 @@
/*
*
* * Copyright 2023 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.apache.kafka.connect.runtime;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.TransactionNamePriority;
import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.util.ConnectorTaskId;

import java.util.List;

import static com.nr.instrumentation.kafka.connect.KafkaConnectConstants.MESSAGE;
import static com.nr.instrumentation.kafka.connect.KafkaConnectConstants.KAFKA_CONNECT;

@Weave(originalName = "org.apache.kafka.connect.runtime.WorkerSourceTask")
abstract class WorkerSourceTask_Instrumentation {

@NewField
private Token token;

@Trace(dispatcher = true)
protected List<SourceRecord> poll() throws InterruptedException {
NewRelic.getAgent().getTransaction()
.setTransactionName(TransactionNamePriority.FRAMEWORK_HIGH, true, MESSAGE, KAFKA_CONNECT, id().connector());
List<SourceRecord> returnValue = Weaver.callOriginal();
if (returnValue == null || returnValue.isEmpty()) {
NewRelic.getAgent().getTransaction().ignore();
} else {
token = NewRelic.getAgent().getTransaction().getToken();
}
return returnValue;
}

@Trace(async = true)
private boolean sendRecords() {
if (token != null) {
token.linkAndExpire();
token = null;
}
return Weaver.callOriginal();
}

public abstract ConnectorTaskId id();
}
@@ -0,0 +1,25 @@
/*
*
* * Copyright 2023 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.apache.kafka.connect.sink;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;

import java.util.Collection;

import static com.nr.instrumentation.kafka.connect.KafkaConnectConstants.KAFKA_CONNECT;

@Weave(originalName = "org.apache.kafka.connect.sink.SinkTask", type = MatchType.BaseClass)
public abstract class SinkTask_Instrumentation<K, V> {

@Trace
public abstract void put(Collection<SinkRecord> records);
}
@@ -0,0 +1,25 @@
/*
*
* * Copyright 2023 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.apache.kafka.connect.source;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;

import java.util.List;

import static com.nr.instrumentation.kafka.connect.KafkaConnectConstants.KAFKA_CONNECT;

@Weave(originalName = "org.apache.kafka.connect.source.SourceTask", type = MatchType.BaseClass)
public abstract class SourceTask_Instrumentation<K, V> {

@Trace
public abstract List<SourceRecord> poll() throws InterruptedException;
}
@@ -0,0 +1,30 @@
/*
*
* * Copyright 2023 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.apache.kafka.connect.storage;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;

import java.util.logging.Level;

import static com.nr.instrumentation.kafka.connect.KafkaConnectConstants.KAFKA_CONNECT;

@Weave(originalName = "org.apache.kafka.connect.storage.Converter", type = MatchType.Interface)
public abstract class Converter_Instrumentation {

@Trace(excludeFromTransactionTrace = true)
public abstract byte[] fromConnectData(String topic, Schema schema, Object value);

@Trace(excludeFromTransactionTrace = true)
public abstract SchemaAndValue toConnectData(String topic, byte[] value);
}

0 comments on commit f39fb49

Please sign in to comment.