Skip to content

Commit

Permalink
Reduces overhead by using constant clock time in messaging (#765)
Browse files Browse the repository at this point in the history
  • Loading branch information
adriancole committed Aug 22, 2018
1 parent 3e138ba commit b4076e2
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ public ConsumerRecords<K, V> poll(Duration timeout) {
return poll(timeout.toMillis());
}

/** This */
/** This uses a single timestamp for all records polled, to reduce overhead. */
@Override public ConsumerRecords<K, V> poll(long timeout) {
ConsumerRecords<K, V> records = delegate.poll(timeout);
if (records.isEmpty() || tracing.isNoop()) return records;
long timestamp = 0L;
Map<String, Span> consumerSpansForTopic = new LinkedHashMap<>();
for (TopicPartition partition : records.partitions()) {
String topic = partition.topic();
Expand All @@ -68,31 +69,38 @@ public ConsumerRecords<K, V> poll(Duration timeout) {
// If we extracted neither a trace context, nor request-scoped data (extra),
// make or reuse a span for this topic
if (extracted.samplingFlags() != null && extracted.extra().isEmpty()) {
Span consumerSpanForTopic = consumerSpansForTopic.get(topic);
if (consumerSpanForTopic == null) {
consumerSpansForTopic.put(topic,
consumerSpanForTopic = tracing.tracer().nextSpan(extracted).name("poll")
.kind(Span.Kind.CONSUMER)
.tag(KafkaTags.KAFKA_TOPIC_TAG, topic)
.start());
Span span = consumerSpansForTopic.get(topic);
if (span == null) {
span = tracing.tracer().nextSpan(extracted);
if (!span.isNoop()) {
setConsumerSpan(topic, span, remoteServiceName);
// incur timestamp overhead only once
if (timestamp == 0L) {
timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
}
span.start(timestamp);
}
consumerSpansForTopic.put(topic, span);
}
// no need to remove propagation headers as we failed to extract anything
injector.inject(consumerSpanForTopic.context(), record.headers());
injector.inject(span.context(), record.headers());
} else { // we extracted request-scoped data, so cannot share a consumer span.
Span span = tracing.tracer().nextSpan(extracted);
if (!span.isNoop()) {
span.name("poll").kind(Span.Kind.CONSUMER).tag(KafkaTags.KAFKA_TOPIC_TAG, topic);
if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
span.start().finish(); // span won't be shared by other records
setConsumerSpan(topic, span, remoteServiceName);
// incur timestamp overhead only once
if (timestamp == 0L) {
timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
}
span.start(timestamp).finish(timestamp); // span won't be shared by other records
}
// remove prior propagation headers from the record
tracing.propagation().keys().forEach(key -> record.headers().remove(key));
injector.inject(span.context(), record.headers());
}
}
}
consumerSpansForTopic.values()
.forEach(span -> span.remoteServiceName(remoteServiceName).finish());
for (Span span : consumerSpansForTopic.values()) span.finish(timestamp);
return records;
}

Expand Down Expand Up @@ -270,4 +278,9 @@ public void close(Duration timeout) {
@Override public void wakeup() {
delegate.wakeup();
}

static void setConsumerSpan(String topic, Span span, String remoteServiceName) {
span.name("poll").kind(Span.Kind.CONSUMER).tag(KafkaTags.KAFKA_TOPIC_TAG, topic);
if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ static Set<Map.Entry<String, String>> lastHeaders(Headers headers) {
headers.forEach(h -> result.put(h.key(), new String(h.value(), Charsets.UTF_8)));
return result.entrySet();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import brave.propagation.TraceIdContext;
import com.github.charithe.kafka.EphemeralKafkaBroker;
import com.github.charithe.kafka.KafkaJunitRule;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -297,6 +296,8 @@ Span takeProducerSpan() throws InterruptedException {
assertThat(result)
.withFailMessage("Producer span was not reported")
.isNotNull();
// ensure the span finished
assertThat(result.durationAsLong()).isPositive();
return result;
}

Expand All @@ -306,6 +307,8 @@ Span takeConsumerSpan() throws InterruptedException {
assertThat(result)
.withFailMessage("Consumer span was not reported")
.isNotNull();
// ensure the span finished
assertThat(result.durationAsLong()).isPositive();
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ RecordMetadata createRecordMetadata() {

return new RecordMetadata(tp, -1L, -1L, timestamp, checksum, keySize, valueSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,4 @@ public void send_shouldnt_tag_binary_key() {
.flatExtracting(s -> s.tags().entrySet())
.containsOnly(entry("kafka.topic", TEST_TOPIC));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package brave.spring.rabbit;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.internal.Nullable;
import brave.propagation.Propagation.Setter;
Expand Down Expand Up @@ -29,20 +28,22 @@ final class TracingMessagePostProcessor implements MessagePostProcessor {
};

final Injector<MessageProperties> injector;
final Tracer tracer;
final Tracing tracing;
@Nullable final String remoteServiceName;

TracingMessagePostProcessor(Tracing tracing, @Nullable String remoteServiceName) {
this.injector = tracing.propagation().injector(SETTER);
this.tracer = tracing.tracer();
this.tracing = tracing;
this.remoteServiceName = remoteServiceName;
}

@Override public Message postProcessMessage(Message message) {
Span span = tracer.nextSpan().kind(Span.Kind.PRODUCER).name("publish");
Span span = tracing.tracer().nextSpan().kind(Span.Kind.PRODUCER).name("publish");
if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
injector.inject(span.context(), message.getMessageProperties());
span.start().finish();
// incur timestamp overhead only once
long timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
span.start(timestamp).finish(timestamp);
return message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import zipkin2.Endpoint;

import static brave.Span.Kind.CONSUMER;
import static brave.spring.rabbit.SpringRabbitTracing.RABBIT_EXCHANGE;
Expand Down Expand Up @@ -71,11 +70,13 @@ final class TracingRabbitListenerAdvice implements MethodInterceptor {
Span listenerSpan = tracer.newChild(consumerSpan.context()).name("on-message");

if (!consumerSpan.isNoop()) {
consumerSpan.start();
long timestamp = tracing.clock(consumerSpan.context()).currentTimeMicroseconds();
consumerSpan.start(timestamp);
if (remoteServiceName != null) consumerSpan.remoteServiceName(remoteServiceName);
tagReceivedMessageProperties(consumerSpan, message.getMessageProperties());
consumerSpan.finish();
listenerSpan.start(); // not using scoped span as we want to start late
long consumerFinish = timestamp + 1L; // save a clock reading
consumerSpan.finish(consumerFinish);
listenerSpan.start(consumerFinish); // not using scoped span as we want to start late
}

try (SpanInScope ws = tracer.withSpanInScope(listenerSpan)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,8 @@ Span takeProducerSpan() throws InterruptedException {
assertThat(result)
.withFailMessage("Producer span was not reported")
.isNotNull();
// ensure the span finished
assertThat(result.durationAsLong()).isPositive();
return result;
}

Expand All @@ -431,6 +433,8 @@ Span takeConsumerSpan() throws InterruptedException {
assertThat(result)
.withFailMessage("Consumer span was not reported")
.isNotNull();
// ensure the span finished
assertThat(result.durationAsLong()).isPositive();
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,4 @@ void onMessageConsumeFailed(Message message, Throwable throwable) throws Throwab
} catch (RuntimeException ex) {
}
}
}
}

0 comments on commit b4076e2

Please sign in to comment.