Skip to content

Commit

Permalink
Refactors such that Kafka bundles spans and shares more code
Browse files Browse the repository at this point in the history
This rewrites KafkaSpanCollector to bundle multiple spans into the same
Kafka message. Basically, as many messages that come in one second will
send in the same message. This change resulted in several times higher
throughput in Yelp's kafka+cassandra Zipkin architecture.

Flushing can be controlled via KafkaSpanCollector.Config.flushInterval

Incidentally, this shares a lot of code with HttpSpanCollector, which
should reduce the amount of bugs and maintenance around queue-based
collection.
  • Loading branch information
Adrian Cole committed Mar 4, 2016
1 parent 003cd69 commit 74cc5af
Show file tree
Hide file tree
Showing 11 changed files with 396 additions and 383 deletions.
@@ -0,0 +1,123 @@
package com.github.kristofa.brave;

import com.github.kristofa.brave.internal.Nullable;
import com.twitter.zipkin.gen.Span;
import com.twitter.zipkin.gen.SpanCodec;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;

import static java.util.concurrent.TimeUnit.SECONDS;

/**
* Implemented {@link #sendSpans} to transport a encoded list of spans to Zipkin.
*/
public abstract class AbstractSpanCollector implements SpanCollector, Flushable, Closeable {

private final SpanCodec codec;
private final SpanCollectorMetricsHandler metrics;
private final BlockingQueue<Span> pending = new LinkedBlockingQueue<Span>(1000);
@Nullable // for testing
private final Flusher flusher;

/**
* @param flushInterval in seconds. 0 implies spans are {@link #flush() flushed externally.
*/
public AbstractSpanCollector(SpanCodec codec, SpanCollectorMetricsHandler metrics,
int flushInterval) {
this.codec = codec;
this.metrics = metrics;
this.flusher = flushInterval > 0 ? new Flusher(this, flushInterval) : null;
}

/**
* Queues the span for collection, or drops it if the queue is full.
*
* @param span Span, should not be <code>null</code>.
*/
@Override
public void collect(Span span) {
metrics.incrementAcceptedSpans(1);
if (!pending.offer(span)) {
metrics.incrementDroppedSpans(1);
}
}

/**
* Calling this will flush any pending spans to the transport on the current thread.
*/
@Override
public void flush() {
if (pending.isEmpty()) return;
List<Span> drained = new ArrayList<Span>(pending.size());
pending.drainTo(drained);
if (drained.isEmpty()) return;

// encode the spans for transport
int spanCount = drained.size();
byte[] encoded;
try {
encoded = codec.writeSpans(drained);
} catch (RuntimeException e) {
metrics.incrementDroppedSpans(spanCount);
return;
}

// transport the spans
try {
sendSpans(encoded);
} catch (IOException e) {
metrics.incrementDroppedSpans(spanCount);
return;
}
}

/** Calls flush on a fixed interval */
static final class Flusher implements Runnable {
final Flushable flushable;
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

Flusher(Flushable flushable, int flushInterval) {
this.flushable = flushable;
this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS);
}

@Override
public void run() {
try {
flushable.flush();
} catch (IOException ignored) {
}
}
}

/**
* Sends a encoded list of spans over the current transport.
*
* @throws IOException when thrown, drop metrics will increment accordingly
*/
protected abstract void sendSpans(byte[] encoded) throws IOException;

@Override
public void addDefaultAnnotation(String key, String value) {
throw new UnsupportedOperationException();
}

/**
* Requests a cease of delivery. There will be at most one in-flight send after this call.
*/
@Override
public void close() {
if (flusher != null) flusher.scheduler.shutdown();
// throw any outstanding spans on the floor
int dropped = pending.drainTo(new LinkedList<Span>());
metrics.incrementDroppedSpans(dropped);
}
}
8 changes: 7 additions & 1 deletion brave-spancollector-http/README.md
@@ -1,4 +1,10 @@
# brave-spancollector-http # # brave-spancollector-http #


SpanCollector that is used to submit spans to Zipkins Http endpoint `/spans`. SpanCollector that encodes spans into a json list, POSTed to `/api/v1/spans`.


## Configuration ##

By default...

* Spans are flushed to a POST request every second. Configure with `HttpSpanCollector.Config.flushInterval`.
* The POST body is not compressed. Configure with `HttpSpanCollector.Config.compressionEnabled`.
10 changes: 0 additions & 10 deletions brave-spancollector-http/pom.xml
Expand Up @@ -36,16 +36,6 @@
<artifactId>auto-value</artifactId> <artifactId>auto-value</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</dependency>
<!-- org.apache.thrift.ProcessFunction v0.9 uses SLF4J at runtime -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>io.zipkin.java</groupId> <groupId>io.zipkin.java</groupId>
<artifactId>zipkin-junit</artifactId> <artifactId>zipkin-junit</artifactId>
Expand Down
@@ -1,34 +1,21 @@
package com.github.kristofa.brave.http; package com.github.kristofa.brave.http;


