# SENG 550 Project
Authors:
- Sydney Kwok (30073206) 
- Liam Conway (30046856)
- Isabella Guimet (30040654)
- Christina Truong (30064426)


Spark is currently setup to run on a local machine. Please change as necessary if you wish to run on a different environment (Colab, Databricks, etc.)

In [None]:
%pip install pyspark
import pyspark
from pyspark.sql import SparkSession

#Create SparkSession
spark = SparkSession.builder.master("local[8]").appName("Stock_Data").getOrCreate()

In [None]:
sc = spark.sparkContext

# Creating Our Dataset 
Use the yfinance library to fetch historical stock data from Yahoo Finance for all stocks in the S&P 500 over the last 12 years.

In [None]:
!pip install yfinance

In [None]:
import pandas as pd

def get_sp500():
    # Get S&P 500 data from Wikipedia
    data = pd.read_html('http://en.wikipedia.org/wiki/List_of_S%26P_500_companies')[0]
    # Take only the selected columns
    data = data[['Symbol', 'Security', 'GICS Sector']]
    # Rename the selected columns
    data = data.rename(columns={'Symbol': 'Ticker', 'Security': 'Company', 'GICS Sector': 'Sector'})
    # Replace '.' with '-' for tickers (required for Yahoo Finance)
    data['Ticker'] = data['Ticker'].str.replace('.', '-', regex=True)
    # Remove commas from company names to prevent CSV misinterpretation
    data['Company'] = data['Company'].str.replace(',', '', regex=True)
    return data

companies = get_sp500()

Download all financial data for the S&P 500 tickers 

In [None]:
import yfinance as yf

finance_data = yf.download(tickers=companies['Ticker'].tolist(), start='2010-01-01', end='2022-11-22', group_by='ticker')

Merge company data and financial data from the last two steps

In [None]:
dataframes = []
for index, company_data in companies.iterrows():
    merged_data = finance_data[company_data['Ticker']].assign(**company_data)
    dataframes.append(merged_data)

final_df = pd.concat(dataframes)

In [None]:
final_df.shape

In [None]:
final_df.head()

In [None]:
final_df.tail()

In [None]:
final_df.info()

In [None]:
final_df.isnull().sum()

The null data consists of stock financial data that did not exist as of our given start date (January 1, 2010), as seen below. For instance, ZTS was listed on February 1, 2013.

In [None]:
print(final_df[final_df.isnull().any(axis=1)])

As these stocks did not have any *public* value as of these dates, we'll fill all NaN values of 0.

In [None]:
final_df = final_df.fillna(0)
final_df.isnull().sum()

In [None]:
final_df.to_csv('raw-data.csv', header=False)

# Load CSV Into PySpark

Read the CSV and return it as an RDD of Strings.

In [None]:
rdd = sc.textFile('raw-data.csv', 7)

In [None]:
rdd.take(3)

# Exploratory Data Analysis
Compute some stats on the historical stock data we've collected for the selected ten companies for the period January 1, 2010 to November 22, 2022.

**1). Calculate the average close price for each ticker over the last 20 years**

1a). Extract fields only relevant to this analysis. That is, we only need field 0 (Date), field 4 (Close) and field 7 (ticker)

In [None]:
def extractFieldsForQ1(stockRDDRecord):
  fieldsList = stockRDDRecord.split(",")
  return (fieldsList[0], fieldsList[4], fieldsList[7])

print(extractFieldsForQ1(rdd.take(1)[0]))

Now, we apply this function "extractFieldsForQ1()" for all rows in our RDD using a map function.

In [None]:
closeTickerRDD = rdd.map(extractFieldsForQ1)
print(closeTickerRDD.take(5))

In [None]:
closeTickerRDDFiltered = closeTickerRDD.filter(lambda x: float(x[1]) > 0)

1b). Calculate the number of close stock prices for each ticker. From above, we can see that the `closeTickerRDD` contains 3 values: ('date', 'close', 'ticker'), so we need to grab the 2nd index which is ticker and count how many rows we have for that ticker. We will save this value for later when we do the average calculation.

