Skip to content

Commit

Permalink
TypedText - Added versions of dailyPrefixSuffix for TSV and CSV sourc…
Browse files Browse the repository at this point in the history
…es. (#1627)

* TypedText - Added versions of dailyPrefixSuffix for TSV and CSV sources.

* TypedText - Refactored functions for loading files with different types of delimiters.
  • Loading branch information
cavorite authored and johnynek committed Dec 5, 2016
1 parent 1d42ada commit 845c33f
Showing 1 changed file with 40 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,53 @@ object TypedText {
/**
* Prefix might be "/logs/awesome"
*/
def hourlyTsv[T](prefix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] = {
private def hourly[T](sep: TypedSep, prefix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] = {
require(prefix.last != '/', "prefix should not include trailing /")
new TimePathTypedText[T](TAB, prefix + TimePathedSource.YEAR_MONTH_DAY_HOUR + "/*")
new TimePathTypedText[T](sep, prefix + TimePathedSource.YEAR_MONTH_DAY_HOUR + "/*")
}
def hourlyOsv[T](prefix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] = {
require(prefix.last != '/', "prefix should not include trailing /")
new TimePathTypedText[T](ONE, prefix + TimePathedSource.YEAR_MONTH_DAY_HOUR + "/*")
}
def hourlyCsv[T](prefix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] = {
require(prefix.last != '/', "prefix should not include trailing /")
new TimePathTypedText[T](COMMA, prefix + TimePathedSource.YEAR_MONTH_DAY_HOUR + "/*")
}
def dailyTsv[T](prefix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] = {
require(prefix.last != '/', "prefix should not include trailing /")
new TimePathTypedText[T](TAB, prefix + TimePathedSource.YEAR_MONTH_DAY + "/*")
}
def dailyOsv[T](prefix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] = {
require(prefix.last != '/', "prefix should not include trailing /")
new TimePathTypedText[T](ONE, prefix + TimePathedSource.YEAR_MONTH_DAY + "/*")
}
def dailyCsv[T](prefix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] = {

def hourlyTsv[T](prefix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] =
hourly(TAB, prefix)

def hourlyOsv[T](prefix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] =
hourly(ONE, prefix)

def hourlyCsv[T](prefix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] =
hourly(COMMA, prefix)

private def daily[T](
sep: TypedSep, prefix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] = {
require(prefix.last != '/', "prefix should not include trailing /")
new TimePathTypedText[T](COMMA, prefix + TimePathedSource.YEAR_MONTH_DAY + "/*")
new TimePathTypedText[T](sep, prefix + TimePathedSource.YEAR_MONTH_DAY + "/*")
}
def dailyPrefixSuffixOsv[T](prefix: String, suffix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] = {

def dailyTsv[T](prefix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] =
daily(TAB, prefix)

def dailyOsv[T](prefix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] =
daily(ONE, prefix)

def dailyCsv[T](prefix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] =
daily(COMMA, prefix)

private def dailyPrefixSuffix[T](
sep: TypedSep,
prefix: String,
suffix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] = {
require(prefix.last != '/', "prefix should not include trailing /")
require(suffix.head == '/', "suffix should include a preceding /")
new TimePathTypedText[T](ONE, prefix + TimePathedSource.YEAR_MONTH_DAY + suffix + "/*")
new TimePathTypedText[T](sep, prefix + TimePathedSource.YEAR_MONTH_DAY + suffix + "/*")
}

def dailyPrefixSuffixTsv[T](prefix: String, suffix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] =
dailyPrefixSuffix(TAB, prefix, suffix)

def dailyPrefixSuffixOsv[T](prefix: String, suffix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] =
dailyPrefixSuffix(ONE, prefix, suffix)

def dailyPrefixSuffixCsv[T](prefix: String, suffix: String)(implicit dr: DateRange, td: TypeDescriptor[T]): TypedTextDelimited[T] =
dailyPrefixSuffix(COMMA, prefix, suffix)

}

trait TypedTextDelimited[T] extends SchemedSource with Mappable[T] with TypedSink[T] {
Expand Down

0 comments on commit 845c33f

Please sign in to comment.