Installing the required packages

In [None]:
!pip install pyspark



Importing the required libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression


1. Creating the SparkSession

In [None]:
spark = SparkSession.builder \
    .appName("Linear Regression on Stock Data") \
    .getOrCreate()

2. Loading the data

In [None]:
tesla_df = spark.read.csv("/content/TSLA Historical Data (1).csv", header=True, inferSchema=True)

In [None]:
tesla_df.show(5)
tesla_df.printSchema()

+----------+------+------+------+------+-------+--------+
|      Date| Price|  Open|  High|   Low|   Vol.|Change %|
+----------+------+------+------+------+-------+--------+
|10/31/2024|249.85|257.99|259.75|249.25| 66.58M|  -2.99%|
|10/30/2024|257.55|258.04|263.35|255.82| 53.99M|  -0.76%|
|10/29/2024|259.52|264.51|264.98|255.51| 80.52M|  -1.14%|
|10/28/2024|262.51| 270.0|273.54|262.24|107.65M|  -2.48%|
|10/25/2024|269.19|256.01|269.49|255.32|161.61M|   3.34%|
+----------+------+------+------+------+-------+--------+
only showing top 5 rows

root
 |-- Date: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Vol.: string (nullable = true)
 |-- Change %: string (nullable = true)



3. Data Preprocessing

Renaming Columns

In [None]:
tesla_df = tesla_df.withColumnRenamed("Open", "Opening_price")
tesla_df = tesla_df.withColumnRenamed("High", "Highest_price")
tesla_df = tesla_df.withColumnRenamed("Low", "Lowest_price")
tesla_df =tesla_df.withColumnRenamed("Vol.", "Volume(Million)")

In [None]:
tesla_df.describe().show()

+-------+----------+------------------+-----------------+------------------+-----------------+---------------+--------+
|summary|      Date|             Price|    Opening_price|     Highest_price|     Lowest_price|Volume(Million)|Change %|
+-------+----------+------------------+-----------------+------------------+-----------------+---------------+--------+
|  count|      1217|              1217|             1217|              1217|             1217|           1217|    1217|
|   mean|      NULL|208.21409202958108|208.2749465899753|212.91849630238275|203.3144371405097|           NULL|    NULL|
| stddev|      NULL| 79.01713259646819| 79.1937861388829| 80.76606832933288|77.33186914564232|           NULL|    NULL|
|    min|01/02/2020|             24.08|            24.98|             26.99|            23.37|        100.01M|  -0.02%|
|    max|12/31/2021|            409.97|           411.47|             414.5|           405.67|         99.94M|   9.78%|
+-------+----------+------------------+-

In [None]:
tesla_df.distinct()

DataFrame[Date: string, Price: double, Opening_price: double, Highest_price: double, Lowest_price: double, Volume(Million): string, Change %: string]

In [None]:
# Counts the number of distinct rows in dataframe
tesla_df.distinct().count()

1217

In [None]:
tesla_df.show(5)

+----------+------+-------------+-------------+------------+---------------+--------+
|      Date| Price|Opening_price|Highest_price|Lowest_price|Volume(Million)|Change %|
+----------+------+-------------+-------------+------------+---------------+--------+
|10/31/2024|249.85|       257.99|       259.75|      249.25|         66.58M|  -2.99%|
|10/30/2024|257.55|       258.04|       263.35|      255.82|         53.99M|  -0.76%|
|10/29/2024|259.52|       264.51|       264.98|      255.51|         80.52M|  -1.14%|
|10/28/2024|262.51|        270.0|       273.54|      262.24|        107.65M|  -2.48%|
|10/25/2024|269.19|       256.01|       269.49|      255.32|        161.61M|   3.34%|
+----------+------+-------------+-------------+------------+---------------+--------+
only showing top 5 rows



Converting date from string to date

In [None]:
from pyspark.sql import functions as F
tesla_df = tesla_df.withColumn("Date", F.to_date(F.col("Date"), "MM/dd/yyyy"))

In [None]:
tesla_df.show(5)
tesla_df.printSchema()

+----------+------+-------------+-------------+------------+---------------+--------+
|      Date| Price|Opening_price|Highest_price|Lowest_price|Volume(Million)|Change %|
+----------+------+-------------+-------------+------------+---------------+--------+
|2024-10-31|249.85|       257.99|       259.75|      249.25|         66.58M|  -2.99%|
|2024-10-30|257.55|       258.04|       263.35|      255.82|         53.99M|  -0.76%|
|2024-10-29|259.52|       264.51|       264.98|      255.51|         80.52M|  -1.14%|
|2024-10-28|262.51|        270.0|       273.54|      262.24|        107.65M|  -2.48%|
|2024-10-25|269.19|       256.01|       269.49|      255.32|        161.61M|   3.34%|
+----------+------+-------------+-------------+------------+---------------+--------+
only showing top 5 rows

