Skip to content

Commit

Permalink
Merge ccf78a5 into 0313cb0
Browse files Browse the repository at this point in the history
  • Loading branch information
malafeev committed Oct 28, 2019
2 parents 0313cb0 + ccf78a5 commit 107d7ac
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 39 deletions.
Expand Up @@ -15,7 +15,6 @@


import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -49,13 +48,6 @@ public TracingKafkaConsumer(Consumer<K, V> consumer, Tracer tracer) {
this.consumerSpanNameProvider = ClientSpanNameProvider.CONSUMER_OPERATION_NAME;
}

/**
* GlobalTracer is used to get tracer
*/
public TracingKafkaConsumer(Consumer<K, V> consumer) {
this(consumer, GlobalTracer.get());
}

public TracingKafkaConsumer(Consumer<K, V> consumer, Tracer tracer,
BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider) {
this.consumer = consumer;
Expand All @@ -65,14 +57,6 @@ public TracingKafkaConsumer(Consumer<K, V> consumer, Tracer tracer,
: consumerSpanNameProvider;
}

/**
* GlobalTracer is used to get tracer
*/
public TracingKafkaConsumer(Consumer<K, V> consumer,
BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider) {
this(consumer, GlobalTracer.get(), consumerSpanNameProvider);
}

@Override
public Set<TopicPartition> assignment() {
return consumer.assignment();
Expand Down
Expand Up @@ -16,8 +16,8 @@

import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -47,13 +47,6 @@ public TracingKafkaProducer(Producer<K, V> producer, Tracer tracer) {
this.producerSpanNameProvider = ClientSpanNameProvider.PRODUCER_OPERATION_NAME;
}

/**
* GlobalTracer is used to get tracer
*/
public TracingKafkaProducer(Producer<K, V> producer) {
this(producer, GlobalTracer.get());
}

public TracingKafkaProducer(Producer<K, V> producer, Tracer tracer,
BiFunction<String, ProducerRecord, String> producerSpanNameProvider) {
this.producer = producer;
Expand All @@ -63,14 +56,6 @@ public TracingKafkaProducer(Producer<K, V> producer, Tracer tracer,
: producerSpanNameProvider;
}

/**
* GlobalTracer is used to get tracer
*/
public TracingKafkaProducer(Producer<K, V> producer,
BiFunction<String, ProducerRecord, String> producerSpanNameProvider) {
this(producer, GlobalTracer.get(), producerSpanNameProvider);
}

@Override
public void initTransactions() {
producer.initTransactions();
Expand All @@ -82,9 +67,10 @@ public void beginTransaction() throws ProducerFencedException {
}

@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String s)
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId)
throws ProducerFencedException {
producer.sendOffsetsToTransaction(map, s);
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
}

@Override
Expand All @@ -99,11 +85,20 @@ public void abortTransaction() throws ProducerFencedException {

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
return send(record, null, null);
}

public Future<RecordMetadata> send(ProducerRecord<K, V> record, SpanContext parent) {
return send(record, null, parent);
}

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
return send(record, callback, null);
}

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback,
SpanContext parent) {
/*
// Create wrappedRecord because headers can be read only in record (if record is sent second time)
ProducerRecord<K, V> wrappedRecord = new ProducerRecord<>(record.topic(),
Expand All @@ -114,7 +109,8 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callbac
record.headers());
*/

Span span = TracingKafkaUtils.buildAndInjectSpan(record, tracer, producerSpanNameProvider);
Span span = TracingKafkaUtils
.buildAndInjectSpan(record, tracer, producerSpanNameProvider, parent);
try (Scope ignored = tracer.activateSpan(span)) {
Callback wrappedCallback = new TracingCallback(callback, span, tracer);
return producer.send(record, wrappedCallback);
Expand Down
Expand Up @@ -56,11 +56,12 @@ static void inject(SpanContext spanContext, Headers headers,
}

public static <K, V> Span buildAndInjectSpan(ProducerRecord<K, V> record, Tracer tracer) {
return buildAndInjectSpan(record, tracer, ClientSpanNameProvider.PRODUCER_OPERATION_NAME);
return buildAndInjectSpan(record, tracer, ClientSpanNameProvider.PRODUCER_OPERATION_NAME, null);
}

public static <K, V> Span buildAndInjectSpan(ProducerRecord<K, V> record, Tracer tracer,
BiFunction<String, ProducerRecord, String> producerSpanNameProvider) {
BiFunction<String, ProducerRecord, String> producerSpanNameProvider,
SpanContext parent) {

String producerOper =
TO_PREFIX + record.topic(); // <======== It provides better readability in the UI
Expand All @@ -72,6 +73,8 @@ public static <K, V> Span buildAndInjectSpan(ProducerRecord<K, V> record, Tracer

if (spanContext != null) {
spanBuilder.asChildOf(spanContext);
} else if (parent != null) {
spanBuilder.asChildOf(parent);
}

Span span = spanBuilder.start();
Expand Down
Expand Up @@ -112,6 +112,43 @@ public void test() throws Exception {
assertNull(mockTracer.activeSpan());
}

@Test
public void testWithParentContext() throws Exception {
TracingKafkaProducer<Integer, String> producer = createTracingProducer();

final MockSpan parent = mockTracer.buildSpan("parent").start();

// Send 1
producer.send(new ProducerRecord<>("messages", 1, "test"), parent.context());

// Send 2
producer.send(new ProducerRecord<>("messages", 1, "test"),
(metadata, exception) -> assertEquals("messages", metadata.topic()), parent.context());

final CountDownLatch latch = new CountDownLatch(2);
createConsumer(latch, 1, false, null);

producer.close();

List<MockSpan> mockSpans = mockTracer.finishedSpans();
assertEquals(4, mockSpans.size());
checkSpans(mockSpans);
for (MockSpan span : mockSpans) {
assertEquals(parent.context().traceId(), span.context().traceId());
}

final List<MockSpan> sendSpans = getByOperationNameAll(mockSpans,
TracingKafkaUtils.TO_PREFIX + "messages");
assertEquals(2, sendSpans.size());
for (MockSpan sendSpan : sendSpans) {
assertEquals(parent.context().spanId(), sendSpan.parentId());
}

parent.finish();

assertNull(mockTracer.activeSpan());
}

@Test
public void testNotTracedProducer() throws Exception {
Producer<Integer, String> producer = createProducer();
Expand Down Expand Up @@ -266,7 +303,7 @@ public void testSeekInConsumerAndCloseInProducer() throws InterruptedException {
}


private Producer<Integer, String> createTracingProducer() {
private TracingKafkaProducer<Integer, String> createTracingProducer() {
return new TracingKafkaProducer<>(createProducer(), mockTracer);
}

Expand Down Expand Up @@ -363,4 +400,14 @@ private MockSpan getByOperationName(List<MockSpan> spans, String operationName)
return found.isEmpty() ? null : found.get(0);
}

private List<MockSpan> getByOperationNameAll(List<MockSpan> spans, String operationName) {
List<MockSpan> found = new ArrayList<>();
for (MockSpan span : spans) {
if (operationName.equals(span.operationName())) {
found.add(span);
}
}
return found;
}

}

0 comments on commit 107d7ac

Please sign in to comment.