In [None]:
stockCountPerTickerRDD = (closeTickerRDDFiltered.map(lambda x : (x[2], 1))
                            .reduceByKey(lambda x,y : x+y)
                            .take(10)
                      )
print(stockCountPerTickerRDD)

# Will help out later when calculating the averages
stockCountPerTickerDict = dict(stockCountPerTickerRDD)

1c). Calculate the sum of all close stock prices for a particular stock for the last 20 years. Reminder that closeTickerRDD contains 3 values: ('date', 'close', 'ticker'). Ticker is `x[2]` and close is `x[1]`.

In [None]:
sumClosePricePerTickerRDD = (closeTickerRDDFiltered.map(lambda x : (x[2], float(x[1])))
                                .reduceByKey(lambda x,y: x+y)
                                .take(10))
print(sumClosePricePerTickerRDD)

1d) Calculate the average stock close price with `stockCountPerTickerRDD` and `avgClosePricePerTickerRDD`. 

Note:

item[1] is the sum of close price per ticker

stockCountPerTickerDict.get(item[0]) is the total number of stocks close prices for that ticker in `item[0]`.

In [None]:
# This will hold the average stock close prices for each ticker
avgStockClosePrices = list()

for item in sumClosePricePerTickerRDD:
  avg = item[1]/stockCountPerTickerDict.get(item[0])
  avgStockClosePrices.append((item[0], avg))

sortedAvgStock = sorted(avgStockClosePrices, key=lambda x: x[1], reverse=True);
print(sortedAvgStock)

**2). Calculate the min "Low" value in this dataset for each ticker**

In [None]:
# Just extract Ticker(7) & Low(3)
def extractFieldsForQ2(stockRDDRecord):
  fieldsList = stockRDDRecord.split(",")
  return (fieldsList[7], float(fieldsList[3]))

print(extractFieldsForQ2(rdd.take(1)[0]))

In [None]:
# Apply extractFieldsForQ2 on all rows in the RDD
low_rdd = rdd.map(extractFieldsForQ2)
print(low_rdd.take(3))

In [None]:
# Calculate the min Low value for each ticker
min_low_by_ticker = low_rdd.filter(lambda x: x[1] > 0).reduceByKey(lambda x,y: min(x,y)).take(10)
sorted(min_low_by_ticker, key=lambda x: x[1])

**3). Calculate the max "High" value in this dataset for each ticker**

In [None]:
# Just extract Ticker(7) & High(2)
def extractFieldsForQ3(stockRDDRecord):
  fieldsList = stockRDDRecord.split(",")
  return (fieldsList[7], float(fieldsList[2]))

print(extractFieldsForQ3(rdd.take(1)[0]))

In [None]:
# Apply extractFieldsForQ3 on all rows in the RDD
high_rdd = rdd.map(extractFieldsForQ3)
print(high_rdd.take(3))

In [None]:
# Calculate the max High value for each ticker
max_high_by_ticker = high_rdd.reduceByKey(lambda x,y: max(x,y)).take(10)
sorted(max_high_by_ticker, key=lambda x: x[1], reverse=True)

**4. Graph the max "High" value in this dataset for each ticker per year**

4a). Extract fields only relevant to this analysis. That is, we only need field 0 (Date), 2 (High), and 7 (Ticker)

In [None]:
def extractFieldsForQ4(stockRDDRecord):
  fieldsList = stockRDDRecord.split(",")
  return (fieldsList[0], fieldsList[2], fieldsList[7]) # Extract Date (0), High (2), Ticker (7)

print(extractFieldsForQ4(rdd.take(1)[0]))

Now, we apply this function "extractFieldsForQ4()" for all rows in our RDD using a map function.

In [None]:
graphHighRDD = rdd.map(extractFieldsForQ4)
print(graphHighRDD.take(3))

