In [71]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [72]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
from IPython.display import display

In [73]:
spark_application_name = "Spark_Application_Name"

In [74]:
spark = (SparkSession.builder.appName(spark_application_name).getOrCreate())

## Reading the dataset

Google is the stock data of choice due to its completeness in data and also its fluctuation is not as sharp and strong as other stocks such as Amazon or Zoom.

* Help the estimator to have an easier time predicting the stock


In [75]:
google_path = "stocks_data/GOOGLE.csv"
df = spark.read.csv(google_path, header="true", inferSchema="true", multiLine="true", escape='"')

In [76]:
from functools import reduce
from pyspark.sql import DataFrame

In [77]:
display(df)

DataFrame[Date: timestamp, High: double, Low: double, Open: double, Close: double, Volume: int, Adj Close: double, company_name: string]

In [78]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- company_name: string (nullable = true)



## Data cleansing

Cast Date column to of type `Datetime`

In [79]:
from pyspark.sql.functions import *

df = df.withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))
df.show()

+----------+-----------------+-----------------+-----------------+-----------------+-------+-----------------+------------+
|      Date|             High|              Low|             Open|            Close| Volume|        Adj Close|company_name|
+----------+-----------------+-----------------+-----------------+-----------------+-------+-----------------+------------+
|2017-01-03|789.6300048828125|775.7999877929688|778.8099975585938|786.1400146484375|1657300|786.1400146484375|      GOOGLE|
|2017-01-04|791.3400268554688|783.1599731445312|788.3599853515625|786.9000244140625|1073000|786.9000244140625|      GOOGLE|
|2017-01-05|  794.47998046875|  785.02001953125|786.0800170898438|  794.02001953125|1335200|  794.02001953125|      GOOGLE|
|2017-01-06|807.9000244140625|792.2039794921875| 795.260009765625|806.1500244140625|1640200|806.1500244140625|      GOOGLE|
|2017-01-09|809.9660034179688|802.8300170898438|806.4000244140625|806.6500244140625|1274600|806.6500244140625|      GOOGLE|
|2017-01

In [80]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- company_name: string (nullable = true)



Fill the na if there is any at the Volume column with 0 (Precaution)

In [81]:
cleanDF = df.na.fill(value=0, subset=["Volume"])
cleanDF.show()

+----------+-----------------+-----------------+-----------------+-----------------+-------+-----------------+------------+
|      Date|             High|              Low|             Open|            Close| Volume|        Adj Close|company_name|
+----------+-----------------+-----------------+-----------------+-----------------+-------+-----------------+------------+
|2017-01-03|789.6300048828125|775.7999877929688|778.8099975585938|786.1400146484375|1657300|786.1400146484375|      GOOGLE|
|2017-01-04|791.3400268554688|783.1599731445312|788.3599853515625|786.9000244140625|1073000|786.9000244140625|      GOOGLE|
|2017-01-05|  794.47998046875|  785.02001953125|786.0800170898438|  794.02001953125|1335200|  794.02001953125|      GOOGLE|
|2017-01-06|807.9000244140625|792.2039794921875| 795.260009765625|806.1500244140625|1640200|806.1500244140625|      GOOGLE|
|2017-01-09|809.9660034179688|802.8300170898438|806.4000244140625|806.6500244140625|1274600|806.6500244140625|      GOOGLE|
|2017-01

In [82]:
outputPath = "stocks-clean.parquet"

noNullsDF = cleanDF.withColumnRenamed("Adj Close", "AdjClose")
noNullsDF.write.mode("overwrite").parquet(outputPath)

## Add Prediction Column

* The goal of this prediction model is able to predict any given column the value of the next day
* The next day is chosen because the change would not be as drastic and therefore easier to predict rather than next days, week(s), month(s)
* Down here in the example is `High` but we did a test run with all the column

In [83]:
filePath = "stocks-clean.parquet"
stocksDF = spark.read.parquet(filePath)

In [84]:
from pyspark.sql.functions import lead, col
from pyspark.sql import Window

w = Window.orderBy("Date")

stocksDF = stocksDF.withColumn("Next", lead("High",1,0).over(w))

In [85]:
stocksDF = stocksDF.filter(col("Next")!=0.0)
display(stocksDF)

DataFrame[Date: date, High: double, Low: double, Open: double, Close: double, Volume: int, AdjClose: double, company_name: string, Next: double]

In [86]:
outputPath = "stocks-final.parquet"
stocksDF.write.mode("overwrite").parquet(outputPath)

## Split into test and train

* Split with preserved chronological order to help the model to be better at predicting the future
* 80/20 split like standard

In [87]:
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window

stocksDF = stocksDF.withColumn("rank", percent_rank().over(Window.partitionBy().orderBy("Date"))).drop("company_name")

