Skip to content
Browse files

Fixing up source code.

  • Loading branch information...
1 parent bcb4fdb commit 6397b1e201332154bf67b2f861739515632b51b7 @azymnis azymnis committed
Showing with 11 additions and 18 deletions.
  1. +11 −18 src/main/scala/com/twitter/scalding/Source.scala
View
29 src/main/scala/com/twitter/scalding/Source.scala
@@ -388,9 +388,9 @@ abstract class TimePathedSource(pattern : String, dateRange : DateRange, tz : Ti
override def localPath = pattern
/*
- * Valid paths based on daterange.
+ * Get path statuses based on daterange.
*/
- protected def goodTimePaths(conf : Configuration) : Option[Iterable[(String, Boolean)]] = {
+ protected def getPathStatuses(conf : Configuration) : Iterable[(String, Boolean)] = {
List("%1$tH" -> Hours(1), "%1$td" -> Days(1)(tz),
"%1$tm" -> Months(1)(tz), "%1$tY" -> Years(1)(tz))
.find { unitDur : (String,Duration) => pattern.contains(unitDur._1) }
@@ -403,20 +403,17 @@ abstract class TimePathedSource(pattern : String, dateRange : DateRange, tz : Ti
(path, good)
}
}
+ .getOrElse(Nil : Iterable[(String, Boolean)])
}
// Override because we want to check UNGLOBIFIED paths that each are present.
override def hdfsReadPathsAreGood(conf : Configuration) : Boolean = {
- goodTimePaths(conf)
- .map {
- _.forall{ x =>
- if (!x._2) {
- System.err.println("[ERROR] Path: " + x._1 + " is missing in: " + toString)
- }
- x._2
- }
+ getPathStatuses(conf).forall{ x =>
+ if (!x._2) {
+ System.err.println("[ERROR] Path: " + x._1 + " is missing in: " + toString)
}
- .getOrElse(false)
+ x._2
+ }
}
}
@@ -426,17 +423,13 @@ abstract class TimePathedSource(pattern : String, dateRange : DateRange, tz : Ti
abstract class MostRecentGoodSource(p : String, dr : DateRange, t : TimeZone)
extends TimePathedSource(p, dr, t) {
- override protected def goodHdfsPaths(hdfsMode : Hdfs) = goodTimePaths(hdfsMode.config)
- .getOrElse(Nil)
+ override protected def goodHdfsPaths(hdfsMode : Hdfs) = getPathStatuses(hdfsMode.config)
.toList
.reverse
.find{ _._2 }
- .map{ x => List(x._1) }
- .getOrElse(Nil)
- .toIterable
+ .map{ x => x._1 }
- override def hdfsReadPathsAreGood(conf : Configuration) = goodTimePaths(conf)
- .getOrElse(Nil)
+ override def hdfsReadPathsAreGood(conf : Configuration) = getPathStatuses(conf)
.exists{ _._2 }
}

0 comments on commit 6397b1e

Please sign in to comment.
Something went wrong with that request. Please try again.