diff --git a/README.md b/README.md index 0368c04..3119920 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,40 @@ # spark-ts-examples -Spark TS Examples + +Description +----------- + +Examples showing how to use the `spark-ts` time series library for Apache Spark. + +Minimum Requirements +-------------------- + +* Java 1.8 +* Maven 3.0 +* Apache Spark 1.6.0 + +Using this Repo +--------------- + +### Building + +We use [Maven](https://maven.apache.org/) for building Java / Scala. To compile and build +the example jar, navigate to the `jvm` directory and run: + + mvn package + +### Running + +To submit one of the Java or Scala examples to a local Spark cluster, run the following command +from the `jvm` directory: + + spark-submit --class com.cloudera.tsexamples.Stocks target/spark-ts-examples-0.0.1-SNAPSHOT-jar-with-dependencies.jar + +You can substitute any of the Scala or Java example classes as the value for the `--class` +parameter. + +To submit a Python example, run the following command from the `python` directory: + + spark-submit --driver-class-path PATH/TO/sparkts-0.3.0-jar-with-dependencies.jar Stocks.py + +The `--driver-class-path` parameter value must point to the Spark-TS JAR file, which can be +downloaded from the spark-timeseries [Github repo](https://github.com/sryza/spark-timeseries). diff --git a/jvm/src/main/scala/com/cloudera/tsexamples/Stocks.scala b/jvm/src/main/scala/com/cloudera/tsexamples/Stocks.scala index 885fd92..d6ce748 100644 --- a/jvm/src/main/scala/com/cloudera/tsexamples/Stocks.scala +++ b/jvm/src/main/scala/com/cloudera/tsexamples/Stocks.scala @@ -25,7 +25,7 @@ object Stocks { val dt = ZonedDateTime.of(tokens(0).toInt, tokens(1).toInt, tokens(2).toInt, 0, 0, 0, 0, ZoneId.systemDefault()) val symbol = tokens(3) - val price = tokens(4).toDouble + val price = tokens(5).toDouble Row(Timestamp.from(dt.toInstant), symbol, price) } val fields = Seq( @@ -67,7 +67,7 @@ object Stocks { // Compute return rates val returnRates = filled.returnRates() - + // Compute Durbin-Watson stats for each series val dwStats = returnRates.mapValues(TimeSeriesStatisticalTests.dwtest) diff --git a/python/Stocks.py b/python/Stocks.py new file mode 100644 index 0000000..9299e27 --- /dev/null +++ b/python/Stocks.py @@ -0,0 +1,69 @@ +from datetime import datetime + +from pyspark import SparkContext, SQLContext +from pyspark.sql import Row +from pyspark.sql.types import StructType, StructField, TimestampType, DoubleType, StringType + +from sparkts.datetimeindex import uniform, BusinessDayFrequency +from sparkts.timeseriesrdd import time_series_rdd_from_observations + +def lineToRow(line): + (year, month, day, symbol, volume, price) = line.split("\t") + # Python 2.x compatible timestamp generation + dt = datetime(int(year), int(month), int(day)) + return (dt, symbol, float(price)) + +def loadObservations(sparkContext, sqlContext, path): + textFile = sparkContext.textFile(path) + rowRdd = textFile.map(lineToRow) + schema = StructType([ + StructField('timestamp', TimestampType(), nullable=True), + StructField('symbol', StringType(), nullable=True), + StructField('price', DoubleType(), nullable=True), + ]) + return sqlContext.createDataFrame(rowRdd, schema); + +if __name__ == "__main__": + sc = SparkContext(appName="Stocks") + sqlContext = SQLContext(sc) + + tickerObs = loadObservations(sc, sqlContext, "../data/ticker.tsv") + + # Create an daily DateTimeIndex over August and September 2015 + freq = BusinessDayFrequency(1, 1, sc) + dtIndex = uniform(start='2015-08-03T00:00-07:00', end='2015-09-22T00:00-07:00', freq=freq, sc=sc) + + # Align the ticker data on the DateTimeIndex to create a TimeSeriesRDD + tickerTsrdd = time_series_rdd_from_observations(dtIndex, tickerObs, "timestamp", "symbol", "price") + + # Cache it in memory + tickerTsrdd.cache() + + # Count the number of series (number of symbols) + print(tickerTsrdd.count()) + + # Impute missing values using linear interpolation + filled = tickerTsrdd.fill("linear") + + # Compute return rates + returnRates = filled.return_rates() + + # Durbin-Watson test for serial correlation, ported from TimeSeriesStatisticalTests.scala + def dwtest(residuals): + residsSum = residuals[0] * residuals[0] + diffsSum = 0.0 + i = 1 + while i < len(residuals): + residsSum += residuals[i] * residuals[i] + diff = residuals[i] - residuals[i - 1] + diffsSum += diff * diff + i += 1 + return diffsSum / residsSum + + # Compute Durbin-Watson stats for each series + # Swap ticker symbol and stats so min and max compare the statistic value, not the + # ticker names. + dwStats = returnRates.map_series(lambda row: (row[0], [dwtest(row[1])])).map(lambda x: (x[1], x[0])) + + print(dwStats.min()) + print(dwStats.max())