In [1]:
# PySpark and others
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, stddev, mean, lit, lead, min as spark_min, max as spark_max
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator
from pyspark.sql import Window
from sklearn.metrics import confusion_matrix

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import os

In [2]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Stock Data Analysis") \
    .getOrCreate()

In [3]:
all_df = spark.read.csv("nasdaq100.csv", header=True, inferSchema=True)

In [4]:
all_df.show(5)

+------+----------+-------+-------+-------+-------+
|ticker|      date|   open|   high|    low|  close|
+------+----------+-------+-------+-------+-------+
|  CSCO|2011-01-03|14.5212| 14.644|14.4741|14.5516|
|  CSCO|2011-01-04|14.6116|14.6282|14.4868|14.5722|
|  CSCO|2011-01-05|14.6116|14.8089|14.5782| 14.751|
|  CSCO|2011-01-06|14.8423|14.8806|14.7422|14.8737|
|  CSCO|2011-01-07|14.8089| 14.913|14.7932|14.8904|
+------+----------+-------+-------+-------+-------+
only showing top 5 rows



In [5]:
# Check distinct tickers
all_df.select("ticker").distinct().show()
all_df.groupBy("ticker").count().show()

+------+
|ticker|
+------+
|  CSCO|
|  MCHP|
|   ADI|
|  GILD|
|  MNST|
|  INTC|
|   MDB|
|  INTU|
|  CCEP|
|  VRTX|
|   PDD|
|  GEHC|
|  COST|
|  ISRG|
|  ABNB|
|   WBD|
|  MSTR|
|   GFS|
|   KDP|
|  WDAY|
+------+
only showing top 20 rows

+------+-----+
|ticker|count|
+------+-----+
|  CSCO| 3570|
|  MCHP| 3570|
|   ADI| 3570|
|  GILD| 3570|
|  MNST| 3570|
|  INTC| 3570|
|   MDB| 1859|
|  INTU| 3570|
|  CCEP| 3570|
|  VRTX| 3570|
|   PDD| 1667|
|  GEHC|  561|
|  COST| 3570|
|  ISRG| 3570|
|  ABNB| 1068|
|   WBD| 3570|
|  MSTR| 3570|
|   GFS|  846|
|   KDP| 3570|
|  WDAY| 3121|
+------+-----+
only showing top 20 rows



In [6]:
# Check for missing values in the dataset

from pyspark.sql.functions import sum

# Check missing values for each column in the combined dataset
all_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in all_df.columns]).show()

+------+----+----+----+---+-----+
|ticker|date|open|high|low|close|
+------+----+----+----+---+-----+
|     0|   0|   0|   0|  0|    0|
+------+----+----+----+---+-----+



In [7]:
# Training set (before 2017)
train_period = all_df.filter((col("date") >= "2011-01-01") & (col("date") <= "2016-12-31"))

# Testing Periods Filtering
stable_period = all_df.filter((col("date") >= "2017-01-01") & (col("date") <= "2017-12-31"))
trade_war_period = all_df.filter((col("date") >= "2018-01-01") & (col("date") <= "2019-12-31"))
covid_period = all_df.filter((col("date") >= "2020-02-01") & (col("date") <= "2020-08-31"))
ukraine_period = all_df.filter((col("date") >= "2022-02-01") & (col("date") <= "2022-12-31"))

In [8]:
# List missing values in the datasets

# For stable period
print("Missing values in stable period:")
stable_period.select([sum(col(c).isNull().cast("int")).alias(c) for c in stable_period.columns]).show()

# For US-China Trade War period
print("Missing values in US-China Trade War testing period:")
trade_war_period.select([sum(col(c).isNull().cast("int")).alias(c) for c in trade_war_period.columns]).show()

# For COVID-19 period
print("Missing values in COVID-19 testing period:")
covid_period.select([sum(col(c).isNull().cast("int")).alias(c) for c in covid_period.columns]).show()

