Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #11 from azymnis/master

Add HadoopTest mode for local testing.
  • Loading branch information...
commit 6034d7b2d4ca90416fbf927ad14602bb66ada057 2 parents 9a8d775 + 5795875
@johnynek johnynek authored
View
2  build.sbt
@@ -2,7 +2,7 @@ import AssemblyKeys._
name := "scalding"
-version := "0.3.0"
+version := "0.3.1"
organization := "com.twitter"
View
15 src/main/scala/com/twitter/scalding/Mode.scala
@@ -57,9 +57,10 @@ abstract class Mode(val sourceStrictness : Boolean) {
}
}
-case class Hdfs(strict : Boolean, val config : Configuration) extends Mode(strict) {
+trait HadoopMode extends Mode {
+ def jobConf : Configuration
def newFlowConnector(iosersIn : List[String]) = {
- val props = config.foldLeft(Map[AnyRef, AnyRef]()) {
+ val props = jobConf.foldLeft(Map[AnyRef, AnyRef]()) {
(acc, kv) => acc + ((kv.getKey, kv.getValue))
}
val io = "io.serializations"
@@ -68,6 +69,15 @@ case class Hdfs(strict : Boolean, val config : Configuration) extends Mode(stric
}
}
+case class Hdfs(strict : Boolean, val config : Configuration) extends Mode(strict) with HadoopMode {
+ override def jobConf = config
+}
+
+case class HadoopTest(val config : Configuration, val buffers : Map[Source,Buffer[Tuple]])
+ extends Mode(false) with HadoopMode {
+ override def jobConf = config
+}
+
case class Local(strict : Boolean) extends Mode(strict) {
//No serialization is actually done in local mode, it's all memory
def newFlowConnector(iosers : List[String]) = new LocalFlowConnector
@@ -78,5 +88,4 @@ case class Local(strict : Boolean) extends Mode(strict) {
case class Test(val buffers : Map[Source,Buffer[Tuple]]) extends Mode(false) {
//No serialization is actually done in Test mode, it's all memory
def newFlowConnector(iosers : List[String]) = new LocalFlowConnector
-
}
View
50 src/main/scala/com/twitter/scalding/Source.scala
@@ -15,6 +15,9 @@ limitations under the License.
*/
package com.twitter.scalding
+import com.twitter.meatlocker.tap.MemorySourceTap
+
+import java.io.File
import java.util.TimeZone
import java.util.Calendar
import java.util.{Map => JMap}
@@ -31,16 +34,16 @@ import cascading.tap.MultiSourceTap
import cascading.tap.SinkMode
import cascading.tap.Tap
import cascading.tap.local.FileTap
-
+import cascading.tuple.{Tuple, TupleEntryIterator, Fields}
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.fs.Path
-import cascading.tuple.{Tuple, TupleEntryIterator, Fields}
import collection.mutable.{Buffer, MutableList}
+import collection.JavaConversions.asJavaList
/**
* thrown when validateTaps fails
@@ -136,10 +139,42 @@ abstract class Source extends java.io.Serializable {
new FileTap(localScheme, localPath, sinkmode)
}
case Test(buffers) => new MemoryTap(localScheme, testBuffer(buffers, readOrWrite))
+ case HadoopTest(conf, buffers) => readOrWrite match {
+ case Read() => createHadoopTestReadTap(buffers(this))
+ case Write() => createHadoopTestWriteTap
+ }
case hdfsMode @ Hdfs(_, _) => readOrWrite match {
- case Read() => createHdfsReadTap(hdfsMode)
- case Write() => createHdfsWriteTap(hdfsMode)
+ case Read() => createHdfsReadTap(hdfsMode)
+ case Write() => createHdfsWriteTap(hdfsMode)
+ }
+ }
+ }
+
+ protected def createHadoopTestReadTap(buffer : Iterable[Tuple]) :
+ Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], OutputCollector[_,_]] = {
+ new MemorySourceTap(buffer.toList, hdfsScheme.getSourceFields())
+ }
+
+ protected def hadoopTestPath = "/tmp/scalding/" + hdfsWritePath
+ protected def createHadoopTestWriteTap :
+ Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], OutputCollector[_,_]] = {
+ new Hfs(hdfsScheme, hadoopTestPath, SinkMode.REPLACE)
+ }
+
+ def finalizeHadoopTestOutput(mode : Mode) {
+ mode match {
+ case HadoopTest(conf, buffers) => {
+ val fp = new HadoopFlowProcess(new JobConf(conf))
+ // We read the write tap in order to add its contents in the test buffers
+ val it = createHadoopTestWriteTap.openForRead(fp)
+ val buf = buffers(this)
+ buf.clear()
+ while(it != null && it.hasNext) {
+ buf += new Tuple(it.next.getTuple)
}
+ new File(hadoopTestPath).delete()
+ }
+ case _ => throw new RuntimeException("Cannot read test data in a non-test mode")
}
}
@@ -214,6 +249,11 @@ abstract class Source extends java.io.Serializable {
val tap = new MemoryTap(localScheme, testBuffer(buffers, Read()))
tap.openForRead(fp)
}
+ case HadoopTest(conf, buffers) => {
+ val fp = new HadoopFlowProcess(new JobConf(conf))
+ val tap = createHadoopTestReadTap(buffers(this))
+ tap.openForRead(fp)
+ }
case hdfsMode @ Hdfs(_, conf) => {
val fp = new HadoopFlowProcess(new JobConf(conf))
val tap = createHdfsReadTap(hdfsMode)
Please sign in to comment.
Something went wrong with that request. Please try again.