## Import

In [136]:
import time
import psutil
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, desc, lag, to_date, expr, abs
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.regression import LinearRegression

## Pandas Settings

In [137]:
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

## Metric Function

In [154]:
process = psutil.Process()
def metrics_start():
    return {"time": time.time(), "mem": process.memory_info().rss, "cpu": psutil.cpu_percent(interval=None)}

def metrics_end(start, df=None):
    end_time = time.time()
    end_mem = process.memory_info().rss
    end_cpu = psutil.cpu_percent(interval=None)

    partitions = df.rdd.getNumPartitions() if df else None
    rows_per_partition = df.rdd.glom().map(len).collect() if df else None
    cores = psutil.cpu_count(logical=True)

    return {
        "time": end_time - start["time"],
        "mem_used_MB": (end_mem - start["mem"]) / (1024**2),
        "partitions": partitions,
        "rows_per_partition": rows_per_partition,
        "cores": cores
    }

## Pyspark Session

In [157]:
df = spark.read.option("header", True).option("inferSchema", True) \
        .csv("datasets/tesla_deliveries_dataset_2015_2025.csv") \
        .repartition(4)  # initial repartition
print("Initial Num partitions:", df.rdd.getNumPartitions())

Initial Num partitions: 4


## Dataset

In [141]:
df = spark.read.csv("./datasets/tesla_deliveries_dataset_2015_2025.csv", header=True, inferSchema=True)
df.cache()
df.show(5)
df.printSchema()

