Skip to content

Commit

Permalink
Reader Builder API
Browse files Browse the repository at this point in the history
Provide a common reader API similar to the `sqlContext.read` API
available in Spark SQL. Example usage:

```scala
val timeSeriesRDD = new ReadBuilder()
  .range("20170101", "20170201")
  .option("timeColumn", "myTimeColumn")
  .parquet("myParquetFile")
```
  • Loading branch information
Steven She authored and icexelloss committed Jul 2, 2018
1 parent 78ac102 commit ea12afe
Show file tree
Hide file tree
Showing 10 changed files with 621 additions and 39 deletions.
20 changes: 11 additions & 9 deletions python/ts/flint/readwriter.py
Expand Up @@ -199,15 +199,18 @@ def pandas(self, df, schema=None, *,
is_sorted=is_sorted,
unit=self._parameters.timeUnitString())

def _df_between(self, df, begin, end, time_column, junit):
def _df_between(self, df, begin_nanos, end_nanos, time_column):
"""Filter a Python dataframe to contain data between begin (inclusive) and end (exclusive)
:return: :class:`pyspark.sql.DataFrame`
"""
jdf = df._jdf
new_jdf = self._jpkg.TimeSeriesRDD.DFBetween(jdf, begin, end, junit, time_column)
if begin_nanos:
df = df.filter(df[time_column] >= begin_nanos)

return DataFrame(new_jdf, self._sqlContext)
if end_nanos:
df = df.filter(df[time_column] < end_nanos)

return df

