Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Two simple tests for the ServerResponsetime Hadoop jobs #28

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,9 +20,9 @@ import cascading.scheme.local.{TextLine => CLTextLine}
import com.twitter.elephantbird.cascading2.scheme.LzoThriftScheme
import cascading.scheme.Scheme
import cascading.flow.FlowProcess
import com.twitter.scalding.{DateOps, TimePathedSource, DateRange, Mappable}
import com.twitter.zipkin.gen.Span
import org.apache.hadoop.mapred.{JobConf, RecordReader, OutputCollector}
import com.twitter.scalding._

// Scala is pickier than Java about type parameters, and Cascading's Scheme
// declaration leaves some type parameters underspecified. Fill in the type
Expand All @@ -43,9 +43,8 @@ abstract class HourlySuffixSource(prefixTemplate : String, dateRange : DateRange
trait LzoThrift[T <: TBase[_, _]] extends Mappable[T] {
def column: Class[_]

override def localScheme = {
println("This does not work yet"); new CLTextLine
}
// TODO this won't actually work locally, but we need something here for the tests
override def localScheme = new CLTextLine()

override def hdfsScheme = HadoopSchemeInstance(new LzoThriftScheme[T](column))
}
Expand Down
@@ -0,0 +1,49 @@
package com.twitter.zipkin.hadoop

import org.specs.Specification
import com.twitter.zipkin.gen
import com.twitter.scalding._
import sources.SpanSource
import scala.collection.JavaConverters._

class ServerResponsetimeSpec extends Specification with TupleConversions {
noDetailedDiffs()

implicit val dateRange = DateRange(RichDate(123), RichDate(321))

val endpoint = new gen.Endpoint(123, 666, "service")
val span = new gen.Span(12345, "methodcall", 666,
List(new gen.Annotation(1000, "sr").setHost(endpoint), new gen.Annotation(2000, "ss").setHost(endpoint)).asJava,
List[gen.BinaryAnnotation]().asJava)


"ServerResponsetime" should {
"have no output if input is < 100 entries" in {
JobTest("com.twitter.zipkin.hadoop.ServerResponsetime").
arg("input", "inputFile").
arg("output", "outputFile").
arg("date", "2012-01-01T01:00").
source(SpanSource(), List(span -> 0)).
sink[(String, Int)](Tsv("outputFile")) {
outputBuffer => outputBuffer.toMap mustEqual Map()
}.run.finish
}
"return one entry with avg 1 ms" in {
JobTest("com.twitter.zipkin.hadoop.ServerResponsetime").
arg("input", "inputFile").
arg("output", "outputFile").
arg("date", "2012-01-01T01:00").
source(SpanSource(), repeatSpan(span, 101)).
sink[(String, String, Double, Double, Double)](Tsv("outputFile")) {
outputBuffer => outputBuffer foreach { e =>
e mustEqual ("0.0.0.123", "service", 102d, 1d, 0d)
}
}.run.finish
}

}

def repeatSpan(span: gen.Span, count: Int): List[(gen.Span, Int)] = {
((0 to count).toSeq map { i: Int => span.deepCopy().setId(i) -> i }).toList
}
}