Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #20 from azymnis/develop

Adding a source for the most recent good date path.
  • Loading branch information...
commit 6bccce31d0885b019652fd1d503099606c2eef7d 2 parents ee351eb + 6397b1e
@johnynek johnynek authored
Showing with 42 additions and 14 deletions.
  1. +42 −14 src/main/scala/com/twitter/scalding/Source.scala
View
56 src/main/scala/com/twitter/scalding/Source.scala
@@ -204,9 +204,11 @@ abstract class Source extends java.io.Serializable {
}
}
- protected def createHdfsReadTap(hdfsMode : Hdfs) :
- Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = {
- val goodPaths = if (hdfsMode.sourceStrictness) {
+ /*
+ * Get all the set of valid paths based on source strictness.
+ */
+ protected def goodHdfsPaths(hdfsMode : Hdfs) = {
+ if (hdfsMode.sourceStrictness) {
//we check later that all the paths are good
hdfsPaths
}
@@ -214,7 +216,11 @@ abstract class Source extends java.io.Serializable {
// If there are no matching paths, this is still an error, we need at least something:
hdfsPaths.filter{ pathIsGood(_, hdfsMode.config) }
}
- val taps = goodPaths.map { new Hfs(hdfsScheme, _, SinkMode.KEEP) }
+ }
+
+ protected def createHdfsReadTap(hdfsMode : Hdfs) :
+ Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = {
+ val taps = goodHdfsPaths(hdfsMode).map { new Hfs(hdfsScheme, _, SinkMode.KEEP) }
taps.size match {
case 0 => {
// This case is going to result in an error, but we don't want to throw until
@@ -381,8 +387,10 @@ abstract class TimePathedSource(pattern : String, dateRange : DateRange, tz : Ti
}
override def localPath = pattern
- // Override because we want to check UNGLOBIFIED paths that each are present.
- override def hdfsReadPathsAreGood(conf : Configuration) : Boolean = {
+ /*
+ * Get path statuses based on daterange.
+ */
+ protected def getPathStatuses(conf : Configuration) : Iterable[(String, Boolean)] = {
List("%1$tH" -> Hours(1), "%1$td" -> Days(1)(tz),
"%1$tm" -> Months(1)(tz), "%1$tY" -> Years(1)(tz))
.find { unitDur : (String,Duration) => pattern.contains(unitDur._1) }
@@ -392,17 +400,37 @@ abstract class TimePathedSource(pattern : String, dateRange : DateRange, tz : Ti
.map { dr : DateRange =>
val path = String.format(pattern, dr.start.toCalendar(tz))
val good = pathIsGood(path, conf)
- if (!good) {
- System.err.println("[ERROR] Path: " + path + " is missing in: " + toString)
- }
- //return
- good
+ (path, good)
}
- //All should be true
- .forall { x => x }
}
- .getOrElse(false)
+ .getOrElse(Nil : Iterable[(String, Boolean)])
}
+
+ // Override because we want to check UNGLOBIFIED paths that each are present.
+ override def hdfsReadPathsAreGood(conf : Configuration) : Boolean = {
+ getPathStatuses(conf).forall{ x =>
+ if (!x._2) {
+ System.err.println("[ERROR] Path: " + x._1 + " is missing in: " + toString)
+ }
+ x._2
+ }
+ }
+}
+
+/*
+ * A source that contains the most recent existing path in this date range.
+ */
+abstract class MostRecentGoodSource(p : String, dr : DateRange, t : TimeZone)
+ extends TimePathedSource(p, dr, t) {
+
+ override protected def goodHdfsPaths(hdfsMode : Hdfs) = getPathStatuses(hdfsMode.config)
+ .toList
+ .reverse
+ .find{ _._2 }
+ .map{ x => x._1 }
+
+ override def hdfsReadPathsAreGood(conf : Configuration) = getPathStatuses(conf)
+ .exists{ _._2 }
}
case class TextLine(p : String) extends FixedPathSource(p) with TextLineScheme
Please sign in to comment.
Something went wrong with that request. Please try again.