def dataframe(self, df, begin=None, end=None, *,
timezone='UTC',
Expand Down Expand Up @@ -266,13 +269,12 @@ def dataframe(self, df, begin=None, end=None, *,
timeUnit=unit
)

begin = self._parameters.range().beginFlintString()
end = self._parameters.range().endFlintString()
time_column = self._parameters.timeColumn()
jtimeunit = self._parameters.timeUnit()
begin_nanos = self._parameters.range().beginNanosOrNull()
end_nanos = self._parameters.range().endNanosOrNull()

if begin or end:
df = self._df_between(df, begin, end, time_column, jtimeunit)
if begin_nanos or end_nanos:
df = self._df_between(df, begin_nanos, end_nanos, time_column)

return TimeSeriesDataFrame._from_df(
df,
Expand Down
6 changes: 3 additions & 3 deletions scalastyle-config.xml
Expand Up @@ -210,11 +210,11 @@ This file is divided into 3 sections:
<!-- Import ordering. Currently warning only since there are lots of violations. -->
<check level="warning" class="org.scalastyle.scalariform.ImportOrderChecker" enabled="true">
<parameters>
<parameter name="groups">java,scala,3rdParty,spark</parameter>
<parameter name="groups">java,scala,3rdParty,flint</parameter>
<parameter name="group.java">javax?\..*</parameter>
<parameter name="group.scala">scala\..*</parameter>
<parameter name="group.3rdParty">(?!org\.apache\.spark\.).*</parameter>
<parameter name="group.spark">org\.apache\.spark\..*</parameter>
<parameter name="group.3rdParty">(?!com\.twosigma\.flint\.).*</parameter>
<parameter name="group.flint">com\.twosigma\.flint\..*</parameter>
</parameters>
</check>

Expand Down
97 changes: 81 additions & 16 deletions src/main/scala/com/twosigma/flint/timeseries/TimeSeriesRDD.scala
Expand Up @@ -17,6 +17,7 @@
package com.twosigma.flint.timeseries

import java.util.concurrent.TimeUnit
import javax.annotation.Nullable

import com.twosigma.flint.FlintConf
import com.twosigma.flint.annotation.PythonApi
Expand Down Expand Up @@ -149,28 +150,52 @@ object TimeSeriesRDD {
* Filter a [[org.apache.spark.sql.DataFrame]] to contain data within a time range
*
* @param dataFrame A [[org.apache.spark.sql.DataFrame]].
* @param begin Begin time of the returned [[DataFrame]], inclusive
* @param end End time of the returnred [[DataFrame]], exclusive
* @param begin Optional begin time of the returned [[DataFrame]], inclusive
* @param end Optional end time of the returned [[DataFrame]], exclusive
* @param timeUnit Optional. The time unit under time column which could be
* [[scala.concurrent.duration.NANOSECONDS]],[[scala.concurrent.duration.MILLISECONDS]], etc.
* @param timeColumn Optional. The name of column in `df` that specifies the column name for time. Default: "time"
* @return a [[org.apache.spark.sql.DataFrame]].
*/
@deprecated("0.3.4", "No longer used by Python bindings")
@PythonApi
private[flint] def DFBetween(
dataFrame: DataFrame,
begin: String,
end: String,
@Nullable begin: String,
@Nullable end: String,
timeUnit: TimeUnit = NANOSECONDS,
timeColumn: String = timeColumnName
): DataFrame = {
val beginNanos = Option(begin).map(TimeFormat.parse(_, timeUnit = timeUnit))
val endNanos = Option(end).map(TimeFormat.parse(_, timeUnit = timeUnit))

DFBetween(dataFrame, beginNanos, endNanos, timeColumn = timeColumn)
}

/**
*
* @param dataFrame A [[org.apache.spark.sql.DataFrame]].
* @param beginNanosOpt Optional begin time of the returned [[DataFrame]] in nanoseconds, inclusive
* @param endNanosOpt Optional end time of the returned [[DataFrame]] in nanoseconds, exclusive
* @param timeColumn Optional. The name of column in `dataFrame` that specifies the column name for time.
* @return a [[org.apache.spark.sql.DataFrame]].
*/
private[flint] def DFBetween(
dataFrame: DataFrame,
beginNanosOpt: Option[Long],
endNanosOpt: Option[Long],
timeColumn: String
): DataFrame = {
var df = dataFrame

if (begin != null) {
df = df.filter(df(timeColumn) >= timeUnit.convert(TimeFormat.parseNano(begin), NANOSECONDS))
df = beginNanosOpt match {
case Some(nanos) => df.filter(df(timeColumn) >= nanos)
case None => df
}
if (end != null) {
df = df.filter(df(timeColumn) < timeUnit.convert(TimeFormat.parseNano(end), NANOSECONDS))

df = endNanosOpt match {
case Some(nanos) => df.filter(df(timeColumn) < nanos)
case None => df
}

df
Expand Down Expand Up @@ -355,17 +380,57 @@ object TimeSeriesRDD {
)(
isSorted: Boolean,
timeUnit: TimeUnit,
begin: String = null,
end: String = null,
columns: Seq[String] = null,
@Nullable begin: String = null,
@Nullable end: String = null,
@Nullable columns: Seq[String] = null,
timeColumn: String = timeColumnName
): TimeSeriesRDD = {
fromParquet(
sc,
paths,
isSorted = isSorted,
beginNanos = Option(begin).map(TimeFormat.parse(_, timeUnit = timeUnit)),
endNanos = Option(end).map(TimeFormat.parse(_, timeUnit = timeUnit)),
columns = Option(columns),
timeUnit = timeUnit,
timeColumn = timeColumn
)
}

/**
* Read a Parquet file into a [[TimeSeriesRDD]] using optional nanoseconds for begin and end.
*
* Used by [[com.twosigma.flint.timeseries.io.read.ReadBuilder]].
*
* @param sc The [[org.apache.spark.SparkContext]].
* @param paths The paths of the parquet file.
* @param isSorted flag specifies if the rows in the file have been sorted by their timestamps.
* @param beginNanos Optional. Inclusive nanoseconds.
* @param endNanos Optional. Exclusive nanoseconds.
* @param columns Optional. Column in the parquet file to read into [[TimeSeriesRDD]]. IMPORTANT: This is critical
* for performance. Reading small amounts of columns can easily increase performance by 10x
* comparing to reading all columns in the file.
* @param timeColumn Optional. Column in parquet file that specifies time.
* @return a [[TimeSeriesRDD]]
*/
private[timeseries] def fromParquet(
sc: SparkContext,
paths: Seq[String],
isSorted: Boolean,
beginNanos: Option[Long],
endNanos: Option[Long],
columns: Option[Seq[String]],
timeUnit: TimeUnit,
timeColumn: String
): TimeSeriesRDD = {
val sqlContext = SQLContext.getOrCreate(sc)
var df = sqlContext.read.parquet(paths: _*)
if (columns != null) {
df = df.select(columns.head, columns.tail: _*)
}
fromDF(DFBetween(df, begin, end, timeUnit, timeColumn))(
val df = sqlContext.read.parquet(paths: _*)

val prunedDf = columns.map { columnNames =>
df.select(columnNames.map(col): _*)
}.getOrElse(df)

fromDF(DFBetween(prunedDf, beginNanos, endNanos, timeColumn))(
isSorted = isSorted,
timeUnit = timeUnit,
timeColumn = timeColumn
Expand Down
100 changes: 100 additions & 0 deletions src/main/scala/com/twosigma/flint/timeseries/io/read/Parameters.scala
@@ -0,0 +1,100 @@
/*
* Copyright 2017 TWO SIGMA OPEN SOURCE, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twosigma.flint.timeseries.io.read

import java.time.{ Instant, ZonedDateTime, ZoneOffset }
import javax.annotation.Nullable

import scala.collection.mutable

import com.twosigma.flint.annotation.PythonApi

private[read] class Parameters private (
val extraOptions: mutable.Map[String, String],
var range: BeginEndRange = BeginEndRange(None, None)
) extends Serializable {

def this(defaultOptions: Map[String, String]) =
this(mutable.HashMap[String, String](defaultOptions.toSeq: _*))

def option(key: String, valueOpt: Option[String]): Unit = valueOpt match {
case Some(v) => extraOptions += key -> v
case None => extraOptions -= key
}

/**
* Convenience method for the Python API that provides a Java Map compatible with py4j.
* Exposed in the Python API as `_extra_options` to return a dict of key-value options.
*/
@PythonApi
private[read] def extraOptionsAsJavaMap: java.util.Map[String, String] = {
import scala.collection.JavaConverters._
extraOptions.asJava
}

}

private[read] case class BeginEndRange(beginNanosOpt: Option[Long], endNanosOpt: Option[Long]) {

def beginNanos: Long = beginNanosOpt.getOrElse(
throw new IllegalArgumentException("'begin' range must be set")
)

def endNanos: Long = endNanosOpt.getOrElse(
throw new IllegalArgumentException("'end' range must be set")
)

@PythonApi
private[read] def beginNanosOrNull: java.lang.Long = beginNanosOpt.map(Long.box).orNull

@PythonApi
private[read] def endNanosOrNull: java.lang.Long = beginNanosOpt.map(Long.box).orNull

/**
* Converts the begin date to an ISO string, or null.
*
* TODO(sshe): 9/29/2017. Deprecated. Keeping for compatibility with old Python packages.
* Newer Python bindings use [[beginNanosOrNull]].
*
* @return A string representation of the begin time or null
*/
@PythonApi
@Nullable
private[read] def beginFlintString: String = beginNanosOpt.map(toISOString).orNull

/**
* Converts the begin date to an ISO string, or null.
*
* TODO(sshe): 9/29/2017. Deprecated. Keeping for compatibility with old Python packages.
* Newer Python bindings use [[endNanosOrNull]].
*
* @return A string representation of the begin time or null
*/
@PythonApi
@Nullable
private[read] def endFlintString: String = endNanosOpt.map(toISOString).orNull

/**
* TODO(sshe): 9/29/2017. Deprecated. Keeping for compatibility with old Python packages.
*/
@PythonApi
private def toISOString(nanos: Long): String =
ZonedDateTime.ofInstant(Instant.ofEpochSecond(0, nanos), ZoneOffset.UTC)
.toOffsetDateTime // Remove timezone ID
.toString // ISO8601 compatible string

}

0 comments on commit ea12afe

Please sign in to comment.