Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #17 from johnynek/master

Bumps version, fixes issue with joinWithTiny along with joinWithSmaller
  • Loading branch information...
commit 4990bae787e015084fe3c78c2bc72545d0185c14 2 parents 3f4430e + 311e5a5
@azymnis azymnis authored
View
10 build.sbt
@@ -2,7 +2,7 @@ import AssemblyKeys._
name := "scalding"
-version := "0.3.2"
+version := "0.3.3"
organization := "com.twitter"
@@ -10,15 +10,15 @@ scalaVersion := "2.8.1"
resolvers += "Concurrent Maven Repo" at "http://conjars.org/repo"
-libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-227"
+libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-236"
-libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-227"
+libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-236"
-libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-227"
+libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-236"
libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.2.1"
-libraryDependencies += "com.twitter" % "meat-locker" % "0.1.5"
+libraryDependencies += "com.twitter" % "meat-locker" % "0.1.6"
libraryDependencies += "commons-lang" % "commons-lang" % "2.4"
View
2  scripts/scald.rb
@@ -2,7 +2,7 @@
require 'fileutils'
require 'thread'
-SCALDING_VERSION="0.3.2"
+SCALDING_VERSION="0.3.3"
#Usage : scald.rb [--hdfs|--local|--print] job <job args>
# --hdfs: if job ends in ".scala" or ".java" and the file exists, link it against JARFILE (below) and then run it on HOST.
View
9 src/main/scala/com/twitter/scalding/Source.scala
@@ -151,7 +151,7 @@ abstract class Source extends java.io.Serializable {
}
protected def createHadoopTestReadTap(buffer : Iterable[Tuple]) :
- Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], OutputCollector[_,_]] = {
+ Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = {
new MemorySourceTap(buffer.toList.asJava, hdfsScheme.getSourceFields())
}
@@ -205,7 +205,7 @@ abstract class Source extends java.io.Serializable {
}
protected def createHdfsReadTap(hdfsMode : Hdfs) :
- Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], OutputCollector[_,_]] = {
+ Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = {
val goodPaths = if (hdfsMode.sourceStrictness) {
//we check later that all the paths are good
hdfsPaths
@@ -223,12 +223,11 @@ abstract class Source extends java.io.Serializable {
new Hfs(hdfsScheme, hdfsPaths.head, SinkMode.KEEP)
}
case 1 => taps.head
- case _ => new MultiSourceTap[HadoopFlowProcess,
- JobConf, RecordReader[_,_], OutputCollector[_,_]](taps.toSeq : _*)
+ case _ => new MultiSourceTap[Hfs, HadoopFlowProcess, JobConf, RecordReader[_,_]]( taps.toSeq : _*)
}
}
protected def createHdfsWriteTap(hdfsMode : Hdfs) :
- Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], OutputCollector[_,_]] = {
+ Tap[HadoopFlowProcess, JobConf, _, OutputCollector[_,_]] = {
new Hfs(hdfsScheme, hdfsWritePath, SinkMode.REPLACE)
}
View
38 src/test/scala/com/twitter/scalding/CoreTest.scala
@@ -133,6 +133,7 @@ class TinyJoinJob(args: Args) extends Job(args) {
}
class TinyJoinTest extends Specification with TupleConversions {
+ noDetailedDiffs() //Fixes an issue with scala 2.9
"A JoinJob" should {
val input1 = List("a" -> 1, "b" -> 2, "c" -> 3)
val input2 = List("b" -> -1, "c" -> 5, "d" -> 4)
@@ -159,6 +160,43 @@ class TinyJoinTest extends Specification with TupleConversions {
}
}
+class TinyThenSmallJoin(args : Args) extends Job(args) {
+ val pipe0 = Tsv("in0",('x0,'y0)).read
+ val pipe1 = Tsv("in1",('x1,'y1)).read
+ val pipe2 = Tsv("in2",('x2,'y2)).read
+
+ pipe0.joinWithTiny('x0 -> 'x1, pipe1)
+ .joinWithSmaller('x0 -> 'x2, pipe2)
+ .write(Tsv("out"))
+}
+
+class TinyThenSmallJoinTest extends Specification with TupleConversions with FieldConversions {
+ noDetailedDiffs() //Fixes an issue with scala 2.9
+ "A TinyThenSmallJoin" should {
+ val input0 = List((1,2),(2,3),(3,4))
+ val input1 = List((1,20),(2,30),(3,40))
+ val input2 = List((1,200),(2,300),(3,400))
+ val correct = List((1,2,1,20,1,200),
+ (2,3,2,30,2,300),(3,4,3,40,3,400))
+
+ JobTest("com.twitter.scalding.TinyThenSmallJoin")
+ .source(Tsv("in0",('x0,'y0)), input0)
+ .source(Tsv("in1",('x1,'y1)), input1)
+ .source(Tsv("in2",('x2,'y2)), input2)
+ .sink[(Int,Int,Int,Int,Int,Int)](Tsv("out")) { outBuf =>
+ val actualOutput = outBuf.toList.sorted
+ println(actualOutput)
+ "join tuples with the same key" in {
+ correct must be_==(actualOutput)
+ }
+ }
+ .run
+ .runHadoop
+ .finish
+ }
+}
+
+
class MergeTestJob(args : Args) extends Job(args) {
val in = TextLine(args("in")).read.mapTo(1->('x,'y)) { line : String =>
val p = line.split(" ").map { _.toDouble }
Please sign in to comment.
Something went wrong with that request. Please try again.