Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Adding a source for the most recent good date path. #20

Merged
merged 2 commits into from

2 participants

@azymnis
Collaborator

Tested this on the cluster. All scalding tests pass.

@johnynek

I think you can remove this line.

@johnynek

What about we name this getPathStatuses or something, because this is a list of paths and an accompanying status for that path.

Also, I think you just want an Iterable[(String,Boolean)] here.

@johnynek johnynek merged commit 6bccce3 into twitter:develop
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
Showing with 42 additions and 14 deletions.
  1. +42 −14 src/main/scala/com/twitter/scalding/Source.scala
View
56 src/main/scala/com/twitter/scalding/Source.scala
@@ -204,9 +204,11 @@ abstract class Source extends java.io.Serializable {
}
}
- protected def createHdfsReadTap(hdfsMode : Hdfs) :
- Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = {
- val goodPaths = if (hdfsMode.sourceStrictness) {
+ /*
+ * Get all the set of valid paths based on source strictness.
+ */
+ protected def goodHdfsPaths(hdfsMode : Hdfs) = {
+ if (hdfsMode.sourceStrictness) {
//we check later that all the paths are good
hdfsPaths
}
@@ -214,7 +216,11 @@ abstract class Source extends java.io.Serializable {
// If there are no matching paths, this is still an error, we need at least something:
hdfsPaths.filter{ pathIsGood(_, hdfsMode.config) }
}
- val taps = goodPaths.map { new Hfs(hdfsScheme, _, SinkMode.KEEP) }
+ }
+
+ protected def createHdfsReadTap(hdfsMode : Hdfs) :
+ Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], _] = {
+ val taps = goodHdfsPaths(hdfsMode).map { new Hfs(hdfsScheme, _, SinkMode.KEEP) }
taps.size match {
case 0 => {
// This case is going to result in an error, but we don't want to throw until
@@ -381,8 +387,10 @@ abstract class TimePathedSource(pattern : String, dateRange : DateRange, tz : Ti
}
override def localPath = pattern
- // Override because we want to check UNGLOBIFIED paths that each are present.
- override def hdfsReadPathsAreGood(conf : Configuration) : Boolean = {
+ /*
+ * Get path statuses based on daterange.
+ */
+ protected def getPathStatuses(conf : Configuration) : Iterable[(String, Boolean)] = {
List("%1$tH" -> Hours(1), "%1$td" -> Days(1)(tz),
"%1$tm" -> Months(1)(tz), "%1$tY" -> Years(1)(tz))
.find { unitDur : (String,Duration) => pattern.contains(unitDur._1) }
@@ -392,17 +400,37 @@ abstract class TimePathedSource(pattern : String, dateRange : DateRange, tz : Ti
.map { dr : DateRange =>
val path = String.format(pattern, dr.start.toCalendar(tz))
val good = pathIsGood(path, conf)
- if (!good) {
- System.err.println("[ERROR] Path: " + path + " is missing in: " + toString)
- }
- //return
- good
+ (path, good)
}
- //All should be true
- .forall { x => x }
}
- .getOrElse(false)
+ .getOrElse(Nil : Iterable[(String, Boolean)])
}
+
+ // Override because we want to check UNGLOBIFIED paths that each are present.
+ override def hdfsReadPathsAreGood(conf : Configuration) : Boolean = {
+ getPathStatuses(conf).forall{ x =>
+ if (!x._2) {
+ System.err.println("[ERROR] Path: " + x._1 + " is missing in: " + toString)
+ }
+ x._2
+ }
+ }
+}
+
+/*
+ * A source that contains the most recent existing path in this date range.
+ */
+abstract class MostRecentGoodSource(p : String, dr : DateRange, t : TimeZone)
+ extends TimePathedSource(p, dr, t) {
+
+ override protected def goodHdfsPaths(hdfsMode : Hdfs) = getPathStatuses(hdfsMode.config)
+ .toList
+ .reverse
+ .find{ _._2 }
+ .map{ x => x._1 }
+
+ override def hdfsReadPathsAreGood(conf : Configuration) = getPathStatuses(conf)
+ .exists{ _._2 }
}
case class TextLine(p : String) extends FixedPathSource(p) with TextLineScheme
Something went wrong with that request. Please try again.