In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.types as t
import pyspark.sql.functions as f
import seaborn as sns
import matplotlib.dates as mdates
import matplotlib.pyplot as plt

In [2]:
spark = (SparkSession.builder.config("spark.driver.memory","5g").config("spark.driver.maxResultSize", "5g").getOrCreate())
spark

In [3]:
%%time
from pyspark.sql.types import DoubleType


X_train = spark.read.csv("./Dataset/train.csv", header=True, inferSchema=True)
X_test = spark.read.csv("./Dataset/example_test.csv", header=True, inferSchema=True)

# Convert VWAP Feature to double
X_train = X_train.withColumn("VWAP", X_train.VWAP.cast(DoubleType()))

X_train.printSchema()

root
 |-- timestamp: integer (nullable = true)
 |-- Asset_ID: integer (nullable = true)
 |-- Count: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- VWAP: double (nullable = true)
 |-- Target: double (nullable = true)

CPU times: user 5.92 ms, sys: 5.85 ms, total: 11.8 ms
Wall time: 31.3 s


In [4]:
X_train.head(1)

[Row(timestamp=1514764860, Asset_ID=2, Count=40.0, Open=2376.58, High=2399.5, Low=2357.14, Close=2374.59, Volume=19.23300519, VWAP=2373.1163915061647, Target=-0.004218152387429286)]

In [18]:
%%time
# Check null values if any

X_train.select([f.count(f.when(f.isnull(c), c)).alias(c) for c in X_train.columns]).show()

+---------+--------+-----+----+----+---+-----+------+----+------+
|timestamp|Asset_ID|Count|Open|High|Low|Close|Volume|VWAP|Target|
+---------+--------+-----+----+----+---+-----+------+----+------+
|        0|       0|    0|   0|   0|  0|    0|     0|   0|     0|
+---------+--------+-----+----+----+---+-----+------+----+------+

CPU times: user 13.2 ms, sys: 6.29 ms, total: 19.5 ms
Wall time: 29 s


In [6]:
# Drop missing values
X_train = X_train.na.drop()

In [7]:
col_features = ['Count', "Open", "High", "Low", "Close", "Volume", "VWAP", "Target"]

In [8]:
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=col_features, outputCol="Independent Features")
output = vecAssembler.transform(X_train)

In [9]:
# Final data
final_data = output.select(["Independent Features", "Target"])
final_data.head(3)

[Row(Independent Features=DenseVector([40.0, 2376.58, 2399.5, 2357.14, 2374.59, 19.233, 2373.1164, -0.0042]), Target=-0.004218152387429286),
 Row(Independent Features=DenseVector([5.0, 8.53, 8.53, 8.53, 8.53, 78.38, 8.53, -0.0144]), Target=-0.014398966468964769),
 Row(Independent Features=DenseVector([229.0, 13835.194, 14013.8, 13666.11, 13850.176, 31.5501, 13827.0621, -0.0146]), Target=-0.014643224355736173)]

In [22]:
# Base Model (Without Time series considerations)
from pyspark.ml.regression import LinearRegression
train_data, test_data = final_data.randomSplit([0.75, 0.25]) # Randomly split the data into training and testing

lr = LinearRegression(featuresCol="Independent Features", labelCol = "Target")

lrModel = lr.fit(train_data) # Fit the model

In [23]:
lrModel.intercept

nan

In [17]:
%%time
# Predict/Evaluate on test data

pred_results = lrModel.evaluate(test_data)
pred_results.predictions.show()



+--------------------+--------------------+----------+
|Independent Features|              Target|prediction|
+--------------------+--------------------+----------+
|[1.0,0.029101,0.0...|0.010354317165462756|       NaN|
|[1.0,0.0294839999...|0.001288878336668...|       NaN|
|[1.0,0.0303899999...|-0.01523387910549...|       NaN|
|[1.0,0.03055,0.03...|0.007245901639344243|       NaN|
|[1.0,0.0309,0.030...|0.013009708737864223|       NaN|
|[1.0,0.0310300000...|-0.04964539007092...|       NaN|
|[1.0,0.03184,0.03...|0.026107694238734824|       NaN|
|[1.0,0.031863,0.0...|-0.03131850923896007|       NaN|
|[1.0,0.03193,0.03...|-0.03343749999999979|       NaN|
|[1.0,0.03199,0.03...|-0.04473272897780567|       NaN|
|[1.0,0.032,0.032,...|-0.03998124413879...|       NaN|
|[1.0,0.0320170000...| 0.00824561951463254|       NaN|
|[1.0,0.032297,0.0...|-0.00673202208847...|       NaN|
|[1.0,0.032474,0.0...|0.005145053310190...|       NaN|
|[1.0,0.03258,0.03...|-6.09942055504686...|       NaN|
|[1.0,0.03