+----+-----+-------------+-------+--------------------+----------------+-------------+--------------------+--------+--------------+--------------------+-----------------+
|Year|Month|       Region|  Model|Estimated_Deliveries|Production_Units|Avg_Price_USD|Battery_Capacity_kWh|Range_km|CO2_Saved_tons|         Source_Type|Charging_Stations|
+----+-----+-------------+-------+--------------------+----------------+-------------+--------------------+--------+--------------+--------------------+-----------------+
|2023|    5|       Europe|Model S|               17646|           17922|     92874.27|                 120|     704|       1863.42|Interpolated (Month)|            12207|
|2015|    2|         Asia|Model X|                3797|            4164|     62205.65|                  75|     438|        249.46|  Official (Quarter)|             7640|
|2019|    1|North America|Model X|                8411|            9189|    117887.32|                  82|     480|        605.59|Interpolated (

## Analytics

In [158]:
metrics_sp = []

In [159]:
analytics_results = []

# 1. Time-Series: Year-Month + 12M Rolling Avg
print("=== 1. Time-Series Analysis ===")
start = metrics_start()
ts = df.withColumn("Date", to_date(expr("concat(Year,'-',Month,'-01')")))
window_12 = Window.orderBy("Date").rowsBetween(-11, 0)
ts = ts.groupBy("Date").agg(sum("Estimated_Deliveries").alias("Monthly_Deliveries")) \
       .repartition(4)
ts = ts.withColumn("Rolling_12M_Avg", avg("Monthly_Deliveries").over(window_12)).orderBy("Date")
ts_metrics = metrics_end(start, ts)
ts.show(3, False)
print("Metrics:", ts_metrics)
analytics_results.append(("Time-Series", ts_metrics))

# 2. Z-Score Anomaly Detection
print("=== 2. Z-Score Anomaly Detection ===")
start = metrics_start()
stats = df.select(mean("Estimated_Deliveries").alias("mean_del"),
                  stddev("Estimated_Deliveries").alias("std_del")).collect()[0]
mean_del, std_del = stats
outliers = df.withColumn("Zscore", (col("Estimated_Deliveries") - mean_del)/std_del) \
             .filter(abs(col("Zscore")) > 3).repartition(4)
outliers_metrics = metrics_end(start, outliers)
outliers.show(3)
print("Metrics:", outliers_metrics)
analytics_results.append(("Z-Score Anomaly", outliers_metrics))

# 3. Model-Level Z-Score
print("=== 3. Model Z-Score ===")
w = Window.partitionBy("Model")
start = metrics_start()
df_model_z = df.withColumn("mean_del", mean("Estimated_Deliveries").over(w)) \
               .withColumn("std_del", stddev("Estimated_Deliveries").over(w)) \
               .withColumn("Zscore", (col("Estimated_Deliveries") - col("mean_del")) / col("std_del")) \
               .filter(abs(col("Zscore")) > 3).repartition(4)
model_z_metrics = metrics_end(start, df_model_z)
df_model_z.show(3)
print("Metrics:", model_z_metrics)
analytics_results.append(("Model Z-Score", model_z_metrics))

# 4. Supply Chain Stress Index
print("=== 4. Supply Chain Stress Index ===")
start = metrics_start()
supply_chain = df.groupBy("Year").agg(
    sum("Production_Units").alias("Prod"),
    sum("Estimated_Deliveries").alias("Del")
).withColumn("Stress_Index", (col("Prod") - col("Del")) / col("Del")) \
 .orderBy("Year").repartition(4)
supply_chain_metrics = metrics_end(start, supply_chain)
supply_chain.show(3)
print("Metrics:", supply_chain_metrics)
analytics_results.append(("Supply Chain Stress", supply_chain_metrics))

# 5. CO2 Savings Intensity
print("=== 5. CO2 Savings Intensity ===")
start = metrics_start()
env_index = df.groupBy("Model").agg(
    (sum("CO2_Saved_tons") / sum("Estimated_Deliveries")).alias("CO2_per_Car"),
    avg("Range_km").alias("Avg_Range")
).orderBy(desc("CO2_per_Car")).repartition(4)
env_index_metrics = metrics_end(start, env_index)
env_index.show(3)
print("Metrics:", env_index_metrics)
analytics_results.append(("CO2 Savings Intensity", env_index_metrics))

# 6. Linear Regression Forecast
print("=== 6. Linear Regression Forecast ===")
yearly = df.groupBy("Year").agg(sum("Estimated_Deliveries").alias("Deliveries")).repartition(4)
assembler = VectorAssembler(inputCols=["Year"], outputCol="features")
train_df = assembler.transform(yearly)
start = metrics_start()
lr = LinearRegression(featuresCol="features", labelCol="Deliveries")
model = lr.fit(train_df)
pred_df = spark.createDataFrame([(2026,), (2027,)], ["Year"])
pred = model.transform(assembler.transform(pred_df))
lr_metrics = metrics_end(start, pred)
pred.show()
print("Metrics:", lr_metrics)
analytics_results.append(("Linear Regression", lr_metrics))

# 7. Price Elasticity Correlation
print("=== 7. Price Elasticity Correlation ===")
start = metrics_start()
elasticity = df.stat.corr("Avg_Price_USD", "Estimated_Deliveries")
elasticity_metrics = metrics_end(start, df)
print("Price–Demand Correlation:", elasticity)
print("Metrics:", elasticity_metrics)
analytics_results.append(("Price Elasticity Correlation", elasticity_metrics))

# 8. K-Means Clustering
print("=== 8. K-Means Clustering ===")
vec = VectorAssembler(inputCols=["Estimated_Deliveries", "Battery_Capacity_kWh", "Range_km", "Avg_Price_USD"],
                      outputCol="features_raw").transform(df)
scaler = StandardScaler(inputCol="features_raw", outputCol="features", withMean=True, withStd=True)
scaled = scaler.fit(vec).transform(vec).repartition(4)
start = metrics_start()
kmeans = KMeans(k=4, seed=42)
km_model = kmeans.fit(scaled)
clusters = km_model.transform(scaled)
clusters_metrics = metrics_end(start, clusters)
clusters.groupBy("prediction").count().show()
print("Metrics:", clusters_metrics)
analytics_results.append(("K-Means Clustering", clusters_metrics))

# 9. Regional Performance Index
print("=== 9. Regional Performance Index ===")
start = metrics_start()
region_perf = df.groupBy("Region").agg(
    sum("Estimated_Deliveries").alias("Deliveries"),
    sum("CO2_Saved_tons").alias("CO2"),
    avg("Range_km").alias("Avg_Range")
).withColumn("Score", col("Deliveries")*0.5 + col("CO2")*0.3 + col("Avg_Range")*0.2).orderBy(desc("Score")).repartition(4)
region_perf_metrics = metrics_end(start, region_perf)
region_perf.show(3)
print("Metrics:", region_perf_metrics)
analytics_results.append(("Regional Performance", region_perf_metrics))

# 10. Model Lifecycle Analysis
print("=== 10. Model Lifecycle Analysis ===")
w = Window.partitionBy("Model").orderBy("Year")
start = metrics_start()
lifecycle = df.groupBy("Model", "Year").agg(
    sum("Estimated_Deliveries").alias("Del"),
    sum("Production_Units").alias("Prod")
).withColumn("YoY_Del_Growth", (col("Del") - lag("Del").over(w))/lag("Del").over(w)) \
 .withColumn("YoY_Prod_Growth", (col("Prod") - lag("Prod").over(w))/lag("Prod").over(w)) \
 .orderBy("Model", "Year").repartition(4)
lifecycle_metrics = metrics_end(start, lifecycle)
lifecycle.show(3)
print("Metrics:", lifecycle_metrics)
analytics_results.append(("Model Lifecycle", lifecycle_metrics))


=== 1. Time-Series Analysis ===
+----------+------------------+---------------+
|Date      |Monthly_Deliveries|Rolling_12M_Avg|
+----------+------------------+---------------+
|2015-01-01|183180            |183180.0       |
|2015-02-01|165053            |174116.5       |
|2015-03-01|184567            |177600.0       |
+----------+------------------+---------------+
only showing top 3 rows

Metrics: {'time': 0.0657191276550293, 'mem_used_MB': 0.125, 'partitions': 1, 'rows_per_partition': [132], 'cores': 14}
=== 2. Z-Score Anomaly Detection ===
+----+-----+-----------+-------+--------------------+----------------+-------------+--------------------+--------+--------------+--------------------+-----------------+------------------+
|Year|Month|     Region|  Model|Estimated_Deliveries|Production_Units|Avg_Price_USD|Battery_Capacity_kWh|Range_km|CO2_Saved_tons|         Source_Type|Charging_Stations|            Zscore|
+----+-----+-----------+-------+--------------------+----------------+-----

## Metrics

In [160]:
import pandas as pd
summary_df = pd.DataFrame([{"analytics": a[0], **a[1]} for a in analytics_results])
print("\n=== PySpark Analytics Summary ===")
print(summary_df)


=== PySpark Analytics Summary ===
                      analytics      time  mem_used_MB  partitions  \
0                   Time-Series  0.065719        0.125           1   
1               Z-Score Anomaly  0.102352        0.000           4   
2                 Model Z-Score  0.018796        0.000           4   
3           Supply Chain Stress  0.027906        0.000           4   
4         CO2 Savings Intensity  0.015924        0.000           4   
5             Linear Regression  0.349423        0.125          14   
6  Price Elasticity Correlation  0.096234        0.000           4   
7            K-Means Clustering  1.333870        0.000           4   
8          Regional Performance  0.032422        0.000           4   
9               Model Lifecycle  0.035321        0.000           4   

                           rows_per_partition  cores  
0                                       [132]     14  
1                                [1, 2, 0, 1]     14  
2                            