Skip to content

Commit

Permalink
Adds missing fields
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian Cole committed Oct 18, 2017
1 parent 249dd2b commit 9606b3b
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 40 deletions.
Expand Up @@ -60,17 +60,19 @@ final class CassandraSpanConsumer implements SpanConsumer {
.value("trace_id", QueryBuilder.bindMarker("trace_id"))
.value("trace_id_high", QueryBuilder.bindMarker("trace_id_high"))
.value("ts_uuid", QueryBuilder.bindMarker("ts_uuid"))
.value("parent_id", QueryBuilder.bindMarker("parent_id"))
.value("id", QueryBuilder.bindMarker("id"))
.value("ts", QueryBuilder.bindMarker("ts"))
.value("kind", QueryBuilder.bindMarker("kind"))
.value("span", QueryBuilder.bindMarker("span"))
.value("parent_id", QueryBuilder.bindMarker("parent_id"))
.value("ts", QueryBuilder.bindMarker("ts"))
.value("duration", QueryBuilder.bindMarker("duration"))
.value("l_ep", QueryBuilder.bindMarker("l_ep"))
.value("l_service", QueryBuilder.bindMarker("l_service"))
.value("r_ep", QueryBuilder.bindMarker("r_ep"))
.value("annotations", QueryBuilder.bindMarker("annotations"))
.value("tags", QueryBuilder.bindMarker("tags"))
.value("shared", QueryBuilder.bindMarker("shared"))
.value("debug", QueryBuilder.bindMarker("debug"))
.value("annotation_query", QueryBuilder.bindMarker("annotation_query")));