# For Russia-Ukraine Conflict period
print("Missing values in Russia-Ukraine Conflict testing period:")
ukraine_period.select([sum(col(c).isNull().cast("int")).alias(c) for c in ukraine_period.columns]).show()

# For train period
print("Missing values in stable period:")
train_period.select([sum(col(c).isNull().cast("int")).alias(c) for c in train_period.columns]).show()


Missing values in stable period:
+------+----+----+----+---+-----+
|ticker|date|open|high|low|close|
+------+----+----+----+---+-----+
|     0|   0|   0|   0|  0|    0|
+------+----+----+----+---+-----+

Missing values in US-China Trade War testing period:
+------+----+----+----+---+-----+
|ticker|date|open|high|low|close|
+------+----+----+----+---+-----+
|     0|   0|   0|   0|  0|    0|
+------+----+----+----+---+-----+

Missing values in COVID-19 testing period:
+------+----+----+----+---+-----+
|ticker|date|open|high|low|close|
+------+----+----+----+---+-----+
|     0|   0|   0|   0|  0|    0|
+------+----+----+----+---+-----+

Missing values in Russia-Ukraine Conflict testing period:
+------+----+----+----+---+-----+
|ticker|date|open|high|low|close|
+------+----+----+----+---+-----+
|     0|   0|   0|   0|  0|    0|
+------+----+----+----+---+-----+

Missing values in stable period:
+------+----+----+----+---+-----+
|ticker|date|open|high|low|close|
+------+----+----+----+---+-

In [9]:
# Check the number of rows in each dataset

pandas_df = train_period.toPandas()
print(f"Training dataset shape: {pandas_df.shape}")

pandas_df = stable_period.toPandas()
print(f"Satbel period dataset shape: {pandas_df.shape}")

pandas_df = trade_war_period.toPandas()
print(f"Trade war period dataset shape: {pandas_df.shape}")

pandas_df = covid_period.toPandas()
print(f"Covid period dataset shape: {pandas_df.shape}")

pandas_df = ukraine_period.toPandas()  
print(f"Ukraine period dataset shape: {pandas_df.shape}")

Training dataset shape: (124166, 6)
Satbel period dataset shape: (21887, 6)
Trade war period dataset shape: (45290, 6)
Covid period dataset shape: (13524, 6)
Ukraine period dataset shape: (22649, 6)


In [10]:
# Substep 7:

# Cache the train period DataFrame
train_period.cache()

# Cache the disruption period DataFrames
stable_period.cache()
trade_war_period.cache()
covid_period.cache()
ukraine_period.cache()

# Force caching by doing a small action like counting
stable_period.count()
trade_war_period.count()
covid_period.count()
ukraine_period.count()
train_period.count()

124166

In [11]:
# Considering the features for the model, we will use the following features:  
# - Moving averages (5 and 10 days)
# - Volatility (5 and 10 days)
# - Lagged values (1 day)
# - Returns (open-close, high-low, close-lag1)
# - Label (1 if the next day close price is higher than the current day close price, 0 otherwise)
# - Label (regression): the percentage change in the close price from the current day to the next day


from pyspark.sql.window import Window
from pyspark.sql.functions import avg, stddev, lag, lead, when

# Common window specifications
window_spec_5 = Window.partitionBy("ticker").orderBy("date").rowsBetween(-4, 0)
window_spec_10 = Window.partitionBy("ticker").orderBy("date").rowsBetween(-9, 0)
window_spec_lag = Window.partitionBy("ticker").orderBy("date")
window_spec_lead = Window.partitionBy("ticker").orderBy("date")

