# Tesla Stock Analysis with PySpark

This notebook demonstrates how to use Apache Spark to analyze Tesla (TSLA) stock data.
We'll explore various Spark DataFrame operations, perform statistical analysis, and create visualizations.

## 1. Environment Setup and Spark Session Initialization

In [1]:
# Import required libraries
import os
import sys
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Spark imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

# Data visualization imports
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

# Set visualization style
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

# Display settings
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
pd.set_option('display.float_format', lambda x: '%.2f' % x)

print("Libraries imported successfully!")

Libraries imported successfully!


In [2]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Tesla Stock Analysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.sql.repl.eagerEval.enabled", "true") \
    .config("spark.sql.repl.eagerEval.maxNumRows", 20) \
    .getOrCreate()

# Set log level to reduce verbosity
spark.sparkContext.setLogLevel("ERROR")

print(f"Spark Version: {spark.version}")
print(f"Spark Application ID: {spark.sparkContext.applicationId}")
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")
print("\nSpark session initialized successfully!")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/09/10 09:03:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark Version: 3.3.1
Spark Application ID: local-1757520205864
Spark UI: http://iad50-br-lbe-j9-r3-xe-0-0-1-0.amazon.com:4040

Spark session initialized successfully!


## 2. Data Loading and Initial Exploration

In [3]:
# Load the TSLA CSV file
file_path = "data/tsla.csv"

# Read CSV with schema inference
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(file_path)

print(f"Data loaded successfully!")
print(f"Total records: {df.count():,}")
print(f"Number of columns: {len(df.columns)}")

Data loaded successfully!
Total records: 1,257
Number of columns: 6


In [4]:
# Display schema
print("DataFrame Schema:")
df.printSchema()

DataFrame Schema:
root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [5]:
# Show first 10 rows
print("First 10 rows of the dataset:")
df.show(10, truncate=False)

First 10 rows of the dataset:
+-------------------+------------------+------------------+------------------+------------------+---------+
|Date               |Open              |High              |Low               |Close             |Volume   |
+-------------------+------------------+------------------+------------------+------------------+---------+
|2020-01-02 00:00:00|28.299999237060547|28.713333129882812|28.11400032043457 |28.68400001525879 |142981500|
|2020-01-03 00:00:00|29.366666793823242|30.266666412353516|29.128000259399414|29.534000396728516|266677500|
|2020-01-06 00:00:00|29.364667892456055|30.104000091552734|29.333332061767578|30.1026668548584  |151995000|
|2020-01-07 00:00:00|30.760000228881836|31.441999435424805|30.224000930786133|31.270666122436523|268231500|
|2020-01-08 00:00:00|31.579999923706055|33.232666015625   |31.215333938598633|32.80933380126953 |467164500|
|2020-01-09 00:00:00|33.13999938964844 |33.253334045410156|31.524667739868164|32.089332580566406|426606000

In [6]:
# Convert Date column to proper date type and extract date components
df = df.withColumn("Date", F.to_date(F.col("Date"))) \
       .withColumn("Year", F.year("Date")) \
       .withColumn("Month", F.month("Date")) \
       .withColumn("Day", F.dayofmonth("Date")) \
       .withColumn("DayOfWeek", F.dayofweek("Date")) \
       .withColumn("Quarter", F.quarter("Date"))

# Cache the DataFrame for better performance
df.cache()

print("Date column converted and date components extracted.")
df.select("Date", "Year", "Month", "Quarter", "Day", "DayOfWeek").show(5)

Date column converted and date components extracted.
+----------+----+-----+-------+---+---------+
|      Date|Year|Month|Quarter|Day|DayOfWeek|
+----------+----+-----+-------+---+---------+
|2020-01-02|2020|    1|      1|  2|        5|
|2020-01-03|2020|    1|      1|  3|        6|
|2020-01-06|2020|    1|      1|  6|        2|
|2020-01-07|2020|    1|      1|  7|        3|
|2020-01-08|2020|    1|      1|  8|        4|
+----------+----+-----+-------+---+---------+
only showing top 5 rows



## 3. Data Quality and Statistics

In [7]:
# Check for null values
print("Null values in each column:")
null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])
null_counts.show()

Null values in each column:
+----+----+----+---+-----+------+----+-----+---+---------+-------+
|Date|Open|High|Low|Close|Volume|Year|Month|Day|DayOfWeek|Quarter|
+----+----+----+---+-----+------+----+-----+---+---------+-------+
|   0|   0|   0|  0|    0|     0|   0|    0|  0|        0|      0|
+----+----+----+---+-----+------+----+-----+---+---------+-------+



In [8]:
# Basic statistics for numerical columns
print("Statistical Summary:")
df.select("Open", "High", "Low", "Close", "Volume").describe().show()

Statistical Summary:
+-------+------------------+------------------+------------------+-----------------+--------------------+
|summary|              Open|              High|               Low|            Close|              Volume|
+-------+------------------+------------------+------------------+-----------------+--------------------+
|  count|              1257|              1257|              1257|             1257|                1257|
|   mean|213.31521943910974|218.11713001248185|208.20468772244814|213.2794425815652|1.2556862633253779E8|
| stddev| 83.44924973788747| 85.22529577503349| 81.38276862266676|83.32151523455335| 8.210821218093796E7|
|    min|24.979999542236328| 26.99066734313965|23.367332458496094|24.08133316040039|            29401800|
|    max| 475.8999938964844| 488.5400085449219|  457.510009765625|479.8599853515625|           914082000|
+-------+------------------+------------------+------------------+-----------------+--------------------+



In [9]:
# Date range of the dataset
date_range = df.select(
    F.min("Date").alias("Start_Date"),
    F.max("Date").alias("End_Date"),
    F.count("Date").alias("Trading_Days")
).collect()[0]

