Permalink
Browse files

Merge branch 'release/0.3.5'

  • Loading branch information...
2 parents ee351eb + 815f25e commit 841003d4ce9b3199f3d65ab7bc16dff7ad7fcf0d @azymnis azymnis committed Mar 2, 2012
View
@@ -0,0 +1,6 @@
+# Scalding #
+
+### Version 0.3.5 ###
+
+ISSUE 21: move JobTest into main
+ISSUE 20: Adding a source for the most recent good date path.
View
@@ -1,4 +1,8 @@
# Scalding
+
+Current version: 0.3.5
+
+## Summary
Scalding is a library that has two components:
* a scala DSL to make map-reduce computations look very similar to scala's collection API
@@ -41,7 +45,7 @@ recommendations:
We use Travis-ci.org to verify the build:
[![Build Status](https://secure.travis-ci.org/twitter/scalding.png)](http://travis-ci.org/twitter/scalding)
-The current version is 0.3.4 and available from maven central: org="com.twitter", artifact="scalding_2.8.1".
+The current version is 0.3.5 and available from maven central: org="com.twitter", artifact="scalding_2.8.1".
## Comparison to Scrunch/Scoobi
Scalding comes with an executable tutorial set that does not require a Hadoop
View
@@ -2,7 +2,7 @@ import AssemblyKeys._
name := "scalding"
-version := "0.3.4"
+version := "0.3.5"
organization := "com.twitter"
View
@@ -2,7 +2,7 @@
require 'fileutils'
require 'thread'
-SCALDING_VERSION="0.3.4"
+SCALDING_VERSION="0.3.5"
#Usage : scald.rb [--hdfs|--local|--print] job <job args>
# --hdfs: if job ends in ".scala" or ".java" and the file exists, link it against JARFILE (below) and then run it on HOST.
@@ -4,14 +4,20 @@ import scala.collection.mutable.{Buffer, ListBuffer}
import scala.annotation.tailrec
import cascading.tuple.Tuple
-import cascading.tuple.TupleEntry
import org.apache.hadoop.mapred.JobConf
object JobTest {
def apply(jobName : String) = new JobTest(jobName)
}
+/**
+ * This class is used to construct unit tests for scalding jobs.
+ * You should not use it unless you are writing tests.
+ * For examples of how to do that, see the tests included in the
+ * main scalding repository:
+ * https://github.com/twitter/scalding/tree/master/src/test/scala/com/twitter/scalding
+ */
class JobTest(jobName : String) extends TupleConversions {
private var argsMap = Map[String, List[String]]()
private val callbacks = Buffer[() => Unit]()
@@ -204,17 +204,23 @@ abstract class Source extends java.io.Serializable {
}
}
- protected def createHdfsReadTap(hdfsMode : Hdfs) :
- Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = {
- val goodPaths = if (hdfsMode.sourceStrictness) {
+ /*
+ * Get all the set of valid paths based on source strictness.
+ */
+ protected def goodHdfsPaths(hdfsMode : Hdfs) = {
+ if (hdfsMode.sourceStrictness) {
//we check later that all the paths are good
hdfsPaths
}
else {
// If there are no matching paths, this is still an error, we need at least something:
hdfsPaths.filter{ pathIsGood(_, hdfsMode.config) }
}
- val taps = goodPaths.map { new Hfs(hdfsScheme, _, SinkMode.KEEP) }
+ }
+
+ protected def createHdfsReadTap(hdfsMode : Hdfs) :
+ Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = {
+ val taps = goodHdfsPaths(hdfsMode).map { new Hfs(hdfsScheme, _, SinkMode.KEEP) }
taps.size match {
case 0 => {
// This case is going to result in an error, but we don't want to throw until
@@ -381,8 +387,10 @@ abstract class TimePathedSource(pattern : String, dateRange : DateRange, tz : Ti
}
override def localPath = pattern
- // Override because we want to check UNGLOBIFIED paths that each are present.
- override def hdfsReadPathsAreGood(conf : Configuration) : Boolean = {
+ /*
+ * Get path statuses based on daterange.
+ */
+ protected def getPathStatuses(conf : Configuration) : Iterable[(String, Boolean)] = {
List("%1$tH" -> Hours(1), "%1$td" -> Days(1)(tz),
"%1$tm" -> Months(1)(tz), "%1$tY" -> Years(1)(tz))
.find { unitDur : (String,Duration) => pattern.contains(unitDur._1) }
@@ -392,17 +400,37 @@ abstract class TimePathedSource(pattern : String, dateRange : DateRange, tz : Ti
.map { dr : DateRange =>
val path = String.format(pattern, dr.start.toCalendar(tz))
val good = pathIsGood(path, conf)
- if (!good) {
- System.err.println("[ERROR] Path: " + path + " is missing in: " + toString)
- }
- //return
- good
+ (path, good)
}
- //All should be true
- .forall { x => x }
}
- .getOrElse(false)
+ .getOrElse(Nil : Iterable[(String, Boolean)])
}
+
+ // Override because we want to check UNGLOBIFIED paths that each are present.
+ override def hdfsReadPathsAreGood(conf : Configuration) : Boolean = {
+ getPathStatuses(conf).forall{ x =>
+ if (!x._2) {
+ System.err.println("[ERROR] Path: " + x._1 + " is missing in: " + toString)
+ }
+ x._2
+ }
+ }
+}
+
+/*
+ * A source that contains the most recent existing path in this date range.
+ */
+abstract class MostRecentGoodSource(p : String, dr : DateRange, t : TimeZone)
+ extends TimePathedSource(p, dr, t) {
+
+ override protected def goodHdfsPaths(hdfsMode : Hdfs) = getPathStatuses(hdfsMode.config)
+ .toList
+ .reverse
+ .find{ _._2 }
+ .map{ x => x._1 }
+
+ override def hdfsReadPathsAreGood(conf : Configuration) = getPathStatuses(conf)
+ .exists{ _._2 }
}
case class TextLine(p : String) extends FixedPathSource(p) with TextLineScheme

0 comments on commit 841003d

Please sign in to comment.