root
 |-- Date: date (nullable = true)
 |-- Price: double (nullable = true)
 |-- Opening_price: double (nullable = true)
 |-- Highest_price: double (nullable = true)
 |-- Lowest_price: double (nullable

In [None]:
tesla_df.distinct()

DataFrame[Date: date, Price: double, Opening_price: double, Highest_price: double, Lowest_price: double, Volume(Million): string, Change %: string]

Dropping Missing values

In [None]:
tesla_df = tesla_df.na.drop()

In [None]:
tesla_df.count()

1217

In [None]:
tesla_df.show(5)

+----------+------+-------------+-------------+------------+---------------+--------+
|      Date| Price|Opening_price|Highest_price|Lowest_price|Volume(Million)|Change %|
+----------+------+-------------+-------------+------------+---------------+--------+
|2024-10-31|249.85|       257.99|       259.75|      249.25|         66.58M|  -2.99%|
|2024-10-30|257.55|       258.04|       263.35|      255.82|         53.99M|  -0.76%|
|2024-10-29|259.52|       264.51|       264.98|      255.51|         80.52M|  -1.14%|
|2024-10-28|262.51|        270.0|       273.54|      262.24|        107.65M|  -2.48%|
|2024-10-25|269.19|       256.01|       269.49|      255.32|        161.61M|   3.34%|
+----------+------+-------------+-------------+------------+---------------+--------+
only showing top 5 rows



Converting the Change % column to Float

In [None]:
from pyspark.sql.types import IntegerType, DoubleType
tesla_df = tesla_df.withColumn("Change %", F.regexp_replace(F.col("Change %"), "%", ""))  # Remove '%'
tesla_df = tesla_df.withColumn("Change %", F.col("Change %").cast("float"))  # Convert to Float

In [None]:
tesla_df.show(5)

+----------+------+-------------+-------------+------------+---------------+--------+
|      Date| Price|Opening_price|Highest_price|Lowest_price|Volume(Million)|Change %|
+----------+------+-------------+-------------+------------+---------------+--------+
|2024-10-31|249.85|       257.99|       259.75|      249.25|         66.58M|   -2.99|
|2024-10-30|257.55|       258.04|       263.35|      255.82|         53.99M|   -0.76|
|2024-10-29|259.52|       264.51|       264.98|      255.51|         80.52M|   -1.14|
|2024-10-28|262.51|        270.0|       273.54|      262.24|        107.65M|   -2.48|
|2024-10-25|269.19|       256.01|       269.49|      255.32|        161.61M|    3.34|
+----------+------+-------------+-------------+------------+---------------+--------+
only showing top 5 rows



Converting Volume from string datatype to float

In [None]:
tesla_df = tesla_df.withColumn("Volume(Million)", F.regexp_replace(F.col("Volume(Million)"), "M", ""))  # Remove 'M'
tesla_df = tesla_df.withColumn("Volume(Million)", F.col("Volume(Million)").cast("float"))  # Convert to Float

In [None]:
tesla_df.show(5)

+----------+------+-------------+-------------+------------+---------------+--------+
|      Date| Price|Opening_price|Highest_price|Lowest_price|Volume(Million)|Change %|
+----------+------+-------------+-------------+------------+---------------+--------+
|2024-10-31|249.85|       257.99|       259.75|      249.25|          66.58|   -2.99|
|2024-10-30|257.55|       258.04|       263.35|      255.82|          53.99|   -0.76|
|2024-10-29|259.52|       264.51|       264.98|      255.51|          80.52|   -1.14|
|2024-10-28|262.51|        270.0|       273.54|      262.24|         107.65|   -2.48|
|2024-10-25|269.19|       256.01|       269.49|      255.32|         161.61|    3.34|
+----------+------+-------------+-------------+------------+---------------+--------+
only showing top 5 rows



In [None]:
tesla_df.distinct()

DataFrame[Date: date, Price: double, Opening_price: double, Highest_price: double, Lowest_price: double, Volume(Million): float, Change %: float]

In [None]:
tesla_df.describe().show()

+-------+------------------+-----------------+------------------+-----------------+------------------+-----------------+
|summary|             Price|    Opening_price|     Highest_price|     Lowest_price|   Volume(Million)|         Change %|
+-------+------------------+-----------------+------------------+-----------------+------------------+-----------------+
|  count|              1217|             1217|              1217|             1217|              1217|             1217|
|   mean|208.21409202958108|208.2749465899753|212.91849630238275|203.3144371405097|126.53741995937627|0.268849624447898|
| stddev| 79.01713259646819| 79.1937861388829| 80.76606832933288|77.33186914564232| 83.05737419619376|4.215775210254224|
|    min|             24.08|            24.98|             26.99|            23.37|              29.4|           -21.06|
|    max|            409.97|           411.47|             414.5|           405.67|            914.08|            21.92|
+-------+------------------+----

Dropping duplicates

In [None]:
tesla_df_cleaned = tesla_df.dropDuplicates()
tesla_df_cleaned.show()

+----------+------+-------------+-------------+------------+---------------+--------+
|      Date| Price|Opening_price|Highest_price|Lowest_price|Volume(Million)|Change %|
+----------+------+-------------+-------------+------------+---------------+--------+
|2024-07-18|249.23|       251.09|       257.14|       247.2|         110.87|    0.29|
|2023-09-14|276.04|       271.32|       276.71|      270.42|         107.71|    1.75|
|2023-02-17|208.31|       199.99|       208.44|       197.5|         213.74|     3.1|
|2023-01-27| 177.9|       162.43|       180.68|      161.17|         306.59|    11.0|
|2022-03-24|337.97|       336.58|        341.5|       329.6|          68.92|    1.48|
|2021-04-01|220.58|       229.46|       230.81|      219.81|          105.9|   -0.93|
|2024-07-01|209.86|       201.02|       213.23|      200.85|         135.69|    6.05|
|2023-06-16|260.54|       258.92|        263.6|      257.21|         167.92|    1.81|
|2022-10-31|227.54|       226.19|       229.85|      2

Saving the cleaned data to a parquet file

In [None]:
tesla_df_cleaned.write.parquet("tesla_dfCleaned.parquet")

Loading the parquet Dataframe for preprocessing and model training

In [None]:
teslaDf = spark.read.parquet("/content/tesla_dfCleaned.parquet", header=True, inferSchema=True)

In [None]:
teslaDf.show(5)

+----------+------+-------------+-------------+------------+---------------+--------+
|      Date| Price|Opening_price|Highest_price|Lowest_price|Volume(Million)|Change %|
+----------+------+-------------+-------------+------------+---------------+--------+
|2024-07-18|249.23|       251.09|       257.14|       247.2|         110.87|    0.29|
|2023-09-14|276.04|       271.32|       276.71|      270.42|         107.71|    1.75|
|2023-02-17|208.31|       199.99|       208.44|       197.5|         213.74|     3.1|
|2023-01-27| 177.9|       162.43|       180.68|      161.17|         306.59|    11.0|
|2022-03-24|337.97|       336.58|        341.5|       329.6|          68.92|    1.48|
+----------+------+-------------+-------------+------------+---------------+--------+
only showing top 5 rows



In [None]:
teslaDf.describe().show()

+-------+------------------+------------------+-----------------+------------------+------------------+-----------------+
|summary|             Price|     Opening_price|    Highest_price|      Lowest_price|   Volume(Million)|         Change %|
+-------+------------------+------------------+-----------------+------------------+------------------+-----------------+
|  count|              1217|              1217|             1217|              1217|              1217|             1217|
|   mean|208.21409202958105|208.27494658997554|212.9184963023828|203.31443714050957|126.53741995937627|0.268849624447898|
| stddev| 79.01713259646819| 79.19378613888293|80.76606832933294| 77.33186914564239| 83.05737419619382|4.215775210254223|
|    min|             24.08|             24.98|            26.99|             23.37|              29.4|           -21.06|
|    max|            409.97|            411.47|            414.5|            405.67|            914.08|            21.92|
+-------+---------------

4. Feature Engineering

**Extracting Date components**


New features are created by extracting the year, month, and day from


the Date column. This can help capture seasonality or trends over time.

In [None]:
from pyspark.sql.functions import col, year, month, dayofmonth, lag, avg
teslaDf = tesla_df.withColumn("Year", year(col("Date"))) \
       .withColumn("Month", month(col("Date"))) \
       .withColumn("Day", dayofmonth(col("Date")))

Creating a Window specification for lagging and moving averages

*   A window specification defines how the data is partitioned and ordered for the window function to operate on.




In [None]:
from pyspark.sql import Window
window_spec = Window.orderBy("Date")


Creating lagged features

*   The Prev_Close feature provides context by including the closing price from the previous day. This is crucial in time series data, as today's price is often influenced by yesterday's price.
*   The daily price change is a direct measure of volatility.



In [None]:
teslaDf = teslaDf.withColumn("Prev_Close", lag("Price", 1).over(window_spec))  # Previous day's closing price
teslaDf = teslaDf.withColumn("Price_Change", col("Price") - col("Prev_Close"))  # Daily price change

Creating Moving Averages

*   Moving averages help smooth out short-term fluctuations in the data, making it easier to identify long-term trends.
*   The 5-day moving average (MA_5) captures short-term trends, while the 10-day moving average (MA_10) provides a slightly broader view.



In [None]:
teslaDf = teslaDf.withColumn("MA_5", avg("Price").over(Window.orderBy("Date").rowsBetween(-4, 0)))  # 5-day moving average
teslaDf = teslaDf.withColumn("MA_10", avg("Price").over(Window.orderBy("Date").rowsBetween(-9, 0)))  # 10-day moving average

Dropping rows with null values (due to lagging and moving averages)

In [None]:
teslaDf = teslaDf.dropna()

In [None]:
teslaDf.count()

1216

In [None]:
teslaDf.show(5, truncate=False)

+----------+-----+-------------+-------------+------------+---------------+--------+----+-----+---+----------+-------------------+------------------+------------------+
|Date      |Price|Opening_price|Highest_price|Lowest_price|Volume(Million)|Change %|Year|Month|Day|Prev_Close|Price_Change       |MA_5              |MA_10             |
+----------+-----+-------------+-------------+------------+---------------+--------+----+-----+---+----------+-------------------+------------------+------------------+
|2020-01-03|29.53|29.37        |30.27        |29.13       |266.92         |2.96    |2020|1    |3  |28.68     |0.8500000000000014 |29.105            |29.105            |
|2020-01-06|30.1 |29.36        |30.1         |29.33       |152.36         |1.93    |2020|1    |6  |29.53     |0.5700000000000003 |29.436666666666667|29.436666666666667|
|2020-01-07|31.27|30.76        |31.44        |30.22       |273.14         |3.89    |2020|1    |7  |30.1      |1.1699999999999982 |29.895            |29.895

Creating feature vector

In [None]:
assembler = VectorAssembler(inputCols=["Opening_price", "Highest_price", "Lowest_price", "Volume(Million)", "Year", "Month", "Day", "Prev_Close", "Price_Change", "MA_5", "MA_10"], outputCol="assembled_features")
feature_df = assembler.transform(teslaDf)

In [None]:
feature_df.show(10, truncate=False)

+----------+-----+-------------+-------------+------------+---------------+--------+----+-----+---+----------+-------------------+------------------+------------------+----------------------------------------------------------------------------------------------------------------------+
|Date      |Price|Opening_price|Highest_price|Lowest_price|Volume(Million)|Change %|Year|Month|Day|Prev_Close|Price_Change       |MA_5              |MA_10             |assembled_features                                                                                                    |
+----------+-----+-------------+-------------+------------+---------------+--------+----+-----+---+----------+-------------------+------------------+------------------+----------------------------------------------------------------------------------------------------------------------+
|2020-01-03|29.53|29.37        |30.27        |29.13       |266.92         |2.96    |2020|1    |3  |28.68     |0.8500000000000014 |29.105

Scaling the features

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
scaler = StandardScaler(inputCol="assembled_features", outputCol="scaledFeatures", withMean=True, withStd=True)
scaler_model = scaler.fit(feature_df)
scaled_df = scaler_model.transform(feature_df)


In [None]:
scaled_df.show(5,truncate=False)

+----------+-----+-------------+-------------+------------+---------------+--------+----+-----+---+----------+-------------------+------------------+------------------+---------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Date      |Price|Opening_price|Highest_price|Lowest_price|Volume(Million)|Change %|Year|Month|Day|Prev_Close|Price_Change       |MA_5              |MA_10             |assembled_features                                                                                                   |scaledFeatures                                                                                                                                                                                                 

Splitting the data into training and testing sets

In [None]:
train_df, test_df = scaled_df.randomSplit([0.8, 0.2], seed=42)


In [None]:
train_df.show()

+----------+-----+-------------+-------------+------------+---------------+--------+----+-----+---+----------+--------------------+------------------+------------------+--------------------+--------------------+
|      Date|Price|Opening_price|Highest_price|Lowest_price|Volume(Million)|Change %|Year|Month|Day|Prev_Close|        Price_Change|              MA_5|             MA_10|  assembled_features|      scaledFeatures|
+----------+-----+-------------+-------------+------------+---------------+--------+----+-----+---+----------+--------------------+------------------+------------------+--------------------+--------------------+
|2020-01-03|29.53|        29.37|        30.27|       29.13|         266.92|    2.96|2020|    1|  3|     28.68|  0.8500000000000014|            29.105|            29.105|[29.37,30.27,29.1...|[-2.2648358810510...|
|2020-01-06| 30.1|        29.36|         30.1|       29.33|         152.36|    1.93|2020|    1|  6|     29.53|  0.5700000000000003|29.436666666666667|29

Creating the Linear Regression model

In [None]:
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="Price")

Creating the pipeline

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])

