Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Implemented Mode.fileExists. #31

Merged
merged 2 commits into from

3 participants

@mjahr
Collaborator

Implemented Mode.fileExists to facilitate checking for files across
filesystems in different modes. Added support to test modes, and
JobTest, for registering mock filenames. Minor refactoring of JobTest
to improve code reuse across run(), runWithoutNext(), and runHadoop().

@mjahr mjahr Implemented Mode.fileExists to facilitate checking for files across
filesystems in different modes.  Added support to test modes, and
JobTest, for registering mock filenames.  Minor refactoring of JobTest
to improve code reuse across run(), runWithoutNext(), and runHadoop().
a6645cc
src/main/scala/com/twitter/scalding/Mode.scala
((12 lines not shown))
case class Hdfs(strict : Boolean, val config : Configuration) extends Mode(strict) with HadoopMode {
override def jobConf = config
+ override def fileExists(filename : String) : Boolean = {
+ val filePath = new Path(filename)
+ filePath.getFileSystem(config).exists(filePath)
@azymnis Collaborator
azymnis added a note

I think it would be cleaner to do:

FileSystem.get(config).exists(filePath)

@mjahr Collaborator
mjahr added a note

good idea...done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@johnynek johnynek merged commit 11f89f2 into twitter:develop
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Mar 14, 2012
  1. @mjahr

    Implemented Mode.fileExists to facilitate checking for files across

    mjahr authored
    filesystems in different modes.  Added support to test modes, and
    JobTest, for registering mock filenames.  Minor refactoring of JobTest
    to improve code reuse across run(), runWithoutNext(), and runHadoop().
  2. @mjahr
This page is out of date. Refresh to see the latest.
View
62 src/main/scala/com/twitter/scalding/JobTest.scala
@@ -24,6 +24,7 @@ class JobTest(jobName : String) extends TupleConversions {
private val callbacks = Buffer[() => Unit]()
private var sourceMap = Map[Source, Buffer[Tuple]]()
private var sinkSet = Set[Source]()
+ private var fileSet = Set[String]()
def arg(inArg : String, value : List[String]) = {
argsMap += inArg -> value
@@ -49,44 +50,63 @@ class JobTest(jobName : String) extends TupleConversions {
this
}
+ // Simulates the existance of a file so that mode.fileExists returns true. We
+ // do not simulate the file contents; that should be done through mock
+ // sources.
+ def registerFile(filename : String) = {
+ fileSet += filename
+ this
+ }
+
+
def run = {
- Mode.mode = Test(sourceMap)
- runAll(Job(jobName, new Args(argsMap)))
+ runJob(initJob(false), true)
+ this
+ }
+
+ def runWithoutNext(useHadoop : Boolean = false) = {
+ runJob(initJob(useHadoop), false)
this
}
def runHadoop = {
- Mode.mode = HadoopTest(new JobConf(), sourceMap)
- runAll(Job(jobName, new Args(argsMap)), true)
+ runJob(initJob(true), true)
this
}
// This SITS is unfortunately needed to get around Specs
def finish : Unit = { () }
+ // Registers test files, initializes the global mode, and creates a job.
+ private def initJob(useHadoop : Boolean) : Job = {
+ // Create a global mode to use for testing.
+ val testMode : TestMode =
+ if (useHadoop) {
+ HadoopTest(new JobConf(), sourceMap)
+ } else {
+ Test(sourceMap)
+ }
+ testMode.registerTestFiles(fileSet)
+ Mode.mode = testMode
+
+ // Construct a job.
+ Job(jobName, new Args(argsMap))
+ }
+
@tailrec
- final def runAll(job : Job, useHadoop : Boolean = false) : Unit = {
+ private final def runJob(job : Job, runNext : Boolean) : Unit = {
job.buildFlow.complete
- job.next match {
- case Some(nextjob) => runAll(nextjob, useHadoop)
+ val next : Option[Job] = if (runNext) { job.next } else { None }
+ next match {
+ case Some(nextjob) => runJob(nextjob, runNext)
case None => {
- if(useHadoop) {
- sinkSet.foreach{ _.finalizeHadoopTestOutput(Mode.mode) }
+ Mode.mode match {
+ case HadoopTest(_,_) => sinkSet.foreach{ _.finalizeHadoopTestOutput(Mode.mode) }
+ case _ => ()
}
- //Now it is time to check the test conditions:
+ // Now it is time to check the test conditions:
callbacks.foreach { cb => cb() }
}
}
}
-
- def runWithoutNext(useHadoop: Boolean = false) = {
- Mode.mode =
- if (useHadoop) HadoopTest(new JobConf(), sourceMap) else Test(sourceMap)
- Job(jobName, new Args(argsMap)).buildFlow.complete
- if(useHadoop) {
- sinkSet.foreach{ _.finalizeHadoopTestOutput(Mode.mode) }
- }
- callbacks.foreach { cb => cb() }
- this
- }
}
View
21 src/main/scala/com/twitter/scalding/Mode.scala
@@ -15,7 +15,9 @@ limitations under the License.
*/
package com.twitter.scalding
+import java.io.File
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
import cascading.flow.FlowConnector
import cascading.flow.hadoop.HadoopFlowConnector
@@ -55,6 +57,9 @@ abstract class Mode(val sourceStrictness : Boolean) {
def getSourceNamed(name : String) : Option[Source] = {
sourceMap.find { _._1.toString == name }.map { _._1 }
}
+
+ // Returns true if the file exists on the current filesystem.
+ def fileExists(filename : String) : Boolean
}
trait HadoopMode extends Mode {
@@ -85,21 +90,33 @@ trait HadoopMode extends Mode {
}
}
+// Mix-in trait for test modes; overrides fileExists to allow the registration
+// of mock filenames for testing.
+trait TestMode extends Mode {
+ private var fileSet = Set[String]()
+ def registerTestFiles(files : Set[String]) = fileSet = files
+ override def fileExists(filename : String) : Boolean = fileSet.contains(filename)
+}
+
case class Hdfs(strict : Boolean, val config : Configuration) extends Mode(strict) with HadoopMode {
override def jobConf = config
+ override def fileExists(filename : String) : Boolean =
+ FileSystem.get(config).exists(new Path(filename))
}
case class HadoopTest(val config : Configuration, val buffers : Map[Source,Buffer[Tuple]])
- extends Mode(false) with HadoopMode {
+ extends Mode(false) with HadoopMode with TestMode {
override def jobConf = config
}
case class Local(strict : Boolean) extends Mode(strict) {
def newFlowConnector(props : Map[AnyRef,AnyRef]) = new LocalFlowConnector(props)
+ override def fileExists(filename : String) : Boolean = new File(filename).exists
}
+
/**
* Memory only testing for unit tests
*/
-case class Test(val buffers : Map[Source,Buffer[Tuple]]) extends Mode(false) {
+case class Test(val buffers : Map[Source,Buffer[Tuple]]) extends Mode(false) with TestMode {
def newFlowConnector(props : Map[AnyRef,AnyRef]) = new LocalFlowConnector(props)
}
Something went wrong with that request. Please try again.