Skip to content

Commit

Permalink
Merge pull request #91 from openzipkin/receivedSpan
Browse files Browse the repository at this point in the history
Adds ZipkinRule.receivedSpanCount and ZipkinRule.receivedSpanBytes
  • Loading branch information
adriancole committed Mar 2, 2016
2 parents 08c889f + 89763a1 commit 715106a
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 9 deletions.
15 changes: 14 additions & 1 deletion zipkin-junit/README.md
Expand Up @@ -2,6 +2,10 @@

This contains `ZipkinRule`, a JUnit rule to spin-up a Zipkin server during tests.

ZipkinRule aims to emulate a full-featured server. For example, it presents the
entire [Zipkin Api](http://openzipkin.github.io/zipkin-api/#/), and supports
features like gzip compression.

Usage
------

Expand Down Expand Up @@ -43,4 +47,13 @@ public void doesntAttemptToRetryOn400() throws IOException {
// check that we didn't retry on 400
assertThat(zipkin.httpRequestCount()).isEqualTo(1);
}
```
```

Besides `httpRequestCount()`, there are two other counters that can
help you assert instrumentation is doing what you think:

* `receivedSpanCount()` - How many spans the server received.
* `receivedSpanBytes()` - The cumulative bytes the server received.

These counters can validate aspects such as compression or that you are
grouping spans by id before reporting them to the server.
21 changes: 21 additions & 0 deletions zipkin-junit/src/main/java/zipkin/junit/ZipkinRule.java
Expand Up @@ -17,6 +17,7 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
Expand Down Expand Up @@ -44,6 +45,7 @@ public final class ZipkinRule implements TestRule {
private final InMemorySpanStore store = new InMemorySpanStore();
private final MockWebServer server = new MockWebServer();
private final BlockingQueue<MockResponse> failureQueue = new LinkedBlockingQueue<>();
private final AtomicInteger receivedSpanBytes = new AtomicInteger();

public ZipkinRule() {
Dispatcher dispatcher = new Dispatcher() {
Expand All @@ -54,6 +56,9 @@ public MockResponse dispatch(RecordedRequest request) throws InterruptedExceptio
MockResponse maybeFailure = failureQueue.poll();
if (maybeFailure != null) return maybeFailure;
MockResponse result = successDispatch.dispatch(request);
if (request.getMethod().equals("POST")) {
receivedSpanBytes.addAndGet((int) request.getBodySize());
}
String encoding = request.getHeaders().get("Accept-Encoding");
if (result.getBody() != null && encoding != null && encoding.contains("gzip")) {
try {
Expand Down Expand Up @@ -90,6 +95,22 @@ public int httpRequestCount() {
return server.getRequestCount();
}

/**
* Returns the count of spans decoded by the server. This number may be higher than unique span id
* count: it corresponds directly to what's reported by instrumentation.
*/
public int receivedSpanCount() {
return store.acceptedSpanCount();
}

/**
* Returns the amount of bytes received by the server. This number is affected by compression,
* thrift vs json, and if spans are sent in multiple waves, repeating data.
*/
public int receivedSpanBytes() {
return receivedSpanBytes.get();
}

/**
* Stores the given spans directly, to setup preconditions for a test.
*
Expand Down
30 changes: 22 additions & 8 deletions zipkin-junit/src/test/java/zipkin/junit/ZipkinRuleTest.java
Expand Up @@ -90,6 +90,19 @@ public void httpRequestCountIncrements() throws IOException {
assertThat(zipkin.httpRequestCount()).isEqualTo(2);
}

/**
* Normally, a span can be reported twice: for client and server. However, there are bugs that
* happened where several updates went to the same span id. {@link ZipkinRule#receivedSpanCount}
* can be used to help ensure a span isn't reported more times than expected.
*/
@Test
public void receivedSpanCountIncrements() throws IOException {
postSpans(span);
postSpans(span);

assertThat(zipkin.receivedSpanCount()).isEqualTo(2);
}

@Test
public void postSpans_disconnectDuringBody() throws IOException {
zipkin.enqueueFailure(HttpFailure.disconnectDuringBody());
Expand Down Expand Up @@ -122,14 +135,6 @@ public void postSpans_sendErrorResponse400() throws IOException {
assertThat(postSpans(span).code()).isEqualTo(202);
}

private Response postSpans(Span ... spans) throws IOException {
byte[] spansInJson = Codec.JSON.writeSpans(asList(spans));
return client.newCall(new Request.Builder()
.url(zipkin.httpUrl() + "/api/v1/spans")
.post(RequestBody.create(MediaType.parse("application/json"), spansInJson)).build()
).execute();
}

@Test
public void gzippedSpans() throws IOException {
byte[] spansInJson = Codec.JSON.writeSpans(asList(span));
Expand All @@ -142,6 +147,7 @@ public void gzippedSpans() throws IOException {
).execute();

assertThat(zipkin.getTraces()).containsOnly(asList(span));
assertThat(zipkin.receivedSpanBytes()).isEqualTo(gzippedJson.length);
}

@Test
Expand Down Expand Up @@ -177,4 +183,12 @@ public void readSpans_gzippedResponse() throws Exception {

assertThat(Codec.JSON.readSpans(unzipped)).isEqualTo(trace);
}

private Response postSpans(Span ... spans) throws IOException {
byte[] spansInJson = Codec.JSON.writeSpans(asList(spans));
return client.newCall(new Request.Builder()
.url(zipkin.httpUrl() + "/api/v1/spans")
.post(RequestBody.create(MediaType.parse("application/json"), spansInJson)).build()
).execute();
}
}
6 changes: 6 additions & 0 deletions zipkin/src/main/java/zipkin/InMemorySpanStore.java
Expand Up @@ -41,6 +41,7 @@ public final class InMemorySpanStore implements SpanStore {
private final Multimap<Long, Span> traceIdToSpans = new LinkedListMultimap<>();
private final Multimap<String, Long> serviceToTraceIds = new LinkedHashSetMultimap<>();
private final Multimap<String, String> serviceToSpanNames = new LinkedHashSetMultimap<>();
private int acceptedSpanCount;

@Override
public synchronized void accept(Iterator<Span> spans) {
Expand All @@ -49,6 +50,7 @@ public synchronized void accept(Iterator<Span> spans) {
long traceId = span.traceId;
String spanName = span.name;
traceIdToSpans.put(span.traceId, span);
acceptedSpanCount++;

for (String serviceName : serviceNames(span)) {
serviceToTraceIds.put(serviceName, traceId);
Expand All @@ -57,6 +59,10 @@ public synchronized void accept(Iterator<Span> spans) {
}
}

public synchronized int acceptedSpanCount() {
return acceptedSpanCount;
}

public synchronized List<Long> traceIds() {
return Util.sortedList(traceIdToSpans.keySet());
}
Expand Down

0 comments on commit 715106a

Please sign in to comment.