In [57]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Spark session & context
spark = SparkSession.builder.master('local[2]').getOrCreate()
sc = spark.sparkContext

Import libraries for training the ML models

In [58]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression


When we read in Time Series data, we need to "bin" it appropriately.  

We need to decide what to do if we have several values in a bin.  Take the average?  Just pick one?

In [79]:
# units are seconds
ts_bin_size = 60 * 60 * 24  # Round to nearest day

Read the OHLC data

In [80]:
bitcoin_price_ohlc_DF = spark.read.csv("data/btc/price-ohlc.csv", inferSchema=True, header=True)
bitcoin_price_ohlc_DF = bitcoin_price_ohlc_DF.withColumn(
  'ts_bin',
  F.round(F.col('time') / ts_bin_size)
)
bitcoin_price_ohlc_DF.printSchema()


root
 |-- time: integer (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- ts_bin: double (nullable = true)



Next, read in price average data

In [81]:
bitcoin_price_DF = spark.read.csv("data/btc/price.csv", inferSchema=True, header=True)
bitcoin_price_DF = bitcoin_price_DF.withColumn(
  'ts_bin',
  F.round(F.col('time') / ts_bin_size)
)
bitcoin_price_DF.show()


+----------+--------------------+-------+
|      time|               price| ts_bin|
+----------+--------------------+-------+
|1279324800|0.049510000000000005|14807.0|
|1279411200|             0.08584|14808.0|
|1279497600|              0.0808|14809.0|
|1279584000| 0.07473333333333332|14810.0|
|1279670400|             0.07921|14811.0|
|1279756800|            0.055945|14812.0|
|1279843200|0.062283333333333336|14813.0|
|1279929600|             0.05454|14814.0|
|1280016000|              0.0505|14815.0|
|1280102400|               0.056|14816.0|
|1280188800|0.059844444444444436|14817.0|
|1280275200|              0.0589|14818.0|
|1280361600| 0.06920000000000001|14819.0|
|1280448000| 0.06428333333333333|14820.0|
|1280534400|             0.06785|14821.0|
|1280620800|              0.0611|14822.0|
|1280707200|                0.06|14823.0|
|1280793600|                0.06|14824.0|
|1280880000|             0.05795|14825.0|
|1280966400|               0.061|14826.0|
+----------+--------------------+-

We can do a filtered binned join to join these together, which is necessary because our timestamps don't match perfectly

In [86]:
combined = bitcoin_price_ohlc_DF.join(bitcoin_price_DF, bitcoin_price_DF.ts_bin == bitcoin_price_ohlc_DF.ts_bin, 'outer') \
  .select(
    bitcoin_price_ohlc_DF.ts_bin, 
    bitcoin_price_ohlc_DF.time, 
    bitcoin_price_ohlc_DF.open, 
    bitcoin_price_ohlc_DF.high, 
    bitcoin_price_ohlc_DF.low, 
    bitcoin_price_ohlc_DF.close, 
    bitcoin_price_DF.price
    ) \
  .sort(F.desc("time"))
combined.show()




+-------+----------+------------------+------------------+------------------+------------------+-----------------+
| ts_bin|      time|              open|              high|               low|             close|            price|
+-------+----------+------------------+------------------+------------------+------------------+-----------------+
|18854.0|1628974800| 46668.83774568702| 46965.60043761734| 46668.83774568702| 46964.97673474165|47064.63037062539|
|18854.0|1628971200| 46960.97859652031| 46960.97859652031| 46470.37578089779|46643.653869476926|47064.63037062539|
|18854.0|1628967600| 46813.46205343097| 47197.36698996014| 46743.07533589857| 47100.35193816682|47064.63037062539|
|18854.0|1628964000|46766.090294525704| 46888.86115153998| 46743.92350332929|46788.203789445164|47064.63037062539|
|18854.0|1628960400| 46867.31940322104| 46901.71011400429| 46685.60557153721|46816.264954585626|47064.63037062539|
|18854.0|1628956800|46949.914611995395|46949.914611995395|   46760.258125276| 46



In [92]:
feature_assembler = VectorAssembler(inputCols=["time", "open", "high", "low", "close"], outputCol='VFeatures', handleInvalid='skip')
output = feature_assembler.transform(combined)
output.limit(2).show()

+-------+----------+-----------------+-----------------+-----------------+------------------+-----------------+--------------------+
| ts_bin|      time|             open|             high|              low|             close|            price|           VFeatures|
+-------+----------+-----------------+-----------------+-----------------+------------------+-----------------+--------------------+
|18854.0|1628974800|46668.83774568702|46965.60043761734|46668.83774568702| 46964.97673474165|47064.63037062539|[1.6289748E9,4666...|
|18854.0|1628971200|46960.97859652031|46960.97859652031|46470.37578089779|46643.653869476926|47064.63037062539|[1.6289712E9,4696...|
+-------+----------+-----------------+-----------------+-----------------+------------------+-----------------+--------------------+



In [103]:
traindata, testdata = output.randomSplit([0.75, 0.25])
regressor = LinearRegression(featuresCol='VFeatures', labelCol='price')
regressor = regressor.fit(traindata)

21/08/20 18:03:47 WARN Instrumentation: [8041a5b7] regParam is zero, which might cause numerical instability and overfitting.


In [104]:
regressor.coefficients

DenseVector([0.0, -0.2296, 0.5643, -0.0469, 0.7099])

In [105]:
pred = regressor.evaluate(testdata)



Py4JJavaError: An error occurred while calling o932.pValues.
: java.lang.UnsupportedOperationException: No p-value available for this LinearRegressionModel
	at org.apache.spark.ml.regression.LinearRegressionSummary.pValues$lzycompute(LinearRegression.scala:1067)
	at org.apache.spark.ml.regression.LinearRegressionSummary.pValues(LinearRegression.scala:1064)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