# Function to add all features
def add_features(df):
    return df \
        .withColumn("ma_5", avg("close").over(window_spec_5)) \
        .withColumn("ma_10", avg("close").over(window_spec_10)) \
        .withColumn("volatility_5", stddev("close").over(window_spec_5)) \
        .withColumn("volatility_10", stddev("close").over(window_spec_10)) \
        .withColumn("close_lag1", lag("close", 1).over(window_spec_lag)) \
        .withColumn("open_lag1", lag("open", 1).over(window_spec_lag)) \
        .withColumn("high_lag1", lag("high", 1).over(window_spec_lag)) \
        .withColumn("low_lag1", lag("low", 1).over(window_spec_lag)) \
        .withColumn("return_open_close", (col("close") - col("open")) / col("open")) \
        .withColumn("return_high_low", (col("high") - col("low")) / col("open")) \
        .withColumn("return_close_lag1", (col("close") - col("close_lag1")) / col("close_lag1")) \
        .withColumn("close_lead1", lead("close", 1).over(window_spec_lead)) \
        .withColumn("label_class", when(col("close_lead1") > col("close"), 1).otherwise(0)) \
        .withColumn("label_regress", (col("close_lead1") - col("close")) / col("close"))

# Apply to training set
train_period = add_features(train_period)

# Apply to each testing set
stable_period = add_features(stable_period)
trade_war_period = add_features(trade_war_period)
covid_period = add_features(covid_period)   
ukraine_period = add_features(ukraine_period)   



In [12]:
# Building the feature set for both classification and regression tasks

feature_columns = [
    "open", "high", "low", "close",
    "ma_5", "ma_10", "volatility_5", "volatility_10",
    "close_lag1", "open_lag1", "high_lag1", "low_lag1",
    "return_open_close", "return_high_low", "return_close_lag1"
]

# For classification task
classification_columns = feature_columns + ["label_class"]

# For regression task
regression_columns = feature_columns + ["label_regress"]

# For classification, select only these columns for the training period and disruptions periods
train_period_class = train_period.select(classification_columns)
stable_period_class = stable_period.select(classification_columns)
trade_war_period_class = trade_war_period.select(classification_columns)
covid_period_class = covid_period.select(classification_columns)
ukraine_period_class = ukraine_period.select(classification_columns)

# For regression, select only these columns for the training period and disruptions periods
train_period_regress = train_period.select(regression_columns)
stable_period_regress = stable_period.select(regression_columns)
trade_war_period_regress = trade_war_period.select(regression_columns)
covid_period_regress = covid_period.select(regression_columns)
ukraine_period_regress = ukraine_period.select(regression_columns)

In [13]:
# Applying VectorAssembler to create feature vectors for both classification and regression tasks

from pyspark.ml.feature import VectorAssembler

# Define assembler
assembler = VectorAssembler(
    inputCols=feature_columns,
    handleInvalid="skip",
    outputCol="features"
)

# Apply the assembler to training dataset for classification
train_period_class = assembler.transform(train_period_class)

# Apply the assembler for test datasets for classification
stable_period_class = assembler.transform(stable_period_class)
trade_war_period_class = assembler.transform(trade_war_period_class)
covid_period_class = assembler.transform(covid_period_class)
ukraine_period_class = assembler.transform(ukraine_period_class)


# Apply the assembler for regression datasets
train_period_regress = assembler.transform(train_period_regress)

# Apply the assembler for regression datasets
stable_period_regress = assembler.transform(stable_period_regress)
trade_war_period_regress = assembler.transform(trade_war_period_regress)
covid_period_regress = assembler.transform(covid_period_regress)
ukraine_period_regress = assembler.transform(ukraine_period_regress)

### Scaling

In [14]:
# Scaling the features using StandardScaler for both classification and regression tasks

scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
scaler_model = scaler.fit(train_period_class)

train_period_class_scaled = scaler_model.transform(train_period_class)
stable_period_class_scaled = scaler_model.transform(stable_period_class)
trade_war_period_class_scaled = scaler_model.transform(trade_war_period_class)
covid_period_class_scaled = scaler_model.transform(covid_period_class)
ukraine_period_class_scaled = scaler_model.transform(ukraine_period_class)