import com.github.kristofa.brave.AbstractSpanCollector;
import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler; import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler;
import com.github.kristofa.brave.SpanCollector;
import com.github.kristofa.brave.SpanCollectorMetricsHandler; import com.github.kristofa.brave.SpanCollectorMetricsHandler;
import com.github.kristofa.brave.internal.Nullable;
import com.google.auto.value.AutoValue; import com.google.auto.value.AutoValue;
import com.twitter.zipkin.gen.Span;
import com.twitter.zipkin.gen.SpanCodec; import com.twitter.zipkin.gen.SpanCodec;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;


import static java.util.concurrent.TimeUnit.SECONDS;

/** /**
* SpanCollector which submits spans to Zipkin, using its {@code POST /spans} endpoint. * SpanCollector which submits spans to Zipkin, using its {@code POST /spans} endpoint.
*/ */
public final class HttpSpanCollector implements SpanCollector, Flushable, Closeable { public final class HttpSpanCollector extends AbstractSpanCollector {


@AutoValue @AutoValue
public static abstract class Config { public static abstract class Config {
Expand Down Expand Up @@ -72,104 +59,38 @@ public interface Builder {


private final String url; private final String url;
private final Config config; private final Config config;
private final SpanCollectorMetricsHandler metrics;
private final BlockingQueue<Span> pending = new LinkedBlockingQueue<>(1000);
@Nullable // for testing
private final Flusher flusher;


/** /**
* Create a new instance with default configuration. * Create a new instance with default configuration.
* *
* @param baseUrl URL of the zipkin query server instance. Like: http://localhost:9411/ * @param baseUrl URL of the zipkin query server instance. Like: http://localhost:9411/
* @param metrics Gets notified when spans are accepted or dropped. If you are not interested in * @param metrics Gets notified when spans are accepted or dropped. If you are not interested in
* these events you can use {@linkplain EmptySpanCollectorMetricsHandler} * these events you can use {@linkplain EmptySpanCollectorMetricsHandler}
*/ */
public static HttpSpanCollector create(String baseUrl, SpanCollectorMetricsHandler metrics) { public static HttpSpanCollector create(String baseUrl, SpanCollectorMetricsHandler metrics) {
return new HttpSpanCollector(baseUrl, Config.builder().build(), metrics); return new HttpSpanCollector(baseUrl, Config.builder().build(), metrics);
} }


/** /**
* @param baseUrl URL of the zipkin query server instance. Like: http://localhost:9411/ * @param baseUrl URL of the zipkin query server instance. Like: http://localhost:9411/
* @param config controls flush interval and timeouts * @param config includes flush interval and timeouts
* @param metrics Gets notified when spans are accepted or dropped. If you are not interested in * @param metrics Gets notified when spans are accepted or dropped. If you are not interested in
* these events you can use {@linkplain EmptySpanCollectorMetricsHandler} * these events you can use {@linkplain EmptySpanCollectorMetricsHandler}
*/ */
public static HttpSpanCollector create(String baseUrl, Config config, public static HttpSpanCollector create(String baseUrl, Config config,
SpanCollectorMetricsHandler metrics) { SpanCollectorMetricsHandler metrics) {
return new HttpSpanCollector(baseUrl, config, metrics); return new HttpSpanCollector(baseUrl, config, metrics);
} }


// Visible for testing. Ex when tests need to explicitly control flushing, set interval to 0. // Visible for testing. Ex when tests need to explicitly control flushing, set interval to 0.
HttpSpanCollector(String baseUrl, Config config, HttpSpanCollector(String baseUrl, Config config, SpanCollectorMetricsHandler metrics) {
SpanCollectorMetricsHandler metrics) { super(SpanCodec.JSON, metrics, config.flushInterval());
this.url = baseUrl + (baseUrl.endsWith("/") ? "" : "/") + "api/v1/spans"; this.url = baseUrl + (baseUrl.endsWith("/") ? "" : "/") + "api/v1/spans";
this.metrics = metrics;
this.config = config; this.config = config;
this.flusher = config.flushInterval() > 0 ? new Flusher(this, config.flushInterval()) : null;
}

/**
* Queues the span for collection, or drops it if the queue is full.
*
* @param span Span, should not be <code>null</code>.
*/
@Override
public void collect(Span span) {
metrics.incrementAcceptedSpans(1);
if (!pending.offer(span)) {
metrics.incrementDroppedSpans(1);
}
} }


/**
* Calling this will flush any pending spans to the http transport on the current thread.
*/
@Override @Override
public void flush() { protected void sendSpans(byte[] json) throws IOException {
if (pending.isEmpty()) return;
List<Span> drained = new ArrayList<>(pending.size());
pending.drainTo(drained);
if (drained.isEmpty()) return;

// json-encode the spans for transport
int spanCount = drained.size();
byte[] json;
try {
json = SpanCodec.JSON.writeSpans(drained);
} catch (RuntimeException e) {
metrics.incrementDroppedSpans(spanCount);
return;
}

// Send the json to the zipkin endpoint
try {
postSpans(json);
} catch (IOException e) {
metrics.incrementDroppedSpans(spanCount);
return;
}
}

/** Calls flush on a fixed interval */
static final class Flusher implements Runnable {
final Flushable flushable;
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

Flusher(Flushable flushable, int flushInterval) {
this.flushable = flushable;
this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS);
}

@Override
public void run() {
try {
flushable.flush();
} catch (IOException ignored) {
}
}
}

void postSpans(byte[] json) throws IOException {
// intentionally not closing the connection, so as to use keep-alives // intentionally not closing the connection, so as to use keep-alives
HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection(); HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
connection.setConnectTimeout(config.connectTimeout()); connection.setConnectTimeout(config.connectTimeout());
Expand Down Expand Up @@ -199,21 +120,4 @@ void postSpans(byte[] json) throws IOException {
throw e; throw e;
} }
} }

@Override
public void addDefaultAnnotation(String key, String value) {
throw new UnsupportedOperationException();
}

/**
* Requests a cease of delivery. There will be at most one in-flight request processing after this
* call returns.
*/
@Override
public void close() {
if (flusher != null) flusher.scheduler.shutdown();
// throw any outstanding spans on the floor
int dropped = pending.drainTo(new LinkedList<>());
metrics.incrementDroppedSpans(dropped);
}
} }
Expand Up @@ -5,6 +5,7 @@
import com.twitter.zipkin.gen.Span; import com.twitter.zipkin.gen.Span;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import zipkin.junit.HttpFailure; import zipkin.junit.HttpFailure;
Expand All @@ -23,6 +24,11 @@ public class HttpSpanCollectorTest {
HttpSpanCollector.Config config = HttpSpanCollector.Config.builder().flushInterval(0).build(); HttpSpanCollector.Config config = HttpSpanCollector.Config.builder().flushInterval(0).build();
HttpSpanCollector collector = new HttpSpanCollector(zipkin.httpUrl(), config, metrics); HttpSpanCollector collector = new HttpSpanCollector(zipkin.httpUrl(), config, metrics);


@After
public void closeCollector(){
collector.close();
}

@Test @Test
public void collectDoesntDoIO() throws Exception { public void collectDoesntDoIO() throws Exception {
collector.collect(span(1L, "foo")); collector.collect(span(1L, "foo"));
Expand Down
13 changes: 11 additions & 2 deletions brave-spancollector-kafka/README.md
@@ -1,8 +1,17 @@
# brave-spancollector-kafka # # brave-spancollector-kafka #


SpanCollector that is used to submit spans to Kafka. SpanCollector that encodes spans into a thrift list, sent to the Kafka topic `zipkin`.


Spans are sent to a topic named `zipkin` and contain no key or partition only a value which is a TBinaryProtocol encoded Span. Kafka messages contain no key or partition, only a value which is a TBinaryProtocol encoded list of spans.

*Important*
If using zipkin-collector-service (or zipkin-receiver-kafka), you must run v1.35+

## Configuration ##

By default...

* Spans are flushed to a Kafka message every second. Configure with `KafkaSpanCollector.Config.flushInterval`.


## Monitoring ## ## Monitoring ##


Expand Down
11 changes: 5 additions & 6 deletions brave-spancollector-kafka/pom.xml
Expand Up @@ -40,6 +40,11 @@
<artifactId>brave-core</artifactId> <artifactId>brave-core</artifactId>
<version>3.4.1-SNAPSHOT</version> <version>3.4.1-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId> <artifactId>kafka-clients</artifactId>
Expand All @@ -52,11 +57,5 @@
<version>1.7</version> <version>1.7</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- org.apache.thrift.ProcessFunction v0.9 uses SLF4J at runtime -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

0 comments on commit 74cc5af

Please sign in to comment.