diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/DependencyTree.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/DependencyTree.scala index 04370752f5d..19dc9222ad4 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/DependencyTree.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/DependencyTree.scala @@ -13,12 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.twitter.zipkin.hadoop -import com.twitter.scalding._ import cascading.pipe.joiner._ -import com.twitter.zipkin.gen.{SpanServiceName, BinaryAnnotation, Span, Annotation} +import com.twitter.scalding._ +import com.twitter.zipkin.gen.SpanServiceName import com.twitter.zipkin.hadoop.sources._ /** @@ -26,14 +25,17 @@ import com.twitter.zipkin.hadoop.sources._ */ class DependencyTree(args: Args) extends Job(args) with DefaultDateRangeJob { - val spanInfo = DailyPreprocessedSpanSource() + + val timeGranularity = TimeGranularity.Day + + val spanInfo = PreprocessedSpanSource(timeGranularity) .read .filter(0) { s : SpanServiceName => s.isSetParent_id() } .mapTo(0 -> ('id, 'parent_id, 'service)) { s: SpanServiceName => (s.id, s.parent_id, s.service_name ) } // TODO: account for possible differences between sent and received service names - val idName = DailyPrepTsvSource() + val idName = PrepTsvSource(timeGranularity) .read /* Join with the original on parent ID to get the parent's service name */ val spanInfoWithParent = spanInfo diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala index 1be667067bc..62b651a23a6 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala @@ -17,7 +17,6 @@ package com.twitter.zipkin.hadoop import com.twitter.zipkin.gen.{Constants, SpanServiceName, Annotation} -import cascading.pipe.joiner.LeftJoin import com.twitter.scalding.{Tsv, DefaultDateRangeJob, Job, Args} import com.twitter.zipkin.hadoop.sources._ @@ -26,7 +25,9 @@ import com.twitter.zipkin.hadoop.sources._ */ class ExpensiveEndpoints(args : Args) extends Job(args) with DefaultDateRangeJob { - val spanInfo = DailyPreprocessedSpanSource() + val timeGranularity = TimeGranularity.Day + + val spanInfo = PreprocessedSpanSource(timeGranularity) .read .filter(0) { s : SpanServiceName => s.isSetParent_id() } .mapTo(0 -> ('id, 'parent_id, 'service, 'annotations)) @@ -52,7 +53,7 @@ class ExpensiveEndpoints(args : Args) extends Job(args) with DefaultDateRangeJob } } - val idName = DailyPrepTsvSource() + val idName = PrepTsvSource(timeGranularity) .read /* Join with the original on parent ID to get the parent's service name */ val spanInfoWithParent = spanInfo diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/FindDuplicateTraces.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/FindDuplicateTraces.scala index 52b3c04dd91..df25d01977e 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/FindDuplicateTraces.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/FindDuplicateTraces.scala @@ -13,14 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.twitter.zipkin.hadoop -import com.twitter.zipkin.gen.{BinaryAnnotation, Constants, SpanServiceName, Annotation} import com.twitter.scalding.{Tsv, DefaultDateRangeJob, Job, Args} -import com.twitter.zipkin.hadoop.sources.{Util, PreprocessedSpanSource} -import java.nio.ByteBuffer - +import com.twitter.zipkin.gen.{SpanServiceName, Annotation} +import com.twitter.zipkin.hadoop.sources.{TimeGranularity, PreprocessedSpanSource} /** * Finds traces with duplicate trace IDs @@ -30,7 +27,7 @@ class FindDuplicateTraces(args: Args) extends Job(args) with DefaultDateRangeJob val maxDuration = augmentString(args.required("maximum_duration")).toInt - val result = PreprocessedSpanSource() + val result = PreprocessedSpanSource(TimeGranularity.Hour) .read .mapTo(0 ->('trace_id, 'annotations)) { s: SpanServiceName => (s.trace_id, s.annotations.toList) diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/GrepByAnnotation.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/GrepByAnnotation.scala index f335a8fd218..94bf3f4367e 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/GrepByAnnotation.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/GrepByAnnotation.scala @@ -17,14 +17,14 @@ package com.twitter.zipkin.hadoop import com.twitter.zipkin.gen.{Annotation, Span} import com.twitter.scalding.{Tsv, DefaultDateRangeJob, Job, Args} -import sources.SpanSource +import com.twitter.zipkin.hadoop.sources.{TimeGranularity, SpanSource} class GrepByAnnotation(args: Args) extends Job(args) with DefaultDateRangeJob { val grepByWord = args.required("word") val preprocessed = - SpanSource() + SpanSource(TimeGranularity.Hour) .read .mapTo(0 -> ('traceid, 'annotations)) { s: Span => (s.trace_id, s.annotations.toList) } .filter('annotations) { annotations: List[Annotation] => diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/MemcacheRequest.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/MemcacheRequest.scala index 38cab260a78..7ed6517c736 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/MemcacheRequest.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/MemcacheRequest.scala @@ -13,21 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.twitter.zipkin.hadoop import com.twitter.scalding._ -import java.nio.ByteBuffer -import java.util.Arrays -import sources._ import com.twitter.zipkin.gen._ +import com.twitter.zipkin.hadoop.sources.{PreprocessedSpanSource, TimeGranularity, PrepNoNamesSpanSource} /** * Find out how often each service does memcache accesses */ class MemcacheRequest(args : Args) extends Job(args) with DefaultDateRangeJob { - val preprocessed = DailyPrepNoNamesSpanSource() + val preprocessed = PrepNoNamesSpanSource(TimeGranularity.Day) .read .mapTo(0 -> ('parent_id, 'binary_annotations)) { s: Span => (s.parent_id, s.binary_annotations.toList) } @@ -40,7 +37,7 @@ class MemcacheRequest(args : Args) extends Job(args) with DefaultDateRangeJob { } .project('parent_id, 'memcacheNames) - val memcacheRequesters = DailyPreprocessedSpanSource() + val memcacheRequesters = PreprocessedSpanSource(TimeGranularity.Day) .read .mapTo(0 -> ('trace_id, 'id, 'service)) { s: SpanServiceName => (s.trace_id, s.id, s.service_name)} diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/PopularAnnotations.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/PopularAnnotations.scala index 188b005ad3a..896a974a20c 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/PopularAnnotations.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/PopularAnnotations.scala @@ -18,7 +18,7 @@ package com.twitter.zipkin.hadoop import com.twitter.scalding._ -import com.twitter.zipkin.hadoop.sources.DailyPreprocessedSpanSource +import com.twitter.zipkin.hadoop.sources.{PreprocessedSpanSource, TimeGranularity} import com.twitter.zipkin.gen.{SpanServiceName, Annotation} /** @@ -26,7 +26,7 @@ import com.twitter.zipkin.gen.{SpanServiceName, Annotation} */ class PopularAnnotations(args : Args) extends Job(args) with DefaultDateRangeJob { - val preprocessed = DailyPreprocessedSpanSource() + val preprocessed = PreprocessedSpanSource(TimeGranularity.Day) .read .mapTo(0 -> ('service, 'annotations)) { s: SpanServiceName => (s.service_name, s.annotations.toList) } diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/PopularKeys.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/PopularKeys.scala index c45b8183c7f..10aaa48e82d 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/PopularKeys.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/PopularKeys.scala @@ -13,12 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.twitter.zipkin.hadoop - import com.twitter.scalding._ -import com.twitter.zipkin.hadoop.sources.DailyPreprocessedSpanSource +import com.twitter.zipkin.hadoop.sources.{PreprocessedSpanSource, TimeGranularity} import com.twitter.zipkin.gen.{SpanServiceName, BinaryAnnotation} /** @@ -26,7 +24,7 @@ import com.twitter.zipkin.gen.{SpanServiceName, BinaryAnnotation} */ class PopularKeys(args : Args) extends Job(args) with DefaultDateRangeJob { - val preprocessed = DailyPreprocessedSpanSource() + val preprocessed = PreprocessedSpanSource(TimeGranularity.Day) .read .mapTo(0 -> ('service, 'binary_annotations)) { s: SpanServiceName => (s.service_name, s.binary_annotations.toList) } diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ServerResponsetime.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ServerResponsetime.scala index 176b8b45cc2..8e0466789b1 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ServerResponsetime.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ServerResponsetime.scala @@ -15,9 +15,8 @@ */ package com.twitter.zipkin.hadoop - import com.twitter.scalding._ -import com.twitter.zipkin.hadoop.sources.SpanSource +import com.twitter.zipkin.hadoop.sources.{TimeGranularity, SpanSource} import com.twitter.zipkin.gen.{Span, Constants, Annotation} import scala.collection.JavaConverters._ import java.nio.ByteBuffer @@ -31,7 +30,7 @@ class ServerResponsetime(args: Args) extends Job(args) with DefaultDateRangeJob val serverAnnotations = Seq(Constants.SERVER_RECV, Constants.SERVER_SEND) - SpanSource() + SpanSource(TimeGranularity.Hour) .read // only need id and annotations for this .mapTo(0 -> ('id, 'annotations)) { s: Span => (s.id, s.annotations.toList) } diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/Timeouts.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/Timeouts.scala index 538fc2a285c..0c884506390 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/Timeouts.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/Timeouts.scala @@ -27,6 +27,7 @@ import com.twitter.zipkin.hadoop.sources._ class Timeouts(args: Args) extends Job(args) with DefaultDateRangeJob { + val timeGranularity = TimeGranularity.Day val ERROR_TYPE = List("finagle.timeout", "finagle.retry") val input = args.required("error_type") @@ -35,14 +36,14 @@ class Timeouts(args: Args) extends Job(args) with DefaultDateRangeJob { } // Preprocess the data into (trace_id, id, parent_id, annotations, client service name, service name) - val spanInfo = DailyPreprocessedSpanSource() + val spanInfo = PreprocessedSpanSource(timeGranularity) .read .mapTo(0 -> ('id, 'parent_id, 'annotations, 'service) ) { s: SpanServiceName => (s.id, s.parent_id, s.annotations.toList, s.service_name) } // Project to (id, service name) - val idName = DailyPrepTsvSource() + val idName = PrepTsvSource(timeGranularity) .read // Left join with idName to find the parent's service name, if applicable diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WhaleReport.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WhaleReport.scala index f7287202bd9..ca80b7524de 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WhaleReport.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WhaleReport.scala @@ -16,9 +16,9 @@ package com.twitter.zipkin.hadoop -import com.twitter.zipkin.gen.{BinaryAnnotation, Constants, SpanServiceName, Annotation} import com.twitter.scalding.{Tsv, DefaultDateRangeJob, Job, Args} -import com.twitter.zipkin.hadoop.sources.{Util, DailyPreprocessedSpanSource} +import com.twitter.zipkin.gen.{BinaryAnnotation, SpanServiceName, Annotation} +import com.twitter.zipkin.hadoop.sources.{TimeGranularity, PreprocessedSpanSource, Util} import java.nio.ByteBuffer @@ -30,7 +30,7 @@ class WhaleReport(args: Args) extends Job(args) with DefaultDateRangeJob { val ERRORS = List("finagle.timeout", "finagle.retry") - val spanInfo = DailyPreprocessedSpanSource() + val spanInfo = PreprocessedSpanSource(TimeGranularity.Day) .read .mapTo(0 ->('trace_id, 'id, 'service, 'annotations, 'binary_annotations)) { s: SpanServiceName => (s.trace_id, s.id, s.service_name, s.annotations.toList, s.binary_annotations.toList) diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WorstRuntimes.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WorstRuntimes.scala index d8755003af0..41458373d63 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WorstRuntimes.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WorstRuntimes.scala @@ -13,22 +13,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.twitter.zipkin.hadoop import com.twitter.scalding._ -import sources.{PreprocessedSpanSource, PrepNoNamesSpanSource} -import com.twitter.zipkin.gen.{SpanServiceName, Span, Constants, Annotation} +import com.twitter.zipkin.gen.{SpanServiceName, Constants, Annotation} +import com.twitter.zipkin.hadoop.sources.{TimeGranularity, PreprocessedSpanSource} /** * Obtain the IDs and the durations of the one hundred service calls which take the longest per service */ - class WorstRuntimes(args: Args) extends Job(args) with DefaultDateRangeJob { val clientAnnotations = Seq(Constants.CLIENT_RECV, Constants.CLIENT_SEND) - val preprocessed = PreprocessedSpanSource() + val preprocessed = PreprocessedSpanSource(TimeGranularity.Hour) .read .mapTo(0 -> ('service, 'id, 'annotations)) { s : SpanServiceName => (s.service_name, s.id, s.annotations.toList) diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WorstRuntimesPerTrace.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WorstRuntimesPerTrace.scala index e87063f19e7..4ed3e081293 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WorstRuntimesPerTrace.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WorstRuntimesPerTrace.scala @@ -13,22 +13,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.twitter.zipkin.hadoop import com.twitter.scalding._ -import sources.{DailyPreprocessedSpanSource} +import com.twitter.zipkin.hadoop.sources.{PreprocessedSpanSource, TimeGranularity} import com.twitter.zipkin.gen.{SpanServiceName, Constants, Annotation} /** * Obtain the IDs and the durations of the one hundred service calls which take the longest per service */ - class WorstRuntimesPerTrace(args: Args) extends Job(args) with DefaultDateRangeJob { val clientAnnotations = Seq(Constants.CLIENT_RECV, Constants.CLIENT_SEND) - val preprocessed = DailyPreprocessedSpanSource() + val preprocessed = PreprocessedSpanSource(TimeGranularity.Day) .read .mapTo(0 -> ('service, 'trace_id, 'annotations)) { s : SpanServiceName => (s.service_name, s.trace_id, s.annotations.toList) diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/DailyFindIDtoName.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/DailyFindIDtoName.scala index 414ef3a1744..abe33026967 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/DailyFindIDtoName.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/DailyFindIDtoName.scala @@ -13,23 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.twitter.zipkin.hadoop.sources -import com.twitter.scalding.{DefaultDateRangeJob, Job, Args} -import com.twitter.zipkin.gen.SpanServiceName +import com.twitter.scalding.Args /** * Finds the mapping from span ID to service name */ - -class DailyFindIDtoName(args : Args) extends Job(args) with DefaultDateRangeJob { - - val spanInfo = DailyPreprocessedSpanSource() - .read - .mapTo(0 -> ('id_1, 'name_1)) - { s: SpanServiceName => (s.id, s.service_name ) } - .filter('name_1) {n : String => n != null } - .unique('id_1, 'name_1) - .write(DailyPrepTsvSource()) +class DailyFindIDtoName(args: Args) extends FindIDtoName(args) { + override val timeGranularity = TimeGranularity.Day } diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/DailyFindNames.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/DailyFindNames.scala index ee9f9524e5b..91ed6b0a0f1 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/DailyFindNames.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/DailyFindNames.scala @@ -13,38 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.twitter.zipkin.hadoop.sources -import com.twitter.scalding._ -import com.twitter.zipkin.gen._ -import scala.collection.JavaConverters._ +import com.twitter.scalding.Args /** * Finds the best client side and service names for each span, if any exist */ - -class DailyFindNames(args : Args) extends Job(args) with DefaultDateRangeJob { - - val preprocessed = DailyPrepNoNamesSpanSource() - .read - .mapTo(0 ->('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations)) { - s: Span => (s.trace_id, s.name, s.id, s.parent_id, s.annotations.toList, s.binary_annotations.toList) - } - - val findNames = preprocessed - .flatMap('annotations -> 'service) { Util.getServiceName } - .mapTo(('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations, 'service) -> 'spanWithServiceNames) { - a : (Long, String, Long, Long, List[Annotation], List[BinaryAnnotation], String) => - a match { - case (tid, name, id, pid, annotations, binary_annotations, service) => - val spanSN = new SpanServiceName(tid, name, id, annotations.asJava, binary_annotations.asJava, service) - if (pid != 0) { - spanSN.setParent_id(pid) - } - spanSN - } - }.write(DailyPreprocessedSpanSource()) - - +class DailyFindNames(args: Args) extends FindNames(args) { + override val timeGranularity = TimeGranularity.Day } diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/DailyPreprocessed.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/DailyPreprocessed.scala index e601fd4cefe..641dabc620c 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/DailyPreprocessed.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/DailyPreprocessed.scala @@ -13,41 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - package com.twitter.zipkin.hadoop.sources -import com.twitter.zipkin.gen.{BinaryAnnotation, Span, Annotation} -import com.twitter.scalding._ -import com.twitter.zipkin.gen -import scala.collection.JavaConverters._ +import com.twitter.scalding.Args /** * Preprocesses the data by merging different pieces of the same span */ -class DailyPreprocessed(args : Args) extends Job(args) with DefaultDateRangeJob { - val preprocessed = DailySpanSource() - .read - .mapTo(0 ->('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations)) { - s: Span => (s.trace_id, s.name, s.id, s.parent_id, s.annotations.toList, s.binary_annotations.toList) - } - .groupBy('trace_id, 'id, 'parent_id) { - _.reduce('annotations, 'binary_annotations) { - (left: (List[Annotation], List[BinaryAnnotation]), right: (List[Annotation], List[BinaryAnnotation])) => - (left._1 ++ right._1, left._2 ++ right._2) - } - } - - val onlyMerge = preprocessed - .mapTo(('trace_id, 'id, 'parent_id, 'annotations, 'binary_annotations) -> 'span) { - a : (Long, Long, Long, List[Annotation], List[BinaryAnnotation]) => - a match { - case (tid, id, pid, annotations, binary_annotations) => - val span = new gen.Span(tid, "", id, annotations.asJava, binary_annotations.asJava) - if (pid != 0) { - span.setParent_id(pid) - } - span - } - }.write(DailyPrepNoNamesSpanSource()) +class DailyPreprocessed(args: Args) extends Preprocessed(args) { + override val timeGranularity = TimeGranularity.Day } diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/FindIDtoName.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/FindIDtoName.scala index 49868b84b60..c997e52f819 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/FindIDtoName.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/FindIDtoName.scala @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.twitter.zipkin.hadoop.sources import com.twitter.scalding.{DefaultDateRangeJob, Job, Args} @@ -22,14 +21,15 @@ import com.twitter.zipkin.gen.SpanServiceName /** * Finds the mapping from span ID to service name */ +class FindIDtoName(args: Args) extends Job(args) with DefaultDateRangeJob { -class FindIDtoName(args : Args) extends Job(args) with DefaultDateRangeJob { + val timeGranularity: TimeGranularity = TimeGranularity.Hour - val spanInfo = PreprocessedSpanSource() + val spanInfo = PreprocessedSpanSource(timeGranularity) .read .mapTo(0 -> ('id_1, 'name_1)) { s: SpanServiceName => (s.id, s.service_name ) } .filter('name_1) {n : String => n != null } .unique('id_1, 'name_1) - .write(PrepTsvSource()) + .write(PrepTsvSource(timeGranularity)) } diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/FindNames.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/FindNames.scala index 21b9b8ac0bb..ac0127fedcb 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/FindNames.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/FindNames.scala @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.twitter.zipkin.hadoop.sources import com.twitter.scalding._ @@ -23,10 +22,10 @@ import scala.collection.JavaConverters._ /** * Finds the best client side and service names for each span, if any exist */ +class FindNames(args: Args) extends Job(args) with DefaultDateRangeJob { + val timeGranularity: TimeGranularity = TimeGranularity.Hour -class FindNames(args : Args) extends Job(args) with DefaultDateRangeJob { - - val preprocessed = PrepNoNamesSpanSource() + val preprocessed = PrepNoNamesSpanSource(timeGranularity) .read .mapTo(0 ->('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations)) { s: Span => (s.trace_id, s.name, s.id, s.parent_id, s.annotations.toList, s.binary_annotations.toList) @@ -44,7 +43,7 @@ class FindNames(args : Args) extends Job(args) with DefaultDateRangeJob { } spanSN } - }.write(PreprocessedSpanSource()) + }.write(PreprocessedSpanSource(timeGranularity)) } diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/Preprocessed.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/Preprocessed.scala index 4a8e477bc6d..de86887d1ab 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/Preprocessed.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/Preprocessed.scala @@ -13,8 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - package com.twitter.zipkin.hadoop.sources import com.twitter.zipkin.gen.{BinaryAnnotation, Span, Annotation} @@ -26,7 +24,9 @@ import scala.collection.JavaConverters._ * Preprocesses the data by merging different pieces of the same span */ class Preprocessed(args : Args) extends Job(args) with DefaultDateRangeJob { - val preprocessed = SpanSource() + val timeGranularity: TimeGranularity = TimeGranularity.Hour + + val preprocessed = SpanSource(timeGranularity) .read .mapTo(0 ->('trace_id, 'id, 'parent_id, 'annotations, 'binary_annotations)) { s: Span => (s.trace_id, s.id, s.parent_id, s.annotations.toList, s.binary_annotations.toList) @@ -49,5 +49,5 @@ class Preprocessed(args : Args) extends Job(args) with DefaultDateRangeJob { } span } - }.write(PrepNoNamesSpanSource()) + }.write(PrepNoNamesSpanSource(timeGranularity)) } diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/SpanSource.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/SpanSource.scala index d964a049ad9..55fe0aaed58 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/SpanSource.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/SpanSource.scala @@ -15,7 +15,6 @@ */ package com.twitter.zipkin.hadoop.sources -import com.twitter.zipkin.gen.{BinaryAnnotation, Annotation} import org.apache.thrift.TBase import cascading.scheme.Scheme import com.twitter.zipkin.gen.{Span, SpanServiceName} @@ -35,23 +34,6 @@ object HadoopSchemeInstance { scheme.asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]] } -abstract class HourlySuffixLzoThrift[T <: TBase[_,_] : Manifest](prefix : String, dateRange : DateRange) extends - HourlySuffixSource(prefix, dateRange) with LzoThrift[T] { - def column = manifest[T].erasure -} - -abstract class DailySuffixLzoThrift[T <: TBase[_,_] : Manifest](prefix : String, dateRange : DateRange) extends - DailySuffixSource(prefix, dateRange) with LzoThrift[T] { - def column = manifest[T].erasure -} - - -abstract class HourlySuffixSource(prefixTemplate : String, dateRange : DateRange) extends - TimePathedSource(prefixTemplate + TimePathedSource.YEAR_MONTH_DAY_HOUR + "/*", dateRange, DateOps.UTC) - -abstract class DailySuffixSource(prefixTemplate : String, dateRange : DateRange) extends -TimePathedSource(prefixTemplate + TimePathedSource.YEAR_MONTH_DAY + "/*", dateRange, DateOps.UTC) - trait LzoThrift[T <: TBase[_, _]] extends Mappable[T] { def column: Class[_] @@ -61,6 +43,15 @@ trait LzoThrift[T <: TBase[_, _]] extends Mappable[T] { override def hdfsScheme = HadoopSchemeInstance(new LzoThriftScheme[T](column)) } +abstract class TimeSuffixSource(prefix: String, granularity: TimeGranularity, dateRange: DateRange) + extends TimePathedSource(prefix + granularity.timePath + "/*", dateRange, DateOps.UTC) + + +abstract class LzoThriftTimeSuffixSource[T <: TBase[_,_] : Manifest](prefix: String, granularity: TimeGranularity, dateRange: DateRange) + extends TimeSuffixSource(prefix, granularity, dateRange) with LzoThrift[T] { + def column = manifest[T].erasure +} + /** * Ensures that a _SUCCESS file is present in the Source path. */ @@ -87,9 +78,8 @@ trait LzoTsv extends DelimitedScheme { /** * This is the source for trace data. Directories are like so: /logs/zipkin/yyyy/mm/dd/hh */ -case class SpanSource(implicit dateRange: DateRange) extends HourlySuffixLzoThrift[Span]("/logs/zipkin/", dateRange) - -case class DailySpanSource(implicit dateRange: DateRange) extends DailySuffixLzoThrift[Span]("/logs/zipkin/", dateRange) +case class SpanSource(granularity: TimeGranularity)(implicit dateRange: DateRange) + extends LzoThriftTimeSuffixSource[Span]("/logs/zipkin/", granularity, dateRange) case class FixedSpanSource(p : String) extends FixedPathSource(p) with LzoThrift[Span] { def column = classOf[Span] @@ -98,34 +88,21 @@ case class FixedSpanSource(p : String) extends FixedPathSource(p) with LzoThrift /** * This is the source for trace data that has been merged. Directories are like in SpanSource */ -case class PrepNoNamesSpanSource(implicit dateRange: DateRange) extends HourlySuffixLzoThrift[Span]("Preprocessed", dateRange) - -case class DailyPrepNoNamesSpanSource(implicit dateRange: DateRange) extends DailySuffixLzoThrift[Span]("DailyPreprocessed", dateRange) +case class PrepNoNamesSpanSource(granularity: TimeGranularity)(implicit dateRange: DateRange) + extends LzoThriftTimeSuffixSource[Span]("Preprocessed_%s".format(granularity.name), granularity, dateRange) /** * This is the source for trace data that has been merged and for which we've found * the best possible client side and service names. Directories are like in SpanSource */ -case class PreprocessedSpanSource(implicit dateRange: DateRange) extends HourlySuffixLzoThrift[SpanServiceName]("FindNames", dateRange) - -case class DailyPreprocessedSpanSource(implicit dateRange: DateRange) extends DailySuffixLzoThrift[SpanServiceName]("DailyFindNames", dateRange) +case class PreprocessedSpanSource(granularity: TimeGranularity)(implicit dateRange: DateRange) + extends LzoThriftTimeSuffixSource[SpanServiceName]("FindNames_%s".format(granularity.name), granularity, dateRange) /** * This is the source for data of the form (id, service name) */ - -case class PrepTsvSource()(implicit dateRange : DateRange) - extends HourlySuffixSource("FindIDtoName", dateRange) - with LzoTsv - with Mappable[(Long, String)] - with SuccessFileSource { - override val fields = new Fields("id_1", "name_1") - override val types : Array[Class[_]] = Array(classOf[Long], classOf[String]) - override val columnNums = (0 until types.size) -} - -case class DailyPrepTsvSource()(implicit dateRange : DateRange) - extends DailySuffixSource("DailyFindIDtoName", dateRange) +case class PrepTsvSource(granularity: TimeGranularity)(implicit dateRange : DateRange) + extends TimeSuffixSource("FindIDtoName_%s".format(granularity.name), granularity, dateRange) with LzoTsv with Mappable[(Long, String)] with SuccessFileSource { diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/TimeGranularity.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/TimeGranularity.scala new file mode 100644 index 00000000000..af227e1d24e --- /dev/null +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/TimeGranularity.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2012 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.zipkin.hadoop.sources + +import com.twitter.scalding.TimePathedSource + +object TimeGranularity { + case object Hour extends TimeGranularity("Hour", TimePathedSource.YEAR_MONTH_DAY_HOUR) + case object Day extends TimeGranularity("Day", TimePathedSource.YEAR_MONTH_DAY) +} + +abstract class TimeGranularity(val name: String, val timePath: String) diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/DependencyTreeSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/DependencyTreeSpec.scala index 096de2df23d..d7bba1c1694 100644 --- a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/DependencyTreeSpec.scala +++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/DependencyTreeSpec.scala @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.twitter.zipkin.hadoop import org.specs.Specification @@ -32,6 +31,7 @@ import com.twitter.zipkin.hadoop.sources._ class DependencyTreeSpec extends Specification with TupleConversions { noDetailedDiffs() + val timeGranularity = TimeGranularity.Day implicit val dateRange = DateRange(RichDate(123), RichDate(321)) val endpoint = new gen.Endpoint(123, 666, "service") @@ -55,8 +55,8 @@ class DependencyTreeSpec extends Specification with TupleConversions { .arg("input", "inputFile") .arg("output", "outputFile") .arg("date", "2012-01-01T01:00") - .source(DailyPreprocessedSpanSource(), spans) - .source(DailyPrepTsvSource(), Util.getSpanIDtoNames(spans)) + .source(PreprocessedSpanSource(timeGranularity), spans) + .source(PrepTsvSource(timeGranularity), Util.getSpanIDtoNames(spans)) .sink[(String, String, Long)](Tsv("outputFile")) { val map = new HashMap[String, Long]() map("service, " + Util.UNKNOWN_SERVICE_NAME) = 0 diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/ExpensiveEndpointsSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/ExpensiveEndpointsSpec.scala index 2147044bf03..0a4cbdd2381 100644 --- a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/ExpensiveEndpointsSpec.scala +++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/ExpensiveEndpointsSpec.scala @@ -13,14 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.twitter.zipkin.hadoop import org.specs.Specification import com.twitter.zipkin.gen -import com.twitter.scalding._ -import com.twitter.zipkin.gen -import gen.AnnotationType import scala.collection.JavaConverters._ import collection.mutable.HashMap import com.twitter.scalding.TupleConversions @@ -38,6 +34,7 @@ import com.twitter.zipkin.hadoop.sources._ class ExpensiveEndpointsSpec extends Specification with TupleConversions { noDetailedDiffs() + val timeGranularity = TimeGranularity.Day implicit val dateRange = DateRange(RichDate(123), RichDate(321)) val endpoint = new gen.Endpoint(123, 666, "service") @@ -58,8 +55,8 @@ class ExpensiveEndpointsSpec extends Specification with TupleConversions { arg("input", "inputFile"). arg("output", "outputFile"). arg("date", "2012-01-01T01:00"). - source(DailyPreprocessedSpanSource(), spans). - source(DailyPrepTsvSource(), Util.getSpanIDtoNames(spans)). + source(PreprocessedSpanSource(timeGranularity), spans). + source(PrepTsvSource(timeGranularity), Util.getSpanIDtoNames(spans)). sink[(String, String, Long)](Tsv("outputFile")) { val result = new HashMap[String, Long]() result("service, service2") = 0 diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/FindDuplicateTracesSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/FindDuplicateTracesSpec.scala index e26beb96f49..cb4f529af97 100644 --- a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/FindDuplicateTracesSpec.scala +++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/FindDuplicateTracesSpec.scala @@ -21,8 +21,7 @@ import com.twitter.zipkin.gen import com.twitter.scalding._ import gen.AnnotationType import scala.collection.JavaConverters._ -import collection.mutable.HashMap -import com.twitter.zipkin.hadoop.sources.{PrepTsvSource, PreprocessedSpanSource, Util} +import com.twitter.zipkin.hadoop.sources.{PreprocessedSpanSource, TimeGranularity, Util} /** * Tests that DependencyTree finds all service calls and how often per pair @@ -56,7 +55,7 @@ class FindDuplicateTracesSpec extends Specification with TupleConversions { .arg("output", "outputFile") .arg("date", "2012-01-01T01:00") .arg("maximum_duration", "600") - .source(PreprocessedSpanSource(), spans) + .source(PreprocessedSpanSource(TimeGranularity.Hour), spans) .sink[Long](Tsv("outputFile")) { outputBuffer => outputBuffer foreach { e => e mustEqual 12345 diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/MemcacheRequestSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/MemcacheRequestSpec.scala index c4690dd6f08..ef1d665ee9b 100644 --- a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/MemcacheRequestSpec.scala +++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/MemcacheRequestSpec.scala @@ -63,8 +63,8 @@ class MemcacheRequestSpec extends Specification with TupleConversions { arg("input", "inputFile"). arg("output", "outputFile"). arg("date", "2012-01-01T01:00"). - source(DailyPrepNoNamesSpanSource(), Util.repeatSpan(span1, 10, 100, 0) ++ Util.repeatSpan(span3, 2, 200, 300) ++ Util.repeatSpan(span3, 0, 1000, 500) ++ Util.repeatSpan(span4, 2, 1000, 11) ). - source(DailyPreprocessedSpanSource(), Util.repeatSpan(span, 12, 0, 20) ++ Util.repeatSpan(span2, 2, 300, 400) ++ Util.repeatSpan(span2, 0, 500, 100000)). + source(PrepNoNamesSpanSource(TimeGranularity.Day), Util.repeatSpan(span1, 10, 100, 0) ++ Util.repeatSpan(span3, 2, 200, 300) ++ Util.repeatSpan(span3, 0, 1000, 500) ++ Util.repeatSpan(span4, 2, 1000, 11) ). + source(PreprocessedSpanSource(TimeGranularity.Day), Util.repeatSpan(span, 12, 0, 20) ++ Util.repeatSpan(span2, 2, 300, 400) ++ Util.repeatSpan(span2, 0, 500, 100000)). sink[(String, Long)](Tsv("outputFile")) { val counts = new HashMap[String, Long]() counts("service") = 0 diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/PopularAnnotationsSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/PopularAnnotationsSpec.scala index 0265cf6490b..4a180f820d4 100644 --- a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/PopularAnnotationsSpec.scala +++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/PopularAnnotationsSpec.scala @@ -20,7 +20,7 @@ import org.specs.Specification import com.twitter.zipkin.gen import com.twitter.scalding._ import gen.AnnotationType -import com.twitter.zipkin.hadoop.sources.{DailyPreprocessedSpanSource, Util} +import com.twitter.zipkin.hadoop.sources.{PreprocessedSpanSource, TimeGranularity, Util} import scala.collection.JavaConverters._ import scala.collection.mutable._ @@ -53,7 +53,7 @@ class PopularAnnotationsSpec extends Specification with TupleConversions { arg("input", "inputFile"). arg("output", "outputFile"). arg("date", "2012-01-01T01:00"). - source(DailyPreprocessedSpanSource(), Util.repeatSpan(span, 101, 0, 0) ::: Util.repeatSpan(span1, 50, 200, 0) ::: Util.repeatSpan(span2, 10, 500, 0)). + source(PreprocessedSpanSource(TimeGranularity.Day), Util.repeatSpan(span, 101, 0, 0) ::: Util.repeatSpan(span1, 50, 200, 0) ::: Util.repeatSpan(span2, 10, 500, 0)). sink[(String, String)](Tsv("outputFile")) { val map = new HashMap[String, List[String]]() map("service") = Nil diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/PopularKeysSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/PopularKeysSpec.scala index f4d9011b8d2..08727492a27 100644 --- a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/PopularKeysSpec.scala +++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/PopularKeysSpec.scala @@ -20,7 +20,7 @@ import org.specs.Specification import com.twitter.zipkin.gen import com.twitter.scalding._ import gen.AnnotationType -import com.twitter.zipkin.hadoop.sources.{DailyPreprocessedSpanSource, Util} +import com.twitter.zipkin.hadoop.sources.{PreprocessedSpanSource, TimeGranularity, Util} import scala.collection.JavaConverters._ import scala.collection.mutable._ @@ -49,7 +49,7 @@ class PopularKeysSpec extends Specification with TupleConversions { arg("input", "inputFile"). arg("output", "outputFile"). arg("date", "2012-01-01T01:00"). - source(DailyPreprocessedSpanSource(), Util.repeatSpan(span, 101, 0, 0) ::: Util.repeatSpan(span1, 50, 200, 0)). + source(PreprocessedSpanSource(TimeGranularity.Day), Util.repeatSpan(span, 101, 0, 0) ::: Util.repeatSpan(span1, 50, 200, 0)). sink[(String, String)](Tsv("outputFile")) { val map = new HashMap[String, List[String]]() map("service") = Nil diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/ServerResponsetimeSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/ServerResponsetimeSpec.scala index 106793d5aba..90040d12058 100644 --- a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/ServerResponsetimeSpec.scala +++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/ServerResponsetimeSpec.scala @@ -4,7 +4,7 @@ package com.twitter.zipkin.hadoop import org.specs.Specification import com.twitter.zipkin.gen import com.twitter.scalding._ -import com.twitter.zipkin.hadoop.sources.{SpanSource, Util} +import com.twitter.zipkin.hadoop.sources.{SpanSource, TimeGranularity, Util} import scala.collection.JavaConverters._ class ServerResponsetimeSpec extends Specification with TupleConversions { @@ -23,7 +23,7 @@ class ServerResponsetimeSpec extends Specification with TupleConversions { arg("input", "inputFile"). arg("output", "outputFile"). arg("date", "2012-01-01T01:00"). - source(SpanSource(), List(span -> 0)). + source(SpanSource(TimeGranularity.Hour), List(span -> 0)). sink[(String, Int)](Tsv("outputFile")) { outputBuffer => outputBuffer.toMap mustEqual Map() }.run.finish @@ -33,7 +33,7 @@ class ServerResponsetimeSpec extends Specification with TupleConversions { arg("input", "inputFile"). arg("output", "outputFile"). arg("date", "2012-01-01T01:00"). - source(SpanSource(), Util.repeatSpan(span, 101, 0, 0)). + source(SpanSource(TimeGranularity.Hour), Util.repeatSpan(span, 101, 0, 0)). sink[(String, String, Double, Double, Double)](Tsv("outputFile")) { outputBuffer => outputBuffer foreach { e => e mustEqual ("0.0.0.123", "service", 102d, 1d, 0d) diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/TimeoutsSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/TimeoutsSpec.scala index 7098da98c98..96d041beae2 100644 --- a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/TimeoutsSpec.scala +++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/TimeoutsSpec.scala @@ -33,6 +33,7 @@ import com.twitter.zipkin.hadoop.sources._ class TimeoutsSpec extends Specification with TupleConversions { noDetailedDiffs() + val timeGranularity = TimeGranularity.Day implicit val dateRange = DateRange(RichDate(123), RichDate(321)) val endpoint = new gen.Endpoint(123, 666, "service") @@ -62,8 +63,8 @@ class TimeoutsSpec extends Specification with TupleConversions { arg("output", "outputFile"). arg("date", "2012-01-01T01:00"). arg("error_type", "finagle.timeout"). - source(DailyPreprocessedSpanSource(), spans). - source(DailyPrepTsvSource(), Util.getSpanIDtoNames(spans)). + source(PreprocessedSpanSource(timeGranularity), spans). + source(PrepTsvSource(timeGranularity), Util.getSpanIDtoNames(spans)). sink[(String, String, Long)](Tsv("outputFile")) { val map = new HashMap[String, Long]() map("service, " + Util.UNKNOWN_SERVICE_NAME) = 0 diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WhaleReportSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WhaleReportSpec.scala index 2f177337052..bb5d47a4937 100644 --- a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WhaleReportSpec.scala +++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WhaleReportSpec.scala @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.twitter.zipkin.hadoop import org.specs.Specification @@ -64,7 +63,7 @@ class WhaleReportSpec extends Specification with TupleConversions { arg("input", "inputFile"). arg("output", "outputFile"). arg("date", "2012-01-01T01:00"). - source(DailyPreprocessedSpanSource(), spans). + source(PreprocessedSpanSource(TimeGranularity.Day), spans). sink[(Long, List[String])](Tsv("outputFile")) { var result = new HashSet[String]() var actual = new HashSet[String]() diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WorstRuntimesPerTraceSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WorstRuntimesPerTraceSpec.scala index e226f89d510..0f800db5e00 100644 --- a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WorstRuntimesPerTraceSpec.scala +++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WorstRuntimesPerTraceSpec.scala @@ -22,7 +22,7 @@ import com.twitter.scalding._ import gen.AnnotationType import scala.collection.JavaConverters._ import collection.mutable.HashMap -import com.twitter.zipkin.hadoop.sources.{DailyPreprocessedSpanSource, Util} +import com.twitter.zipkin.hadoop.sources.{PreprocessedSpanSource, TimeGranularity, Util} /** * Tests that DependencyTree finds all service calls and how often per pair @@ -55,7 +55,7 @@ class WorstRuntimesPerTraceSpec extends Specification with TupleConversions { .arg("input", "inputFile") .arg("output", "outputFile") .arg("date", "2012-01-01T01:00") - .source(DailyPreprocessedSpanSource(), spans) + .source(PreprocessedSpanSource(TimeGranularity.Day), spans) .sink[(String, Long, Int)](Tsv("outputFile")) { val map = new HashMap[String, Int]() outputBuffer => outputBuffer foreach { e => diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WorstRuntimesSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WorstRuntimesSpec.scala index fa949006a8a..78c2e0da1e0 100644 --- a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WorstRuntimesSpec.scala +++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WorstRuntimesSpec.scala @@ -22,7 +22,7 @@ import com.twitter.scalding._ import scala.collection.JavaConverters._ import collection.mutable.HashMap import com.twitter.zipkin.gen.AnnotationType -import sources.{PreprocessedSpanSource, Util, PrepNoNamesSpanSource} +import com.twitter.zipkin.hadoop.sources.{PreprocessedSpanSource, TimeGranularity, Util} /** * Tests that WorstRuntimes finds the spans which take the longest to run @@ -54,7 +54,7 @@ class WorstRuntimesSpec extends Specification with TupleConversions { .arg("input", "inputFile") .arg("output", "outputFile") .arg("date", "2012-01-01T01:00") - .source(PreprocessedSpanSource(), (Util.repeatSpan(span, 20, 0, 0) ++ Util.repeatSpan(span1, 20, 100, 0))) + .source(PreprocessedSpanSource(TimeGranularity.Hour), (Util.repeatSpan(span, 20, 0, 0) ++ Util.repeatSpan(span1, 20, 100, 0))) .sink[(String, Long, Long)](Tsv("outputFile")) { var counts = new HashMap[String, Long]() counts += ("service" -> 0)