insertTraceServiceSpanName = session.prepare(
Expand Down Expand Up @@ -126,29 +128,35 @@ public Call<Void> accept(List<Span> spans) {
*/
void storeSpan(Span span, long timestamp) {
try {
List<AnnotationUDT> annotations = span.annotations().stream()
.map(a -> new AnnotationUDT(a))
.collect(Collectors.toList());
boolean traceIdHigh = !strictTraceId && span.traceId().length() == 32;

// start with the partition key
BoundStatement bound = bindWithName(insertSpan, "insert-span")
.setUUID("ts_uuid", new UUID(
UUIDs.startOf(timestamp / 1000).getMostSignificantBits(),
UUIDs.random().getLeastSignificantBits()))
.setString("id", span.id())
.setString("span", span.name())
.setList("annotations", annotations)
.setMap("tags", span.tags())
.setString("annotation_query", Joiner.on(',').join(CassandraUtil.annotationKeys(span)));

.setUUID("ts_uuid", new UUID(
UUIDs.startOf(timestamp / 1000).getMostSignificantBits(),
UUIDs.random().getLeastSignificantBits()))
.setString("trace_id", traceIdHigh ? span.traceId().substring(16) : span.traceId())
.setString("id", span.id());

// now set the data fields
if (traceIdHigh) {
bound = bound.setString("trace_id_high", span.traceId().substring(0, 16));
}
if (null != span.parentId()) {
bound = bound.setString("parent_id", span.parentId());
}
if (null != span.kind()) {
bound = bound.setString("kind", span.kind().name());
}
if (null != span.name()) {
bound = bound.setString("span", span.name());
}
if (null != span.timestamp()) {
bound = bound.setLong("ts", span.timestamp());
}
if (null != span.duration()) {
bound = bound.setLong("duration", span.duration());
}
if (null != span.parentId()) {
bound = bound.setString("parent_id", span.parentId());
}
if (null != span.localEndpoint()) {
bound = bound
.set("l_ep", new EndpointUDT(span.localEndpoint()), EndpointUDT.class)
Expand All @@ -157,16 +165,22 @@ void storeSpan(Span span, long timestamp) {
if (null != span.remoteEndpoint()) {
bound = bound.set("r_ep", new EndpointUDT(span.remoteEndpoint()), EndpointUDT.class);
}
if (!span.annotations().isEmpty()) {
List<AnnotationUDT> annotations = span.annotations().stream()
.map(a -> new AnnotationUDT(a))
.collect(Collectors.toList());
bound = bound
.setList("annotations", annotations)
.setString("annotation_query", Joiner.on(',').join(CassandraUtil.annotationKeys(span)));
}
if (!span.tags().isEmpty()) {
bound = bound.setMap("tags", span.tags());
}
if (null != span.shared()) {
bound = bound.setBool("shared", span.shared());
}

if (!strictTraceId && span.traceId().length() == 32) {
bound = bound
.setString("trace_id", span.traceId().substring(16))
.setString("trace_id_high", span.traceId().substring(0, 16));
} else {
bound = bound.setString("trace_id", span.traceId());
if (null != span.debug()) {
bound = bound.setBool("debug", span.debug());
}
session.executeAsync(bound);
} catch (RuntimeException ignore) {
Expand Down
Expand Up @@ -101,8 +101,8 @@ final class CassandraSpanStore implements SpanStore {

selectTraces = session.prepare(
QueryBuilder.select(
"trace_id", "trace_id_high", "id", "ts", "span", "parent_id",
"duration", "l_ep", "r_ep", "annotations", "tags", "shared")
"trace_id_high", "trace_id", "parent_id", "id", "kind", "span", "ts",
"duration", "l_ep", "r_ep", "annotations", "tags", "shared", "debug")
.from(TABLE_SPAN)
.where(QueryBuilder.in("trace_id", QueryBuilder.bindMarker("trace_id")))
.limit(QueryBuilder.bindMarker("limit_")));
Expand Down Expand Up @@ -177,18 +177,19 @@ final class CassandraSpanStore implements SpanStore {
}
Span.Builder builder = Span.newBuilder()
.traceId(traceId)
.parentId(row.getString("parent_id"))
.id(row.getString("id"))
.name(row.getString("span"))
.duration(row.getLong("duration"));
.timestamp(row.getLong("ts"));

if (!row.isNull("ts")) {
builder = builder.timestamp(row.getLong("ts"));
}
if (!row.isNull("duration")) {
builder = builder.duration(row.getLong("duration"));
builder.duration(row.getLong("duration"));
}
if (!row.isNull("parent_id")) {
builder = builder.parentId(row.getString("parent_id"));
if (!row.isNull("kind")) {
try {
builder.kind(Span.Kind.valueOf(row.getString("kind")));
} catch (IllegalArgumentException ignored) {
}
}
if (!row.isNull("l_ep")) {
builder = builder.localEndpoint(row.get("l_ep", Schema.EndpointUDT.class).toEndpoint());
Expand All @@ -199,6 +200,9 @@ final class CassandraSpanStore implements SpanStore {
if (!row.isNull("shared")) {
builder = builder.shared(row.getBool("shared"));
}
if (!row.isNull("debug")) {
builder = builder.shared(row.getBool("debug"));
}
for (AnnotationUDT udt : row.getList("annotations", AnnotationUDT.class)) {
builder = builder.addAnnotation(udt.toAnnotation().timestamp(), udt.toAnnotation().value());
}
Expand Down
Expand Up @@ -17,18 +17,20 @@ CREATE TYPE IF NOT EXISTS zipkin2_cassandra.annotation (
CREATE TABLE IF NOT EXISTS zipkin2_cassandra.span (
trace_id text, // when strictTraceId=false, only contains right-most 16 chars
ts_uuid timeuuid,
trace_id_high text, // when strictTraceId=false, contains right-most 16 chars if present
id text,
ts bigint,
span text,
trace_id_high text, // when strictTraceId=false, contains right-most 16 chars if present
parent_id text,
kind text,
span text, // span.name
ts bigint,
duration bigint,
l_ep Endpoint,
l_service text,
r_ep Endpoint,
annotations list<frozen<annotation>>,
tags map<text,text>,
shared boolean,
debug boolean,
annotation_query text, //-- can't do SASI on set<text>: comma-joined until CASSANDRA-11182
PRIMARY KEY (trace_id, ts_uuid, id)
)
Expand Down
Expand Up @@ -56,8 +56,4 @@ public static KeyspaceMetadata ensureExists(String keyspace, Session session) {
private static long rowCount(CassandraStorage storage, String table) {
return storage.session().execute("SELECT COUNT(*) from " + table).one().getLong(0);
}

public static Session session(CassandraStorage storage) {
return storage.session();
}
}

0 comments on commit 9606b3b

Please sign in to comment.