In [14]:
import pyspark
import os
import matplotlib.pyplot as plt

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import Imputer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.window import Window

In [6]:
spark = SparkSession.builder.appName("Stock Price Prediction").getOrCreate()

In [7]:
companies = ['AAPL', 'MSFT', 'AMZN', 'GOOGL', 'META', 'TSLA', 'NVDA']

In [8]:
def data_preprocessing(company):
    df = spark.read.csv(f"{company}.csv", header=True, inferSchema=True)
    w = Window.partitionBy().orderBy("Date")
    df = df.withColumn("lag_close_1", lag(df.Close).over(w))
    df = df.withColumn("daily_return", (df.Close - df.Open) / df.Open)
    df = df.withColumn("intra_day_volatility", df.High - df.Low)
    df = df.withColumn("daily_volatility", df.Close - df.lag_close_1)
    df = df.withColumn("7_day_avg_close", avg(df.Close).over(w.rowsBetween(-6, 0)))
    df = df.dropna()
    feature_columns = ["Open", "High", "Low", "Close", "Volume", "daily_return",
                       "intra_day_volatility", "daily_volatility", "7_day_avg_close"]
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    df_assembled = assembler.transform(df)
    scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
    scaler_model = scaler.fit(df_assembled)
    scaled_data = scaler_model.transform(df_assembled)
    final_data = scaled_data.select("scaledFeatures", col("Adj Close").alias("label"))
    return final_data


In [9]:
def model_training(model, train_data, test_data):
    model = model.fit(train_data)
    predictions = model.transform(test_data)
    evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
    rmse = evaluator_rmse.evaluate(predictions)
    return rmse


In [10]:
results = {}

In [11]:
for company in companies: 
    df = data_preprocessing(company)
    train_data = df.orderBy("date").limit(int(df.count() * 0.8))
    test_data = df.subtract(train_data)
    linear_reg = LinearRegression(featuresCol="scaledFeatures", labelCol="label")
    decision_tree = DecisionTreeRegressor(featuresCol="scaledFeatures", labelCol="label")
    random_forest = RandomForestRegressor(featuresCol="scaledFeatures", labelCol="label")
    gbt = GBTRegressor(featuresCol="scaledFeatures", labelCol="label")
    models = {"Linear Regression": linear_reg, "Decision Tree": decision_tree, 
              "Random Forest": random_forest, "GBT": gbt}
    results[company] = {}
    for model_name, model in models.items():
        rmse = model_training(model, train_data, test_data)
        results[company][model_name] = {"RMSE": rmse}
        print(f"{company} : {model_name} - RMSE score : {rmse}")


AAPL : Linear Regression - RMSE score : 12.129645972906863
AAPL : Decision Tree - RMSE score : 86.63013376362115
AAPL : Random Forest - RMSE score : 86.78508379232147
AAPL : GBT - RMSE score : 87.6408346318461
MSFT : Linear Regression - RMSE score : 46.9277706908987
MSFT : Decision Tree - RMSE score : 184.85613459278397
MSFT : Random Forest - RMSE score : 184.971228429868
MSFT : GBT - RMSE score : 185.47273803384186
AMZN : Linear Regression - RMSE score : 4.395605246964923e-11
AMZN : Decision Tree - RMSE score : 56.08705725465829
AMZN : Random Forest - RMSE score : 54.74716415261936
AMZN : GBT - RMSE score : 53.47767382072862
GOOGL : Linear Regression - RMSE score : 3.1596457775756857e-06
GOOGL : Decision Tree - RMSE score : 53.49376535463721
GOOGL : Random Forest - RMSE score : 53.62812434151932
GOOGL : GBT - RMSE score : 53.12322010492594
META : Linear Regression - RMSE score : 0.17601801000650485
META : Decision Tree - RMSE score : 46.66482695450893
META : Random Forest - RMSE score

In [17]:
def select_model(results):
    best_model = {}
    for company, metrics in results.items():
        sorted_metrics = sorted(metrics.items(),key=lambda x:x[1]['RMSE'])
        model_name, best_metric = sorted_metrics[0]
        best_rmse = best_metric['RMSE']
        if model_name == "Linear Regression" and best_rmse <= 11e-1:
            model_name, best_metric = sorted_metrics[1]
        best_model[company] = model_name
    return best_model

In [19]:
best_models = select_model(results)
for company, model in best_models.items():
    print(f"{company} : Best Model - {model}")

AAPL : Best Model - Linear Regression
MSFT : Best Model - Linear Regression
AMZN : Best Model - GBT
GOOGL : Best Model - GBT
META : Best Model - GBT
TSLA : Best Model - GBT
NVDA : Best Model - GBT
