# Load the data

In [1]:
import boto3
import s3fs
import pandas as pd

In [2]:
client = boto3.client('s3')

In [3]:
line = list()
with open('awskeys.txt' , 'r') as file:
    for lines in file:
        line.append(lines)

In [4]:
from boto.s3.connection import S3Connection
conn = S3Connection(line[0] ,line[1], host='s3.ap-south-1.amazonaws.com')

In [5]:
path = 's3n://stockpricebucket/Google_Stock_Price_Train.csv'

In [9]:
dataframe = pd.read_csv(path)

In [10]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession

In [11]:
sc = SparkContext()
sparkSession = SparkSession(sc)
stock_price_history = sparkSession.createDataFrame(dataframe)



In [12]:
stock_price_history.take(5)

[Row(Date='05/12/2020', Close/Last=1375.74, Volume=1390600, Open=1407.12, High=1415.0, Low=1374.77),
 Row(Date='05/11/2020', Close/Last=1403.26, Volume=1412116, Open=1378.28, High=1416.53, Low=1377.152),
 Row(Date='05/08/2020', Close/Last=1388.37, Volume=1388068, Open=1383.13, High=1398.76, Low=1375.48),
 Row(Date='05/07/2020', Close/Last=1372.56, Volume=1399759, Open=1365.94, High=1377.6, Low=1355.27),
 Row(Date='05/06/2020', Close/Last=1347.3, Volume=1215423, Open=1361.69, High=1371.1199, Low=1347.29)]

In [13]:
stock_price_history.head()

Row(Date='05/12/2020', Close/Last=1375.74, Volume=1390600, Open=1407.12, High=1415.0, Low=1374.77)

# Data exploration

In [14]:
stock_price_history.cache()

DataFrame[Date: string, Close/Last: double, Volume: bigint, Open: double, High: double, Low: double]

In [16]:
stock_price_history.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Close/Last: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)



In [17]:
stock_price_history_2 = stock_price_history.withColumnRenamed("Close/Last","Close")

In [18]:
stock_price_history_filtered_datatype = stock_price_history_2.selectExpr("cast(Volume as double) Volume")
stock_price_history_filtered_datatype.printSchema()
stock_price_history_filtered_datatype.show()


root
 |-- Date: string (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)

+----------+-------+---------+-------+---------+--------+
|      Date|  Close|   Volume|   Open|     High|     Low|
+----------+-------+---------+-------+---------+--------+
|05/12/2020|1375.74|1390600.0|1407.12|   1415.0| 1374.77|
|05/11/2020|1403.26|1412116.0|1378.28|  1416.53|1377.152|
|05/08/2020|1388.37|1388068.0|1383.13|  1398.76| 1375.48|
|05/07/2020|1372.56|1399759.0|1365.94|   1377.6| 1355.27|
|05/06/2020| 1347.3|1215423.0|1361.69|1371.1199| 1347.29|
|05/05/2020|1351.11|1651533.0|1337.92|  1373.94| 1337.46|
|05/04/2020| 1326.8|1504017.0|1308.23|  1327.66|  1299.0|
|05/01/2020|1320.61|2072639.0| 1328.5|1352.0695|  1311.0|
|04/30/2020|1348.66|2668906.0|1324.88|  1352.82| 1322.49|
|04/29/2020|1341.48|3793624.0|1341.46|  1359.99| 1325.34|
|04/28/2020|1233.67|29

In [19]:
stock_price_history_filtered_datatype.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Date,1543,,,01/02/2015,12/31/2019
Close,1543,892.9923972132204,265.69879489557223,492.55,1526.69
Volume,1543,1749821.0311082308,878745.042713821,7932.0,1.11535E7
Open,1543,892.712746597537,265.1248567402656,494.65,1525.07
High,1543,900.7310641607256,268.19693888925013,495.976,1532.1063
Low,1543,884.5988007777053,262.7931625664658,487.56,1521.4


# Prepare data for Machine Learning

In [20]:
from pyspark.ml.feature import VectorAssembler

In [21]:
vectorAssembler = VectorAssembler(inputCols = ['Open','High','Low'], outputCol = 'features')

In [22]:
stock_price_history_df = vectorAssembler.transform(stock_price_history_filtered_datatype)

In [23]:
stock_price_history_df = stock_price_history_df.select(['features','Close'])

In [24]:
stock_price_history_df.show()

+--------------------+-------+
|            features|  Close|
+--------------------+-------+
|[1407.12,1415.0,1...|1375.74|
|[1378.28,1416.53,...|1403.26|
|[1383.13,1398.76,...|1388.37|
|[1365.94,1377.6,1...|1372.56|
|[1361.69,1371.119...| 1347.3|
|[1337.92,1373.94,...|1351.11|
|[1308.23,1327.66,...| 1326.8|
|[1328.5,1352.0695...|1320.61|
|[1324.88,1352.82,...|1348.66|
|[1341.46,1359.99,...|1341.48|
|[1287.93,1288.05,...|1233.67|
|[1296.0,1296.15,1...|1275.88|
|[1261.17,1280.4,1...|1279.31|
|[1271.55,1293.31,...|1276.31|
|[1245.54,1285.613...|1263.21|
|[1247.0,1254.27,1...|1216.34|
|[1271.0,1281.6,12...|1266.61|
|[1284.85,1294.43,...|1283.25|
|[1274.1,1279.0,12...|1263.47|
|[1245.61,1280.46,...|1262.47|
+--------------------+-------+
only showing top 20 rows



# splits the dataset

In [25]:
splits = stock_price_history_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

# Model training

In [26]:
from pyspark.ml.regression import LinearRegression

In [27]:
LinearRegressor = LinearRegression(featuresCol = 'features', labelCol='Close', maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [28]:
LinearRegression_model = LinearRegressor.fit(train_df)

# Model Testing

In [29]:
model_predictions = LinearRegression_model.transform(test_df)
model_predictions.select("prediction","Close","features").show(5)

+-----------------+------+--------------------+
|       prediction| Close|            features|
+-----------------+------+--------------------+
|842.9454962880578|843.19|[842.88,843.88,84...|
|857.0873498191976|862.76|[851.2,863.45,849...|
| 909.009341724517|912.57|[901.94,915.68,90...|
|907.7381568790252|906.69|[904.12,914.9444,...|
|914.5006438243346|921.29|[905.1,923.33,905.0]|
+-----------------+------+--------------------+
only showing top 5 rows



In [30]:
print("Coefficients: " + str(LinearRegression_model.coefficients))
print("Intercept: " + str(LinearRegression_model.intercept))

Coefficients: [0.0,0.4729970038552547,0.5275704282605069]
Intercept: 0.3170826789033278


In [38]:
trainingSummary = LinearRegression_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 6.617300
r2: 0.999393


# Save Model

In [32]:
import sys

In [33]:
LinearRegression_model.save(sys.argv[1])