Skip to content

Commit

Permalink
Merge pull request twitter#20 from azymnis/develop
Browse files Browse the repository at this point in the history
Adding a source for the most recent good date path.
  • Loading branch information
johnynek committed Mar 1, 2012
2 parents ee351eb + 6397b1e commit 6bccce3
Showing 1 changed file with 42 additions and 14 deletions.
56 changes: 42 additions & 14 deletions src/main/scala/com/twitter/scalding/Source.scala
Expand Up @@ -204,17 +204,23 @@ abstract class Source extends java.io.Serializable {
}
}

protected def createHdfsReadTap(hdfsMode : Hdfs) :
Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = {
val goodPaths = if (hdfsMode.sourceStrictness) {
/*
* Get all the set of valid paths based on source strictness.
*/
protected def goodHdfsPaths(hdfsMode : Hdfs) = {
if (hdfsMode.sourceStrictness) {
//we check later that all the paths are good
hdfsPaths
}
else {
// If there are no matching paths, this is still an error, we need at least something:
hdfsPaths.filter{ pathIsGood(_, hdfsMode.config) }
}
val taps = goodPaths.map { new Hfs(hdfsScheme, _, SinkMode.KEEP) }
}

protected def createHdfsReadTap(hdfsMode : Hdfs) :
Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = {
val taps = goodHdfsPaths(hdfsMode).map { new Hfs(hdfsScheme, _, SinkMode.KEEP) }
taps.size match {
case 0 => {
// This case is going to result in an error, but we don't want to throw until
Expand Down Expand Up @@ -381,8 +387,10 @@ abstract class TimePathedSource(pattern : String, dateRange : DateRange, tz : Ti
}
override def localPath = pattern

// Override because we want to check UNGLOBIFIED paths that each are present.
override def hdfsReadPathsAreGood(conf : Configuration) : Boolean = {
/*
* Get path statuses based on daterange.
*/
protected def getPathStatuses(conf : Configuration) : Iterable[(String, Boolean)] = {
List("%1$tH" -> Hours(1), "%1$td" -> Days(1)(tz),
"%1$tm" -> Months(1)(tz), "%1$tY" -> Years(1)(tz))
.find { unitDur : (String,Duration) => pattern.contains(unitDur._1) }
Expand All @@ -392,17 +400,37 @@ abstract class TimePathedSource(pattern : String, dateRange : DateRange, tz : Ti
.map { dr : DateRange =>
val path = String.format(pattern, dr.start.toCalendar(tz))
val good = pathIsGood(path, conf)
if (!good) {
System.err.println("[ERROR] Path: " + path + " is missing in: " + toString)
}
//return
good
(path, good)
}
//All should be true
.forall { x => x }
}
.getOrElse(false)
.getOrElse(Nil : Iterable[(String, Boolean)])
}

// Override because we want to check UNGLOBIFIED paths that each are present.
override def hdfsReadPathsAreGood(conf : Configuration) : Boolean = {
getPathStatuses(conf).forall{ x =>
if (!x._2) {
System.err.println("[ERROR] Path: " + x._1 + " is missing in: " + toString)
}
x._2
}
}
}

/*
* A source that contains the most recent existing path in this date range.
*/
abstract class MostRecentGoodSource(p : String, dr : DateRange, t : TimeZone)
extends TimePathedSource(p, dr, t) {

override protected def goodHdfsPaths(hdfsMode : Hdfs) = getPathStatuses(hdfsMode.config)
.toList
.reverse
.find{ _._2 }
.map{ x => x._1 }

override def hdfsReadPathsAreGood(conf : Configuration) = getPathStatuses(conf)
.exists{ _._2 }
}

case class TextLine(p : String) extends FixedPathSource(p) with TextLineScheme
Expand Down

0 comments on commit 6bccce3

Please sign in to comment.