Skip to content

Commit

Permalink
Trace: merge spans and sort by timestamp by default
Browse files Browse the repository at this point in the history
Author: @franklinhu
Fixes #91
URL: #91
  • Loading branch information
Franklin Hu committed Aug 1, 2012
1 parent aa67fff commit c872e58
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 41 deletions.
33 changes: 8 additions & 25 deletions zipkin-common/src/main/scala/com/twitter/zipkin/query/Trace.scala
Expand Up @@ -33,10 +33,17 @@ object Trace {
def apply(spanTree: SpanTreeEntry): Trace = Trace(spanTree.toList)
}

case class Trace(spans: Seq[Span]) {
case class Trace(private val s: Seq[Span]) {

val log = Logger.get(getClass.getName)

lazy val spans = mergeBySpanId(s).toSeq.sortWith {
(a, b) =>
val aTimestamp = a.firstAnnotation.map(_.timestamp).getOrElse(Long.MaxValue)
val bTimestamp = b.firstAnnotation.map(_.timestamp).getOrElse(Long.MaxValue)
aTimestamp < bTimestamp
}

/**
* Find the trace id for this trace.
* Returns none if we have no spans to look up id by
Expand Down Expand Up @@ -157,16 +164,6 @@ case class Trace(spans: Seq[Span]) {
}.flatten
}

/**
* Incoming data can have multiple entries for the same Span, for example
* data sent from client as one span and data from the server as one span.
*
* This method merges them by span id into one object per id.
*/
def mergeSpans: Trace = {
new Trace(mergeBySpanId(spans).toList)
}

/**
* Merge all the spans objects with the same span ids into one per id.
* We store parts of spans in different columns in order to make writes
Expand Down Expand Up @@ -224,20 +221,6 @@ case class Trace(spans: Seq[Span]) {
}
}

/**
* Return a Trace sorted by the first annotation in each span.
*/
def sortedByTimestamp: Trace = {
Trace {
spans.sortWith {
(a, b) =>
val aTimestamp = a.firstAnnotation.map(_.timestamp).getOrElse(Long.MaxValue)
val bTimestamp = b.firstAnnotation.map(_.timestamp).getOrElse(Long.MaxValue)
aTimestamp < bTimestamp
}
}
}

/**
* Print the trace tree to give the user an overview.
*/
Expand Down
Expand Up @@ -80,7 +80,7 @@ object JsonQueryAdapter extends QueryAdapter {
val startAndEnd = t.getStartAndEndTimestamp.get
JsonTrace(
t.id.map(_.toString).getOrElse(""),
t.mergeSpans.spans.map(JsonAdapter(_)),
t.spans.map(JsonAdapter(_)),
startAndEnd.start,
startAndEnd.end,
startAndEnd.end - startAndEnd.start,
Expand Down
Expand Up @@ -22,7 +22,7 @@ import com.twitter.finagle.tracing.{Trace => FTrace}
import com.twitter.logging.Logger
import com.twitter.ostrich.admin.Service
import com.twitter.util.Future
import com.twitter.zipkin.adapter.{ThriftQueryAdapter, ThriftAdapter}
import com.twitter.zipkin.adapter.ThriftQueryAdapter
import com.twitter.zipkin.gen
import com.twitter.zipkin.query.adjusters.Adjuster
import com.twitter.zipkin.storage.{Aggregates, TraceIdDuration, Index, Storage}
Expand Down
Expand Up @@ -37,7 +37,7 @@ class TimeSkewAdjuster extends Adjuster {
case None => return trace // no root span found, returning as is
case Some(s) => {
val spans = adjust(trace.getSpanTree(s, trace.getIdToChildrenMap), None)
Trace(spans).sortedByTimestamp
Trace(spans)
}
}
}
Expand Down
Expand Up @@ -117,7 +117,7 @@ trait CassandraStorage extends Storage with Cassandra {
CASSANDRA_GET_TRACE_TOO_BIG.incr()
None
} else {
Some(Trace(spans.toSeq).mergeSpans.sortedByTimestamp)
Some(Trace(spans.toSeq))
}
}
}
Expand Down
Expand Up @@ -153,9 +153,9 @@ class TraceSpec extends Specification {
"sort spans by first annotation timestamp" in {
val inputSpans = List[Span](span4, span3, span5, span1, span2)
val expectedTrace = Trace(List[Span](span1, span2, span3, span4, span5))
val actualTrace = Trace(inputSpans).sortedByTimestamp
val actualTrace = Trace(inputSpans)

expectedTrace mustEqual actualTrace
expectedTrace.spans mustEqual actualTrace.spans
}

"merge spans" in {
Expand All @@ -175,7 +175,7 @@ class TraceSpec extends Specification {
val spanToMerge2 = Span(12345, "methodcall2", span2Id, Some(span1Id), ann2, Nil)
val spanMerged = Span(12345, "methodcall2", span2Id, Some(span1Id), annMerged, Nil)

Trace(List(spanMerged)) mustEqual Trace(List(spanToMerge1, spanToMerge2)).mergeSpans
Trace(List(spanMerged)).spans mustEqual Trace(List(spanToMerge1, spanToMerge2)).spans
}

"get rootmost span from full trace" in {
Expand Down
Expand Up @@ -248,7 +248,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker {
qs.start()

expect {
1.of(storage).getTracesByIds(List(1L)) willReturn Future(List(trace4.mergeSpans))
1.of(storage).getTracesByIds(List(1L)) willReturn Future(List(trace4))
}

val ann1 = gen.TimelineAnnotation(100, gen.Constants.CLIENT_SEND,
Expand Down Expand Up @@ -295,7 +295,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker {
val realTrace = Trace(List(rs1, rs2))

expect {
1.of(storage).getTracesByIds(List(4488677265848750007L)) willReturn Future(List(realTrace.mergeSpans))
1.of(storage).getTracesByIds(List(4488677265848750007L)) willReturn Future(List(realTrace))
}

val actual = qs.getTraceTimelinesByIds(List(4488677265848750007L), List(gen.Adjust.TimeSkew))()
Expand Down Expand Up @@ -357,7 +357,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker {
val realTrace = Trace(List(rs1, rs2))

expect {
1.of(storage).getTracesByIds(List(-6120267009876080004L)) willReturn Future(List(realTrace.mergeSpans))
1.of(storage).getTracesByIds(List(-6120267009876080004L)) willReturn Future(List(realTrace))
}

val actual = qs.getTraceTimelinesByIds(List(-6120267009876080004L), List(gen.Adjust.TimeSkew))()
Expand Down
Expand Up @@ -136,8 +136,8 @@ class TimeSkewAdjusterSpec extends Specification with JMocker with ClassMocker {
val span2 = Span(1, "multiget_slice", -855543208864892776L, Some(2209720933601260005L),
List(ann2, ann5), Nil)

val realTrace = new Trace(List(span1a, span1b, span2)).mergeSpans
val expectedRealTrace = new Trace(List(span1aFixed, span1b, span2)).mergeSpans
val realTrace = new Trace(List(span1a, span1b, span2))
val expectedRealTrace = new Trace(List(span1aFixed, span1b, span2))

val adjuster = new TimeSkewAdjuster

Expand Down Expand Up @@ -167,7 +167,7 @@ class TimeSkewAdjusterSpec extends Specification with JMocker with ClassMocker {
val monorailSs = Annotation(3L, gen.Constants.SERVER_SEND, epMonorail)
val unicornCr = Annotation(4L, gen.Constants.CLIENT_RECV, epTfe)
val goodSpan = Span(1, "friendships/create", 12345L, None, List(unicornCs, monorailSr, monorailSs, unicornCr), Nil)
val goodTrace = new Trace(Seq(goodSpan)).mergeSpans
val goodTrace = new Trace(Seq(goodSpan))

val actualTrace = adjuster.adjust(goodTrace)
goodTrace mustEqual actualTrace
Expand All @@ -191,8 +191,8 @@ class TimeSkewAdjusterSpec extends Specification with JMocker with ClassMocker {
val adjustedMonorailSs = Annotation(1330539327145012L, gen.Constants.SERVER_SEND, epMonorail)
val spanAdjustedMonorail = Span(1, "friendships/create", 6379677665629798877L, Some(7264365917420400007L), List(unicornCs, adjustedMonorailSr, adjustedMonorailSs, unicornCr), Nil)

val realTrace = new Trace(Seq(spanTfe, spanMonorailUnicorn)).mergeSpans
val expectedAdjustedTrace = new Trace(Seq(spanTfe, spanAdjustedMonorail)).mergeSpans
val realTrace = new Trace(Seq(spanTfe, spanMonorailUnicorn))
val expectedAdjustedTrace = new Trace(Seq(spanTfe, spanAdjustedMonorail))

val adjusted = adjuster.adjust(realTrace)

Expand Down Expand Up @@ -236,7 +236,7 @@ class TimeSkewAdjusterSpec extends Specification with JMocker with ClassMocker {
val spanAdjustedGizmoduck = Span(1, "get_by_auth_token", 119310086840195752L, Some(7625434200987291951L), List(passbirdCs, passbirdCr, createdGizmoduckSr, createdGizmoduckSs), Nil)
val spanAdjustedMemcache = Span(1, "Get", 3983355768376203472L, Some(119310086840195752L), List(adjustedGizmoduckCs, adjustedGizmoduckCr), Nil)

val realTrace = new Trace(Seq(spanTfe, spanPassbird, spanGizmoduck, spanMemcache)).mergeSpans
val realTrace = new Trace(Seq(spanTfe, spanPassbird, spanGizmoduck, spanMemcache))
val adjustedTrace = new Trace(Seq(spanTfe, spanPassbird, spanAdjustedGizmoduck, spanAdjustedMemcache))

adjuster.adjust(realTrace) mustEqual adjustedTrace
Expand Down

0 comments on commit c872e58

Please sign in to comment.