train_period_regress_scaled = scaler_model.transform(train_period_regress)
stable_period_regress_scaled = scaler_model.transform(stable_period_regress)
trade_war_period_regress_scaled = scaler_model.transform(trade_war_period_regress)
covid_period_regress_scaled = scaler_model.transform(covid_period_regress)
ukraine_period_regress_scaled = scaler_model.transform(ukraine_period_regress)





In [15]:
# remove rows with missing values

# for scaled dataset
train_period_regress_scaled = train_period_regress_scaled.filter(col("label_regress").cast("double").isNotNull())
stable_period_regress_scaled = stable_period_regress_scaled.filter(col("label_regress").cast("double").isNotNull())
trade_war_period_regress_scaled = trade_war_period_regress_scaled.filter(col("label_regress").cast("double").isNotNull())
covid_period_regress_scaled = covid_period_regress_scaled.filter(col("label_regress").cast("double").isNotNull())
ukraine_period_regress_scaled = ukraine_period_regress_scaled.filter(col("label_regress").cast("double").isNotNull())

# for unscaled dataset
# remove rows with missing values
train_period_regress = train_period_regress.filter(col("label_regress").cast("double").isNotNull())
stable_period_regress = stable_period_regress.filter(col("label_regress").cast("double").isNotNull())
trade_war_period_regress = trade_war_period_regress.filter(col("label_regress").cast("double").isNotNull())
covid_period_regress = covid_period_regress.filter(col("label_regress").cast("double").isNotNull())
ukraine_period_regress = ukraine_period_regress.filter(col("label_regress").cast("double").isNotNull())

## Classification in 4 Ways

### SCALED

In [16]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, DecisionTreeClassifier

classifiers = {
    "LogisticRegression": LogisticRegression(featuresCol="scaled_features", labelCol="label_class"),
    "RandomForestClassifier": RandomForestClassifier(featuresCol="scaled_features", labelCol="label_class"),
    "GBTClassifier": GBTClassifier(featuresCol="scaled_features", labelCol="label_class"),
    "DecisionTreeClassifier": DecisionTreeClassifier(featuresCol="scaled_features", labelCol="label_class"),
}

for name, model in classifiers.items():
    print(f"\nTraining {name}...")
    clf_model = model.fit(train_period_class_scaled)

    # Evaluate on stable period
    preds = clf_model.transform(stable_period_class_scaled)
    cls_eval = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label_class", metricName="accuracy")
    stable_accuracy = cls_eval.evaluate(preds)
    stable_f1 = cls_eval.setMetricName("f1").evaluate(preds)
    print(f"Stable period Accuracy: {stable_accuracy:.4f}")
    print(f"Stable period F1 Score: {stable_f1:.4f}")

    # Evaluate on trade war period
    preds = clf_model.transform(trade_war_period_class_scaled)
    cls_eval = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label_class", metricName="accuracy")
    trade_war_accuracy = cls_eval.evaluate(preds)
    trade_war_f1 = cls_eval.setMetricName("f1").evaluate(preds)
    print(f"Trade War period Accuracy: {trade_war_accuracy:.4f}")
    print(f"Trade War period F1 Score: {trade_war_f1:.4f}")

    # Evaluate on COVID period
    preds = clf_model.transform(covid_period_class_scaled)
    cls_eval = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label_class", metricName="accuracy")
    covid_accuracy = cls_eval.evaluate(preds)
    covid_f1 = cls_eval.setMetricName("f1").evaluate(preds)
    print(f"COVID period Accuracy: {covid_accuracy:.4f}")
    print(f"COVID period F1 Score: {covid_f1:.4f}")

    # Evaluate on Ukraine period
    preds = clf_model.transform(ukraine_period_class_scaled)
    cls_eval = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label_class", metricName="accuracy")
    ukraine_accuracy = cls_eval.evaluate(preds)
    ukraine_f1 = cls_eval.setMetricName("f1").evaluate(preds)
    print(f"Ukraine period Accuracy: {ukraine_accuracy:.4f}")
    print(f"Ukraine period F1 Score: {ukraine_f1:.4f}")





