Permalink
Browse files

Updates tests, bumps depended on versions

  • Loading branch information...
1 parent 1481992 commit 1d9a3782e0304f06dc9f9fc1e414bd242ef0b620 @johnynek johnynek committed Feb 23, 2012
View
@@ -10,15 +10,15 @@ scalaVersion := "2.8.1"
resolvers += "Concurrent Maven Repo" at "http://conjars.org/repo"
-libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-215"
+libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-227"
-libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-215"
+libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-227"
-libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-215"
+libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-227"
-libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.2.0"
+libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.2.1"
-libraryDependencies += "com.twitter" % "meat-locker" % "0.1.4"
+libraryDependencies += "com.twitter" % "meat-locker" % "0.1.5"
libraryDependencies += "commons-lang" % "commons-lang" % "2.4"
@@ -0,0 +1,73 @@
+/*
+Copyright 2012 Twitter, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package com.twitter.scalding;
+
+import cascading.tuple.Hasher;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+/*
+ * Handles numerical hashing properly
+ */
+class IntegralComparator extends Comparator[AnyRef] with Hasher[AnyRef] with Serializable {
+
+ def isIntegral(boxed : AnyRef) = {
+ val bclass = boxed.getClass
+ if (bclass == classOf[java.lang.Long]) {
+ true
+ }
+ else if (bclass == classOf[java.lang.Integer]) {
+ true
+ }
+ else if (bclass == classOf[java.lang.Short]) {
+ true
+ }
+ else if (bclass == classOf[java.lang.Byte]) {
+ true
+ }
+ else {
+ false
+ }
+ }
+
+ override def compare(a1: AnyRef, a2: AnyRef) : Int = {
+ if (isIntegral(a1) && isIntegral(a2)) {
+ val long1 = a1.asInstanceOf[Number].longValue
+ val long2 = a2.asInstanceOf[Number].longValue
+ if (long1 < long2)
+ -1
+ else if (long1 > long2)
+ 1
+ else
+ 0
+ }
+ else
+ a1.asInstanceOf[Comparable[AnyRef]].compareTo(a2)
+ }
+
+ override def hashCode(obj : AnyRef) : Int = {
+ if (isIntegral(obj)) {
+ val longv = obj.asInstanceOf[Number].longValue
+ (longv ^ (longv >>> 32)).toInt
+ }
+ else {
+ //Use the default:
+ obj.hashCode
+ }
+ }
+}
@@ -5,6 +5,34 @@ import cascading.tuple.TupleEntry
import org.specs._
+class NumberJoinerJob(args : Args) extends Job(args) {
+ val in0 = Tsv("input0").read.mapTo((0,1) -> ('x0, 'y0)) { input : (Int, Int) => input }
+ val in1 = Tsv("input1").read.mapTo((0,1) -> ('x1, 'y1)) { input : (Long, Long) => input }
+ in0.joinWithSmaller('x0 -> 'x1, in1)
+ .write(Tsv("output"))
+}
+
+class NumberJoinTest extends Specification with TupleConversions {
+ "A NumberJoinerJob" should {
+ //Set up the job:
+ "not throw when joining longs with ints" in {
+ JobTest("com.twitter.scalding.NumberJoinerJob")
+ .source(Tsv("input0"), List(("0","1"), ("1","2"), ("2","4")))
+ .source(Tsv("input1"), List(("0","1"), ("1","3"), ("2","9")))
+ .sink[(Int,Int,Long,Long)](Tsv("output")) { outBuf =>
+ val unordered = outBuf.toSet
+ unordered.size must be_==(3)
+ unordered((0,1,0L,1L)) must be_==(true)
+ unordered((1,2,1L,3L)) must be_==(true)
+ unordered((2,4,2L,9L)) must be_==(true)
+ }
+ .run
+ .runHadoop
+ .finish
+ }
+ }
+}
+
class MapToGroupBySizeSumMaxJob(args: Args) extends Job(args) {
TextLine(args("input")).read.
//1 is the line
@@ -49,7 +77,8 @@ class MapToGroupBySizeSumMaxTest extends Specification with TupleConversions {
goldenOutput must be_==(actualOutput)
}
}.
- run
+ run.
+ finish
}
}
@@ -87,6 +116,7 @@ class JoinTest extends Specification with TupleConversions {
}
}
.run
+ .finish
}
}
@@ -124,6 +154,7 @@ class TinyJoinTest extends Specification with TupleConversions {
}
}
.run
+ .finish
}
}
@@ -162,7 +193,8 @@ class MergeTest extends Specification with TupleConversions {
golden must be_==(outBuf.toMap)
}
}.
- run
+ run.
+ finish
}
}
@@ -175,17 +207,22 @@ class SizeAveStdJob(args : Args) extends Job(args) {
_.sizeAveStdev('y->('size,'yave,'ystdev))
//Make sure this doesn't ruin the calculation
.sizeAveStdev('y->('size2,'yave2,'ystdev2))
+ .average('y)
}
- .project('x,'size,'yave,'ystdev)
+ .project('x,'size,'yave,'ystdev,'y)
.write(Tsv(args("output")))
}
class SizeAveStdSpec extends Specification with TupleConversions {
"A sizeAveStd job" should {
"correctly compute aves and standard deviations" in {
val r = new java.util.Random
+ def powerLawRand = {
+ // Generates a 1/x powerlaw with a max value or 1e40
+ scala.math.pow(1e40, r.nextDouble)
+ }
//Here is our input data:
- val input = (0 to 100).map { i => (i.toString, r.nextDouble.toString +" "+ r.nextDouble.toString) }
+ val input = (0 to 10000).map { i => (i.toString, r.nextDouble.toString +" "+ powerLawRand.toString) }
val output = input.map { numline => numline._2.split(" ").map { _.toDouble } }
.map { vec => ((vec(0)*4).toInt, vec(1)) }
.groupBy { tup => tup._1 }
@@ -202,20 +239,23 @@ class SizeAveStdSpec extends Specification with TupleConversions {
arg("input","fakeInput").
arg("output","fakeOutput").
source(TextLine("fakeInput"), input).
- sink[(Int,Long,Double,Double)](Tsv("fakeOutput")) { outBuf =>
+ sink[(Int,Long,Double,Double,Double)](Tsv("fakeOutput")) { outBuf =>
"correctly compute size, ave, stdev" in {
outBuf.foreach { computed =>
val correctTup = output(computed._1)
//Size
computed._2 must be_== (correctTup._1)
//Ave
- computed._3 must beCloseTo(correctTup._2, 1e-6)
+ computed._3/correctTup._2 must beCloseTo(1.0, 1e-6)
//Stdev
- computed._4 must beCloseTo(correctTup._3, 1e-6)
+ computed._4/correctTup._3 must beCloseTo(1.0, 1e-6)
+ //Explicitly calculated Average:
+ computed._5/computed._3 must beCloseTo(1.0, 1e-6)
}
}
}.
- run
+ run.
+ finish
}
}
}
@@ -252,7 +292,8 @@ class DoubleGroupSpec extends Specification with TupleConversions {
outM(3) must be_== (1)
}
}.
- run
+ run.
+ finish
}
}
}
@@ -286,7 +327,8 @@ class GroupUniqueSpec extends Specification with TupleConversions {
outSet.size must_== 3
}
}.
- run
+ run.
+ finish
}
}
@@ -314,6 +356,7 @@ class DiscardTest extends Specification with TupleConversions {
}
}
.run
+ .finish
}
}
@@ -337,7 +380,9 @@ class HistogramTest extends Specification with TupleConversions {
"must get the result right" in {
outBuf(0) must_== (2L,1L)
}
- }.run
+ }
+ .run
+ .finish
}
}
@@ -372,7 +417,9 @@ class ToListTest extends Specification with TupleConversions {
outBuf(0)._2.split(" ").toSet must_== Set("single", "test")
outBuf(1)._2.split(" ").toSet must_== Set("single", "result")
}
- }.run
+ }
+ .run
+ .finish
}
"A NullListJob" should {
@@ -388,7 +435,9 @@ class ToListTest extends Specification with TupleConversions {
val sSet = outBuf(0)._2.split(" ").toSet
sSet must_== Set("a", "b")
}
- }.run
+ }
+ .run
+ .finish
}
}
@@ -417,6 +466,7 @@ class CrossTest extends Specification with TupleConversions {
}
}
.run
+ .finish
}
}
@@ -441,6 +491,7 @@ class TopKTest extends Specification with TupleConversions {
}
}
.run
+ .finish
}
}
@@ -470,5 +521,6 @@ class TakeTest extends Specification with TupleConversions {
}
}
.run
+ .finish
}
}
@@ -6,6 +6,8 @@ import scala.annotation.tailrec
import cascading.tuple.Tuple
import cascading.tuple.TupleEntry
+import org.apache.hadoop.mapred.JobConf
+
object JobTest {
def apply(jobName : String) = new JobTest(jobName)
}
@@ -14,6 +16,7 @@ class JobTest(jobName : String) extends TupleConversions {
private var argsMap = Map[String, List[String]]()
private val callbacks = Buffer[() => Unit]()
private var sourceMap = Map[Source, Buffer[Tuple]]()
+ private var sinkSet = Set[Source]()
def arg(inArg : String, value : List[String]) = {
argsMap += inArg -> value
@@ -34,31 +37,46 @@ class JobTest(jobName : String) extends TupleConversions {
(implicit conv : TupleConverter[A]) = {
val buffer = new ListBuffer[Tuple]
sourceMap += s -> buffer
+ sinkSet += s
callbacks += (() => op(buffer.map{conv(_)}))
this
}
- def run {
+ def run = {
Mode.mode = Test(sourceMap)
runAll(Job(jobName, new Args(argsMap)))
+ this
}
+ def runHadoop = {
+ Mode.mode = HadoopTest(new JobConf(), sourceMap)
+ runAll(Job(jobName, new Args(argsMap)), true)
+ this
+ }
+
+ // This SITS is unfortunately needed to get around Specs
+ def finish : Unit = { () }
+
@tailrec
- final def runAll(job : Job) : Unit = {
+ final def runAll(job : Job, useHadoop : Boolean = false) : Unit = {
job.buildFlow.complete
job.next match {
- case Some(nextjob) => runAll(nextjob)
+ case Some(nextjob) => runAll(nextjob, useHadoop)
case None => {
+ if(useHadoop) {
+ sinkSet.foreach{ _.finalizeHadoopTestOutput(Mode.mode) }
+ }
//Now it is time to check the test conditions:
callbacks.foreach { cb => cb() }
}
}
}
- def runWithoutNext {
+ def runWithoutNext = {
Mode.mode = Test(sourceMap)
Job(jobName, new Args(argsMap)).buildFlow.complete
callbacks.foreach { cb => cb() }
+ this
}
}
@@ -33,6 +33,7 @@ class PageRankTest extends Specification with TupleConversions {
pageRank(3L) must beCloseTo(otherPR, 0.1)
}
}.
- run
+ run.
+ finish
}
}
@@ -15,6 +15,7 @@ class WordCountTest extends Specification with TupleConversions {
outMap("and") must be_==(1)
}
}.
- run
+ run.
+ finish
}
}

0 comments on commit 1d9a378

Please sign in to comment.