Permalink
Browse files

Adding a source for the most recent good date path.

  • Loading branch information...
1 parent 34a58ed commit bcb4fdb40ce77acf3a09dd39ebd1d0451367d171 @azymnis azymnis committed Mar 1, 2012
Showing with 48 additions and 13 deletions.
  1. +48 −13 src/main/scala/com/twitter/scalding/Source.scala
@@ -204,17 +204,23 @@ abstract class Source extends java.io.Serializable {
}
}
- protected def createHdfsReadTap(hdfsMode : Hdfs) :
- Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = {
- val goodPaths = if (hdfsMode.sourceStrictness) {
+ /*
+ * Get all the set of valid paths based on source strictness.
+ */
+ protected def goodHdfsPaths(hdfsMode : Hdfs) = {
+ if (hdfsMode.sourceStrictness) {
//we check later that all the paths are good
hdfsPaths
}
else {
// If there are no matching paths, this is still an error, we need at least something:
hdfsPaths.filter{ pathIsGood(_, hdfsMode.config) }
}
- val taps = goodPaths.map { new Hfs(hdfsScheme, _, SinkMode.KEEP) }
+ }
+
+ protected def createHdfsReadTap(hdfsMode : Hdfs) :
+ Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = {
+ val taps = goodHdfsPaths(hdfsMode).map { new Hfs(hdfsScheme, _, SinkMode.KEEP) }
taps.size match {
case 0 => {
// This case is going to result in an error, but we don't want to throw until
@@ -381,8 +387,10 @@ abstract class TimePathedSource(pattern : String, dateRange : DateRange, tz : Ti
}
override def localPath = pattern
- // Override because we want to check UNGLOBIFIED paths that each are present.
- override def hdfsReadPathsAreGood(conf : Configuration) : Boolean = {
+ /*
+ * Valid paths based on daterange.
+ */
+ protected def goodTimePaths(conf : Configuration) : Option[Iterable[(String, Boolean)]] = {
List("%1$tH" -> Hours(1), "%1$td" -> Days(1)(tz),
"%1$tm" -> Months(1)(tz), "%1$tY" -> Years(1)(tz))
.find { unitDur : (String,Duration) => pattern.contains(unitDur._1) }
@@ -392,19 +400,46 @@ abstract class TimePathedSource(pattern : String, dateRange : DateRange, tz : Ti
.map { dr : DateRange =>
val path = String.format(pattern, dr.start.toCalendar(tz))
val good = pathIsGood(path, conf)
- if (!good) {
- System.err.println("[ERROR] Path: " + path + " is missing in: " + toString)
- }
- //return
- good
+ (path, good)
}
- //All should be true
- .forall { x => x }
+ }
+ }
+
+ // Override because we want to check UNGLOBIFIED paths that each are present.
+ override def hdfsReadPathsAreGood(conf : Configuration) : Boolean = {
+ goodTimePaths(conf)
+ .map {
+ _.forall{ x =>
+ if (!x._2) {
+ System.err.println("[ERROR] Path: " + x._1 + " is missing in: " + toString)
+ }
+ x._2
+ }
}
.getOrElse(false)
}
}
+/*
+ * A source that contains the most recent existing path in this date range.
+ */
+abstract class MostRecentGoodSource(p : String, dr : DateRange, t : TimeZone)
+ extends TimePathedSource(p, dr, t) {
+
+ override protected def goodHdfsPaths(hdfsMode : Hdfs) = goodTimePaths(hdfsMode.config)
+ .getOrElse(Nil)
+ .toList
+ .reverse
+ .find{ _._2 }
+ .map{ x => List(x._1) }
+ .getOrElse(Nil)
+ .toIterable
+
+ override def hdfsReadPathsAreGood(conf : Configuration) = goodTimePaths(conf)
+ .getOrElse(Nil)
+ .exists{ _._2 }
+}
+
case class TextLine(p : String) extends FixedPathSource(p) with TextLineScheme
case class SequenceFile(p : String, f : Fields = Fields.ALL) extends FixedPathSource(p) with SequenceFileScheme

0 comments on commit bcb4fdb

Please sign in to comment.