Training LogisticRegression...
Stable period Accuracy: 0.5389
Stable period F1 Score: 0.4142
Trade War period Accuracy: 0.5336
Trade War period F1 Score: 0.4324
COVID period Accuracy: 0.5386
COVID period F1 Score: 0.4814
Ukraine period Accuracy: 0.4964
Ukraine period F1 Score: 0.4320

Training RandomForestClassifier...
Stable period Accuracy: 0.5305
Stable period F1 Score: 0.4803
Trade War period Accuracy: 0.5220
Trade War period F1 Score: 0.4834
COVID period Accuracy: 0.5386
COVID period F1 Score: 0.5126
Ukraine period Accuracy: 0.4917
Ukraine period F1 Score: 0.4601

Training GBTClassifier...
Stable period Accuracy: 0.5249
Stable period F1 Score: 0.5026
Trade War period Accuracy: 0.5161
Trade War period F1 Score: 0.4967
COVID period Accuracy: 0.5226
COVID period F1 Score: 0.5130
Ukraine period Accuracy: 0.4987
Ukraine period F1 Score: 0.4840

Training DecisionTreeClassifier...
Stable period Accuracy: 0.5342
Stable period F1 Score: 0.4889
Trade War period Accuracy: 0.5204
Trade War p

### NON-SCALED

In [17]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, DecisionTreeClassifier

classifiers = {
    "LogisticRegression": LogisticRegression(featuresCol="features", labelCol="label_class"),
    "RandomForestClassifier": RandomForestClassifier(featuresCol="features", labelCol="label_class"),
    "GBTClassifier": GBTClassifier(featuresCol="features", labelCol="label_class"),
    "DecisionTreeClassifier": DecisionTreeClassifier(featuresCol="features", labelCol="label_class"),
}

for name, model in classifiers.items():
    print(f"\nTraining {name}...")
    clf_model = model.fit(train_period_class)

    # Evaluate on stable period
    preds = clf_model.transform(stable_period_class)
    cls_eval = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label_class", metricName="accuracy")
    stable_accuracy = cls_eval.evaluate(preds)
    stable_f1 = cls_eval.setMetricName("f1").evaluate(preds)
    print(f"Stable period Accuracy: {stable_accuracy:.4f}")
    print(f"Stable period F1 Score: {stable_f1:.4f}")

    # Evaluate on trade war period
    preds = clf_model.transform(trade_war_period_class)
    cls_eval = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label_class", metricName="accuracy")
    trade_war_accuracy = cls_eval.evaluate(preds)
    trade_war_f1 = cls_eval.setMetricName("f1").evaluate(preds)
    print(f"Trade War period Accuracy: {trade_war_accuracy:.4f}")
    print(f"Trade War period F1 Score: {trade_war_f1:.4f}")

    # Evaluate on COVID period
    preds = clf_model.transform(covid_period_class)
    cls_eval = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label_class", metricName="accuracy")
    covid_accuracy = cls_eval.evaluate(preds)
    covid_f1 = cls_eval.setMetricName("f1").evaluate(preds)
    print(f"COVID period Accuracy: {covid_accuracy:.4f}")
    print(f"COVID period F1 Score: {covid_f1:.4f}")

    # Evaluate on Ukraine period
    preds = clf_model.transform(ukraine_period_class)
    cls_eval = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label_class", metricName="accuracy")
    ukraine_accuracy = cls_eval.evaluate(preds)
    ukraine_f1 = cls_eval.setMetricName("f1").evaluate(preds)
    print(f"Ukraine period Accuracy: {ukraine_accuracy:.4f}")
    print(f"Ukraine period F1 Score: {ukraine_f1:.4f}")





Training LogisticRegression...
Stable period Accuracy: 0.5388
Stable period F1 Score: 0.4143
Trade War period Accuracy: 0.5337
Trade War period F1 Score: 0.4326
COVID period Accuracy: 0.5393
COVID period F1 Score: 0.4820
Ukraine period Accuracy: 0.4969
Ukraine period F1 Score: 0.4324