4b). Calculate the high price of each year for each ticker. From above, we can see that the `graphHighRDD` contains 3 values: `('date', 'high', 'ticker')`. In order to parse the data properly, we'll need to extract only the year out of the date, group all the data by the ticker, group the highs by year, and then take the max of the highs. An example array element may look like the following:
```python
('AAAA', {2000: 1, 2001: 2})
```

We will save this value for later when drawing the graph.

In [None]:
from collections import defaultdict

# Function to group the highs by year, and place the maximum high into a dict
def merge(x):
    data = list(map(lambda y : (y[1], y[2]), x[1]))
    years = {y[0] for y in data}
    result = defaultdict(int)
    for d in data:
        year, value = int(d[0]), float(d[1])
        if(value > result[year]):
            result[year] = value

    return x[0], dict(result)

# Convert the data into the 
graphHighPerYearRDD = (graphHighRDD.map(lambda x : (x[2], x[0].split('-')[0], x[1]))
                            .groupBy(lambda x : x[0])
                            .map(merge))

print(graphHighPerYearRDD.take(10))

4c) Draw the graph from the `graphHighPerYearRDD` dataset above, using matplotlib.

As some stocks may not have been around for the entire duration, we'll need to determine every unique year contained within all the stocks. If a stock does not contain data for a year, we'll assume the price is 0.

In [None]:
from collections import Counter
from matplotlib import pyplot as plt

graphHighValues = graphHighPerYearRDD.take(10)

# Determine every unique year contained within the graphHighPerYearRDD
x_values = graphHighPerYearRDD.flatMap(lambda x: list(x[1].keys())).distinct().collect()
x_values.sort()

# Display the output for each unique ticker
for row in graphHighValues:
    y_values = []
    for key in x_values:
        y_values.append(row[1][key] if key in row[1] else 0)
    
    plt.plot(x_values, y_values, label=row[0])

# Stylize and display the graph
plt.title("Yearly Highs by Ticker")
plt.xlabel("Year")
plt.ylabel("High")
plt.legend()
plt.show()

**5. What is the most active month for trading volumes?**

In [None]:
def extractFieldsForQ5(stockRDDRecord):
  fieldsList = stockRDDRecord.split(",")
  return (fieldsList[0], float(fieldsList[6]))

print(extractFieldsForQ5(rdd.take(1)[0]))

Now, we apply this function `extractFieldsForQ5()` for all rows in our RDD using a map function.

In [None]:
volumeRDD = rdd.map(extractFieldsForQ5)
print(volumeRDD.take(5))

5b). Calculate the total volume generated by date. This sums up all the individual volumes by each stock, and gives us the volume on each unqiue date.

In [None]:
totalVolumePerDateRDD = volumeRDD.map(lambda x: (x[0], (x[1]))).reduceByKey(lambda a,b: a+b)
print(totalVolumePerDateRDD.take(5))

5c). Calculate the total volume per month, and total number of days read from each month. This will allow us to average out the results later.

In [None]:
totalVolumeByMonthRDD = totalVolumePerDateRDD.map(lambda x: (int(x[0].split('-')[1]), (x[1], 1))).reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]))
print(totalVolumeByMonthRDD.collect())

5d). Given the total volume per month and the total number of days, find the average volume by month.

In [None]:
avgVolumeByMonthRDD = totalVolumeByMonthRDD.map(lambda x: (x[0], (x[1][0] / x[1][1])))
avgVolumeByMonth = avgVolumeByMonthRDD.collect()
print(avgVolumeByMonth)

5e). Print out the average volume per month, with the largest volume printed first.

In [None]:
import calendar

sortedAvgVolumeByMonth = sorted(avgVolumeByMonth, key=lambda x: x[1], reverse=True)
print("--- Average Volume per Month ----")
for month_num, volume in sortedAvgVolumeByMonth:
    print(f"{calendar.month_name[month_num]}: {volume}")

**6. What is the day of the week with the highest returns?**

