In [0]:
#importing libararies
import pandas as pd
import matplotlib.pyplot as plt
import boto3
import os

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, DoubleType, StringType, IntegerType
from pyspark.sql.functions import * 
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler 
from pyspark.ml import Pipeline 
from pyspark.ml.evaluation import RegressionEvaluator
# Import the logistic regression model 
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
# Import the evaluation module 
from pyspark.ml.evaluation import * 
# Import the model tuning module 
from pyspark.ml.tuning import * 
import numpy as np


from pyspark.sql.functions import lead
from pyspark.sql.window import Window


In [0]:
#Getting all the file name using boto3 

# Set up AWS credentials
aws_access_key_id = {"Your acess key"}
aws_secret_access_key = {"Your secret key"}
session = boto3.Session(
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key
)
#adding buxket name and folder
bucket_name = 'stockprediction-wg'
folder_prefix = 'landing/'

s3 = session.client('s3')
# List objects within the landing "folder"
objects = s3.list_objects(Bucket=bucket_name, Prefix=folder_prefix)

#creating training and testing dataset
csv = []
training = []
testing = []

for obj in objects.get('Contents', []):
    key = obj['Key']
    if key.endswith(".csv"):
        csv.append(key)

print(csv)


In [0]:

# To work with Amazon S3 storage, set the following variables using your AWS Access Key and Secret Key
# Set the Region to where your files are stored in S3.
access_key = {"Your acess key"}
secret_key = {"Your secret key"}
# Set the environment variables so boto3 can pick them up later
os.environ['AWS_ACCESS_KEY_ID'] = access_key
os.environ['AWS_SECRET_ACCESS_KEY'] = secret_key
encoded_secret_key = secret_key.replace("/", "%2F")
aws_region = "us-east-2"
# Update the Spark options to work with our AWS Credentials
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3." + aws_region +".amazonaws.com")

#Creating list for datat frame to be combine
sdf_list=[]

# for file_path in csv:
file_path1 = "s3a://stockprediction-wg/landing/"
sdf = spark.read.csv(file_path1, header=True, inferSchema=True)

    


In [0]:
display(sdf)

In [0]:
type(sdf)
sdf.describe()

In [0]:
#Clean data phase
#Removing all duplicate dates
NDsdf = sdf.dropDuplicates(["Dates","Symbol"])

# remove all null value
Nsdf = NDsdf.dropna()

In [0]:
display(Nsdf)

In [0]:
#Apply schema to column
Nsdf=Nsdf.withColumn("Dates", col("Dates").cast(DateType()))
Nsdf=Nsdf.withColumn("Open", col("Open").cast("double"))
Nsdf=Nsdf=Nsdf.withColumn("High", col("High").cast("double"))
Nsdf=Nsdf.withColumn("Low", col("Low").cast("double"))
Nsdf=Nsdf.withColumn("Close", col("Close").cast("double"))
Nsdf=Nsdf.withColumn("Volume", col("Volume").cast("double"))
Nsdf=Nsdf.withColumn("MSFT", col("MSFT").cast("integer"))
Nsdf=Nsdf.withColumn("AAPL", col("AAPL").cast("integer"))
Nsdf=Nsdf.withColumn("GOOG", col("GOOG").cast("integer"))
Nsdf=Nsdf.withColumn("META", col("META").cast("integer"))
Nsdf=Nsdf.withColumn("IBM", col("IBM").cast("integer"))
Nsdf=Nsdf.withColumn("AMZN", col("AMZN").cast("integer"))
Nsdf=Nsdf.withColumn("RSI", col("RSI").cast("double"))
Nsdf=Nsdf.withColumn("EMA", col("EMA").cast("double"))
Nsdf=Nsdf.withColumn("OBV", col("OBV").cast("double"))

In [0]:
sgdata_sdf = spark.read.csv("s3a://stockprediction-wg/landing/StormGuardData.csv", header=True, inferSchema=True)
display(sgdata_sdf)

In [0]:
#rename date column in Nsdf so it can be merge
Nsdf = Nsdf.withColumnRenamed("Dates", "Date")
joined_df = Nsdf.join(sgdata_sdf, on="Date", how="inner")
display(joined_df)

In [0]:
#Downloading my spark data frame as parquet to my S3 bucket raw folder
# joined_df.write.parquet("s3a://stockprediction-wg/raw/Stock_Raw_Data.parquet")

In [0]:
# feature engineering
featureAssembler = VectorAssembler(inputCols = ["Open","High","Low","SG-Armor","SwanGuard","Mkt-Trend","Momentum","Sentiment"],outputCol= "features")# sgr armor-sentiment
output = featureAssembler.transform(joined_df)
# output.select("features").show(5, truncate = False)
display(output)

In [0]:
# finanlized_data = output.select("Date","features","Close").sort("Date",ascending = True)
# display(finanlized_data)

In [0]:

# predicting next day close price
# Partition the data by stock symbol and then, within each partition, order by the Date column.
windowPartition = Window.partitionBy("Symbol").orderBy("Date")

# Go through each partition and grab the next record's open price
# Assign it to a new column called NextDaysOpenPrice
df_nextday_close = output.withColumn("NextDaysClosePrice", lead("Close", 1).over(windowPartition))


In [0]:
#split data for test and validation
trainingData, testData = output.randomSplit([0.7, 0.3], seed=42)


#Create linear regression
regression = LinearRegression(featuresCol='features',labelCol='Close')# we want to precict next day close price, instead of close as label col use nextDayClosePrice
# look up chapter 3 and 4 in machine learning in data camp 

