Skip to content

Commit

Permalink
Resolves state conflict between client and local spans
Browse files Browse the repository at this point in the history
Before, local spans used the same thread local as client ones. This
meant a client span in the same trace would overwite a local span.

Ex. [server -> local -> client] would truncate to [server -> client]

This change adds state to ensure local spans aren't overwritten by
client spans.

This change does not introduce a state stack, eventhough local spans
should be nestable. The reason is that Brave's model isn't currently
designed for that and a nested model would significantly impact the api.
  • Loading branch information
Adrian Cole committed Dec 9, 2015
1 parent 8e8830c commit e2d6f02
Show file tree
Hide file tree
Showing 14 changed files with 371 additions and 280 deletions.
17 changes: 11 additions & 6 deletions brave-core/src/main/java/com/github/kristofa/brave/Brave.java
Expand Up @@ -27,14 +27,14 @@ public class Brave {
* Builds Brave api objects with following defaults if not overridden: * Builds Brave api objects with following defaults if not overridden:
* <p> * <p>
* <ul> * <ul>
* <li>ThreadLocalServerAndClientSpanState which binds trace/span state to current thread.</li> * <li>ThreadLocalServerClientAndLocalSpanState which binds trace/span state to current thread.</li>
* <li>FixedSampleRateTraceFilter which traces every request.</li> * <li>FixedSampleRateTraceFilter which traces every request.</li>
* <li>LoggingSpanCollector</li> * <li>LoggingSpanCollector</li>
* </ul> * </ul>
*/ */
public static class Builder { public static class Builder {


private final ServerAndClientSpanState state; private final ServerClientAndLocalSpanState state;
private final List<TraceFilter> traceFilters = new ArrayList<>(); private final List<TraceFilter> traceFilters = new ArrayList<>();
private SpanCollector spanCollector = new LoggingSpanCollector(); private SpanCollector spanCollector = new LoggingSpanCollector();
private Random random = new Random(); private Random random = new Random();
Expand Down Expand Up @@ -64,7 +64,7 @@ public Builder() {
public Builder(String serviceName) { public Builder(String serviceName) {
try { try {
int ip = toInt(getLocalHostLANAddress()); int ip = toInt(getLocalHostLANAddress());
state = new ThreadLocalServerAndClientSpanState(ip, 0, serviceName); state = new ThreadLocalServerClientAndLocalSpanState(ip, 0, serviceName);
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
throw new IllegalStateException("Unable to get Inet address", e); throw new IllegalStateException("Unable to get Inet address", e);
} }
Expand All @@ -79,14 +79,14 @@ public Builder(String serviceName) {
* @param serviceName Name of service. Is only relevant when we do server side tracing. * @param serviceName Name of service. Is only relevant when we do server side tracing.
*/ */
public Builder(int ip, int port, String serviceName) { public Builder(int ip, int port, String serviceName) {
state = new ThreadLocalServerAndClientSpanState(ip, port, serviceName); state = new ThreadLocalServerClientAndLocalSpanState(ip, port, serviceName);
traceFilters.add(new FixedSampleRateTraceFilter(1)); traceFilters.add(new FixedSampleRateTraceFilter(1));
} }


/** /**
* Use for control of how tracing state propagates across threads. * Use for control of how tracing state propagates across threads.
*/ */
public Builder(ServerAndClientSpanState state) { public Builder(ServerClientAndLocalSpanState state) {
this.state = Util.checkNotNull(state, "state must be specified."); this.state = Util.checkNotNull(state, "state must be specified.");
traceFilters.add(new FixedSampleRateTraceFilter(1)); traceFilters.add(new FixedSampleRateTraceFilter(1));
} }
Expand Down Expand Up @@ -209,7 +209,12 @@ private Brave(Builder builder) {
.state(builder.state) .state(builder.state)
.traceFilters(builder.traceFilters).build(); .traceFilters(builder.traceFilters).build();


localTracer = LocalTracer.create(clientTracer, builder.spanCollector, builder.state); localTracer = LocalTracer.builder()
.randomGenerator(builder.random)
.spanCollector(builder.spanCollector)
.spanAndEndpoint(SpanAndEndpoint.LocalSpanAndEndpoint.create(builder.state))
.traceFilters(builder.traceFilters).build();

serverRequestInterceptor = new ServerRequestInterceptor(serverTracer); serverRequestInterceptor = new ServerRequestInterceptor(serverTracer);
serverResponseInterceptor = new ServerResponseInterceptor(serverTracer); serverResponseInterceptor = new ServerResponseInterceptor(serverTracer);
clientRequestInterceptor = new ClientRequestInterceptor(clientTracer); clientRequestInterceptor = new ClientRequestInterceptor(clientTracer);
Expand Down
Expand Up @@ -5,8 +5,15 @@
import com.twitter.zipkin.gen.Span; import com.twitter.zipkin.gen.Span;


/** /**
* Maintains client span state. * Maintains state for a single client span.
* *
* <p/>Client spans can be at the following locations in the span tree.
* <ul>
* <li>The root-span of a trace originated by Brave</li>
* <li>A child of a server span originated by Brave</li>
* <li>A child of a local span originated by Brave</li>
* </ul>
*
* @author kristof * @author kristof
*/ */
public interface ClientSpanState extends CommonSpanState { public interface ClientSpanState extends CommonSpanState {
Expand Down
Expand Up @@ -42,7 +42,7 @@ public static Builder builder() {
@AutoValue.Builder @AutoValue.Builder
public abstract static class Builder { public abstract static class Builder {


public Builder state(ServerAndClientSpanState state) { public Builder state(ServerClientAndLocalSpanState state) {
return spanAndEndpoint(ClientSpanAndEndpoint.create(state)); return spanAndEndpoint(ClientSpanAndEndpoint.create(state));
} }


Expand Down Expand Up @@ -148,13 +148,19 @@ public void setCurrentClientServiceName(String serviceName) {


private SpanId getNewSpanId() { private SpanId getNewSpanId() {


Span currentServerSpan = spanAndEndpoint().state().getCurrentServerSpan().getSpan(); Span parentSpan = spanAndEndpoint().state().getCurrentLocalSpan();
if (parentSpan == null) {
ServerSpan serverSpan = spanAndEndpoint().state().getCurrentServerSpan();
if (serverSpan != null) {
parentSpan = serverSpan.getSpan();
}
}
long newSpanId = randomGenerator().nextLong(); long newSpanId = randomGenerator().nextLong();
if (currentServerSpan == null) { if (parentSpan == null) {
return SpanId.create(newSpanId, newSpanId, null); return SpanId.create(newSpanId, newSpanId, null);
} }


return SpanId.create(currentServerSpan.getTrace_id(), newSpanId, currentServerSpan.getId()); return SpanId.create(parentSpan.getTrace_id(), newSpanId, parentSpan.getId());
} }


ClientTracer() { ClientTracer() {
Expand Down
@@ -0,0 +1,35 @@
package com.github.kristofa.brave;

import com.github.kristofa.brave.internal.Nullable;
import com.twitter.zipkin.gen.Span;

/**
* Maintains state for a single local span. This means nesting is not supported.
*
* <p/>Local spans can be at the following locations in the span tree.
* <ul>
* <li>The root-span of a trace originated by Brave</li>
* <li>A child of a server span originated by Brave</li>
* </ul>
*/
public interface LocalSpanState extends CommonSpanState {

/**
* Gets the Span for the local request that was started as part of current request.
* <p/>
* Should be thread-aware to support multiple parallel requests.
*
* @return Local request span for current thread.
*/
@Nullable
Span getCurrentLocalSpan();

/**
* Sets current local span.
* <p/>
* Should be thread-aware to support multiple parallel requests.
*
* @param span Local span.
*/
void setCurrentLocalSpan(Span span);
}
136 changes: 86 additions & 50 deletions brave-core/src/main/java/com/github/kristofa/brave/LocalTracer.java
@@ -1,9 +1,17 @@
package com.github.kristofa.brave; package com.github.kristofa.brave;


import com.github.kristofa.brave.SpanAndEndpoint.LocalSpanAndEndpoint;
import com.github.kristofa.brave.internal.Util;
import com.google.auto.value.AutoValue; import com.google.auto.value.AutoValue;
import com.twitter.zipkin.gen.AnnotationType;
import com.twitter.zipkin.gen.BinaryAnnotation;
import com.twitter.zipkin.gen.Span; import com.twitter.zipkin.gen.Span;
import com.twitter.zipkin.gen.zipkinCoreConstants; import com.twitter.zipkin.gen.zipkinCoreConstants;


import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;

import static com.twitter.zipkin.gen.zipkinCoreConstants.LOCAL_COMPONENT; import static com.twitter.zipkin.gen.zipkinCoreConstants.LOCAL_COMPONENT;


/** /**
Expand All @@ -18,7 +26,7 @@
* *
* <p/>Here's an example of allocating precise duration for a local span: * <p/>Here's an example of allocating precise duration for a local span:
* <pre> * <pre>
* tracer.startSpan("codec", "encode"); * tracer.startNewSpan("codec", "encode");
* try { * try {
* return codec.encode(input); * return codec.encode(input);
* } finally { * } finally {
Expand All @@ -29,14 +37,38 @@
* @see zipkinCoreConstants#LOCAL_COMPONENT * @see zipkinCoreConstants#LOCAL_COMPONENT
*/ */
@AutoValue @AutoValue
public abstract class LocalTracer { public abstract class LocalTracer extends AnnotationSubmitter {

static Builder builder() {
return new AutoValue_LocalTracer.Builder();
}

// visible for testing
static Builder builder(LocalTracer source) {
return new AutoValue_LocalTracer.Builder(source);
}

@Override
abstract LocalSpanAndEndpoint spanAndEndpoint();

abstract Random randomGenerator();


abstract ClientTracer clientTracer();
abstract SpanCollector spanCollector(); abstract SpanCollector spanCollector();
abstract ServerAndClientSpanState state();


static LocalTracer create(ClientTracer clientTracer, SpanCollector spanCollector, ServerAndClientSpanState state) { abstract List<TraceFilter> traceFilters();
return new AutoValue_LocalTracer(clientTracer, spanCollector, state);
@AutoValue.Builder
abstract static class Builder {

abstract Builder spanAndEndpoint(LocalSpanAndEndpoint spanAndEndpoint);

abstract Builder randomGenerator(Random randomGenerator);

abstract Builder spanCollector(SpanCollector spanCollector);

abstract Builder traceFilters(List<TraceFilter> traceFilters);

abstract LocalTracer build();
} }


/** /**
Expand All @@ -47,16 +79,23 @@ static LocalTracer create(ClientTracer clientTracer, SpanCollector spanCollector
* @return metadata about the new span or null if one wasn't started due to sampling policy. * @return metadata about the new span or null if one wasn't started due to sampling policy.
* @see zipkinCoreConstants#LOCAL_COMPONENT * @see zipkinCoreConstants#LOCAL_COMPONENT
*/ */
public SpanId startSpan(String component, String operation) { public SpanId startNewSpan(String component, String operation) {
SpanId spanId = clientTracer().startNewSpan(operation); SpanId spanId = startNewSpan(component, operation, currentTimeMicroseconds());
if (spanId == null) return null; if (spanId == null) return null;
clientTracer().submitBinaryAnnotation(LOCAL_COMPONENT, component); spanAndEndpoint().span().startTick = System.nanoTime(); // embezzle start tick into an internal field.
state().getCurrentClientSpan()
.setTimestamp(clientTracer().currentTimeMicroseconds())
.startTick = System.nanoTime(); // embezzle start tick into an internal field.
return spanId; return spanId;
} }


private SpanId getNewSpanId() {
Span currentServerSpan = spanAndEndpoint().state().getCurrentServerSpan().getSpan();
long newSpanId = randomGenerator().nextLong();
if (currentServerSpan == null) {
return SpanId.create(newSpanId, newSpanId, null);
}

return SpanId.create(currentServerSpan.getTrace_id(), newSpanId, currentServerSpan.getId());
}

/** /**
* Request a new local span, which started at the given timestamp. * Request a new local span, which started at the given timestamp.
* *
Expand All @@ -66,42 +105,40 @@ public SpanId startSpan(String component, String operation) {
* @return metadata about the new span or null if one wasn't started due to sampling policy. * @return metadata about the new span or null if one wasn't started due to sampling policy.
* @see zipkinCoreConstants#LOCAL_COMPONENT * @see zipkinCoreConstants#LOCAL_COMPONENT
*/ */
public SpanId startSpan(String component, String operation, long timestamp) { public SpanId startNewSpan(String component, String operation, long timestamp) {
SpanId spanId = clientTracer().startNewSpan(operation);
if (spanId == null) return null;
clientTracer().submitBinaryAnnotation(LOCAL_COMPONENT, component);
state().getCurrentClientSpan().setTimestamp(timestamp);
return spanId;
}


/** Boolean sample = spanAndEndpoint().state().sample();
* Associates an event that explains latency with the current system time. if (Boolean.FALSE.equals(sample)) {
* spanAndEndpoint().state().setCurrentLocalSpan(null);
* @param value A short tag indicating the event, like "ApplicationReady" return null;
*/ }
public void submitAnnotation(String value) {
clientTracer().submitAnnotation(value);
}


/** SpanId newSpanId = getNewSpanId();
* Associates an event that explains latency with a timestamp. if (sample == null) {
* // No sample indication is present.
* @param value A short tag indicating the event, like "ApplicationReady" for (TraceFilter traceFilter : traceFilters()) {
* @param timestamp microseconds from epoch if (!traceFilter.trace(newSpanId.getSpanId(), operation)) {
*/ spanAndEndpoint().state().setCurrentLocalSpan(null);
public void submitAnnotation(String value, long timestamp) { return null;
clientTracer().submitAnnotation(value, timestamp); }
} }
}


/** Span newSpan = new Span();
* Binary annotations are tags applied to a Span to give it context. For newSpan.setId(newSpanId.getSpanId());
* example, a key "your_app.version" would let you lookup spans by version. newSpan.setTrace_id(newSpanId.getTraceId());
* if (newSpanId.getParentSpanId() != null) {
* @param key Name used to lookup spans, such as "your_app.version" newSpan.setParent_id(newSpanId.getParentSpanId());
* @param value String value, should not be <code>null</code>. }
*/ newSpan.setName(operation);
public void submitBinaryAnnotation(String key, String value) { newSpan.setTimestamp(timestamp);
clientTracer().submitBinaryAnnotation(key, value); newSpan.addToBinary_annotations(new BinaryAnnotation()
.setKey(LOCAL_COMPONENT)
.setValue(ByteBuffer.wrap(component.getBytes(Util.UTF_8)))
.setAnnotation_type(AnnotationType.STRING)
.setHost(spanAndEndpoint().endpoint()));
spanAndEndpoint().state().setCurrentLocalSpan(newSpan);
return newSpanId;
} }


/** /**
Expand All @@ -110,15 +147,15 @@ public void submitBinaryAnnotation(String key, String value) {
public void finishSpan() { public void finishSpan() {
long endTick = System.nanoTime(); long endTick = System.nanoTime();


Span span = state().getCurrentClientSpan(); Span span = spanAndEndpoint().span();
if (span == null) return; if (span == null) return;


Long startTick = span.startTick; Long startTick = span.startTick;
final long duration; final long duration;
if (startTick != null) { if (startTick != null) {
duration = (endTick - startTick) / 1000; duration = (endTick - startTick) / 1000;
} else { } else {
duration = clientTracer().currentTimeMicroseconds() - span.getTimestamp(); duration = currentTimeMicroseconds() - span.getTimestamp();
} }
finishSpan(duration); finishSpan(duration);
} }
Expand All @@ -127,16 +164,15 @@ public void finishSpan() {
* Completes the span, which took {@code duration} microseconds. * Completes the span, which took {@code duration} microseconds.
*/ */
public void finishSpan(long duration) { public void finishSpan(long duration) {
Span span = state().getCurrentClientSpan(); Span span = spanAndEndpoint().span();
if (span == null) return; if (span == null) return;


synchronized (span) { synchronized (span) {
span.setDuration(duration); span.setDuration(duration);
spanCollector().collect(span); spanCollector().collect(span);
} }


state().setCurrentClientSpan(null); spanAndEndpoint().state().setCurrentLocalSpan(null);
state().setCurrentClientServiceName(null);
} }


LocalTracer() { LocalTracer() {
Expand Down
Expand Up @@ -5,6 +5,6 @@
* *
* @author kristof * @author kristof
*/ */
public interface ServerAndClientSpanState extends ServerSpanState, ClientSpanState { public interface ServerClientAndLocalSpanState extends ServerSpanState, ClientSpanState, LocalSpanState {


} }
Expand Up @@ -5,8 +5,14 @@
import com.twitter.zipkin.gen.Endpoint; import com.twitter.zipkin.gen.Endpoint;


/** /**
* Maintains server span state. * Maintains state for a single server span.
* *
* <p/>Server spans can be at the following locations in the span tree.
* <ul>
* <li>The root-span of a trace originated by Brave</li>
* <li>A child of a span propagated to Brave</li>
* </ul>
*
* @author kristof * @author kristof
*/ */
public interface ServerSpanState extends CommonSpanState { public interface ServerSpanState extends CommonSpanState {
Expand Down

0 comments on commit e2d6f02

Please sign in to comment.