Skip to content
This repository
Browse code

Bump Scalding to 0.7.3, add TypedPipe span preprocessor

  • Loading branch information...
commit aca831f78a80a557c491085134b6485b0d8f67d5 1 parent 8edf7ad
Franklin Hu authored September 27, 2012
2  project/Project.scala
@@ -43,7 +43,7 @@ object Zipkin extends Build {
43 43
       version := "0.3.0-SNAPSHOT",
44 44
       parallelExecution in Test := false,
45 45
       libraryDependencies ++= Seq(
46  
-        "com.twitter" % "scalding_2.9.1"       % "0.5.3",
  46
+        "com.twitter" % "scalding_2.9.1"       % "0.7.3",
47 47
         /*
48 48
           FIXME ElephantBird 3.0.0 picks up libthrift 0.7.0, which is currently
49 49
           incompatible with sbt-thrift so made these intransitive
30  zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/SpanPreprocessor.scala
... ...
@@ -0,0 +1,30 @@
  1
+package com.twitter.zipkin.hadoop.sources
  2
+
  3
+import com.twitter.scalding.TDsl._
  4
+import com.twitter.scalding.{Args, TypedPipe, UtcDateRangeJob, Job}
  5
+import com.twitter.zipkin.gen.{Annotation, BinaryAnnotation, Span}
  6
+import com.twitter.zipkin.gen
  7
+import scala.collection.JavaConverters._
  8
+
  9
+/**
  10
+ * Job that merges all spans that have the same trace ID, span ID, and parent span ID
  11
+ */
  12
+class SpanPreprocessor(args: Args) extends Job(args) with UtcDateRangeJob {
  13
+  val timeGranularity: TimeGranularity = TimeGranularity.Hour
  14
+
  15
+  SpanSource(timeGranularity).read.typed('in, 'out) { spans: TypedPipe[Span] =>
  16
+    spans
  17
+      .groupBy { span => (span.trace_id, span.id, span.parent_id) }
  18
+      .mapValues { span => (span.annotations, span.binary_annotations) }
  19
+      .reduce { case (left: (List[Annotation], List[BinaryAnnotation]), right: (List[Annotation], List[BinaryAnnotation])) =>
  20
+        ((left._1 ++ right._1).asJava, (left._2 ++ right._2).asJava)
  21
+      }
  22
+      .map { case ((traceId, spanId, parentId), (annotations, binaryAnnotations)) =>
  23
+        val span = new gen.Span(traceId, "", spanId, annotations, binaryAnnotations)
  24
+        if (parentId != 0) {
  25
+          span.setParent_id(parentId)
  26
+        }
  27
+        span
  28
+      }
  29
+  }.write(PrepNoNamesSpanSource(timeGranularity))
  30
+}

0 notes on commit aca831f

Please sign in to comment.
Something went wrong with that request. Please try again.