Training RandomForestClassifier...
Stable period Accuracy: 0.5335
Stable period F1 Score: 0.4891
Trade War period Accuracy: 0.5214
Trade War period F1 Score: 0.4863
COVID period Accuracy: 0.5426
COVID period F1 Score: 0.5205
Ukraine period Accuracy: 0.4908
Ukraine period F1 Score: 0.4677

Training GBTClassifier...
Stable period Accuracy: 0.5212
Stable period F1 Score: 0.4988
Trade War period Accuracy: 0.5150
Trade War period F1 Score: 0.4938
COVID period Accuracy: 0.5327
COVID period F1 Score: 0.5173
Ukraine period Accuracy: 0.4988
Ukraine period F1 Score: 0.4804

Training DecisionTreeClassifier...
Stable period Accuracy: 0.5342
Stable period F1 Score: 0.4873
Trade War period Accuracy: 0.5203
Trade War p

## Regression in 4 Ways

### SCALED

In [18]:
from pyspark.ml.regression import LinearRegression, GBTRegressor, RandomForestRegressor, DecisionTreeRegressor

regressors = {
    "LinearRegression": LinearRegression(featuresCol="scaled_features", labelCol="label_regress"),
    "GBTRegressor": GBTRegressor(featuresCol="scaled_features", labelCol="label_regress"),
    "RandomForestRegressor": RandomForestRegressor(featuresCol="scaled_features", labelCol="label_regress"),
    "DecisionTreeRegressor": DecisionTreeRegressor(featuresCol="scaled_features", labelCol="label_regress")
}

for name, model in regressors.items():
    print(f"\nTraining {name}...")
    reg_model = model.fit(train_period_regress_scaled)

    # Evaluate on stable period
    preds = reg_model.transform(stable_period_regress_scaled)
    reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="label_regress", metricName="rmse")
    stable_rmse = reg_eval.evaluate(preds)
    stable_r2 = reg_eval.setMetricName("r2").evaluate(preds)
    print(f"Stable period RMSE: {stable_rmse}")
    print(f"Stable period R2: {stable_r2}")

    # Evaluate on trade war period
    preds = reg_model.transform(trade_war_period_regress_scaled)
    reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="label_regress", metricName="rmse")
    trade_war_rmse = reg_eval.evaluate(preds)
    trade_war_r2 = reg_eval.setMetricName("r2").evaluate(preds)
    print(f"Trade War period RMSE: {trade_war_rmse}")
    print(f"Trade War period R2: {trade_war_r2}")

    # Evaluate on COVID period
    preds = reg_model.transform(covid_period_regress_scaled)
    reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="label_regress", metricName="rmse")
    covid_rmse = reg_eval.evaluate(preds)
    covid_r2 = reg_eval.setMetricName("r2").evaluate(preds)
    print(f"COVID period RMSE: {covid_rmse}")
    print(f"COVID period R2: {covid_r2}")


    # Evaluate on Ukraine period
    preds = reg_model.transform(ukraine_period_regress_scaled)
    reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="label_regress", metricName="rmse")
    ukraine_rmse = reg_eval.evaluate(preds)
    ukraine_r2 = reg_eval.setMetricName("r2").evaluate(preds)
    print(f"Ukraine period RMSE: {ukraine_rmse}")
    print(f"Ukraine period R2: {ukraine_r2}")





Training LinearRegression...
Stable period RMSE: 0.016305635365069977
Stable period R2: -0.003796823640024538
Trade War period RMSE: 0.021225160315171663
Trade War period R2: -0.0041997473032719945
COVID period RMSE: 0.03749525499501932
COVID period R2: 0.009342365932368701
Ukraine period RMSE: 0.03149410225567793
Ukraine period R2: -0.003026647703651353

Training GBTRegressor...
Stable period RMSE: 0.016909762135287557
Stable period R2: -0.07955646317310894
Trade War period RMSE: 0.021240768840201284
Trade War period R2: -0.00567722401927373
COVID period RMSE: 0.03775366868664583
COVID period R2: -0.004359722950431744
Ukraine period RMSE: 0.03147866397766223
Ukraine period R2: -0.002043529735952454