In [88]:
trainDF = stocksDF.where("rank <= .8").drop("rank")
trainDF.show()

+----------+-----------------+-----------------+-----------------+-----------------+-------+-----------------+-----------------+
|      Date|             High|              Low|             Open|            Close| Volume|         AdjClose|             Next|
+----------+-----------------+-----------------+-----------------+-----------------+-------+-----------------+-----------------+
|2017-01-03|789.6300048828125|775.7999877929688|778.8099975585938|786.1400146484375|1657300|786.1400146484375|791.3400268554688|
|2017-01-04|791.3400268554688|783.1599731445312|788.3599853515625|786.9000244140625|1073000|786.9000244140625|  794.47998046875|
|2017-01-05|  794.47998046875|  785.02001953125|786.0800170898438|  794.02001953125|1335200|  794.02001953125|807.9000244140625|
|2017-01-06|807.9000244140625|792.2039794921875| 795.260009765625|806.1500244140625|1640200|806.1500244140625|809.9660034179688|
|2017-01-09|809.9660034179688|802.8300170898438|806.4000244140625|806.6500244140625|1274600|806.6

In [89]:
testDF = stocksDF.where("rank > .8").drop("rank")
testDF.show()

+----------+------------------+------------------+------------------+------------------+-------+------------------+------------------+
|      Date|              High|               Low|              Open|             Close| Volume|          AdjClose|              Next|
+----------+------------------+------------------+------------------+------------------+-------+------------------+------------------+
|2020-02-24| 1436.969970703125|1411.3900146484375|1426.1099853515625|1421.5899658203125|2867100|1421.5899658203125|1438.1400146484375|
|2020-02-25|1438.1400146484375|1382.4000244140625|            1433.0| 1388.449951171875|2478300| 1388.449951171875| 1415.699951171875|
|2020-02-26| 1415.699951171875|            1379.0|1396.1400146484375|1393.1800537109375|2202400|1393.1800537109375|1371.7039794921875|
|2020-02-27|1371.7039794921875|1317.1700439453125|  1362.06005859375|1318.0899658203125|2978300|1318.0899658203125|1341.1400146484375|
|2020-02-28|1341.1400146484375|            1271.0|     

The input of any ML model from Spark is under the form of a vector containing `n` amount of features.
The following cell creates the vector of features to feed into the model.

In [90]:
from pyspark.ml.feature import VectorAssembler

numericCols = []
for (field, dataType) in trainDF.dtypes:
    if (dataType == "double") & (field != "Next"):
        numericCols.append(field)

vecAssembler = VectorAssembler(inputCols=numericCols, outputCol="features")

# Linear Regression
* First and simplest Regressor that is a good reference point for other estimator

In [91]:
vecTrainDF = vecAssembler.transform(trainDF)

In [92]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol="Next", regParam=0.01)
lrModel = lr.fit(vecTrainDF)

## Pipeline

Pipeline is create to streamline the data transformation and predicting process

In [93]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)

## Apply to test DF

In [94]:
predictionDF = pipelineModel.transform(testDF)

predictionDF.select("features", "Next", "prediction").show(10)

