CascadeJob #75

Closed
wants to merge 3 commits into
from
Jump to file or symbol
Failed to load files and symbols.
+79 −2
Diff settings

Always

Just for now

@@ -17,6 +17,8 @@ package com.twitter.scalding
import cascading.flow.{Flow, FlowDef, FlowProps}
import cascading.pipe.Pipe
+import cascading.cascade.{Cascade, CascadeConnector}
+import cascading.stats.FlowStats
//For java -> scala implicits on collections
@@ -200,3 +202,40 @@ class ScriptJob(cmds: Iterable[String]) extends Job(Args("")) {
}
}
}
+
+object CascadeJob {
+ def apply( jobList : Job* ) = new CascadeJob { override val jobs = jobList }
+}
+
+/*
+* Composes the provided Jobs into a Cascade
+ (http://docs.cascading.org/cascading/2.0/javadoc/cascading/cascade/Cascade.html)
+*/
+class CascadeJob(args : Args = Args("")) extends Job(args) {
+
+ /*This job type can be used to aggregate jobs
+ specified using the --jobs command line paramter,
+ for example:
+
+ scald.rb CascadeJob --hdfs --jobs JobOne JobTwo --outdir some/path
+
+ To specify a job-specific argument, prefix the argument name, you can
+ specify arguments prefixed with the job name:
+
+ scald.rb CascadeJob --hdfs --jobs JobOne JobTwo --JobOne:input j1i.csv --JobTwo:input j2i.csv
+ */
+ val jobs : Iterable[Job] = args.list("jobs").map { jobname => Job(jobname, argsFor(jobname, args)) }
+
+ override def run(implicit mode : Mode) = {
+ val flows = jobs.map( _.buildFlow(mode) )
+ val cascade = new CascadeConnector().connect ( flows.toSeq:_* )
+ cascade.complete()
+ cascade.getCascadeStats.getChildren.toSeq.forall {
+ _.asInstanceOf[FlowStats].isSuccessful
+ }
+ }
+
+ def argsFor(jobName : String, parentArgs : Args) = {
+ parentArgs
+ }
+}
@@ -100,7 +100,8 @@ class JobTest(jobName : String) extends TupleConversions {
@tailrec
private final def runJob(job : Job, runNext : Boolean) : Unit = {
- job.buildFlow.complete
+ //job.buildFlow.complete
+ job.run
val next : Option[Job] = if (runNext) { job.next } else { None }
next match {
case Some(nextjob) => runJob(nextjob, runNext)
@@ -30,7 +30,7 @@ class MemoryTap[In,Out](val scheme : Scheme[Properties,In,Out,_,_], val tupleBuf
override def deleteResource(conf : Properties) = true
override def resourceExists(conf : Properties) = true
override def getModifiedTime(conf : Properties) = 1L
- override def getIdentifier() : String = scala.math.random.toString
+ override def getIdentifier() : String = { val id = scala.math.random.toString; println("I am " + id); id}
override def openForRead(flowProcess : FlowProcess[Properties], input : In) = {
new TupleEntryChainIterator(scheme.getSourceFields, tupleBuffer.toIterator)
@@ -0,0 +1,37 @@
+package com.twitter.scalding
+
+import org.specs._
+
+class Job1(args : Args) extends Job(args) {
+ Tsv("in1").read.write(Tsv("output1"))
+}
+
+class Job2(args : Args) extends Job(args) {
+ Tsv("in2").read.write(Tsv("output2"))
+}
+
+class CascadeJobTest extends Specification with TupleConversions {
+ "Instantiating a CascadeJob" should {
+ val newJobs = List(new Job1(Args("")), new Job2(Args("")))
+
+ "add jobs via arguments" in {
+ val cascadeJob = new CascadeJob(Args("--jobs com.twitter.scalding.Job1 com.twitter.scalding.Job2"))
+ cascadeJob.jobs.map(_.name) must be_==(List("com.twitter.scalding.Job1","com.twitter.scalding.Job2"))
+ }
+
+ "add jobs via factory method" in {
+ val cascadeJob = CascadeJob(newJobs:_*)
+ cascadeJob.jobs.map(_.name) must be_==(List("com.twitter.scalding.Job1","com.twitter.scalding.Job2"))
+ }
+ }
+
+ // "Running a CascadeJob" should {
+
+ // JobTest("com.twitter.scalding.CascadeJob")
+ // .arg("jobs", List("com.twitter.scalding.Job1", "com.twitter.scalding.Job2"))
+ // .run
+ // .finish
+
+
+ // }
+}