From 74cc5af061325bf69203e3392596b1ee68d1e2f2 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Wed, 2 Mar 2016 19:12:05 +0800 Subject: [PATCH] Refactors such that Kafka bundles spans and shares more code This rewrites KafkaSpanCollector to bundle multiple spans into the same Kafka message. Basically, as many messages that come in one second will send in the same message. This change resulted in several times higher throughput in Yelp's kafka+cassandra Zipkin architecture. Flushing can be controlled via KafkaSpanCollector.Config.flushInterval Incidentally, this shares a lot of code with HttpSpanCollector, which should reduce the amount of bugs and maintenance around queue-based collection. --- .../kristofa/brave/AbstractSpanCollector.java | 123 +++++++++++++ brave-spancollector-http/README.md | 8 +- brave-spancollector-http/pom.xml | 10 -- .../brave/http/HttpSpanCollector.java | 112 +----------- .../brave/http/HttpSpanCollectorTest.java | 6 + brave-spancollector-kafka/README.md | 13 +- brave-spancollector-kafka/pom.xml | 11 +- .../brave/kafka/KafkaSpanCollector.java | 170 +++++++++--------- .../brave/kafka/SpanProcessingTask.java | 58 ------ .../brave/kafka/ITKafkaSpanCollector.java | 114 ------------ .../brave/kafka/KafkaSpanCollectorTest.java | 154 ++++++++++++++++ 11 files changed, 396 insertions(+), 383 deletions(-) create mode 100755 brave-core/src/main/java/com/github/kristofa/brave/AbstractSpanCollector.java delete mode 100644 brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/SpanProcessingTask.java delete mode 100644 brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/ITKafkaSpanCollector.java create mode 100644 brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/KafkaSpanCollectorTest.java diff --git a/brave-core/src/main/java/com/github/kristofa/brave/AbstractSpanCollector.java b/brave-core/src/main/java/com/github/kristofa/brave/AbstractSpanCollector.java new file mode 100755 index 0000000000..6fab4db46a --- /dev/null +++ b/brave-core/src/main/java/com/github/kristofa/brave/AbstractSpanCollector.java @@ -0,0 +1,123 @@ +package com.github.kristofa.brave; + +import com.github.kristofa.brave.internal.Nullable; +import com.twitter.zipkin.gen.Span; +import com.twitter.zipkin.gen.SpanCodec; +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * Implemented {@link #sendSpans} to transport a encoded list of spans to Zipkin. + */ +public abstract class AbstractSpanCollector implements SpanCollector, Flushable, Closeable { + + private final SpanCodec codec; + private final SpanCollectorMetricsHandler metrics; + private final BlockingQueue pending = new LinkedBlockingQueue(1000); + @Nullable // for testing + private final Flusher flusher; + + /** + * @param flushInterval in seconds. 0 implies spans are {@link #flush() flushed externally. + */ + public AbstractSpanCollector(SpanCodec codec, SpanCollectorMetricsHandler metrics, + int flushInterval) { + this.codec = codec; + this.metrics = metrics; + this.flusher = flushInterval > 0 ? new Flusher(this, flushInterval) : null; + } + + /** + * Queues the span for collection, or drops it if the queue is full. + * + * @param span Span, should not be null. + */ + @Override + public void collect(Span span) { + metrics.incrementAcceptedSpans(1); + if (!pending.offer(span)) { + metrics.incrementDroppedSpans(1); + } + } + + /** + * Calling this will flush any pending spans to the transport on the current thread. + */ + @Override + public void flush() { + if (pending.isEmpty()) return; + List drained = new ArrayList(pending.size()); + pending.drainTo(drained); + if (drained.isEmpty()) return; + + // encode the spans for transport + int spanCount = drained.size(); + byte[] encoded; + try { + encoded = codec.writeSpans(drained); + } catch (RuntimeException e) { + metrics.incrementDroppedSpans(spanCount); + return; + } + + // transport the spans + try { + sendSpans(encoded); + } catch (IOException e) { + metrics.incrementDroppedSpans(spanCount); + return; + } + } + + /** Calls flush on a fixed interval */ + static final class Flusher implements Runnable { + final Flushable flushable; + final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + Flusher(Flushable flushable, int flushInterval) { + this.flushable = flushable; + this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS); + } + + @Override + public void run() { + try { + flushable.flush(); + } catch (IOException ignored) { + } + } + } + + /** + * Sends a encoded list of spans over the current transport. + * + * @throws IOException when thrown, drop metrics will increment accordingly + */ + protected abstract void sendSpans(byte[] encoded) throws IOException; + + @Override + public void addDefaultAnnotation(String key, String value) { + throw new UnsupportedOperationException(); + } + + /** + * Requests a cease of delivery. There will be at most one in-flight send after this call. + */ + @Override + public void close() { + if (flusher != null) flusher.scheduler.shutdown(); + // throw any outstanding spans on the floor + int dropped = pending.drainTo(new LinkedList()); + metrics.incrementDroppedSpans(dropped); + } +} diff --git a/brave-spancollector-http/README.md b/brave-spancollector-http/README.md index 190cea69f1..d18aad6a4d 100644 --- a/brave-spancollector-http/README.md +++ b/brave-spancollector-http/README.md @@ -1,4 +1,10 @@ # brave-spancollector-http # -SpanCollector that is used to submit spans to Zipkins Http endpoint `/spans`. +SpanCollector that encodes spans into a json list, POSTed to `/api/v1/spans`. +## Configuration ## + +By default... + +* Spans are flushed to a POST request every second. Configure with `HttpSpanCollector.Config.flushInterval`. +* The POST body is not compressed. Configure with `HttpSpanCollector.Config.compressionEnabled`. diff --git a/brave-spancollector-http/pom.xml b/brave-spancollector-http/pom.xml index 3e8cc3cf61..8a3a64fb42 100644 --- a/brave-spancollector-http/pom.xml +++ b/brave-spancollector-http/pom.xml @@ -36,16 +36,6 @@ auto-value provided - - org.apache.thrift - libthrift - - - - org.apache.logging.log4j - log4j-slf4j-impl - test - io.zipkin.java zipkin-junit diff --git a/brave-spancollector-http/src/main/java/com/github/kristofa/brave/http/HttpSpanCollector.java b/brave-spancollector-http/src/main/java/com/github/kristofa/brave/http/HttpSpanCollector.java index dc87eeb42a..e901b21d19 100755 --- a/brave-spancollector-http/src/main/java/com/github/kristofa/brave/http/HttpSpanCollector.java +++ b/brave-spancollector-http/src/main/java/com/github/kristofa/brave/http/HttpSpanCollector.java @@ -1,34 +1,21 @@ package com.github.kristofa.brave.http; +import com.github.kristofa.brave.AbstractSpanCollector; import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler; -import com.github.kristofa.brave.SpanCollector; import com.github.kristofa.brave.SpanCollectorMetricsHandler; -import com.github.kristofa.brave.internal.Nullable; import com.google.auto.value.AutoValue; -import com.twitter.zipkin.gen.Span; import com.twitter.zipkin.gen.SpanCodec; import java.io.ByteArrayOutputStream; -import java.io.Closeable; -import java.io.Flushable; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; import java.util.zip.GZIPOutputStream; -import static java.util.concurrent.TimeUnit.SECONDS; - /** * SpanCollector which submits spans to Zipkin, using its {@code POST /spans} endpoint. */ -public final class HttpSpanCollector implements SpanCollector, Flushable, Closeable { +public final class HttpSpanCollector extends AbstractSpanCollector { @AutoValue public static abstract class Config { @@ -72,17 +59,13 @@ public interface Builder { private final String url; private final Config config; - private final SpanCollectorMetricsHandler metrics; - private final BlockingQueue pending = new LinkedBlockingQueue<>(1000); - @Nullable // for testing - private final Flusher flusher; /** * Create a new instance with default configuration. * * @param baseUrl URL of the zipkin query server instance. Like: http://localhost:9411/ * @param metrics Gets notified when spans are accepted or dropped. If you are not interested in - * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} + * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} */ public static HttpSpanCollector create(String baseUrl, SpanCollectorMetricsHandler metrics) { return new HttpSpanCollector(baseUrl, Config.builder().build(), metrics); @@ -90,9 +73,9 @@ public static HttpSpanCollector create(String baseUrl, SpanCollectorMetricsHandl /** * @param baseUrl URL of the zipkin query server instance. Like: http://localhost:9411/ - * @param config controls flush interval and timeouts + * @param config includes flush interval and timeouts * @param metrics Gets notified when spans are accepted or dropped. If you are not interested in - * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} + * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} */ public static HttpSpanCollector create(String baseUrl, Config config, SpanCollectorMetricsHandler metrics) { @@ -100,76 +83,14 @@ public static HttpSpanCollector create(String baseUrl, Config config, } // Visible for testing. Ex when tests need to explicitly control flushing, set interval to 0. - HttpSpanCollector(String baseUrl, Config config, - SpanCollectorMetricsHandler metrics) { + HttpSpanCollector(String baseUrl, Config config, SpanCollectorMetricsHandler metrics) { + super(SpanCodec.JSON, metrics, config.flushInterval()); this.url = baseUrl + (baseUrl.endsWith("/") ? "" : "/") + "api/v1/spans"; - this.metrics = metrics; this.config = config; - this.flusher = config.flushInterval() > 0 ? new Flusher(this, config.flushInterval()) : null; - } - - /** - * Queues the span for collection, or drops it if the queue is full. - * - * @param span Span, should not be null. - */ - @Override - public void collect(Span span) { - metrics.incrementAcceptedSpans(1); - if (!pending.offer(span)) { - metrics.incrementDroppedSpans(1); - } } - /** - * Calling this will flush any pending spans to the http transport on the current thread. - */ @Override - public void flush() { - if (pending.isEmpty()) return; - List drained = new ArrayList<>(pending.size()); - pending.drainTo(drained); - if (drained.isEmpty()) return; - - // json-encode the spans for transport - int spanCount = drained.size(); - byte[] json; - try { - json = SpanCodec.JSON.writeSpans(drained); - } catch (RuntimeException e) { - metrics.incrementDroppedSpans(spanCount); - return; - } - - // Send the json to the zipkin endpoint - try { - postSpans(json); - } catch (IOException e) { - metrics.incrementDroppedSpans(spanCount); - return; - } - } - - /** Calls flush on a fixed interval */ - static final class Flusher implements Runnable { - final Flushable flushable; - final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - - Flusher(Flushable flushable, int flushInterval) { - this.flushable = flushable; - this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS); - } - - @Override - public void run() { - try { - flushable.flush(); - } catch (IOException ignored) { - } - } - } - - void postSpans(byte[] json) throws IOException { + protected void sendSpans(byte[] json) throws IOException { // intentionally not closing the connection, so as to use keep-alives HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection(); connection.setConnectTimeout(config.connectTimeout()); @@ -199,21 +120,4 @@ void postSpans(byte[] json) throws IOException { throw e; } } - - @Override - public void addDefaultAnnotation(String key, String value) { - throw new UnsupportedOperationException(); - } - - /** - * Requests a cease of delivery. There will be at most one in-flight request processing after this - * call returns. - */ - @Override - public void close() { - if (flusher != null) flusher.scheduler.shutdown(); - // throw any outstanding spans on the floor - int dropped = pending.drainTo(new LinkedList<>()); - metrics.incrementDroppedSpans(dropped); - } } diff --git a/brave-spancollector-http/src/test/java/com/github/kristofa/brave/http/HttpSpanCollectorTest.java b/brave-spancollector-http/src/test/java/com/github/kristofa/brave/http/HttpSpanCollectorTest.java index b6176c6f2e..ab5823f2a4 100644 --- a/brave-spancollector-http/src/test/java/com/github/kristofa/brave/http/HttpSpanCollectorTest.java +++ b/brave-spancollector-http/src/test/java/com/github/kristofa/brave/http/HttpSpanCollectorTest.java @@ -5,6 +5,7 @@ import com.twitter.zipkin.gen.Span; import java.util.Arrays; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; import org.junit.Rule; import org.junit.Test; import zipkin.junit.HttpFailure; @@ -23,6 +24,11 @@ public class HttpSpanCollectorTest { HttpSpanCollector.Config config = HttpSpanCollector.Config.builder().flushInterval(0).build(); HttpSpanCollector collector = new HttpSpanCollector(zipkin.httpUrl(), config, metrics); + @After + public void closeCollector(){ + collector.close(); + } + @Test public void collectDoesntDoIO() throws Exception { collector.collect(span(1L, "foo")); diff --git a/brave-spancollector-kafka/README.md b/brave-spancollector-kafka/README.md index 39e31ea6ec..53193499de 100644 --- a/brave-spancollector-kafka/README.md +++ b/brave-spancollector-kafka/README.md @@ -1,8 +1,17 @@ # brave-spancollector-kafka # -SpanCollector that is used to submit spans to Kafka. +SpanCollector that encodes spans into a thrift list, sent to the Kafka topic `zipkin`. -Spans are sent to a topic named `zipkin` and contain no key or partition only a value which is a TBinaryProtocol encoded Span. +Kafka messages contain no key or partition, only a value which is a TBinaryProtocol encoded list of spans. + +*Important* +If using zipkin-collector-service (or zipkin-receiver-kafka), you must run v1.35+ + +## Configuration ## + +By default... + +* Spans are flushed to a Kafka message every second. Configure with `KafkaSpanCollector.Config.flushInterval`. ## Monitoring ## diff --git a/brave-spancollector-kafka/pom.xml b/brave-spancollector-kafka/pom.xml index c97d293436..98bfdec7fa 100644 --- a/brave-spancollector-kafka/pom.xml +++ b/brave-spancollector-kafka/pom.xml @@ -40,6 +40,11 @@ brave-core 3.4.1-SNAPSHOT + + com.google.auto.value + auto-value + provided + org.apache.kafka kafka-clients @@ -52,11 +57,5 @@ 1.7 test - - - org.apache.logging.log4j - log4j-slf4j-impl - test - diff --git a/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/KafkaSpanCollector.java b/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/KafkaSpanCollector.java index 776d751e51..5e0d48c34a 100644 --- a/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/KafkaSpanCollector.java +++ b/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/KafkaSpanCollector.java @@ -1,107 +1,101 @@ package com.github.kristofa.brave.kafka; -import com.github.kristofa.brave.SpanCollector; - -import java.io.Closeable; -import java.util.Properties; -import java.util.concurrent.*; -import java.util.logging.Level; -import java.util.logging.Logger; - +import com.github.kristofa.brave.AbstractSpanCollector; +import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler; import com.github.kristofa.brave.SpanCollectorMetricsHandler; -import com.twitter.zipkin.gen.Span; +import com.google.auto.value.AutoValue; +import com.twitter.zipkin.gen.SpanCodec; +import java.io.IOException; +import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler; +import org.apache.kafka.clients.producer.ProducerRecord; +import zipkin.internal.ThriftCodec; /** - * SpanCollector which submits spans to Kafka using Kafka Producer api. - *