6a). Extract fields only relevant to this analysis. That is, we only need field 0 (Date), field 1 (Open) and field 6 (Volume)

In [None]:
def extractFieldsForQ6(stockRDDRecord):
  fieldsList = stockRDDRecord.split(",")
  return (fieldsList[0], float(fieldsList[1]), float(fieldsList[4])) # Date (0), Open (1), Close (4)

print(extractFieldsForQ6(rdd.take(1)[0]))

Now, we apply this function `extractFieldsForQ6()` for all rows in our RDD using a map function. We'll also filter out any columns with a non-positive open or close. This is to prevent us from seeing stocks that are not listed yet at the given date (i.e. open and close are 0)

In [None]:
dailyReturnRDD = rdd.map(extractFieldsForQ6).filter(lambda x: x[1] > 0 and x[2] > 0)
print(dailyReturnRDD.take(5))

6b). Convert the open and close for each date to the earnings percentage, and add the count to determine the number of stocks & dates for averaging later.

In [None]:
def convertToEarnings(x):
    return (x[2] - x[1]) / x[1] * 100


earningsRDD = dailyReturnRDD.map(lambda x: (x[0], (convertToEarnings(x), 1))).reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]))
print(earningsRDD.take(5))

6c). Get the total earnings per weekday, and the collective count. The weekday is mapped between 0-4 (where 0 is Monday, 4 is Friday)

In [None]:
import datetime
dayEarningsRDD = earningsRDD.map(lambda x: (datetime.datetime.strptime(x[0], '%Y-%m-%d').weekday(), x[1])).reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]))
print(dayEarningsRDD.take(5))

6d). Determine the average earnings per weekday, using the total earnings and the collective count.

In [None]:
finalEarningsRDD = dayEarningsRDD.map(lambda x: (x[0], x[1][0] / x[1][1]))
finalEarnings = finalEarningsRDD.collect()
print(finalEarnings)

6e). Print out the average earnings per weekday, with the largest earnings printed first.

In [None]:
sortedFinalEarnings = sorted(finalEarnings, key=lambda x: x[1], reverse=True)

print("--- Average Earnings per Weekday ---")
for weekday_num, earnings in sortedFinalEarnings:
    print(f"{calendar.day_name[weekday_num]}: {earnings}%")

# Developing & Testing ML Models
Following "Spark Tutorial 2 - ML", we complete a supervised learning pipeline, using the Stock Data dataset we looked at in our EDA phase of the project. Our goal is to train a linear regression model to predict Close values given the Date, Ticker, Open, High, Low, & Volume.

## Read & Parse The Initial Dataset
We have already previously completed the process of reading the stock data into an RDD, where each element of the RDD is a comma-separated string containing the Date, Open, High, Low, Close, Adj Close, Volume, and Ticker (representing the stock data for a given stock on a given date).

Let's now use the `count` method to see how many data points are in the RDD & the `take` method to preview the first 3 data points in its current raw, comma-separated string format.

In [None]:
num_points = rdd.count()
print(f"There are {num_points} data points in the RDD.")

first_3_points = rdd.take(3)
print(first_3_points)

## RDD Pre-Processing
Prior to training our regression model, we must first complete some pre-processing on the RDD. 

As mentioned previously, the RDD currently contains Date (field 0), Open (1), High (2), Low (3), Close (4), Adj Close (5), Volume (6), and Ticker (7) data, but our goal is to train a linear regression model to predict Close values given the Date, Ticker, Open, High, Low, & Volume. Hence, the Adj Close field (field 5) will not be used in our model, and can now be processed out of our RDD.

While we're here, we will also be reformatting Dates from "YYYY-MM-DD" to "YYYY", "MM", "DD".

Also, we will convert our categorical Ticker data into a numerical format.

In [None]:
tickers = companies['Ticker'].values.tolist()
ticker_classes = {k: v for v, k in enumerate(tickers)}

def category_to_num(category):
  """
  Given a Ticker name/category, return the corresponding numerical value.
  """
  return str(ticker_classes[category])