print(f"Dataset Date Range:")
print(f"Start Date: {date_range['Start_Date']}")
print(f"End Date: {date_range['End_Date']}")
print(f"Total Trading Days: {date_range['Trading_Days']:,}")

Dataset Date Range:
Start Date: 2020-01-02
End Date: 2024-12-30
Total Trading Days: 1,257


## 4. Technical Indicators and Calculations

In [10]:
# Calculate daily returns and price changes
window_spec = Window.orderBy("Date")

df = df.withColumn("Prev_Close", F.lag("Close", 1).over(window_spec)) \
       .withColumn("Daily_Return", ((F.col("Close") - F.col("Prev_Close")) / F.col("Prev_Close")) * 100) \
       .withColumn("Daily_Change", F.col("Close") - F.col("Prev_Close")) \
       .withColumn("Intraday_Range", F.col("High") - F.col("Low")) \
       .withColumn("Intraday_Change_Pct", ((F.col("Close") - F.col("Open")) / F.col("Open")) * 100)

print("Technical indicators calculated.")
df.select("Date", "Close", "Daily_Return", "Daily_Change", "Intraday_Range").show(10)

Technical indicators calculated.
+----------+------------------+-------------------+--------------------+------------------+
|      Date|             Close|       Daily_Return|        Daily_Change|    Intraday_Range|
+----------+------------------+-------------------+--------------------+------------------+
|2020-01-02| 28.68400001525879|               null|                null|0.5993328094482422|
|2020-01-03|29.534000396728516| 2.9633258297920753|  0.8500003814697266|1.1386661529541016|
|2020-01-06|  30.1026668548584| 1.9254637045135075|  0.5686664581298828|0.7706680297851562|
|2020-01-07|31.270666122436523|  3.880052465815389|   1.167999267578125|1.2179985046386719|
|2020-01-08| 32.80933380126953|  4.920482578812169|  1.5386676788330078| 2.017332077026367|
|2020-01-09|32.089332580566406|-2.1945011900097318|  -0.720001220703125|1.7286663055419922|
|2020-01-10|31.876667022705078| -0.662729763317422|-0.21266555786132812|0.7493343353271484|
|2020-01-13|34.990665435791016|  9.768895885093

In [11]:
# Calculate moving averages
# Define window specifications for moving averages
window_7 = Window.orderBy("Date").rowsBetween(-6, 0)
window_30 = Window.orderBy("Date").rowsBetween(-29, 0)
window_50 = Window.orderBy("Date").rowsBetween(-49, 0)
window_200 = Window.orderBy("Date").rowsBetween(-199, 0)

df = df.withColumn("MA_7", F.avg("Close").over(window_7)) \
       .withColumn("MA_30", F.avg("Close").over(window_30)) \
       .withColumn("MA_50", F.avg("Close").over(window_50)) \
       .withColumn("MA_200", F.avg("Close").over(window_200)) \
       .withColumn("Volume_MA_30", F.avg("Volume").over(window_30))

print("Moving averages calculated.")
df.select("Date", "Close", "MA_7", "MA_30", "MA_50", "MA_200").show(10)

Moving averages calculated.
+----------+------------------+------------------+------------------+------------------+------------------+
|      Date|             Close|              MA_7|             MA_30|             MA_50|            MA_200|
+----------+------------------+------------------+------------------+------------------+------------------+
|2020-01-02| 28.68400001525879| 28.68400001525879| 28.68400001525879| 28.68400001525879| 28.68400001525879|
|2020-01-03|29.534000396728516|29.109000205993652|29.109000205993652|29.109000205993652|29.109000205993652|
|2020-01-06|  30.1026668548584|29.440222422281902|29.440222422281902|29.440222422281902|29.440222422281902|
|2020-01-07|31.270666122436523|29.897833347320557|29.897833347320557|29.897833347320557|29.897833347320557|
|2020-01-08| 32.80933380126953| 30.48013343811035| 30.48013343811035| 30.48013343811035| 30.48013343811035|
|2020-01-09|32.089332580566406| 30.74833329518636| 30.74833329518636| 30.74833329518636| 30.74833329518636|


In [12]:
# Calculate volatility (30-day rolling standard deviation)
df = df.withColumn("Volatility_30", F.stddev("Daily_Return").over(window_30))

# Calculate RSI (Relative Strength Index)
df = df.withColumn("Gain", F.when(F.col("Daily_Change") > 0, F.col("Daily_Change")).otherwise(0)) \
       .withColumn("Loss", F.when(F.col("Daily_Change") < 0, -F.col("Daily_Change")).otherwise(0))

window_14 = Window.orderBy("Date").rowsBetween(-13, 0)
df = df.withColumn("Avg_Gain", F.avg("Gain").over(window_14)) \
       .withColumn("Avg_Loss", F.avg("Loss").over(window_14)) \
       .withColumn("RS", F.col("Avg_Gain") / F.col("Avg_Loss")) \
       .withColumn("RSI", 100 - (100 / (1 + F.col("RS"))))

print("Volatility and RSI calculated.")

Volatility and RSI calculated.


## 5. Summary and Cleanup

In [13]:
# Final summary
print("="*60)
print("TESLA STOCK ANALYSIS COMPLETE")
print("="*60)
print("\nDataFrame is ready for further analysis.")
print("You can now run SQL queries, create visualizations, or perform additional analysis.")
print("\nTo stop the Spark session, run: spark.stop()")

TESLA STOCK ANALYSIS COMPLETE

DataFrame is ready for further analysis.
You can now run SQL queries, create visualizations, or perform additional analysis.

To stop the Spark session, run: spark.stop()