- * Spans are sent to kafka as keyed messages: the key is the topic zipkin and the value is a TBinaryProtocol encoded Span. - *

+ * SpanCollector which sends a thrift-encoded list of spans to the Kafka topic "zipkin". + * + *

Imporant If using zipkin-collector-service (or zipkin-receiver-kafka), you must run v1.35+ */ -public class KafkaSpanCollector implements SpanCollector, Closeable { +public final class KafkaSpanCollector extends AbstractSpanCollector { - private static final Logger LOGGER = Logger.getLogger(KafkaSpanCollector.class.getName()); - private static final Properties DEFAULT_PROPERTIES = new Properties(); - - static { - DEFAULT_PROPERTIES.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - DEFAULT_PROPERTIES.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + @AutoValue + public static abstract class Config { + public static Builder builder() { + return new AutoValue_KafkaSpanCollector_Config.Builder() + .flushInterval(1); } - private static Properties defaultPropertiesWith(String bootstrapServers) { - Properties props = new Properties(); - for (String name : DEFAULT_PROPERTIES.stringPropertyNames()) { - props.setProperty(name, DEFAULT_PROPERTIES.getProperty(name)); - } - props.setProperty("bootstrap.servers", bootstrapServers); - return props; + public static Builder builder(String bootstrapServers) { + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServers); + props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + return builder().kafkaProperties(props); } - private final Producer producer; - private final ExecutorService executorService; - private final SpanProcessingTask spanProcessingTask; - private final Future future; - private final BlockingQueue queue; - private final SpanCollectorMetricsHandler metricsHandler; + abstract Properties kafkaProperties(); - /** - * Create a new instance with default configuration. - * - * @param bootstrapServers A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. - * Like: host1:port1,host2:port2,... Does not to be all the servers part of Kafka cluster. - * @param metricsHandler Gets notified when spans are accepted or dropped. If you are not interested in these events you - * can use {@linkplain EmptySpanCollectorMetricsHandler} - */ - public KafkaSpanCollector(String bootstrapServers, SpanCollectorMetricsHandler metricsHandler) { - this(KafkaSpanCollector.defaultPropertiesWith(bootstrapServers), metricsHandler); - } + abstract int flushInterval(); - /** - * KafkaSpanCollector. - * - * @param kafkaProperties Configuration for Kafka producer. Essential configuration properties are: - * bootstrap.servers, key.serializer, value.serializer. For a - * full list of config options, see http://kafka.apache.org/documentation.html#producerconfigs. - * @param metricsHandler Gets notified when spans are accepted or dropped. If you are not interested in these events you - * can use {@linkplain EmptySpanCollectorMetricsHandler} - */ - public KafkaSpanCollector(Properties kafkaProperties, SpanCollectorMetricsHandler metricsHandler) { - producer = new KafkaProducer<>(kafkaProperties); - this.metricsHandler = metricsHandler; - executorService = Executors.newSingleThreadExecutor(); - queue = new ArrayBlockingQueue(1000); - spanProcessingTask = new SpanProcessingTask(queue, producer, metricsHandler); - future = executorService.submit(spanProcessingTask); - } + @AutoValue.Builder + public interface Builder { + /** + * Configuration for Kafka producer. Essential configuration properties are: + * bootstrap.servers, key.serializer, value.serializer. For a full list of config options, see + * http://kafka.apache.org/documentation.html#kafkaPropertiess. + * + *

Must include the following mappings: + */ + Builder kafkaProperties(Properties kafkaProperties); - @Override - public void collect(com.twitter.zipkin.gen.Span span) { - metricsHandler.incrementAcceptedSpans(1); - if (!queue.offer(span)) { - metricsHandler.incrementDroppedSpans(1); - LOGGER.log(Level.WARNING, "Queue rejected span!"); - } - } + /** Default 1 second. 0 implies spans are {@link #flush() flushed} externally. */ + Builder flushInterval(int flushInterval); - @Override - public void addDefaultAnnotation(String key, String value) { - throw new UnsupportedOperationException(); + Config build(); } + } - @Override - public void close() { - spanProcessingTask.stop(); - try { - Integer nrProcessedSpans = future.get(6000, TimeUnit.MILLISECONDS); - LOGGER.info("SpanProcessingTask processed " + nrProcessedSpans + " spans."); - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Exception when waiting for SpanProcessTask to finish.", e); - } - executorService.shutdown(); - producer.close(); - metricsHandler.incrementDroppedSpans(queue.size()); - LOGGER.info("KafkaSpanCollector closed."); - } -} \ No newline at end of file + private final Config config; + private final Producer producer; + private final ThriftCodec thriftCodec = new ThriftCodec(); + + /** + * Create a new instance with default configuration. + * + * @param bootstrapServers A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. + * Like: host1:port1,host2:port2,... Does not to be all the servers part of Kafka cluster. + * @param metrics Gets notified when spans are accepted or dropped. If you are not interested in + * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} + */ + public static KafkaSpanCollector create(String bootstrapServers, SpanCollectorMetricsHandler metrics) { + return new KafkaSpanCollector(Config.builder(bootstrapServers).build(), metrics); + } + + /** + * @param config includes flush interval and kafka properties + * @param metrics Gets notified when spans are accepted or dropped. If you are not interested in + * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} + */ + public static KafkaSpanCollector create(Config config, SpanCollectorMetricsHandler metrics) { + return new KafkaSpanCollector(config, metrics); + } + + // Visible for testing. Ex when tests need to explicitly control flushing, set interval to 0. + KafkaSpanCollector(Config config, SpanCollectorMetricsHandler metrics) { + super(SpanCodec.THRIFT, metrics, config.flushInterval()); + this.config = config; + this.producer = new KafkaProducer<>(config.kafkaProperties()); + } + + @Override + protected void sendSpans(byte[] thrift) throws IOException { + producer.send(new ProducerRecord("zipkin", thrift)); + } + + @Override + public void close() { + producer.close(); + super.close(); + } +} diff --git a/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/SpanProcessingTask.java b/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/SpanProcessingTask.java deleted file mode 100644 index 4dd8a9eae4..0000000000 --- a/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/SpanProcessingTask.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.github.kristofa.brave.kafka; - -import com.github.kristofa.brave.SpanCollectorMetricsHandler; -import com.twitter.zipkin.gen.SpanCodec; -import com.twitter.zipkin.gen.Span; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Processes spans by sending them, one at a time, to the topic `zipkin`, encoded in {@code TBinaryProtocol}. - *

- *

Note: this class was written to be used by a single-threaded executor, hence it is not thead-safe. - */ -class SpanProcessingTask implements Callable { - - private static final Logger LOGGER = Logger.getLogger(SpanProcessingTask.class.getName()); - private final BlockingQueue queue; - private final Producer producer; - private final SpanCollectorMetricsHandler metricsHandler; - private volatile boolean stop = false; - private int numProcessedSpans = 0; - - - SpanProcessingTask(BlockingQueue queue, Producer producer, SpanCollectorMetricsHandler metricsHandler) { - this.queue = queue; - this.producer = producer; - this.metricsHandler = metricsHandler; - } - - public void stop() { - stop = true; - } - - @Override - public Integer call() throws Exception { - do { - final Span span = queue.poll(5, TimeUnit.SECONDS); - if (span == null) { - continue; - } - try { - final ProducerRecord message = new ProducerRecord<>("zipkin", SpanCodec.THRIFT.writeSpan(span)); - producer.send(message); - numProcessedSpans++; - } catch (RuntimeException e) { - metricsHandler.incrementDroppedSpans(1); - LOGGER.log(Level.WARNING, "RuntimeException when writing span.", e); - } - } while (!stop); - return numProcessedSpans; - } -} diff --git a/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/ITKafkaSpanCollector.java b/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/ITKafkaSpanCollector.java deleted file mode 100644 index f56f4a46d7..0000000000 --- a/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/ITKafkaSpanCollector.java +++ /dev/null @@ -1,114 +0,0 @@ -package com.github.kristofa.brave.kafka; - -import com.github.charithe.kafka.KafkaJunitRule; -import com.github.kristofa.brave.SpanCollectorMetricsHandler; -import com.twitter.zipkin.gen.SpanCodec; -import com.twitter.zipkin.gen.Span; -import kafka.serializer.DefaultDecoder; -import org.junit.Rule; -import org.junit.Test; -import java.util.*; -import java.util.concurrent.*; - -import static org.junit.Assert.assertEquals; - -public class ITKafkaSpanCollector { - - private final EventsHandler metricsHandler = new EventsHandler(); - - private static class EventsHandler implements SpanCollectorMetricsHandler { - - public int acceptedSpans = 0; - public int droppedSpans = 0; - - @Override - public synchronized void incrementAcceptedSpans(int quantity) { - acceptedSpans += quantity; - } - - @Override - public synchronized void incrementDroppedSpans(int quantity) { - droppedSpans += quantity; - } - } - - - @Rule - public KafkaJunitRule kafkaRule = new KafkaJunitRule(); - - @Test - public void submitSingleSpan() throws TimeoutException { - - KafkaSpanCollector kafkaCollector = new KafkaSpanCollector("localhost:"+kafkaRule.kafkaBrokerPort(), metricsHandler); - Span span = span(1l, "test_kafka_span"); - kafkaCollector.collect(span); - kafkaCollector.close(); - - List spans = getCollectedSpans(kafkaRule.readMessages("zipkin", 1, new DefaultDecoder(kafkaRule.consumerConfig().props()))); - assertEquals(1, spans.size()); - assertEquals(span, spans.get(0)); - assertEquals(1, metricsHandler.acceptedSpans); - assertEquals(0, metricsHandler.droppedSpans); - - } - - @Test - public void submitMultipleSpansInParallel() throws InterruptedException, ExecutionException, TimeoutException { - KafkaSpanCollector kafkaCollector = new KafkaSpanCollector("localhost:"+kafkaRule.kafkaBrokerPort(), metricsHandler); - Callable spanProducer1 = new Callable() { - - @Override - public Void call() throws Exception { - for(int i=1; i<=200; i++) - { - kafkaCollector.collect(span(i, "producer1_" + i)); - } - return null; - } - }; - - Callable spanProducer2 = new Callable() { - - @Override - public Void call() throws Exception { - for(int i=1; i<=200; i++) - { - kafkaCollector.collect(span(i, "producer2_"+i)); - } - return null; - } - }; - - ExecutorService executorService = Executors.newFixedThreadPool(2); - Future future1 = executorService.submit(spanProducer1); - Future future2 = executorService.submit(spanProducer2); - - future1.get(2000, TimeUnit.MILLISECONDS); - future2.get(2000, TimeUnit.MILLISECONDS); - - List spans = getCollectedSpans(kafkaRule.readMessages("zipkin", 400, new DefaultDecoder(kafkaRule.consumerConfig().props()))); - assertEquals(400, spans.size()); - assertEquals(400, metricsHandler.acceptedSpans); - assertEquals(0, metricsHandler.droppedSpans); - kafkaCollector.close(); - } - - private List getCollectedSpans(List rawSpans) { - - List spans = new ArrayList<>(); - - for (byte[] rawSpan : rawSpans) { - Span span = SpanCodec.THRIFT.readSpan(rawSpan); - spans.add(span); - } - return spans; - } - - private Span span(long traceId, String spanName) { - final Span span = new Span(); - span.setId(traceId); - span.setTrace_id(traceId); - span.setName(spanName); - return span; - } -} diff --git a/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/KafkaSpanCollectorTest.java b/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/KafkaSpanCollectorTest.java new file mode 100644 index 0000000000..16b89dc90b --- /dev/null +++ b/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/KafkaSpanCollectorTest.java @@ -0,0 +1,154 @@ +package com.github.kristofa.brave.kafka; + +import com.github.charithe.kafka.KafkaJunitRule; +import com.github.kristofa.brave.SpanCollectorMetricsHandler; +import com.github.kristofa.brave.kafka.KafkaSpanCollector.Config; +import com.twitter.zipkin.gen.Span; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import kafka.serializer.DefaultDecoder; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import zipkin.Codec; + +import static org.assertj.core.api.Assertions.assertThat; + +public class KafkaSpanCollectorTest { + + @Rule + public KafkaJunitRule kafka = new KafkaJunitRule(); + @Rule + public ExpectedException thrown = ExpectedException.none(); + + TestMetricsHander metrics = new TestMetricsHander(); + // set flush interval to 0 so that tests can drive flushing explicitly + Config config = Config.builder("localhost:" + kafka.kafkaBrokerPort()).flushInterval(0).build(); + KafkaSpanCollector collector = new KafkaSpanCollector(config, metrics); + + @After + public void closeCollector(){ + collector.close(); + } + + @Test + public void collectDoesntDoIO() throws Exception { + thrown.expect(TimeoutException.class); + collector.collect(span(1L, "foo")); + + assertThat(readMessages()).isEmpty(); + } + + @Test + public void collectIncrementsAcceptedMetrics() throws Exception { + collector.collect(span(1L, "foo")); + + assertThat(metrics.acceptedSpans.get()).isEqualTo(1); + assertThat(metrics.droppedSpans.get()).isZero(); + } + + @Test + public void dropsWhenQueueIsFull() throws Exception { + for (int i = 0; i < 1001; i++) + collector.collect(span(1L, "foo")); + + collector.flush(); // manually flush the spans + + assertThat(Codec.THRIFT.readSpans(readMessages().get(0))).hasSize(1000); + assertThat(metrics.droppedSpans.get()).isEqualTo(1); + } + + @Test + public void sendsSpans() throws Exception { + collector.collect(span(1L, "foo")); + collector.collect(span(2L, "bar")); + + collector.flush(); // manually flush the spans + + // Ensure only one message was sent + List messages = readMessages(); + assertThat(messages).hasSize(1); + + // Now, let's read back the spans we sent! + assertThat(Codec.THRIFT.readSpans(messages.get(0))).containsExactly( + zipkinSpan(1L, "foo"), + zipkinSpan(2L, "bar") + ); + } + + @Test + public void submitMultipleSpansInParallel() throws Exception { + Callable spanProducer1 = new Callable() { + + @Override + public Void call() throws Exception { + for (int i = 1; i <= 200; i++) { + collector.collect(span(i, "producer1_" + i)); + } + return null; + } + }; + + Callable spanProducer2 = new Callable() { + + @Override + public Void call() throws Exception { + for (int i = 1; i <= 200; i++) { + collector.collect(span(i, "producer2_" + i)); + } + return null; + } + }; + + ExecutorService executorService = Executors.newFixedThreadPool(2); + Future future1 = executorService.submit(spanProducer1); + Future future2 = executorService.submit(spanProducer2); + + future1.get(2000, TimeUnit.MILLISECONDS); + future2.get(2000, TimeUnit.MILLISECONDS); + + collector.flush(); // manually flush the spans + + // Ensure only one message was sent + List messages = readMessages(); + assertThat(messages).hasSize(1); + + // Now, let's make sure we read the correct count of spans. + assertThat(Codec.THRIFT.readSpans(messages.get(0))).hasSize(400); + } + + class TestMetricsHander implements SpanCollectorMetricsHandler { + + final AtomicInteger acceptedSpans = new AtomicInteger(); + final AtomicInteger droppedSpans = new AtomicInteger(); + + @Override + public void incrementAcceptedSpans(int quantity) { + acceptedSpans.addAndGet(quantity); + } + + @Override + public void incrementDroppedSpans(int quantity) { + droppedSpans.addAndGet(quantity); + } + } + + static Span span(long traceId, String spanName) { + return new Span().setTrace_id(traceId).setId(traceId).setName(spanName); + } + + static zipkin.Span zipkinSpan(long traceId, String spanName) { + return new zipkin.Span.Builder().traceId(traceId).id(traceId).name(spanName).build(); + } + + private List readMessages() throws TimeoutException { + return kafka.readMessages("zipkin", 1, new DefaultDecoder(kafka.consumerConfig().props())); + } +}