def process_record(rdd_record):
  """
  Takes a record of the form 'YYYY-MM-DD, Open, High, Low, Close, Adj Close, Volume, Ticker' and returns a record of the form 'Ticker, YYYY, MM, DD, Open, High, Low, Volume, Close'
  """
  date, open, high, low, close, _, volume, ticker, _, _ = rdd_record.split(',')

  year, month, day = date.split('-')

  fields = [category_to_num(ticker), year, month, day, open, high, low, volume, close]
  return ','.join(fields)

processed_rdd = rdd.map(process_record)
print(processed_rdd.take(3))

Now, we can use MLlib `LabeledPoint` objects to label our records.

In [None]:
from pyspark.mllib.regression import LabeledPoint
import numpy as np

# sample data point: '0,2010,01,04,6.8125,6.83050012588501,6.6570000648498535,151998000,6.695000171661377'
# we want the Close field to be the label (the last field), and the rest to be features

def label_record(record):
    """
    Converts a record from a comma-separated string of the form 'Ticker, YYYY, MM, DD, Open, High, Low, Volume, Close' 
    into a `LabeledPoint` where Close is the label and the rest of the fields are all features.
    """
    record_elements = record.split(',')
    return LabeledPoint(record_elements[-1], record_elements[:-1])

labeled_points = processed_rdd.map(label_record)
first_point = labeled_points.take(1)
first_point_features = first_point[0].features 
first_point_label = first_point[0].label
print(first_point_features, first_point_label)

num_features = len(first_point_features)
print(f"There are {num_features} features.")

Now, in order to normalize our values, we're going to scale our features. The result of this is that all of our features will span a similar range.

We will follow the same process as shown in the ML_Linear_Regression tutorial: `(featureValue - meanOfFeatureValues) / standardDeviationOfFeatureValues`. "For a given feature, the mean and standard deviation of its feature values are calculated. Then, the mean is subtracted from each value and the result is then divided by the standard deviation."

In [None]:
def normalize_features(labeled_point):
  """
  Normalize the features of the LabeledPoint object, labeled_point.
  """
  normalized_features = list()
  for i in range(0, len(labeled_point.features)):
    feature = (labeled_point.features[i] - broadcast_mean.value[i]) / broadcast_stdev.value[i]
    normalized_features.insert(i, feature)
  return LabeledPoint(labeled_point.label, normalized_features)


def get_normalized_rdd(non_normalized_rdd): 
    """
    Normalizes the features of the LabeldPoints contained in non_normalized_rdd.
    """
    mean_list = list()
    stdev_list = list()
    num_features = len(non_normalized_rdd.take(1)[0].features)
    for i in range(0, num_features):
        feature_rdd = non_normalized_rdd.map(lambda lp: lp.features[i])
        feature_mean = feature_rdd.mean()
        feature_stdev = feature_rdd.stdev()
        mean_list.insert(i, feature_mean)
        stdev_list.insert(i, feature_stdev)
    global broadcast_mean 
    broadcast_mean = sc.broadcast(mean_list)
    global broadcast_stdev 
    broadcast_stdev = sc.broadcast(stdev_list)
    return non_normalized_rdd.map(normalize_features)

normalized_points = get_normalized_rdd(labeled_points)
print(normalized_points.take(5))

Now, we will split our data into training, validation, and testing sets using the `randomSplit` method.

In [None]:
weights = [.80, .15, .05] # 80/15/5 train/validate/test split
seed = 42

train_data, val_data, test_data = normalized_points.randomSplit(weights, seed)

train_data.cache()
val_data.cache()
test_data.cache()

num_train = train_data.count()
num_val = val_data.count()
num_test = test_data.count()

print(f"The original dataset has {normalized_points.count()} data points.")
print(f"There are {num_train} training data points, {num_val} validation data points, and {num_test} test data points, for a total of {num_train + num_val + num_test} data points.")

