Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds CollectedSpanHandler #2807

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import zipkin2.collector.CollectorComponent;
import zipkin2.collector.CollectorMetrics;
import zipkin2.collector.CollectorSampler;
import zipkin2.collector.handler.CollectedSpanHandler;
import zipkin2.storage.StorageComponent;

/** This collector consumes encoded binary messages from a ActiveMQ queue. */
Expand All @@ -38,13 +39,18 @@ public static final class Builder extends CollectorComponent.Builder {
String queue = "zipkin";
int concurrency = 1;

@Override public Builder storage(StorageComponent storage) {
this.delegate.storage(storage);
@Override public Builder sampler(CollectorSampler sampler) {
this.delegate.sampler(sampler);
return this;
}

@Override public Builder sampler(CollectorSampler sampler) {
this.delegate.sampler(sampler);
@Override public Builder addCollectedSpanHandler(CollectedSpanHandler collectedSpanHandler) {
this.delegate.addCollectedSpanHandler(collectedSpanHandler);
return this;
}

@Override public Builder storage(StorageComponent storage) {
this.delegate.storage(storage);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,18 @@
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.transport.TransportListener;
import zipkin2.Callback;
import zipkin2.CheckResult;
import zipkin2.collector.Collector;
import zipkin2.collector.CollectorMetrics;

import static java.nio.charset.StandardCharsets.UTF_8;
import static zipkin2.Callback.NOOP_VOID;

/**
* Consumes spans from messages on a ActiveMQ queue. Malformed messages will be discarded. Errors in
* the storage component will similarly be ignored, with no retry of the message.
*/
final class ActiveMQSpanConsumer implements TransportListener, MessageListener, Closeable {
static final Callback<Void> NOOP = new Callback<Void>() {
@Override public void onSuccess(Void value) {
}

@Override public void onError(Throwable t) {
}
};

static final CheckResult
CLOSED = CheckResult.failed(new IllegalStateException("Collector intentionally closed")),
INTERRUPTION = CheckResult.failed(new IOException("Recoverable error on ActiveMQ connection"));
Expand Down Expand Up @@ -115,7 +107,7 @@ void registerInNewSession(ActiveMQConnection connection, String queue) throws JM

metrics.incrementBytes(serialized.length);
if (serialized.length == 0) return; // lenient on empty messages
collector.acceptSpans(serialized, NOOP);
collector.acceptSpans(serialized, NOOP_VOID);
}

@Override public void close() {
Expand Down
116 changes: 75 additions & 41 deletions zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import java.util.logging.Logger;
import zipkin2.Call;
import zipkin2.Callback;
import zipkin2.Span;
import zipkin2.SpanBytesDecoderDetector;
import zipkin2.codec.BytesDecoder;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.collector.handler.CollectedSpanHandler;
import zipkin2.storage.StorageComponent;

import static java.lang.String.format;
Expand All @@ -33,21 +36,13 @@

/**
* This component takes action on spans received from a transport. This includes deserializing,
* sampling and scheduling for storage.
* sampling, invoking any handlers, and scheduling for storage.
*
* <p>Callbacks passed do not propagate to the storage layer. They only return success or failures
* before storage is attempted. This ensures that calling threads are disconnected from storage
* threads.
*/
public class Collector { // not final for mock
static final Callback<Void> NOOP_CALLBACK = new Callback<Void>() {
@Override public void onSuccess(Void value) {
}

@Override public void onError(Throwable t) {
}
};

/** Needed to scope this to the correct logging category */
public static Builder newBuilder(Class<?> loggingClass) {
if (loggingClass == null) throw new NullPointerException("loggingClass == null");
Expand All @@ -56,14 +51,35 @@ public static Builder newBuilder(Class<?> loggingClass) {

public static final class Builder {
final Logger logger;
final ArrayList<CollectedSpanHandler> collectedSpanHandlers = new ArrayList<>();
StorageComponent storage;
CollectorSampler sampler;
CollectorMetrics metrics;

Builder(Logger logger) {
this.logger = logger;
}

/** @see {@link CollectorComponent.Builder#sampler(CollectorSampler)} */
public Builder sampler(CollectorSampler sampler) {
if (sampler == null) throw new NullPointerException("sampler == null");
this.collectedSpanHandlers.add(0, sampler); // sample first
return this;
}

/**
* @see {@link CollectorComponent.Builder#addCollectedSpanHandler(CollectedSpanHandler)}
* @since 2.17
*/
public Builder addCollectedSpanHandler(CollectedSpanHandler collectedSpanHandler) {
if (collectedSpanHandler == null) {
throw new NullPointerException("collectedSpanHandler == null");
}
if (collectedSpanHandler != CollectedSpanHandler.NOOP) { // lenient on config bug
this.collectedSpanHandlers.add(collectedSpanHandler);
codefromthecrypt marked this conversation as resolved.
Show resolved Hide resolved
}
return this;
}

/** @see {@link CollectorComponent.Builder#storage(StorageComponent)} */
public Builder storage(StorageComponent storage) {
if (storage == null) throw new NullPointerException("storage == null");
Expand All @@ -78,21 +94,14 @@ public Builder metrics(CollectorMetrics metrics) {
return this;
}

/** @see {@link CollectorComponent.Builder#sampler(CollectorSampler)} */
public Builder sampler(CollectorSampler sampler) {
if (sampler == null) throw new NullPointerException("sampler == null");
this.sampler = sampler;
return this;
}

public Collector build() {
return new Collector(this);
}
}

final Logger logger;
final CollectorMetrics metrics;
final CollectorSampler sampler;
final CollectedSpanHandler handler;
final StorageComponent storage;

Collector(Builder builder) {
Expand All @@ -101,7 +110,7 @@ public Collector build() {
this.metrics = builder.metrics == null ? CollectorMetrics.NOOP_METRICS : builder.metrics;
if (builder.storage == null) throw new NullPointerException("storage == null");
this.storage = builder.storage;
this.sampler = builder.sampler == null ? CollectorSampler.ALWAYS_SAMPLE : builder.sampler;
this.handler = consolidate(builder.collectedSpanHandlers);
}

public void accept(List<Span> spans, Callback<Void> callback) {
Expand All @@ -119,24 +128,25 @@ public void accept(List<Span> spans, Callback<Void> callback, Executor executor)
callback.onSuccess(null);
return;
}
metrics.incrementSpans(spans.size());

List<Span> sampledSpans = sample(spans);
if (sampledSpans.isEmpty()) {
callback.onSuccess(null);
return;
}

// In order to ensure callers are not blocked, we swap callbacks when we get to the storage
// phase of this process. Here, we create a callback whose sole purpose is classifying later
// errors on this bundle of spans in the same log category. This allows people to only turn on
// debug logging in one place.
try {
executor.execute(new StoreSpans(sampledSpans));
metrics.incrementSpans(spans.size());

List<Span> handledSpans = handle(spans);
if (handledSpans.isEmpty()) {
callback.onSuccess(null);
return;
}

// In order to ensure callers are not blocked, we swap callbacks when we get to the storage
// phase of this process. Here, we create a callback whose sole purpose is classifying later
// errors on this bundle of spans in the same log category. This allows people to only turn on
// debug logging in one place.
executor.execute(new StoreSpans(handledSpans));
callback.onSuccess(null);
} catch (Throwable unexpected) { // ensure if a future is supplied we always set value or error
Call.propagateIfFatal(unexpected);
callback.onError(unexpected);
throw unexpected;
}
}

Expand Down Expand Up @@ -204,17 +214,15 @@ String idString(Span span) {
return span.traceId() + "/" + span.id();
}

List<Span> sample(List<Span> input) {
List<Span> sampled = new ArrayList<>(input.size());
List<Span> handle(List<Span> input) {
List<Span> handled = new ArrayList<>(input.size());
for (int i = 0, length = input.size(); i < length; i++) {
Span s = input.get(i);
if (sampler.isSampled(s.traceId(), Boolean.TRUE.equals(s.debug()))) {
sampled.add(s);
}
Span s = handler.handle(input.get(i));
if (s != null) handled.add(s);
}
int dropped = input.size() - sampled.size();
int dropped = input.size() - handled.size();
if (dropped > 0) metrics.incrementSpansDropped(dropped);
return sampled;
return handled;
}

class StoreSpans implements Callback<Void>, Runnable {
Expand All @@ -239,7 +247,7 @@ class StoreSpans implements Callback<Void>, Runnable {
}

@Override public void onError(Throwable t) {
handleStorageError(spans, t, NOOP_CALLBACK);
handleStorageError(spans, t, NOOP_VOID);
}

@Override public String toString() {
Expand Down Expand Up @@ -293,4 +301,30 @@ String appendSpanIds(List<Span> spans, StringBuilder message) {

return message.append("]").toString();
}

static CollectedSpanHandler consolidate(List<CollectedSpanHandler> handlers) {
if (handlers.isEmpty()) return CollectedSpanHandler.NOOP;
if (handlers.size() == 1) return handlers.get(0);
return new MultipleHandler(handlers);
}

static final class MultipleHandler implements CollectedSpanHandler {
final CollectedSpanHandler[] handlers; // Array ensures no iterators are created at runtime

MultipleHandler(List<CollectedSpanHandler> handlers) {
this.handlers = handlers.toArray(new CollectedSpanHandler[0]);
}

@Override public Span handle(Span span) {
for (CollectedSpanHandler handler : handlers) {
span = handler.handle(span);
if (span == null) return null;
}
return span;
}

@Override public String toString() {
return Arrays.toString(handlers);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.List;
import zipkin2.Component;
import zipkin2.collector.handler.CollectedSpanHandler;
import zipkin2.storage.SpanConsumer;
import zipkin2.storage.StorageComponent;

Expand All @@ -35,7 +36,31 @@ public abstract class CollectorComponent extends Component {

public abstract static class Builder {
/**
* Once spans are sampled, they are {@link SpanConsumer#accept(List)} queued for storage} using
* {@link CollectorSampler#isSampled(String, boolean) samples spans} to reduce load on the
* storage system. Defaults to always sample.
*
* <p>Sampling happens before {@link #addCollectedSpanHandler(CollectedSpanHandler) handlers}.
*
* @deprecated since 2.17, use {@link #addCollectedSpanHandler(CollectedSpanHandler)}
*/
@Deprecated public abstract Builder sampler(CollectorSampler sampler);

/**
* Triggered on each collected span, before storage. This allows the ability to mutate or drop
* spans for reasons including remapping tags.
*
* <p>Handlers execute after {@link #sampler(CollectorSampler) sampling} and before {@link
* #storage(StorageComponent) storage}.
*
* @since 2.17
*/
// empty implementation as this method was added late
public Builder addCollectedSpanHandler(CollectedSpanHandler collectedSpanHandler) {
return this;
}

/**
* Once spans are handled, they are {@link SpanConsumer#accept(List)} queued for storage} using
* this component.
*/
public abstract Builder storage(StorageComponent storage);
Expand All @@ -46,12 +71,6 @@ public abstract static class Builder {
*/
public abstract Builder metrics(CollectorMetrics metrics);

/**
* {@link CollectorSampler#isSampled(String, boolean) samples spans} to reduce load on the
* storage system. Defaults to always sample.
*/
public abstract Builder sampler(CollectorSampler sampler);

public abstract CollectorComponent build();
}
}
Loading