# Fit the model to the training data 
model = regression.fit(trainingData)

# Show model coefficients and intercept 
print("Coefficients: ", model.coefficients) 
print("Intercept: ", model.intercept) 

# Test the model on the testData 
test_results = model.transform(testData)


In [0]:
# Show the test results 
test_results.select('Date','Open','High','Low','Close','Adj Close',"Symbol",'prediction').show(truncate=False)

In [0]:
#Download Model
# test_results.write.parquet("s3a://stockprediction-wg/models/Stock_Prediction_Model.parquet")


In [0]:
# Evaluating accuracy of model using RMSE
RegressionEvaluator(labelCol="Close").evaluate(test_results)

In [0]:
# predicting next day close price
# Partition the data by stock symbol and then, within each partition, order by the Date column.
windowPartition = Window.partitionBy("Symbol").orderBy("Date")

# Go through each partition and grab the next record's open price
# Assign it to a new column called NextDaysOpenPrice
df_nextday_close = output.withColumn("NextDaysClosePrice", lead("Close", 1).over(windowPartition))

In [0]:
display (df_nextday_close)

Data visualization

In [0]:
#Creating chart for closing and predicted closing 
summary_sdf = df_nextday_close.where(col("Symbol") == "MSFT").select("Date","Close","NextDaysClosePrice").limit(30)
# summary_sdf=summary_sdf.dropna()
df = summary_sdf.toPandas()
df
# Plotting the line chart
plt.figure(figsize=(12, 10))
plt.plot(df["Date"], df['Close'], label='Closing Price', marker='|')
plt.plot(df["Date"], df['NextDaysClosePrice'], label='Predicted Closing Price', marker='o')

# Adding labels and title
plt.xlabel("Date")
plt.ylabel("Price")
plt.title('MSFT Closing actual price vs predicted closing (30)')
plt.legend()

# Show the plot
plt.show()



In [0]:
#interesting chart 1 how does the price perform 1 year after covid 
summary_sdf = df_nextday_close \
    .where((col("Symbol") == "MSFT") & (col("Date") >= "2019-12-31") & (col("Date") <= "2021-12-31")) \
    .select("Date", "Close", "NextDaysClosePrice") 
# summary_sdf=summary_sdf.dropna()
df = summary_sdf.toPandas()
df
# Plotting the line chart
plt.figure(figsize=(12, 10))
plt.plot(df["Date"], df['Close'], label='Closing Price', marker='|')

# Adding labels and title
plt.xlabel("Date")
plt.ylabel("Price")
plt.title('How MSFT stock perfom from the begining of Covid')
plt.legend()

# Show the plot
plt.show()

In [0]:
df

In [0]:
#interesting chart 1 how does the price perform 1 year after covid 
summary_sdf = df_nextday_close \
    .where((col("Symbol") == "AAPL") & (col("Date") >= "2019-12-31") & (col("Date") <= "2021-12-31")) \
    .select("Date", "Close", "NextDaysClosePrice") 
# summary_sdf=summary_sdf.dropna()
df = summary_sdf.toPandas()
df
# Plotting the line chart
plt.figure(figsize=(12, 10))
plt.plot(df["Date"], df['Close'], label='Closing Price', marker='|')

# Adding labels and title
plt.xlabel("Date")
plt.ylabel("Price")
plt.title('How MSFT stock perfom from the begining of Covid')
plt.legend()

# Show the plot
plt.show()

In [0]:
#interesting chart 1 how does the price perform 1 year after covid 
summary_sdf = df_nextday_close \
    .where((col("Symbol") == "GOOG") & (col("Date") >= "2019-12-31") & (col("Date") <= "2021-12-31")) \
    .select("Date", "Close", "NextDaysClosePrice") 
# summary_sdf=summary_sdf.dropna()
df = summary_sdf.toPandas()
df
# Plotting the line chart
plt.figure(figsize=(12, 10))
plt.plot(df["Date"], df['Close'], label='Closing Price', marker='|')

# Adding labels and title
plt.xlabel("Date")
plt.ylabel("Price")
plt.title('How Google stock perfom from the begining of Covid')
plt.legend()

# Show the plot
plt.show()

In [0]:
#interesting chart 1 how does the price perform 1 year after covid 
summary_sdf = df_nextday_close \
    .where((col("Symbol") == "TSLA") & (col("Date") >= "2019-12-31") & (col("Date") <= "2021-12-31")) \
    .select("Date", "Close", "NextDaysClosePrice") 
# summary_sdf=summary_sdf.dropna()
df = summary_sdf.toPandas()
df
# Plotting the line chart
plt.figure(figsize=(12, 10))
plt.plot(df["Date"], df['Close'], label='Closing Price', marker='|')

# Adding labels and title
plt.xlabel("Date")
plt.ylabel("Price")
plt.title('How Google stock perfom from the begining of Covid')
plt.legend()

# Show the plot
plt.show()

In [0]:

group_Data = df_nextday_close.where( (col("Date") >= "2019-12-31") & (col("Date") <= "2023-05-11"))
df=group_Data.toPandas()

# Group the data by StockSymbol and sum the volumes for each group
grouped_data = df.groupby('Symbol')['Volume'].sum()

# Create a stacked bar chart
fig, ax = plt.subplots()
grouped_data.plot(kind='bar', stacked=True, ax=ax)

# Customize the chart
ax.set_title('Volume Stack Bar Chart by Stock Symbol')
ax.set_xlabel('Stock Symbol')
ax.set_ylabel('Volume (in billion)')

# Show the plot
plt.show()

In [0]:
grouped_data