## Creating & Evaluating A Baseline Model
Creating a baseline model that we can use to evaluate our machine learning model later. We will look back and see if our model performs better or worse than this baseline model that uses a very simple technique to predict the stock Close values.

### Average Label
This baseline model always makes the same prediction: the average label in the training set. This is a constant prediction value that is completely independent of the given data point. 

For this model, we will compute this value: the average Close price for the training set.

In [None]:
avg_close_price = (train_data.map(lambda s: s.label)).mean()
print(f"The average Close price for the training set is {avg_close_price}.")

Now, we use Root Mean Squared Error (RMSE) to evaluate how well this baseline model performs.

In [None]:
import math
def squared_error(label, prediction):
    """
    Calculates the squared error for a single prediction.
    """
    sqr_error = (label - prediction) * (label - prediction)
    return sqr_error

def calc_rmse(labels_and_preds):
    """
    Calculates the RMSE for an `RDD` of (label, prediction) tuples.
    """
    sqr_sum = labels_and_preds.map(lambda s: squared_error(s[0], s[1])).sum()
    return math.sqrt(sqr_sum / labels_and_preds.count())

labels_and_preds_train = train_data.map(lambda s: (s.label, avg_close_price))
rmse_train_base = calc_rmse(labels_and_preds_train)
print(f"The RMSE of the baseline model on the train set is {rmse_train_base}.")

labels_and_preds_test = test_data.map(lambda s: (s.label, avg_close_price))
rmse_test_base = calc_rmse(labels_and_preds_test)
print(f"The RMSE of the baseline model on the test set is {rmse_test_base}.")

## Training Linear Regression Models Using MLlib
Now, let's train some machine learning models and see if we can get better performance.
### Linear Regression With SGD
We're going to start by using LinearRegressionWithSGD to train a model with L2 regularization and an intercept. 

In [None]:
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.regression import LinearRegressionWithSGD
# Values to use when training the linear regression model
numIters = 500  # iterations
alpha = 1.0  # step - learning rate (rate at which you change your weights in each iteration)
miniBatchFrac = 1.0  # miniBatchFraction
reg = 1e-5  # regParam
regType = 'l2'  # regType
useIntercept = True  # intercept

# train the model
firstModel = LinearRegressionWithSGD.train(train_data, numIters, alpha, miniBatchFrac, initialWeights=None, regParam=reg, regType=regType, intercept=useIntercept)

# weightsLR1 stores the model weights; interceptLR1 stores the model intercept
weightsLR1 = firstModel.weights
interceptLR1 = firstModel.intercept

print(f"Linear Regression Model 1 weights: {weightsLR1}, intercept: {interceptLR1}.")

Now, use the `LinearRegressionModel.predict()` method to make a prediction on a sample point.

In [None]:
sample_point = train_data.take(1)[0]
sample_prediction = firstModel.predict(sample_point.features)
print(sample_prediction)

In [None]:
train_labels_and_preds = train_data.map(lambda lp: (lp.label, firstModel.predict(lp.features)))
rmse_trainLR1 = calc_rmse(train_labels_and_preds)

print(f"Baseline model train RMSE: {rmse_train_base}.")
print(f"Linear Regression model train RMSE: {rmse_trainLR1}.")

Now, let's evaluate RMSE of this model on the validation set.

In [None]:
labels_and_preds = test_data.map(lambda lp: (lp.label, firstModel.predict(lp.features)))
rmse_testLR1 = calc_rmse(labels_and_preds)

print(f"Baseline model test RMSE: {rmse_test_base}.")
print(f"Linear Regression model test RMSE: {rmse_testLR1}.")

While we do see great improvement in performance from our baseline model, there's still quite a bit of improvement to be made. We will try different models and hyperparameters to see if we can improve the performance.

## Random Forest Model

In [None]:
from pyspark.mllib.tree import RandomForest
from pyspark.ml.evaluation import RegressionEvaluator

