Skip to content

Commit

Permalink
Merge pull request #3 from pegli/feature/python_examples
Browse files Browse the repository at this point in the history
Added Python stocks example
  • Loading branch information
sryza committed Mar 11, 2016
2 parents 72afc70 + 2b56be1 commit df34b90
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 3 deletions.
40 changes: 39 additions & 1 deletion README.md
@@ -1,2 +1,40 @@
# spark-ts-examples
Spark TS Examples

Description
-----------

Examples showing how to use the `spark-ts` time series library for Apache Spark.

Minimum Requirements
--------------------

* Java 1.8
* Maven 3.0
* Apache Spark 1.6.0

Using this Repo
---------------

### Building

We use [Maven](https://maven.apache.org/) for building Java / Scala. To compile and build
the example jar, navigate to the `jvm` directory and run:

mvn package

### Running

To submit one of the Java or Scala examples to a local Spark cluster, run the following command
from the `jvm` directory:

spark-submit --class com.cloudera.tsexamples.Stocks target/spark-ts-examples-0.0.1-SNAPSHOT-jar-with-dependencies.jar

You can substitute any of the Scala or Java example classes as the value for the `--class`
parameter.

To submit a Python example, run the following command from the `python` directory:

spark-submit --driver-class-path PATH/TO/sparkts-0.3.0-jar-with-dependencies.jar Stocks.py

The `--driver-class-path` parameter value must point to the Spark-TS JAR file, which can be
downloaded from the spark-timeseries [Github repo](https://github.com/sryza/spark-timeseries).
4 changes: 2 additions & 2 deletions jvm/src/main/scala/com/cloudera/tsexamples/Stocks.scala
Expand Up @@ -25,7 +25,7 @@ object Stocks {
val dt = ZonedDateTime.of(tokens(0).toInt, tokens(1).toInt, tokens(2).toInt, 0, 0, 0, 0,
ZoneId.systemDefault())
val symbol = tokens(3)
val price = tokens(4).toDouble
val price = tokens(5).toDouble
Row(Timestamp.from(dt.toInstant), symbol, price)
}
val fields = Seq(
Expand Down Expand Up @@ -67,7 +67,7 @@ object Stocks {

// Compute return rates
val returnRates = filled.returnRates()

// Compute Durbin-Watson stats for each series
val dwStats = returnRates.mapValues(TimeSeriesStatisticalTests.dwtest)

Expand Down
69 changes: 69 additions & 0 deletions python/Stocks.py
@@ -0,0 +1,69 @@
from datetime import datetime

from pyspark import SparkContext, SQLContext
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, TimestampType, DoubleType, StringType

from sparkts.datetimeindex import uniform, BusinessDayFrequency
from sparkts.timeseriesrdd import time_series_rdd_from_observations

def lineToRow(line):
(year, month, day, symbol, volume, price) = line.split("\t")
# Python 2.x compatible timestamp generation
dt = datetime(int(year), int(month), int(day))
return (dt, symbol, float(price))

def loadObservations(sparkContext, sqlContext, path):
textFile = sparkContext.textFile(path)
rowRdd = textFile.map(lineToRow)
schema = StructType([
StructField('timestamp', TimestampType(), nullable=True),
StructField('symbol', StringType(), nullable=True),
StructField('price', DoubleType(), nullable=True),
])
return sqlContext.createDataFrame(rowRdd, schema);

if __name__ == "__main__":
sc = SparkContext(appName="Stocks")
sqlContext = SQLContext(sc)

tickerObs = loadObservations(sc, sqlContext, "../data/ticker.tsv")

# Create an daily DateTimeIndex over August and September 2015
freq = BusinessDayFrequency(1, 1, sc)
dtIndex = uniform(start='2015-08-03T00:00-07:00', end='2015-09-22T00:00-07:00', freq=freq, sc=sc)

# Align the ticker data on the DateTimeIndex to create a TimeSeriesRDD
tickerTsrdd = time_series_rdd_from_observations(dtIndex, tickerObs, "timestamp", "symbol", "price")

# Cache it in memory
tickerTsrdd.cache()

# Count the number of series (number of symbols)
print(tickerTsrdd.count())

# Impute missing values using linear interpolation
filled = tickerTsrdd.fill("linear")

# Compute return rates
returnRates = filled.return_rates()

# Durbin-Watson test for serial correlation, ported from TimeSeriesStatisticalTests.scala
def dwtest(residuals):
residsSum = residuals[0] * residuals[0]
diffsSum = 0.0
i = 1
while i < len(residuals):
residsSum += residuals[i] * residuals[i]
diff = residuals[i] - residuals[i - 1]
diffsSum += diff * diff
i += 1
return diffsSum / residsSum

# Compute Durbin-Watson stats for each series
# Swap ticker symbol and stats so min and max compare the statistic value, not the
# ticker names.
dwStats = returnRates.map_series(lambda row: (row[0], [dwtest(row[1])])).map(lambda x: (x[1], x[0]))

print(dwStats.min())
print(dwStats.max())

0 comments on commit df34b90

Please sign in to comment.