In [None]:
# Check the current output column name
print(f"Current output column name: {assembler.getOutputCol()}")

Current output column name: assembled_features


In [None]:
# If it's 'assembled_features', change it to something unique
if assembler.getOutputCol() == "assembled_features":
    assembler.setOutputCol("assembled_features_new")

In [None]:
# Change the output column of the scaler to a unique name
scaler.setOutputCol("scaledFeatures_new")


StandardScaler_6a7c36cef492

Fitting the pipeline model

In [None]:
pipeline_model = pipeline.fit(train_df)

Making Predictions

In [None]:
predictions = pipeline_model.transform(test_df)

Model Evaluation

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

Model Summary

In [None]:
lr_model = pipeline_model.stages[2]  # Get the Linear Regression model from the pipeline
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))
print(f"Root Mean Squared Error (RMSE): {rmse}")
print("R2: " + str(lr_model.summary.r2))

Coefficients: [1.1227382821134022e-10,-1.0585612625919515e-10,-1.2803443149736153e-10,-1.6053112794989037e-13,1.3488910610710684e-12,3.101858049241241e-13,-7.027392345158488e-14,79.04061066936444,8.450759197769797,-9.075964729656563e-11,3.1626062188642287e-11]
Intercept: 208.36173519736855
Root Mean Squared Error (RMSE): 6.669899274428932e-12
R2: 1.0