Training RandomForestRegressor...
Stable period RMSE: 0.01631624076800345
Stable period R2: -0.005103013994769645
Trade War period RMSE: 0.021185528944662343
Trade War period R2: -0.00045318834625063253
COVID period RMSE: 0.03767425098725071
COVID period R2: -0.00013867389252930629
Ukrain

### NON-SCALED

In [19]:
from pyspark.ml.regression import LinearRegression, GBTRegressor, RandomForestRegressor, DecisionTreeRegressor

regressors = {
    "LinearRegression": LinearRegression(featuresCol="features", labelCol="label_regress"),
    "GBTRegressor": GBTRegressor(featuresCol="features", labelCol="label_regress"),
    "RandomForestRegressor": RandomForestRegressor(featuresCol="features", labelCol="label_regress"),
    "DecisionTreeRegressor": DecisionTreeRegressor(featuresCol="features", labelCol="label_regress")
}

for name, model in regressors.items():
    print(f"\nTraining {name}...")
    reg_model = model.fit(train_period_regress)

    # Evaluate on stable period
    preds = reg_model.transform(stable_period_regress)
    reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="label_regress", metricName="rmse")
    stable_rmse = reg_eval.evaluate(preds)
    stable_r2 = reg_eval.setMetricName("r2").evaluate(preds)
    print(f"Stable period RMSE: {stable_rmse}")
    print(f"Stable period R2: {stable_r2}")

    # Evaluate on trade war period
    preds = reg_model.transform(trade_war_period_regress)
    reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="label_regress", metricName="rmse")
    trade_war_rmse = reg_eval.evaluate(preds)
    trade_war_r2 = reg_eval.setMetricName("r2").evaluate(preds)
    print(f"Trade War period RMSE: {trade_war_rmse}")
    print(f"Trade War period R2: {trade_war_r2}")

    # Evaluate on COVID period
    preds = reg_model.transform(covid_period_regress)
    reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="label_regress", metricName="rmse")
    covid_rmse = reg_eval.evaluate(preds)
    covid_r2 = reg_eval.setMetricName("r2").evaluate(preds)
    print(f"COVID period RMSE: {covid_rmse}")
    print(f"COVID period R2: {covid_r2}")

    # Evaluate on Ukraine period
    preds = reg_model.transform(ukraine_period_regress)
    reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="label_regress", metricName="rmse")
    ukraine_rmse = reg_eval.evaluate(preds)
    ukraine_r2 = reg_eval.setMetricName("r2").evaluate(preds)
    print(f"Ukraine period RMSE: {ukraine_rmse}")
    print(f"Ukraine period R2: {ukraine_r2}")






Training LinearRegression...
Stable period RMSE: 0.016305635365085842
Stable period R2: -0.0037968236419778645
Trade War period RMSE: 0.021225160315190048
Trade War period R2: -0.004199747305011492
COVID period RMSE: 0.03749525499524335
COVID period R2: 0.009342365920530726
Ukraine period RMSE: 0.03149410225567299
Ukraine period R2: -0.0030266477033362715

Training GBTRegressor...
Stable period RMSE: 0.016482643545111985
Stable period R2: -0.025708838926584265
Trade War period RMSE: 0.021437545714302113
Trade War period R2: -0.024396949113624444
COVID period RMSE: 0.037800707565974215
COVID period R2: -0.006864029899543489
Ukraine period RMSE: 0.03163402047625961
Ukraine period R2: -0.01195869785363235

Training RandomForestRegressor...
Stable period RMSE: 0.016329918752623743
Stable period R2: -0.006788885903619812
Trade War period RMSE: 0.021183716409454496
Trade War period R2: -0.000282007436749776
COVID period RMSE: 0.037706048901469835
COVID period R2: -0.0018276653847977276
Ukra