+--------------------+------------------+------------------+
|            features|              Next|        prediction|
+--------------------+------------------+------------------+
|[1436.96997070312...|1438.1400146484375|1433.3858691797398|
|[1438.14001464843...| 1415.699951171875|1408.5239465590419|
|[1415.69995117187...|1371.7039794921875|  1408.25904452282|
|[1371.70397949218...|1341.1400146484375|1339.6268740697183|
|[1341.14001464843...|1390.8699951171875|1356.9669645018753|
|[1390.86999511718...|1410.1500244140625|1403.0954723388015|
|[1410.15002441406...|1388.0899658203125|1367.4898097392918|
|[1388.08996582031...|1358.9100341796875|1398.3019850759447|
|[1358.91003417968...| 1306.219970703125| 1337.199784601689|
|[1306.21997070312...| 1254.760009765625|1311.6375892531723|
+--------------------+------------------+------------------+
only showing top 10 rows



The model that gives the best result with the given dataset

In [95]:
from pyspark.ml.evaluation import RegressionEvaluator

regEvaluator = RegressionEvaluator(labelCol="Next", predictionCol="prediction")
rmse = regEvaluator.setMetricName("rmse").evaluate(predictionDF)
r2 = regEvaluator.setMetricName("r2").evaluate(predictionDF)
print(f"RMSE is {rmse}")
print(f"R2 is {r2}")

RMSE is 26.953545670982795
R2 is 0.9754423733828439


# LogScale

In [96]:
logTrainDF = trainDF.withColumn("log_next", log(col("Next")))
logTestDF = testDF.withColumn("log_next", log(col("Next")))

In [97]:
lr = LinearRegression(labelCol="log_next", predictionCol="log_pred")
pipeline = Pipeline(stages = [vecAssembler, lr])
pipelineModel = pipeline.fit(logTrainDF)
predictionDF = pipelineModel.transform(logTestDF)

## Apply to test DF

* Log Scale seems to be having a hard time predicting because the fluctuation in price between days is so minute (small) that a small change in the log could lead to a throw off in the prediction

In [98]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, exp

expDF = predictionDF.withColumn("prediction", exp(col("log_pred")))

regEvaluator = RegressionEvaluator(labelCol="Next", predictionCol="prediction")
rmse = regEvaluator.setMetricName("rmse").evaluate(expDF)
r2 = regEvaluator.setMetricName("r2").evaluate(expDF)
print(f"RMSE is {rmse}")
print(f"R2 is {r2}")

RMSE is 107.45571599482726
R2 is 0.6096864492272256


In [99]:
expDF.select("features", "Next","prediction").show(10)

+--------------------+------------------+------------------+
|            features|              Next|        prediction|
+--------------------+------------------+------------------+
|[1436.96997070312...|1438.1400146484375|1477.1218308939633|
|[1438.14001464843...| 1415.699951171875|1460.0566315762965|
|[1415.69995117187...|1371.7039794921875|1450.0198583333124|
|[1371.70397949218...|1341.1400146484375|1371.7448274763638|
|[1341.14001464843...|1390.8699951171875|1399.9846185911151|
|[1390.86999511718...|1410.1500244140625|1455.2577405579018|
|[1410.15002441406...|1388.0899658203125|1418.3932186939137|
|[1388.08996582031...|1358.9100341796875|1439.6753633039857|
|[1358.91003417968...| 1306.219970703125| 1367.325193292934|
|[1306.21997070312...| 1254.760009765625|  1331.44913530293|
+--------------------+------------------+------------------+
only showing top 10 rows



# Decision Tree

In [100]:
from pyspark.ml.regression import DecisionTreeRegressor

decisionTree = DecisionTreeRegressor(labelCol="Next")

## Pipeline

In [101]:
from pyspark.ml import Pipeline

stages = [vecAssembler, decisionTree]
pipeline = Pipeline(stages=stages)

pipelineModel = pipeline.fit(trainDF)

In [102]:
pipelineModel = pipeline.fit(trainDF)

## Visualize the decision tree

In [103]:
dtModel = pipelineModel.stages[-1]
print(dtModel.toDebugString)

DecisionTreeRegressionModel: uid=DecisionTreeRegressor_e2bca7f4a85c, depth=5, numNodes=61, numFeatures=5
  If (feature 3 <= 1080.9400024414062)
   If (feature 3 <= 958.4499816894531)
    If (feature 1 <= 845.6400146484375)
     If (feature 3 <= 818.2799987792969)
      If (feature 0 <= 821.2944946289062)
       Predict: 807.487132196841
      Else (feature 0 > 821.2944946289062)
       Predict: 821.7799987792969
     Else (feature 3 > 818.2799987792969)
      If (feature 3 <= 830.5450134277344)
       Predict: 829.1996663411459
      Else (feature 3 > 830.5450134277344)
       Predict: 841.1006005859375
    Else (feature 1 > 845.6400146484375)
     If (feature 3 <= 920.6299743652344)
      If (feature 0 <= 852.125)
       Predict: 853.4000244140625
      Else (feature 0 > 852.125)
       Predict: 912.344248453776
     Else (feature 3 > 920.6299743652344)
      If (feature 3 <= 941.010009765625)
       Predict: 934.9957138372928
      Else (feature 3 > 941.010009765625)
       Predict: 

## Feature & Importance

In [104]:
decisionTreeModel = pipelineModel.stages[-1]
decisionTreeModel.featureImportances

SparseVector(5, {0: 0.0265, 1: 0.0354, 2: 0.0, 3: 0.9381})

In [105]:
import pandas as pd
dtModel = pipelineModel.stages[-1]
featureImp = pd.DataFrame(
  list(zip(vecAssembler.getInputCols(), decisionTreeModel.featureImportances)),
  columns=["feature", "importance"])
featureImp.sort_values(by="importance", ascending=False)

Unnamed: 0,feature,importance
3,Close,0.938105
1,Low,0.035369
0,High,0.026484
2,Open,4.2e-05
4,AdjClose,0.0


## Apply to test DF

In [106]:
predictionDF = pipelineModel.transform(testDF)

predictionDF.select("features", "Next", "prediction").show()

+--------------------+------------------+------------------+
|            features|              Next|        prediction|
+--------------------+------------------+------------------+
|[1436.96997070312...|1438.1400146484375|1379.6908624822443|
|[1438.14001464843...| 1415.699951171875|1379.6908624822443|
|[1415.69995117187...|1371.7039794921875|1379.6908624822443|
|[1371.70397949218...|1341.1400146484375|1322.2032114664714|
|[1341.14001464843...|1390.8699951171875|1298.9300537109375|
|[1390.86999511718...|1410.1500244140625|1322.2032114664714|
|[1410.15002441406...|1388.0899658203125|1322.2032114664714|
|[1388.08996582031...|1358.9100341796875|1379.6908624822443|
|[1358.91003417968...| 1306.219970703125|1322.2032114664714|
|[1306.21997070312...| 1254.760009765625|1298.9300537109375|
|[1254.76000976562...|1281.1500244140625|1237.4631596779336|
|[1281.15002441406...|   1260.9599609375|1237.4631596779336|
|[1260.9599609375,...|1193.8699951171875|1237.4631596779336|
|[1193.86999511718...| 1

In [107]:
from pyspark.ml.evaluation import RegressionEvaluator

regEvaluator = RegressionEvaluator(predictionCol="prediction",
                                          labelCol="Next", 
                                          metricName="rmse")

rmse = regEvaluator.evaluate(predictionDF)
r2 = regEvaluator.setMetricName("r2").evaluate(predictionDF)
print(f"RMSE is {rmse}")
print(f"R2 is {r2}")

RMSE is 112.42707182860805
R2 is 0.5727359194946977


# HyperparameterTuning
* Here is the last step of any given ML model, to optimize the performance of a model.
* However, this would work best given the dataset is small in which case `GOOGLE.csv` is small enough
* Essentially, the process would create numerous model with a variation of the input grid parameters to find and score the best one and return the set of parameters that give the best result.
## RandomForest

In [108]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline

rf = RandomForestRegressor(labelCol="Next", seed=42)
pipeline = Pipeline(stages = [vecAssembler, rf])

## Grid Search

In [109]:
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, [2, 4, 6])
            .addGrid(rf.numTrees, [10, 100])
            .build())

## Cross Validation

In [110]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator

regEvaluator = RegressionEvaluator(labelCol="Next",
                                predictionCol="prediction", 
                                metricName="rmse")

cv = CrossValidator(estimator=pipeline, 
                    evaluator=regEvaluator,
                    estimatorParamMaps=paramGrid, 
                    numFolds=3, 
                    seed=42)

In [111]:
cvModel = cv.setParallelism(4).fit(trainDF)
cv = CrossValidator(estimator=rf, 
                    evaluator=regEvaluator,
                    estimatorParamMaps=paramGrid, 
                    numFolds=3, 
                    parallelism=4, 
                    seed=42)

pipeline = Pipeline(stages=[vecAssembler, cv])

pipelineModel = pipeline.fit(trainDF)

In [112]:
list(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics))

[({Param(parent='RandomForestRegressor_0dea8746b805', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
   Param(parent='RandomForestRegressor_0dea8746b805', name='numTrees', doc='Number of trees to train (>= 1).'): 10},
  47.33141238232539),
 ({Param(parent='RandomForestRegressor_0dea8746b805', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
   Param(parent='RandomForestRegressor_0dea8746b805', name='numTrees', doc='Number of trees to train (>= 1).'): 100},
  43.28900249925332),
 ({Param(parent='RandomForestRegressor_0dea8746b805', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 4,
   Param(parent='RandomForestRegressor_0dea8746b805', name

## Apply to the test DF

In [113]:
predictionDF = pipelineModel.transform(testDF)

regEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Next", metricName="rmse")

rmse = regEvaluator.evaluate(predictionDF)
r2 = regEvaluator.setMetricName("r2").evaluate(predictionDF)
print(f"RMSE is {rmse}")
print(f"R2 is {r2}")

RMSE is 112.47297881106735
R2 is 0.5723869215376514


In [114]:
predictionDF = pipelineModel.transform(testDF)

predictionDF.select("features", "Next", "prediction").show(10)

+--------------------+------------------+------------------+
|            features|              Next|        prediction|
+--------------------+------------------+------------------+
|[1436.96997070312...|1438.1400146484375|1381.1177120022178|
|[1438.14001464843...| 1415.699951171875|1381.1177120022178|
|[1415.69995117187...|1371.7039794921875|1381.1177120022178|
|[1371.70397949218...|1341.1400146484375| 1343.178724487436|
|[1341.14001464843...|1390.8699951171875|1297.3409612065316|
|[1390.86999511718...|1410.1500244140625|1373.8490518417661|
|[1410.15002441406...|1388.0899658203125| 1343.178724487436|
|[1388.08996582031...|1358.9100341796875|1381.1177120022178|
|[1358.91003417968...| 1306.219970703125| 1343.178724487436|
|[1306.21997070312...| 1254.760009765625|1297.3409612065316|
+--------------------+------------------+------------------+
only showing top 10 rows