dtModel = RandomForest.trainRegressor(train_data, categoricalFeaturesInfo={},
                                      numTrees=16, featureSubsetStrategy="auto",
                                      impurity='variance', maxDepth=10, maxBins=128)

train_predictions = dtModel.predict(train_data.map(lambda x: x.features)).collect()
train_labels = train_data.map(lambda x: x.label).collect()
rmseTrainDT = np.sqrt(np.mean((np.array(train_predictions)-np.array(train_labels))**2))

test_predictions = dtModel.predict(test_data.map(lambda x: x.features)).collect()
test_labels = test_data.map(lambda x: x.label).collect()
rmseTestDT = np.sqrt(np.mean((np.array(test_predictions)-np.array(test_labels))**2))

print(f"Baseline model train RMSE: {rmse_train_base}")
print(f"Random Forest Model train RMSE: {rmseTrainDT}.")

print(f"Baseline model test RMSE: {rmse_test_base}")
print(f"Random Forest Model test RMSE: {rmseTestDT}.")

Still, after hyperparameter tuning, our best performing model is the initial Linear Regression Model. We will now run this model on the test data.

## Conclusions

In [None]:
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from matplotlib.colors import ListedColormap, Normalize
from matplotlib.cm import get_cmap

cmap = get_cmap('YlOrRd')

def preparePlot(xticks, yticks, figsize=(10.5, 10.5), hideLabels=False, gridColor='#999999',
                gridWidth=1.0):
    """Template for generating the plot layout."""
    plt.close()
    fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
    ax.axes.tick_params(labelcolor='#999999', labelsize='10')
    for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
        axis.set_ticks_position('none')
        axis.set_ticks(ticks)
        axis.label.set_color('#999999')
        if hideLabels: axis.set_ticklabels([])
    plt.grid(color=gridColor, linewidth=gridWidth, linestyle='-')
    map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])
    return fig, ax

First, we will display a color-coded scatter plot that compares the actual stock closing prices to those predicted by our best model (Linear Regression with SGD). Lighter colour predictions represent lower errors while the red ones represent large errors. While there are a few outlier values, the graph mostly appears to show the a low error value.

In [None]:
predictions = np.asarray(test_data
                         .map(lambda lp: firstModel.predict(lp.features))
                         .collect())

actual = np.asarray(test_data
                    .map(lambda lp: lp.label).collect())

error = np.asarray(test_data
                   .map(lambda lp: (lp.label, firstModel.predict(lp.features))).map(lambda lp: pow(lp[0] - lp[1], 2))
                   .collect())

norm = Normalize()
clrs = cmap(np.asarray(norm(error)))[:,0:3]

fig, ax = preparePlot(np.arange(0, 5000, 500), np.arange(0, 5000, 500))
ax.set_xlim(0, 5000), ax.set_ylim(20, 5000)
plt.scatter(predictions, actual, s=14**2, c=clrs, edgecolors='#888888', alpha=0.75, linewidths=.5)
ax.set_xlabel('Predicted'), ax.set_ylabel(r'Actual')
pass

To contrast the previous graph, we will also plot our worst performing model (Random Forest Model). While the graph appears to still maintain lower errors at lower values (such as <500), it significantly deviates from the actual values at higher levels.

In [None]:
predictions = np.asarray(dtModel
                         .predict(test_data.map(lambda x: x.features))
                         .collect())

actual = np.asarray(test_data
                    .map(lambda lp: lp.label).collect())

error = np.asarray(pow(np.array(actual) - np.array(predictions), 2))

norm = Normalize()
clrs = cmap(np.asarray(norm(error)))[:,0:3]

fig, ax = preparePlot(np.arange(0, 5000, 500), np.arange(0, 5000, 500))
ax.set_xlim(0, 5000), ax.set_ylim(20, 5000)
plt.scatter(predictions, actual, s=14**2, c=clrs, edgecolors='#888888', alpha=0.75, linewidths=.5)
ax.set_xlabel('Predicted'), ax.set_ylabel(r'Actual')
pass