Calculating the RMSE Baseline

In [None]:
mean_value = train_df.agg({"Price": "mean"}).collect()[0][0]
print(f"Mean value of the target variable: {mean_value}")

Mean value of the target variable: 207.49309145129217


In [None]:
# Create a DataFrame with mean predictions
baseline_predictions = train_df.select("Price").withColumn("prediction", F.lit(mean_value))

In [None]:
# Evaluate RMSE for baseline predictions
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="rmse")
baseline_rmse = evaluator.evaluate(baseline_predictions)

print(f"RMSE for Baseline Model: {baseline_rmse}")

RMSE for Baseline Model: 78.2934392393008


Showing Predictions

In [None]:
predictions.select("Price", "prediction").show()

+-----+------------------+
|Price|        prediction|
+-----+------------------+
|31.27|  31.2699999999991|
|34.99| 34.98999999999768|
|34.57| 34.56999999999812|
|38.15| 38.14999999999884|
|43.37| 43.37000000000003|
|49.93|49.929999999991765|
|53.34|53.339999999997985|
|53.33| 53.32999999999569|
|43.02| 43.01999999999853|
|42.28| 42.27999999999906|
|37.37| 37.36999999999935|
|29.67|29.669999999996264|
|24.08| 24.07999999999808|
|33.67| 33.66999999999999|
| 30.3|30.299999999997198|
|47.33| 47.32999999999706|
|48.34| 48.33999999999864|
|53.72| 53.71999999999758|
|66.06| 66.05999999999707|
|66.73| 66.72999999999905|
+-----+------------------+
only showing top 20 rows



Saving the trained Pipeline Model

In [None]:
import os


In [None]:
# Get the current working directory
current_directory = os.getcwd()

print(f"Current working directory: {current_directory}")

Current working directory: /content


In [None]:
from google.colab import drive

# Mount your Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Saving the trained pipeline model to Google Drive
model_path = '/content/drive/My Drive/pipeline_model'
pipeline_model.write().overwrite().save(model_path)

In [None]:
spark.stop()