Skip to content

Commit

Permalink
Merge pull request #97 from openzipkin/get-trace
Browse files Browse the repository at this point in the history
Changes SpanStore.getTracesByIds to getTrace
  • Loading branch information
adriancole committed Mar 16, 2016
2 parents c80c19e + c7e5520 commit c65bcc0
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 93 deletions.
23 changes: 18 additions & 5 deletions interop/src/main/java/zipkin/interop/ScalaSpanStoreAdapter.java
Expand Up @@ -18,6 +18,8 @@
import com.twitter.zipkin.conversions.thrift$;
import com.twitter.zipkin.storage.QueryRequest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TList;
import org.apache.thrift.protocol.TType;
Expand All @@ -32,8 +34,6 @@
import zipkin.SpanStore;
import zipkin.internal.Nullable;

import static java.util.stream.Collectors.toList;

/**
* Adapts {@link SpanStore} to a scala {@link com.twitter.zipkin.storage.SpanStore} in order to test
* against its {@link com.twitter.zipkin.storage.SpanStoreSpec} for interoperability reasons.
Expand Down Expand Up @@ -70,11 +70,24 @@ public Future<Seq<List<Span>>> getTraces(QueryRequest input) {

@Override
public Future<Seq<List<Span>>> getTracesByIds(Seq<Object> input) {
java.util.List<Long> traceIds = JavaConversions.asJavaCollection(input).stream()
.map(o -> Long.valueOf(o.toString())).collect(toList());
return toSeqFuture(spanStore.getTracesByIds(traceIds));
java.util.List<java.util.List<zipkin.Span>> result = new ArrayList<>(input.size());
for (Iterator<Object> traceIds = input.iterator(); traceIds.hasNext();) {
java.util.List<zipkin.Span> span =
spanStore.getTrace(Long.valueOf(traceIds.next().toString()));
if (span != null) result.add(span);
}
Collections.sort(result, TRACE_DESCENDING);
return toSeqFuture(result);
}

static final Comparator<java.util.List<zipkin.Span>> TRACE_DESCENDING =
new Comparator<java.util.List<zipkin.Span>>() {
@Override
public int compare(java.util.List<zipkin.Span> left, java.util.List<zipkin.Span> right) {
return right.get(0).compareTo(left.get(0));
}
};

private static Future<Seq<List<Span>>> toSeqFuture(java.util.List<java.util.List<zipkin.Span>> traces) {
ArrayList<List<Span>> result = new ArrayList<>(traces.size());
for (java.util.List<zipkin.Span> trace : traces) {
Expand Down
5 changes: 2 additions & 3 deletions zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java
Expand Up @@ -14,7 +14,6 @@
package zipkin.junit;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import okhttp3.HttpUrl;
import okhttp3.mockwebserver.Dispatcher;
Expand Down Expand Up @@ -63,8 +62,8 @@ public MockResponse dispatch(RecordedRequest request) {
} else if (url.encodedPath().startsWith("/api/v1/trace/")) {
String traceId = url.encodedPath().replace("/api/v1/trace/", "");
long id = new Buffer().writeUtf8(traceId).readHexadecimalUnsignedLong();
List<List<Span>> traces = store.getTracesByIds(Collections.singletonList(id));
if (!traces.isEmpty()) return jsonResponse(JSON_CODEC.writeSpans(traces.get(0)));
List<Span> trace = store.getTrace(id);
if (trace != null) return jsonResponse(JSON_CODEC.writeSpans(trace));
}
} else if (request.getMethod().equals("POST")) {
if (url.encodedPath().equals("/api/v1/spans")) {
Expand Down
8 changes: 7 additions & 1 deletion zipkin-junit/src/main/java/zipkin/junit/ZipkinRule.java
Expand Up @@ -14,6 +14,7 @@
package zipkin.junit;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -140,7 +141,12 @@ public ZipkinRule enqueueFailure(HttpFailure failure) {

/** Retrieves all traces this zipkin server has received. */
public List<List<Span>> getTraces() {
return store.getTracesByIds(store.traceIds());
List<Long> traceIds = store.traceIds();
List<List<Span>> result = new ArrayList<>(traceIds.size());
for (long traceId : traceIds) {
result.add(store.getTrace(traceId));
}
return result;
}

