In [1]:
#importing necessary libraries
from pyspark.sql import SQLContext, Window
from pyspark.sql.functions import *
import pyspark.sql.functions as f
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import abs, sqrt

In [2]:
#reading in the data
#creating an SQL Context
sqlContext = SQLContext(sc)
##Creating a dataframe by querying data from the csv file loaded in the Cluster
##Querying Apple Close Price data from 2016-2017 by applying the year function on the date column and specifying it to be between 2016 & 2017
df = sqlContext.sql("SELECT date, close FROM aapl__2__1028f_csv WHERE YEAR(date) BETWEEN 2016 AND 2017")

In [3]:
##displaying the dataframe - two columns, data and close price from 2016-2017
df.show()

In [4]:
##QUESTION 1
###Using the window function to sort the date column in ascending order using the Order by function
partitionwindow = Window.orderBy("date")
## Creating time limited features - lag1 & lag2, where lag1 is the close price pushed by one day using the record difference property and lag2 by 2 days
feature1 =lag("close",1).over(partitionwindow)
feature2 =lag("close",2).over(partitionwindow)
##Generating the columns lag1 and lag2 within our dataframe using the withColumn function 
df = df.withColumn("lag1", feature1)
df = df.withColumn("lag2", feature2)
df = df.na.drop()
##Displaying the dataframe
df.show(20)

In [5]:
##PREPARING THE DATA TO PERFORM MACHINE LEARNING
##Selecting the relavent columns - where lag1 & lag2 are the features and the close price is the label/observed value we are trying to predict
df = df.select('lag1','lag2','close')
##Using the VectorAssembler function to create an array of vectors containing the values in the lag1 and lag2 columns - FEATURES 
vectorAssembler = VectorAssembler(inputCols = ['lag1','lag2'], outputCol = 'features')
##Transforming the Dataframe containing the lag1, lag2 and close price columns by applying the Vector Assembler function on the input columns
df1 = vectorAssembler.transform(df)
##Selecting the relavent columns from our transformed dataframe and st
df2 = df1.select('features','close')
df2.show()

In [6]:
###QUESTION 2
##Doing a random Train Test split, where 70% of the dataframe is randomly selected for training and 30% of the data is randomly selected for testing
splits = df2.randomSplit([0.7, 0.3])
##Storing the 70% data for training in train_df
train_df = splits[0]
##Storing the 30% data for testing in test_df
test_df = splits[1]
##Initiating the LinearRegression model from the SparkMLLib and specifying the features and the observed value/labels
lr = LinearRegression(featuresCol = 'features', labelCol='close')
##Fitting the Linear Regression Model on the training data and storing the model in lr_model
lr_model = lr.fit(train_df)
##Printing the coefficients of the slope of our Linear Regression Model and the Intercept
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))
##Summarizing our model over the training set and printing out two important metrics - R2 and the RMSE
trainingSummary = lr_model.summary
##RMSE is the square root of the variance of the residuals. It indicates the absolute fit of the model to the data–how close the observed data points are to the model's predicted values.
print("Root Mean Squared Error (RMSE) on train data = %g" % trainingSummary.rootMeanSquaredError)
##R2 is "(total variance explained by model) / total variance.” So if it is 100%, the two variables are perfectly correlated, i.e., with no variance at all. 
print("R Squared (R2) on train data = %g" % trainingSummary.r2)
lr_predictions = lr_model.transform(test_df)

In [7]:
###QUESTION 3
##Getting the predicted Apple Stock price for 2016-2017 in our test date and comparing it to the Observed Apple Stock price for 2016-2017 in the test data
lr_predictions.select("features", "close", "prediction").show()

In [8]:
###QUESTION 4
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="close",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))
##Evaluating our model on the test data and storing the results in test_result
test_result = lr_model.evaluate(test_df)
##Using the evalution metric RMSE on our Model to get the RMSE of the test data
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

In [9]:
###BONUS QUESTION
df2.describe().show()
df2 = df2.withColumn('index', f.monotonically_increasing_id())
new_train = df2.sort('index').limit(350)
new_test = df2.sort('index', ascending=False).limit(151)
new_train.count()
new_test.count()
new_train.select(f.min('index').alias('min'), f.max('index').alias('max')).show()
new_test.select(f.min('index').alias('min'), f.max('index').alias('max')).show()

In [10]:
##Initiating the LinearRegression model from the SparkMLLib and specifying the features and the observed value/labels
lr1 = LinearRegression(featuresCol = 'features', labelCol='close')
##Fitting the Linear Regression Model on the training data and storing the model in lr_model
lr_model1 = lr1.fit(new_train)
##Printing the coefficients of the slope of our Linear Regression Model and the Intercept
print("Coefficients: " + str(lr_model1.coefficients))
print("Intercept: " + str(lr_model1.intercept))
##Summarizing our model over the training set and printing out two important metrics - R2 and the RMSE
trainingSummary1 = lr_model1.summary
##RMSE is 
print("Root Mean Squared Error (RMSE) on train data = %g" % trainingSummary1.rootMeanSquaredError)
##R2 is "(total variance explained by model) / total variance.” So if it is 100%, the two variables are perfectly correlated, i.e., with no variance at all. 
print("R Squared (R2) on train data = %g" % trainingSummary1.r2)
lr_predictions1 = lr_model1.transform(new_test)
lr_predictions1.select("features", "close", "prediction").show()
lr_evaluator1 = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="close",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator1.evaluate(lr_predictions1))
##Evaluating our model on the test data and storing the results in test_result
test_result1 = lr_model1.evaluate(new_test)
##Using the evalution metric RMSE on our Model to get the RMSE of the test data
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result1.rootMeanSquaredError)