$ /raid5/Spark/spark-2.0.0-ts1-bin-hadoop2.7/bin/pyspark --driver-class-path spark-timeseries/target/sparkts-0.4.0-jar-with-dependencies.jar --py-files spark-timeseries/python/dist/sparkts-0.4.0-py2.7.egg Python 2.7.5 (default, Nov 20 2015, 02:00:19) [GCC 4.8.5 20150623 (Red Hat 4.8.5-4)] on linux2 Type "help", "copyright", "credits" or "license" for more information. 16/10/12 09:40:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/10/12 09:40:03 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.0.0 /_/ Using Python version 2.7.5 (default, Nov 20 2015 02:00:19) SparkSession available as 'spark'. >>> >>> >>> from datetime import datetime >>> >>> from pyspark import SparkContext, SQLContext >>> from pyspark.sql import Row >>> from pyspark.sql.types import StructType, StructField, TimestampType, DoubleType, StringType >>> >>> from sparkts.datetimeindex import uniform, BusinessDayFrequency >>> from sparkts.timeseriesrdd import time_series_rdd_from_observations >>> >>> def lineToRow(line): ... (year, month, day, symbol, volume, price) = line.split("\t") ... # Python 2.x compatible timestamp generation ... dt = datetime(int(year), int(month), int(day)) ... return (dt, symbol, float(price)) ... >>> def loadObservations(sparkContext, sqlContext, path): ... textFile = sparkContext.textFile(path) ... rowRdd = textFile.map(lineToRow) ... schema = StructType([ ... StructField('timestamp', TimestampType(), nullable=True), ... StructField('symbol', StringType(), nullable=True), ... StructField('price', DoubleType(), nullable=True), ... ]) ... return sqlContext.createDataFrame(rowRdd, schema); ... >>> >>> sqlContext = SQLContext(sc) >>> >>> tickerObs = loadObservations(sc, sqlContext, "ticker.tsv") 16/10/12 09:40:23 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. >>> >>> # Create an daily DateTimeIndex over August and September 2015 ... freq = BusinessDayFrequency(1, 1, sc) >>> dtIndex = uniform(start='2015-08-03T00:00-07:00', end='2015-09-22T00:00-07:00', freq=freq, sc=sc) >>> >>> # Align the ticker data on the DateTimeIndex to create a TimeSeriesRDD ... tickerTsrdd = time_series_rdd_from_observations(dtIndex, tickerObs, "timestamp", "symbol", "price") >>> >>> # Cache it in memory ... tickerTsrdd.cache() MapPartitionsRDD[13] at map at NativeMethodAccessorImpl.java:-2 >>> >>> # Count the number of series (number of symbols) ... print(tickerTsrdd.count()) 104 >>> >>> # Impute missing values using linear interpolation ... filled = tickerTsrdd.fill("linear") >>> >>> # Compute return rates ... returnRates = filled.return_rates() >>> >>> # Durbin-Watson test for serial correlation, ported from TimeSeriesStatisticalTests.scala ... def dwtest(residuals): ... residsSum = residuals[0] * residuals[0] ... diffsSum = 0.0 ... i = 1 ... while i < len(residuals): ... residsSum += residuals[i] * residuals[i] ... diff = residuals[i] - residuals[i - 1] ... diffsSum += diff * diff ... i += 1 ... return diffsSum / residsSum ... >>> # Compute Durbin-Watson stats for each series ... # Swap ticker symbol and stats so min and max compare the statistic value, not the ... # ticker names. ... dwStats = returnRates.map_series(lambda row: (row[0], [dwtest(row[1])])).map(lambda x: (x[1], x[0])) >>> >>> print(dwStats.min()) ([nan], u'AAL') >>> print(dwStats.max()) ([nan], u'AAL') >>>