/**
Expand Down
29 changes: 7 additions & 22 deletions zipkin-junit/src/test/java/zipkin/junit/HttpSpanStore.java
Expand Up @@ -14,10 +14,6 @@
package zipkin.junit;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
Expand Down Expand Up @@ -74,27 +70,16 @@ public List<List<Span>> getTraces(QueryRequest request) {
}

@Override
public List<List<Span>> getTracesByIds(Collection<Long> traceIds) {
List<List<Span>> result = new ArrayList<>(traceIds.size());
for (Long id : traceIds) {
Response response = call(new Request.Builder()
.url(baseUrl.resolve(String.format("/api/v1/trace/%016x", id)))
.build());
if (response.code() != 404) {
result.add(JSON_CODEC.readSpans(responseBytes(response)));
}
public List<Span> getTrace(long traceId) {
Response response = call(new Request.Builder()
.url(baseUrl.resolve(String.format("/api/v1/trace/%016x", traceId)))
.build());
if (response.code() == 404) {
return null;
}
Collections.sort(result, TRACE_DESCENDING);
return result;
return JSON_CODEC.readSpans(responseBytes(response));
}

static final Comparator<List<Span>> TRACE_DESCENDING = new Comparator<List<Span>>() {
@Override
public int compare(List<Span> left, List<Span> right) {
return right.get(0).compareTo(left.get(0));
}
};

@Override
public List<String> getServiceNames() {
Response response = call(new Request.Builder()
Expand Down
Expand Up @@ -14,7 +14,6 @@
package zipkin.server;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -152,12 +151,12 @@ public byte[] getTraces(
@RequestMapping(value = "/trace/{traceId}", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE)
public byte[] getTrace(@PathVariable String traceId) {
long id = lowerHexToUnsignedLong(traceId);
List<List<Span>> traces = spanStore.getTracesByIds(Collections.singletonList(id));
List<Span> trace = spanStore.getTrace(id);

if (traces.isEmpty()) {
if (trace == null) {
throw new TraceNotFoundException(traceId, id);
}
return jsonCodec.writeSpans(traces.get(0));
return jsonCodec.writeSpans(trace);
}

@ExceptionHandler(TraceNotFoundException.class)
Expand Down
Expand Up @@ -15,7 +15,6 @@

import com.github.kristofa.brave.Brave;
import com.github.kristofa.brave.LocalTracer;
import java.util.Collection;
import java.util.List;
import zipkin.DependencyLink;
import zipkin.QueryRequest;
Expand Down Expand Up @@ -52,10 +51,10 @@ public List<List<Span>> getTraces(QueryRequest request) {
}

@Override
public List<List<Span>> getTracesByIds(Collection<Long> traceIds) {
tracer.startNewSpan(component, "get-traces-by-ids");
public List<Span> getTrace(long traceId) {
tracer.startNewSpan(component, "get-trace");
try {
return delegate.getTracesByIds(traceIds);
return delegate.getTrace(traceId);
} finally {
tracer.finishSpan();
}
Expand Down
Expand Up @@ -21,8 +21,7 @@
import zipkin.Span;
import zipkin.InMemorySpanStore;

import static java.util.Collections.singletonList;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

public class SpanStoreSpanCollectorTest {

Expand Down Expand Up @@ -51,7 +50,7 @@ public void addMany() {
collector.collect(builder.id(1234L + i).build());
}
collector.flush();
List<List<Span>> result = spanStore.getTracesByIds(singletonList(1234L));
assertThat(result.get(0)).hasSize(500);
List<Span> result = spanStore.getTrace(1234L);
assertThat(result).hasSize(500);
}
}
Expand Up @@ -24,7 +24,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -127,8 +126,7 @@ public List<List<Span>> getTraces(QueryRequest request) {
return getTracesByIds(traceIds);
}

@Override
public List<List<Span>> getTracesByIds(Collection<Long> traceIds) {
List<List<Span>> getTracesByIds(Collection<Long> traceIds) {
// Synchronously get the encoded data, as there are no other requests to the backend needed.
Collection<List<ByteBuffer>> encodedTraces = getUnchecked(
repository.getSpansByTraceIds(traceIds.toArray(new Long[traceIds.size()]), maxTraceCols)
Expand All @@ -146,6 +144,12 @@ public List<List<Span>> getTracesByIds(Collection<Long> traceIds) {
return result;
}

@Override
public List<Span> getTrace(long traceId) {
List<List<Span>> result = getTracesByIds(Collections.singleton(traceId));
return result.isEmpty() ? null : result.get(0);
}

@Override
public List<String> getServiceNames() {
return sortedList(getUnchecked(repository.getServiceNames()));
Expand Down
Expand Up @@ -16,14 +16,14 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.jooq.Condition;
import org.jooq.Cursor;
import org.jooq.DSLContext;
import org.jooq.ExecuteListenerProvider;
Expand Down Expand Up @@ -166,15 +166,19 @@ public void accept(List<Span> spans) {
}
}

List<List<Span>> getTraces(@Nullable QueryRequest request, @Nullable Collection<Long> traceIds) {
List<List<Span>> getTraces(@Nullable QueryRequest request, @Nullable Long traceId) {
final Map<Long, List<Span>> spansWithoutAnnotations;
final Map<Pair<?>, List<Record>> dbAnnotations;
try (Connection conn = datasource.getConnection()) {
Condition traceIdCondition;
if (request != null) {
traceIds = toTraceIdQuery(context(conn), request).fetch(ZIPKIN_SPANS.TRACE_ID);
List<Long> traceIds = toTraceIdQuery(context(conn), request).fetch(ZIPKIN_SPANS.TRACE_ID);
traceIdCondition = ZIPKIN_SPANS.TRACE_ID.in(traceIds);
} else {
traceIdCondition = ZIPKIN_SPANS.TRACE_ID.eq(traceId);
}
spansWithoutAnnotations = context(conn)
.selectFrom(ZIPKIN_SPANS).where(ZIPKIN_SPANS.TRACE_ID.in(traceIds))
.selectFrom(ZIPKIN_SPANS).where(traceIdCondition)
.stream()
.map(r -> new Span.Builder()
.traceId(r.getValue(ZIPKIN_SPANS.TRACE_ID))
Expand Down Expand Up @@ -248,8 +252,9 @@ DSLContext context(Connection conn) {
}

@Override
public List<List<Span>> getTracesByIds(Collection<Long> traceIds) {
return traceIds.isEmpty() ? emptyList() : getTraces(null, traceIds);
public List<Span> getTrace(long traceId) {
List<List<Span>> result = getTraces(null, traceId);
return result.isEmpty() ? null : result.get(0);
}

@Override
Expand Down

0 comments on commit c65bcc0

Please sign in to comment.