Skip to content

Commit

Permalink
Hadoop cleanup (part 1)
Browse files Browse the repository at this point in the history
* Add `TimeGranularity.Day` and `TimeGranularity.Hour` types
* Move time granularity to be specified at the job level as an argument
  rather than in the type, eg. `Source(TimeGranularity.Day)` rather than
  `DailySource`

Author: @franklinhu
Fixes #144
URL: #144
  • Loading branch information
Franklin Hu committed Sep 7, 2012
1 parent 318a97a commit db753f7
Show file tree
Hide file tree
Showing 31 changed files with 124 additions and 199 deletions.
Expand Up @@ -13,27 +13,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import cascading.pipe.joiner._
import com.twitter.zipkin.gen.{SpanServiceName, BinaryAnnotation, Span, Annotation}
import com.twitter.scalding._
import com.twitter.zipkin.gen.SpanServiceName
import com.twitter.zipkin.hadoop.sources._

/**
* Find out how often services call each other throughout the entire system
*/

class DependencyTree(args: Args) extends Job(args) with DefaultDateRangeJob {
val spanInfo = DailyPreprocessedSpanSource()

val timeGranularity = TimeGranularity.Day

val spanInfo = PreprocessedSpanSource(timeGranularity)
.read
.filter(0) { s : SpanServiceName => s.isSetParent_id() }
.mapTo(0 -> ('id, 'parent_id, 'service))
{ s: SpanServiceName => (s.id, s.parent_id, s.service_name ) }

// TODO: account for possible differences between sent and received service names
val idName = DailyPrepTsvSource()
val idName = PrepTsvSource(timeGranularity)
.read
/* Join with the original on parent ID to get the parent's service name */
val spanInfoWithParent = spanInfo
Expand Down
Expand Up @@ -17,7 +17,6 @@
package com.twitter.zipkin.hadoop

import com.twitter.zipkin.gen.{Constants, SpanServiceName, Annotation}
import cascading.pipe.joiner.LeftJoin
import com.twitter.scalding.{Tsv, DefaultDateRangeJob, Job, Args}
import com.twitter.zipkin.hadoop.sources._

Expand All @@ -26,7 +25,9 @@ import com.twitter.zipkin.hadoop.sources._
*/
class ExpensiveEndpoints(args : Args) extends Job(args) with DefaultDateRangeJob {

val spanInfo = DailyPreprocessedSpanSource()
val timeGranularity = TimeGranularity.Day

val spanInfo = PreprocessedSpanSource(timeGranularity)
.read
.filter(0) { s : SpanServiceName => s.isSetParent_id() }
.mapTo(0 -> ('id, 'parent_id, 'service, 'annotations))
Expand All @@ -52,7 +53,7 @@ class ExpensiveEndpoints(args : Args) extends Job(args) with DefaultDateRangeJob
}
}

val idName = DailyPrepTsvSource()
val idName = PrepTsvSource(timeGranularity)
.read
/* Join with the original on parent ID to get the parent's service name */
val spanInfoWithParent = spanInfo
Expand Down
Expand Up @@ -13,14 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import com.twitter.zipkin.gen.{BinaryAnnotation, Constants, SpanServiceName, Annotation}
import com.twitter.scalding.{Tsv, DefaultDateRangeJob, Job, Args}
import com.twitter.zipkin.hadoop.sources.{Util, PreprocessedSpanSource}
import java.nio.ByteBuffer

import com.twitter.zipkin.gen.{SpanServiceName, Annotation}
import com.twitter.zipkin.hadoop.sources.{TimeGranularity, PreprocessedSpanSource}

/**
* Finds traces with duplicate trace IDs
Expand All @@ -30,7 +27,7 @@ class FindDuplicateTraces(args: Args) extends Job(args) with DefaultDateRangeJob

val maxDuration = augmentString(args.required("maximum_duration")).toInt

val result = PreprocessedSpanSource()
val result = PreprocessedSpanSource(TimeGranularity.Hour)
.read
.mapTo(0 ->('trace_id, 'annotations)) { s: SpanServiceName =>
(s.trace_id, s.annotations.toList)
Expand Down
Expand Up @@ -17,14 +17,14 @@ package com.twitter.zipkin.hadoop

import com.twitter.zipkin.gen.{Annotation, Span}
import com.twitter.scalding.{Tsv, DefaultDateRangeJob, Job, Args}
import sources.SpanSource
import com.twitter.zipkin.hadoop.sources.{TimeGranularity, SpanSource}

class GrepByAnnotation(args: Args) extends Job(args) with DefaultDateRangeJob {

val grepByWord = args.required("word")

val preprocessed =
SpanSource()
SpanSource(TimeGranularity.Hour)
.read
.mapTo(0 -> ('traceid, 'annotations)) { s: Span => (s.trace_id, s.annotations.toList) }
.filter('annotations) { annotations: List[Annotation] =>
Expand Down
Expand Up @@ -13,21 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import java.nio.ByteBuffer
import java.util.Arrays
import sources._
import com.twitter.zipkin.gen._
import com.twitter.zipkin.hadoop.sources.{PreprocessedSpanSource, TimeGranularity, PrepNoNamesSpanSource}

/**
* Find out how often each service does memcache accesses
*/
class MemcacheRequest(args : Args) extends Job(args) with DefaultDateRangeJob {

val preprocessed = DailyPrepNoNamesSpanSource()
val preprocessed = PrepNoNamesSpanSource(TimeGranularity.Day)
.read
.mapTo(0 -> ('parent_id, 'binary_annotations))
{ s: Span => (s.parent_id, s.binary_annotations.toList) }
Expand All @@ -40,7 +37,7 @@ class MemcacheRequest(args : Args) extends Job(args) with DefaultDateRangeJob {
}
.project('parent_id, 'memcacheNames)

val memcacheRequesters = DailyPreprocessedSpanSource()
val memcacheRequesters = PreprocessedSpanSource(TimeGranularity.Day)
.read
.mapTo(0 -> ('trace_id, 'id, 'service))
{ s: SpanServiceName => (s.trace_id, s.id, s.service_name)}
Expand Down
Expand Up @@ -18,15 +18,15 @@ package com.twitter.zipkin.hadoop


import com.twitter.scalding._
import com.twitter.zipkin.hadoop.sources.DailyPreprocessedSpanSource
import com.twitter.zipkin.hadoop.sources.{PreprocessedSpanSource, TimeGranularity}
import com.twitter.zipkin.gen.{SpanServiceName, Annotation}

/**
* Per service, find the 100 most common annotations used to annotate spans involving that service
*/
class PopularAnnotations(args : Args) extends Job(args) with DefaultDateRangeJob {

val preprocessed = DailyPreprocessedSpanSource()
val preprocessed = PreprocessedSpanSource(TimeGranularity.Day)
.read
.mapTo(0 -> ('service, 'annotations))
{ s: SpanServiceName => (s.service_name, s.annotations.toList) }
Expand Down
Expand Up @@ -13,20 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop


import com.twitter.scalding._
import com.twitter.zipkin.hadoop.sources.DailyPreprocessedSpanSource
import com.twitter.zipkin.hadoop.sources.{PreprocessedSpanSource, TimeGranularity}
import com.twitter.zipkin.gen.{SpanServiceName, BinaryAnnotation}

/**
* Per service, find the 100 most common keys used to annotate spans involving that service
*/
class PopularKeys(args : Args) extends Job(args) with DefaultDateRangeJob {

val preprocessed = DailyPreprocessedSpanSource()
val preprocessed = PreprocessedSpanSource(TimeGranularity.Day)
.read
.mapTo(0 -> ('service, 'binary_annotations))
{ s: SpanServiceName => (s.service_name, s.binary_annotations.toList) }
Expand Down
Expand Up @@ -15,9 +15,8 @@
*/
package com.twitter.zipkin.hadoop


import com.twitter.scalding._
import com.twitter.zipkin.hadoop.sources.SpanSource
import com.twitter.zipkin.hadoop.sources.{TimeGranularity, SpanSource}
import com.twitter.zipkin.gen.{Span, Constants, Annotation}
import scala.collection.JavaConverters._
import java.nio.ByteBuffer
Expand All @@ -31,7 +30,7 @@ class ServerResponsetime(args: Args) extends Job(args) with DefaultDateRangeJob

val serverAnnotations = Seq(Constants.SERVER_RECV, Constants.SERVER_SEND)

SpanSource()
SpanSource(TimeGranularity.Hour)
.read
// only need id and annotations for this
.mapTo(0 -> ('id, 'annotations)) { s: Span => (s.id, s.annotations.toList) }
Expand Down
Expand Up @@ -27,6 +27,7 @@ import com.twitter.zipkin.hadoop.sources._

class Timeouts(args: Args) extends Job(args) with DefaultDateRangeJob {

val timeGranularity = TimeGranularity.Day
val ERROR_TYPE = List("finagle.timeout", "finagle.retry")

val input = args.required("error_type")
Expand All @@ -35,14 +36,14 @@ class Timeouts(args: Args) extends Job(args) with DefaultDateRangeJob {
}

// Preprocess the data into (trace_id, id, parent_id, annotations, client service name, service name)
val spanInfo = DailyPreprocessedSpanSource()
val spanInfo = PreprocessedSpanSource(timeGranularity)
.read
.mapTo(0 -> ('id, 'parent_id, 'annotations, 'service) )
{ s: SpanServiceName => (s.id, s.parent_id, s.annotations.toList, s.service_name) }


// Project to (id, service name)
val idName = DailyPrepTsvSource()
val idName = PrepTsvSource(timeGranularity)
.read

// Left join with idName to find the parent's service name, if applicable
Expand Down
Expand Up @@ -16,9 +16,9 @@

package com.twitter.zipkin.hadoop

import com.twitter.zipkin.gen.{BinaryAnnotation, Constants, SpanServiceName, Annotation}
import com.twitter.scalding.{Tsv, DefaultDateRangeJob, Job, Args}
import com.twitter.zipkin.hadoop.sources.{Util, DailyPreprocessedSpanSource}
import com.twitter.zipkin.gen.{BinaryAnnotation, SpanServiceName, Annotation}
import com.twitter.zipkin.hadoop.sources.{TimeGranularity, PreprocessedSpanSource, Util}
import java.nio.ByteBuffer


Expand All @@ -30,7 +30,7 @@ class WhaleReport(args: Args) extends Job(args) with DefaultDateRangeJob {

val ERRORS = List("finagle.timeout", "finagle.retry")

val spanInfo = DailyPreprocessedSpanSource()
val spanInfo = PreprocessedSpanSource(TimeGranularity.Day)
.read
.mapTo(0 ->('trace_id, 'id, 'service, 'annotations, 'binary_annotations))
{ s: SpanServiceName => (s.trace_id, s.id, s.service_name, s.annotations.toList, s.binary_annotations.toList)
Expand Down
Expand Up @@ -13,22 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import sources.{PreprocessedSpanSource, PrepNoNamesSpanSource}
import com.twitter.zipkin.gen.{SpanServiceName, Span, Constants, Annotation}
import com.twitter.zipkin.gen.{SpanServiceName, Constants, Annotation}
import com.twitter.zipkin.hadoop.sources.{TimeGranularity, PreprocessedSpanSource}

/**
* Obtain the IDs and the durations of the one hundred service calls which take the longest per service
*/

class WorstRuntimes(args: Args) extends Job(args) with DefaultDateRangeJob {

val clientAnnotations = Seq(Constants.CLIENT_RECV, Constants.CLIENT_SEND)

val preprocessed = PreprocessedSpanSource()
val preprocessed = PreprocessedSpanSource(TimeGranularity.Hour)
.read
.mapTo(0 -> ('service, 'id, 'annotations)) {
s : SpanServiceName => (s.service_name, s.id, s.annotations.toList)
Expand Down
Expand Up @@ -13,22 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import sources.{DailyPreprocessedSpanSource}
import com.twitter.zipkin.hadoop.sources.{PreprocessedSpanSource, TimeGranularity}
import com.twitter.zipkin.gen.{SpanServiceName, Constants, Annotation}

/**
* Obtain the IDs and the durations of the one hundred service calls which take the longest per service
*/

class WorstRuntimesPerTrace(args: Args) extends Job(args) with DefaultDateRangeJob {

val clientAnnotations = Seq(Constants.CLIENT_RECV, Constants.CLIENT_SEND)

val preprocessed = DailyPreprocessedSpanSource()
val preprocessed = PreprocessedSpanSource(TimeGranularity.Day)
.read
.mapTo(0 -> ('service, 'trace_id, 'annotations)) {
s : SpanServiceName => (s.service_name, s.trace_id, s.annotations.toList)
Expand Down
Expand Up @@ -13,23 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop.sources

import com.twitter.scalding.{DefaultDateRangeJob, Job, Args}
import com.twitter.zipkin.gen.SpanServiceName
import com.twitter.scalding.Args

/**
* Finds the mapping from span ID to service name
*/

class DailyFindIDtoName(args : Args) extends Job(args) with DefaultDateRangeJob {

val spanInfo = DailyPreprocessedSpanSource()
.read
.mapTo(0 -> ('id_1, 'name_1))
{ s: SpanServiceName => (s.id, s.service_name ) }
.filter('name_1) {n : String => n != null }
.unique('id_1, 'name_1)
.write(DailyPrepTsvSource())
class DailyFindIDtoName(args: Args) extends FindIDtoName(args) {
override val timeGranularity = TimeGranularity.Day
}
Expand Up @@ -13,38 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop.sources

import com.twitter.scalding._
import com.twitter.zipkin.gen._
import scala.collection.JavaConverters._
import com.twitter.scalding.Args

/**
* Finds the best client side and service names for each span, if any exist
*/

class DailyFindNames(args : Args) extends Job(args) with DefaultDateRangeJob {

val preprocessed = DailyPrepNoNamesSpanSource()
.read
.mapTo(0 ->('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations)) {
s: Span => (s.trace_id, s.name, s.id, s.parent_id, s.annotations.toList, s.binary_annotations.toList)
}

val findNames = preprocessed
.flatMap('annotations -> 'service) { Util.getServiceName }
.mapTo(('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations, 'service) -> 'spanWithServiceNames) {
a : (Long, String, Long, Long, List[Annotation], List[BinaryAnnotation], String) =>
a match {
case (tid, name, id, pid, annotations, binary_annotations, service) =>
val spanSN = new SpanServiceName(tid, name, id, annotations.asJava, binary_annotations.asJava, service)
if (pid != 0) {
spanSN.setParent_id(pid)
}
spanSN
}
}.write(DailyPreprocessedSpanSource())


class DailyFindNames(args: Args) extends FindNames(args) {
override val timeGranularity = TimeGranularity.Day
}

0 comments on commit db753f7

Please sign in to comment.