Skip to content

Commit

Permalink
Ensures current span is always closed prior to finishing (#393)
Browse files Browse the repository at this point in the history
This changes logic in http handlers such that a span is always closed
prior to being finished. This has two advantages: one is that we can
test that instrumentation do not leak span contexts. Another is that
log context synchronization will not add trace IDs of a closed span.
  • Loading branch information
adriancole committed May 7, 2017
1 parent 7703057 commit 9060ecb
Show file tree
Hide file tree
Showing 18 changed files with 297 additions and 290 deletions.
61 changes: 38 additions & 23 deletions brave/src/main/java/brave/http/HttpClientHandler.java
@@ -1,46 +1,61 @@
package brave.http;

import brave.Span;
import zipkin.Constants;
import brave.internal.Nullable;
import brave.propagation.TraceContext;

/**
* This standardizes a way to instrument http clients, particularly in a way that encourages use of
* portable customizations via {@link HttpClientParser}.
*
* @param <Req> the native http request type of the client.
* @param <Resp> the native http response type of the client.
*/
public final class HttpClientHandler<Req, Resp> {
public static <Req, Resp> HttpClientHandler create(HttpAdapter<Req, Resp> adapter,
HttpClientParser parser) {
return new HttpClientHandler<>(adapter, parser);
}

public class HttpClientHandler<Req, Resp> {
final HttpAdapter<Req, Resp> adapter;
final HttpClientParser parser;

public HttpClientHandler(HttpAdapter<Req, Resp> adapter, HttpClientParser parser) {
HttpClientHandler(HttpAdapter<Req, Resp> adapter, HttpClientParser parser) {
this.adapter = adapter;
this.parser = parser;
}

public Req handleSend(Req request, Span span) {
if (span.isNoop()) return request;
/**
* Starts the client span after assigning it a name and tags.
*
* <p>This is typically called after a span is {@link TraceContext.Injector#inject(TraceContext,
* Object) injected} onto the request, and before request is processed by the actual library.
*/
public void handleSend(Req request, Span span) {
if (span.isNoop()) return;

// all of the parsing here occur before a timestamp is recorded on the span
span.kind(Span.Kind.CLIENT).name(parser.spanName(adapter, request));
parser.requestTags(adapter, request, span);
span.start();
return request;
}

public Resp handleReceive(Resp response, Span span) {
if (span.isNoop()) return response;

try {
parser.responseTags(adapter, response, span);
} finally {
span.finish();
}
return response;
}

public <T extends Throwable> T handleError(T throwable, Span span) {
if (span.isNoop()) return throwable;
/**
* Finishes the client span after assigning it tags according to the response or error.
*
* <p>This is typically called once the response headers are received, and after the span is
* {@link brave.Tracer.SpanInScope#close() no longer in scope}.
*/
public void handleReceive(@Nullable Resp response, @Nullable Throwable error, Span span) {
if (span.isNoop()) return;

try {
String message = throwable.getMessage();
if (message == null) message = throwable.getClass().getSimpleName();
span.tag(Constants.ERROR, message);
return throwable;
if (error != null) {
String message = error.getMessage();
if (message == null) message = error.getClass().getSimpleName();
span.tag(zipkin.Constants.ERROR, message);
}
if (response != null) parser.responseTags(adapter, response, span);
} finally {
span.finish();
}
Expand Down
61 changes: 38 additions & 23 deletions brave/src/main/java/brave/http/HttpServerHandler.java
@@ -1,46 +1,61 @@
package brave.http;

import brave.Span;
import zipkin.Constants;
import brave.internal.Nullable;
import brave.propagation.TraceContext;

/**
* This standardizes a way to instrument http servers, particularly in a way that encourages use of
* portable customizations via {@link HttpServerParser}.
*
* @param <Req> the native http request type of the server.
* @param <Resp> the native http response type of the server.
*/
public final class HttpServerHandler<Req, Resp> {
public static <Req, Resp> HttpServerHandler create(HttpAdapter<Req, Resp> adapter,
HttpServerParser parser) {
return new HttpServerHandler<>(adapter, parser);
}

public class HttpServerHandler<Req, Resp> {
final HttpAdapter<Req, Resp> adapter;
final HttpServerParser parser;

public HttpServerHandler(HttpAdapter<Req, Resp> adapter, HttpServerParser parser) {
HttpServerHandler(HttpAdapter<Req, Resp> adapter, HttpServerParser parser) {
this.adapter = adapter;
this.parser = parser;
}

public Req handleReceive(Req request, Span span) {
if (span.isNoop()) return request;
/**
* Starts the server span after assigning it a name and tags.
*
* <p>This is typically called after a span is {@link brave.Tracer#nextSpan(TraceContext.Extractor,
* Object) extracted} from the request and before the request is processed by the actual library.
*/
public void handleReceive(Req request, Span span) {
if (span.isNoop()) return;

// all of the parsing here occur before a timestamp is recorded on the span
span.kind(Span.Kind.SERVER).name(parser.spanName(adapter, request));
parser.requestTags(adapter, request, span);
span.start();
return request;
}

public Resp handleSend(Resp response, Span span) {
if (span.isNoop()) return response;

try {
parser.responseTags(adapter, response, span);
} finally {
span.finish();
}
return response;
}

public <T extends Throwable> T handleError(T throwable, Span span) {
if (span.isNoop()) return throwable;
/**
* Finishes the server span after assigning it tags according to the response or error.
*
* <p>This is typically called once the response headers are sent, and after the span is
* {@link brave.Tracer.SpanInScope#close() no longer in scope}.
*/
public void handleSend(@Nullable Resp response, @Nullable Throwable error, Span span) {
if (span.isNoop()) return;

try {
String message = throwable.getMessage();
if (message == null) message = throwable.getClass().getSimpleName();
span.tag(Constants.ERROR, message);
return throwable;
if (error != null) {
String message = error.getMessage();
if (message == null) message = error.getClass().getSimpleName();
span.tag(zipkin.Constants.ERROR, message);
}
if (response != null) parser.responseTags(adapter, response, span);
} finally {
span.finish();
}
Expand Down
Expand Up @@ -47,8 +47,8 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
span.kind(Span.Kind.CLIENT).name(method.getFullMethodName()).start();
injector.inject(span.context(), headers);
span.kind(Span.Kind.CLIENT).name(method.getFullMethodName()).start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override public void onClose(Status status, Metadata trailers) {
Expand Down
61 changes: 61 additions & 0 deletions instrumentation/http-tests/src/main/java/brave/http/ITHttp.java
@@ -0,0 +1,61 @@
package brave.http;

import brave.Tracing;
import brave.internal.StrictCurrentTraceContext;
import brave.propagation.CurrentTraceContext;
import brave.propagation.TraceContext;
import brave.sampler.Sampler;
import java.util.Collections;
import java.util.List;
import okhttp3.mockwebserver.MockWebServer;
import org.junit.Rule;
import org.junit.rules.ExpectedException;
import zipkin.Endpoint;
import zipkin.Span;
import zipkin.internal.Util;
import zipkin.storage.InMemoryStorage;

import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;

public abstract class ITHttp {
@Rule public ExpectedException thrown = ExpectedException.none();
@Rule public MockWebServer server = new MockWebServer();

Endpoint local = Endpoint.builder().serviceName("local").ipv4(127 << 24 | 1).port(100).build();
InMemoryStorage storage = new InMemoryStorage();

protected CurrentTraceContext currentTraceContext = new StrictCurrentTraceContext();
protected HttpTracing httpTracing;

Tracing.Builder tracingBuilder(Sampler sampler) {
return Tracing.newBuilder()
.reporter(s -> {
// make sure the context was cleared prior to finish.. no leaks!
TraceContext current = httpTracing.tracing().currentTraceContext().get();
if (current != null) {
assertThat(current.spanId())
.isNotEqualTo(s.id);
}
storage.spanConsumer().accept(asList(s));
})
.currentTraceContext(currentTraceContext)
.localEndpoint(local)
.sampler(sampler);
}

List<Span> collectedSpans() {
List<List<Span>> result = storage.spanStore().getRawTraces();
if (result.isEmpty()) return Collections.emptyList();
assertThat(result).hasSize(1);
return result.get(0);
}

void assertReportedTagsInclude(String key, String... values) {
assertThat(collectedSpans())
.flatExtracting(s -> s.binaryAnnotations)
.filteredOn(b -> b.key.equals(key))
.extracting(b -> new String(b.value, Util.UTF_8))
.containsExactly(values);
}
}
Expand Up @@ -2,43 +2,26 @@

import brave.Tracer;
import brave.Tracer.SpanInScope;
import brave.Tracing;
import brave.internal.HexCodec;
import brave.internal.StrictCurrentTraceContext;
import brave.propagation.CurrentTraceContext;
import brave.sampler.Sampler;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import okhttp3.mockwebserver.SocketPolicy;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import zipkin.BinaryAnnotation;
import zipkin.Constants;
import zipkin.Endpoint;
import zipkin.Span;
import zipkin.TraceKeys;
import zipkin.internal.Util;
import zipkin.storage.InMemoryStorage;

import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;

public abstract class ITHttpClient<C> {
@Rule public ExpectedException thrown = ExpectedException.none();
@Rule public MockWebServer server = new MockWebServer();

Endpoint local = Endpoint.builder().serviceName("local").ipv4(127 << 24 | 1).port(100).build();
InMemoryStorage storage = new InMemoryStorage();

protected CurrentTraceContext currentTraceContext = new StrictCurrentTraceContext();
protected HttpTracing httpTracing;
public abstract class ITHttpClient<C> extends ITHttp {
protected C client;

@Before public void setup() {
Expand Down Expand Up @@ -175,19 +158,19 @@ public void reportsServerAddress() throws Exception {
String uri = "/foo?z=2&yAA=1";

close();
httpTracing = httpTracing.toBuilder()
.clientParser(new HttpClientParser() {
@Override public <Req> String spanName(HttpAdapter<Req, ?> adapter, Req req) {
return adapter.method(req).toLowerCase() + " " + adapter.path(req);
}

@Override
public <Req> void requestTags(HttpAdapter<Req, ?> adapter, Req req, brave.Span span) {
span.tag(TraceKeys.HTTP_URL, adapter.url(req)); // just the path is logged by default
}
})
.serverName("remote-service")
.build();
httpTracing = httpTracing.toBuilder()
.clientParser(new HttpClientParser() {
@Override public <Req> String spanName(HttpAdapter<Req, ?> adapter, Req req) {
return adapter.method(req).toLowerCase() + " " + adapter.path(req);
}

@Override
public <Req> void requestTags(HttpAdapter<Req, ?> adapter, Req req, brave.Span span) {
span.tag(TraceKeys.HTTP_URL, adapter.url(req)); // just the path is logged by default
}
})
.serverName("remote-service")
.build();

client = newClient(server.getPort());
server.enqueue(new MockResponse());
Expand All @@ -204,11 +187,7 @@ public <Req> void requestTags(HttpAdapter<Req, ?> adapter, Req req, brave.Span s
.extracting(b -> b.endpoint.serviceName)
.containsExactly("remote-service");

assertThat(collectedSpans)
.flatExtracting(s -> s.binaryAnnotations)
.filteredOn(b -> b.key.equals(TraceKeys.HTTP_URL))
.extracting(b -> new String(b.value, Util.UTF_8))
.containsExactly(url(uri));
assertReportedTagsInclude(TraceKeys.HTTP_URL, url(uri));
}

@Test public void addsStatusCodeWhenNotOk() throws Exception {
Expand All @@ -220,9 +199,7 @@ public <Req> void requestTags(HttpAdapter<Req, ?> adapter, Req req, brave.Span s
// some clients think 404 is an error
}

assertThat(collectedSpans())
.flatExtracting(s -> s.binaryAnnotations)
.contains(BinaryAnnotation.create(TraceKeys.HTTP_STATUS_CODE, "404", local));
assertReportedTagsInclude(TraceKeys.HTTP_STATUS_CODE, "404");
}

@Test public void redirect() throws Exception {
Expand All @@ -240,12 +217,7 @@ public <Req> void requestTags(HttpAdapter<Req, ?> adapter, Req req, brave.Span s
parent.finish();
}

assertThat(collectedSpans())
.hasSize(3) // parent and two HTTP spans
.flatExtracting(s -> s.binaryAnnotations)
.filteredOn(b -> b.key.equals(TraceKeys.HTTP_PATH))
.extracting(b -> new String(b.value, Util.UTF_8))
.containsExactly("/foo", "/bar");
assertReportedTagsInclude(TraceKeys.HTTP_PATH, "/foo", "/bar");
}

@Test public void post() throws Exception {
Expand Down Expand Up @@ -290,28 +262,10 @@ public <Req> void requestTags(HttpAdapter<Req, ?> adapter, Req req, brave.Span s
server.enqueue(new MockResponse());
get(client, path);

assertThat(collectedSpans())
.flatExtracting(s -> s.binaryAnnotations)
.filteredOn(b -> b.key.equals(TraceKeys.HTTP_PATH))
.extracting(b -> new String(b.value, Util.UTF_8))
.containsExactly("/foo");
assertReportedTagsInclude(TraceKeys.HTTP_PATH, "/foo");
}

protected String url(String pathIncludingQuery) {
return "http://127.0.0.1:" + server.getPort() + pathIncludingQuery;
}

Tracing.Builder tracingBuilder(Sampler sampler) {
return Tracing.newBuilder()
.reporter(s -> storage.spanConsumer().accept(asList(s)))
.currentTraceContext(currentTraceContext)
.localEndpoint(local)
.sampler(sampler);
}

List<Span> collectedSpans() {
List<List<Span>> result = storage.spanStore().getRawTraces();
assertThat(result).hasSize(1);
return result.get(0);
}
}

0 comments on commit 9060ecb

Please sign in to comment.