In [1]:
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession

builder = (
    SparkSession.builder.appName("ML Project")
    .config("spark.driver.memory", "16g")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()




In [2]:
import os
from pyspark.sql import SparkSession

# My Spark session
spark

# Path to my folder with CSV files
folder = r"C:\Users\Ranveer Verma\Desktop\ML Project"

# Getting all CSV filenames
csv_files = [f for f in os.listdir(folder) if f.endswith(".csv")]
print(f"Found {len(csv_files)} CSV files.")

# Loading each CSV into a DataFrame
dfs = []
for file in csv_files:
    path = os.path.join(folder, file)
    df = spark.read.option("header", True).option("inferSchema", True).csv(path)
    dfs.append(df)
    print(f"✅ Loaded {file} with {df.count()} rows")


Found 8 CSV files.
✅ Loaded Friday-WorkingHours-Afternoon-DDos.pcap_ISCX.csv with 225745 rows
✅ Loaded Friday-WorkingHours-Afternoon-PortScan.pcap_ISCX.csv with 286467 rows
✅ Loaded Friday-WorkingHours-Morning.pcap_ISCX.csv with 191033 rows
✅ Loaded Monday-WorkingHours.pcap_ISCX.csv with 529918 rows
✅ Loaded Thursday-WorkingHours-Afternoon-Infilteration.pcap_ISCX.csv with 288602 rows
✅ Loaded Thursday-WorkingHours-Morning-WebAttacks.pcap_ISCX.csv with 170366 rows
✅ Loaded Tuesday-WorkingHours.pcap_ISCX.csv with 445909 rows
✅ Loaded Wednesday-workingHours.pcap_ISCX.csv with 692703 rows


In [None]:
# Combining all DataFrames
if dfs:
    combined_df = dfs[0]
    for df in dfs[1:]:
        combined_df = combined_df.unionByName(df, allowMissingColumns=True)
    print("✅ Combined all CSV files into a single DataFrame")
    print(f"Total rows: {combined_df.count()}")
else:
    print("⚠️ No CSV files found")


✅ Combined all CSV files into a single DataFrame
Total rows: 2830743


**PREPROCESSING**

In [4]:
from pyspark.sql.functions import col, trim

# 1️⃣ Remove leading/trailing spaces from all column names
combined_df = combined_df.toDF(*[c.strip() for c in combined_df.columns])

# 2️⃣ Trim whitespace from the Label column (important for CIC-IDS)
df_filled = combined_df.withColumn("Label", trim(col("Label")))

print("✅ df_filled created successfully")
df_filled.printSchema()
df_filled.show(5)



✅ df_filled created successfully
root
 |-- Destination Port: integer (nullable = true)
 |-- Flow Duration: integer (nullable = true)
 |-- Total Fwd Packets: integer (nullable = true)
 |-- Total Backward Packets: integer (nullable = true)
 |-- Total Length of Fwd Packets: integer (nullable = true)
 |-- Total Length of Bwd Packets: integer (nullable = true)
 |-- Fwd Packet Length Max: integer (nullable = true)
 |-- Fwd Packet Length Min: integer (nullable = true)
 |-- Fwd Packet Length Mean: double (nullable = true)
 |-- Fwd Packet Length Std: double (nullable = true)
 |-- Bwd Packet Length Max: integer (nullable = true)
 |-- Bwd Packet Length Min: integer (nullable = true)
 |-- Bwd Packet Length Mean: double (nullable = true)
 |-- Bwd Packet Length Std: double (nullable = true)
 |-- Flow Bytes/s: double (nullable = true)
 |-- Flow Packets/s: double (nullable = true)
 |-- Flow IAT Mean: double (nullable = true)
 |-- Flow IAT Std: double (nullable = true)
 |-- Flow IAT Max: integer (nulla

In [5]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

distinct_labels = df_filled.select("Label").distinct()
window = Window.orderBy("Label")
label_mapping = distinct_labels.withColumn("label_index", row_number().over(window) - 1)


In [6]:
df_indexed = df_filled.join(label_mapping, on="Label", how="left")
df_indexed.select("Label", "label_index").show(5)


+------+-----------+
| Label|label_index|
+------+-----------+
|BENIGN|          0|
|BENIGN|          0|
|BENIGN|          0|
|BENIGN|          0|
|BENIGN|          0|
+------+-----------+
only showing top 5 rows


In [7]:
df_indexed.select("Label", "label_index").distinct().orderBy("label_index").show(50, truncate=False)

+--------------------------+-----------+
|Label                     |label_index|
+--------------------------+-----------+
|BENIGN                    |0          |
|Bot                       |1          |
|DDoS                      |2          |
|DoS GoldenEye             |3          |
|DoS Hulk                  |4          |
|DoS Slowhttptest          |5          |
|DoS slowloris             |6          |
|FTP-Patator               |7          |
|Heartbleed                |8          |
|Infiltration              |9          |
|PortScan                  |10         |
|SSH-Patator               |11         |
|Web Attack � Brute Force  |12         |
|Web Attack � Sql Injection|13         |
|Web Attack � XSS          |14         |
+--------------------------+-----------+



In [8]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

distinct_labels = df_filled.select("Label").distinct()
window = Window.orderBy("Label")
label_mapping = distinct_labels.withColumn("label_index", row_number().over(window) - 1)


In [9]:
# Get all numeric columns (excluding the label)
numeric_cols = [col_name for col_name, dtype in df_indexed.dtypes if dtype in ('int', 'double', 'float')]

# Showing first few numeric columns for verification
print(numeric_cols[:10])


['Destination Port', 'Flow Duration', 'Total Fwd Packets', 'Total Backward Packets', 'Total Length of Fwd Packets', 'Total Length of Bwd Packets', 'Fwd Packet Length Max', 'Fwd Packet Length Min', 'Fwd Packet Length Mean', 'Fwd Packet Length Std']


In [10]:
from pyspark.sql.types import IntegerType, DoubleType, FloatType, LongType
from pyspark.sql.functions import col, isnan, when, count, expr
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql import functions as F

# -----------------------------
# 1. Select numeric columns
# -----------------------------
numeric_cols = [
    f.name for f in df_indexed.schema.fields
    if isinstance(f.dataType, (IntegerType, DoubleType, FloatType, LongType))
]

# -----------------------------
# 2. Fill NaNs with 0
# -----------------------------
df_indexed = df_indexed.fillna(0, subset=numeric_cols)

# -----------------------------
# 3. Remove constant columns
# -----------------------------
non_constant_cols = []
for c in numeric_cols:
    stats = df_indexed.agg(F.max(c).alias("max_val"), F.min(c).alias("min_val")).collect()[0]
    if stats["max_val"] != stats["min_val"]:
        non_constant_cols.append(c)
numeric_cols = non_constant_cols
print("Numeric columns after removing constants:", numeric_cols[:10])

# -----------------------------
# 4. Remove zero-variance columns
# -----------------------------
stddev_cols = [
    (c, df_indexed.agg(F.stddev(c).alias("std")).collect()[0]["std"])
    for c in numeric_cols
]
non_zero_std_cols = [c for c, std in stddev_cols if std and std > 0]
print("Numeric columns after removing zero-variance:", non_zero_std_cols[:10])

# Exclude label column
feature_cols = [c for c in non_zero_std_cols if c != "label_index"]

# -----------------------------
# 5. Assemble features
# -----------------------------
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features_vector"
)
df_vector = assembler.transform(df_indexed)

# -----------------------------
# 6. Standardize features
# -----------------------------
scaler = StandardScaler(
    inputCol="features_vector",
    outputCol="scaled_features",
    withMean=False,
    withStd=True
)
scaler_model = scaler.fit(df_vector)
df_scaled = scaler_model.transform(df_vector)

# -----------------------------
# 7. Quick look at final DataFrame that we get
# -----------------------------
df_scaled.select("scaled_features", "label_index").show(5, truncate=False)

print("✅ Preprocessing complete. Data ready for ML!")



Numeric columns after removing constants: ['Destination Port', 'Flow Duration', 'Total Fwd Packets', 'Total Backward Packets', 'Total Length of Fwd Packets', 'Total Length of Bwd Packets', 'Fwd Packet Length Max', 'Fwd Packet Length Min', 'Fwd Packet Length Mean', 'Fwd Packet Length Std']
Numeric columns after removing zero-variance: ['Destination Port', 'Flow Duration', 'Total Fwd Packets', 'Total Backward Packets', 'Total Length of Fwd Packets', 'Total Length of Bwd Packets', 'Fwd Packet Length Max', 'Fwd Packet Length Min', 'Fwd Packet Length Mean', 'Fwd Packet Length Std']
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

****TRAINING THE MODEL NOW****

IMPLEMENTING LOGISTIC REGRESSION ON THE SAMPLE DATASET

In [12]:
# -----------------------------
# 1. Sample 700k rows from the dataset
# -----------------------------
sample_df = df_scaled.sample(withReplacement=False, fraction=0.25, seed=42)  
print(f"Sampled dataset count: {sample_df.count()}")

# -----------------------------
# 2. Split into train and test sets
# -----------------------------
train_df, test_df = sample_df.randomSplit([0.8, 0.2], seed=42)
print(f"Train count: {train_df.count()}, Test count: {test_df.count()}")

# -----------------------------
# 3. Training Logistic Regression on sample
# -----------------------------
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

lr = LogisticRegression(
    featuresCol="scaled_features",
    labelCol="label_index",
    maxIter=50,
    regParam=0.01,
    elasticNetParam=0.0
)

lr_model = lr.fit(train_df)

# -----------------------------
# 4. Predictions and Evaluation
# -----------------------------
predictions = lr_model.transform(test_df)

evaluator = MulticlassClassificationEvaluator(
    labelCol="label_index",
    predictionCol="prediction",
    metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)
print(f"✅ Test Accuracy on sample: {accuracy:.4f}")

#Showing the predictions
predictions.select("scaled_features", "label_index", "prediction").show(5, truncate=False)


Sampled dataset count: 708271
Train count: 567183, Test count: 141848
✅ Test Accuracy on sample: 0.9254
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+----------+
|scaled_features                                                                                                                                                                                                                                                                                                                                                                            

In [44]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# -----------------------------
# Binary Classification Evaluator
# -----------------------------
binary_evaluator = BinaryClassificationEvaluator(
    labelCol="label_index",
    rawPredictionCol="prediction",  
    metricName="areaUnderROC"        
)
roc_auc = binary_evaluator.evaluate(predictions)
print("ROC-AUC:", roc_auc)

# -----------------------------
# Multiclass Evaluator for precision, recall, f1
# -----------------------------
precision_eval = MulticlassClassificationEvaluator(
    labelCol="label_index", predictionCol="prediction", metricName="precisionByLabel"
)
recall_eval = MulticlassClassificationEvaluator(
    labelCol="label_index", predictionCol="prediction", metricName="recallByLabel"
)
f1_eval = MulticlassClassificationEvaluator(
    labelCol="label_index", predictionCol="prediction", metricName="f1"
)

precision = precision_eval.evaluate(predictions)
recall = recall_eval.evaluate(predictions)
f1 = f1_eval.evaluate(predictions)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-score: {f1:.4f}")


ROC-AUC: 0.851788351852596
Precision: 0.9369
Recall: 0.9801
F1-score: 0.9172


IMPLEMENTING RANDOM FOREST ON THE SAMPLE DATASET

In [24]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# -----------------------------
# 1. Defining Random Forest model
# -----------------------------
rf = RandomForestClassifier(
    featuresCol="scaled_features",
    labelCol="label_index",
    numTrees=50,            # number of trees in the forest
    maxDepth=10,            # maximum depth of each tree
    seed=42
)

# -----------------------------
# 2. Training the model
# -----------------------------
rf_model = rf.fit(train_df)

# -----------------------------
# 3. Predicting on test data
# -----------------------------
rf_predictions = rf_model.transform(test_df)

# -----------------------------
# 4. Evaluating the Accuracy
# -----------------------------
multi_evaluator = MulticlassClassificationEvaluator(
    labelCol="label_index",
    predictionCol="prediction",
    metricName="accuracy"
)
rf_accuracy = multi_evaluator.evaluate(rf_predictions)
print(f"✅ Random Forest Test Accuracy: {rf_accuracy:.4f}")

# -----------------------------
# 5. Precision, Recall, F1, ROC-AUC
# -----------------------------
binary_evaluator = BinaryClassificationEvaluator(
    labelCol="label_index",
    rawPredictionCol="prediction",
    metricName="areaUnderROC"
)
roc_auc = binary_evaluator.evaluate(rf_predictions)
precision = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="weightedPrecision").evaluate(rf_predictions)
recall = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="weightedRecall").evaluate(rf_predictions)
f1 = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="f1").evaluate(rf_predictions)

print(f"ROC-AUC: {roc_auc:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-score: {f1:.4f}")

# showing some predictions
rf_predictions.select("label_index", "prediction", "probability").show(5, truncate=False)


✅ Random Forest Test Accuracy: 0.9959
ROC-AUC: 0.9928
Precision: 0.9952
Recall: 0.9959
F1-score: 0.9953
+-----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label_index|prediction|probability                                                                                                                                                                                                                                                |
+-----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0          |0.0       |[0.9071927683284616,0.0,0.013204180848055

IMPLEMENTING DECISION TREE ON THE SAMPLE DATASET

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize Decision Tree
dt = DecisionTreeClassifier(
    labelCol="label_index",
    featuresCol="scaled_features",
    maxDepth=10,        # maximum depth of the tree
    seed=42
)

# Training the model
dt_model = dt.fit(train_df)

# Predictions
df_dt_preds = dt_model.transform(test_df)

# Evaluation
evaluator = MulticlassClassificationEvaluator(
    labelCol="label_index",
    predictionCol="prediction",
    metricName="accuracy"
)

accuracy = evaluator.evaluate(df_dt_preds)
print(f"✅ Decision Tree Test Accuracy: {accuracy:.4f}")

# Precision, Recall, F1
precision = evaluator.setMetricName("weightedPrecision").evaluate(df_dt_preds)
recall = evaluator.setMetricName("weightedRecall").evaluate(df_dt_preds)
f1 = evaluator.setMetricName("f1").evaluate(df_dt_preds)
print(f"Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}")



✅ Decision Tree Test Accuracy: 0.9960
Precision: 0.9957, Recall: 0.9960, F1-score: 0.9958


IMPLEMENTING NEURAL NETWORK ON THE SAMPLE DATASET

In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# -----------------------------
# 1. Defining the network layers
# -----------------------------
# Input layer = 68 (number of features)
# Hidden layers = we can try [128, 64] or [100, 50] as starting point
# Output layer = number of classes
layers = [68, 128, 64, 15]

# -----------------------------
# 2. Defining the model
# -----------------------------
mlp = MultilayerPerceptronClassifier(
    featuresCol="scaled_features",
    labelCol="label_index",
    maxIter=100,
    layers=layers,
    blockSize=128,
    seed=42
)

# -----------------------------
# 3. Training the model
# -----------------------------
mlp_model = mlp.fit(train_df)

# -----------------------------
# 4. Making the predictions
# -----------------------------
df_mlp_preds = mlp_model.transform(test_df)

# -----------------------------
# 5. Evaluating
# -----------------------------
evaluator = MulticlassClassificationEvaluator(
    labelCol="label_index",
    predictionCol="prediction",
    metricName="accuracy"
)
accuracy = evaluator.evaluate(df_mlp_preds)
precision = evaluator.evaluate(df_mlp_preds, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(df_mlp_preds, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(df_mlp_preds, {evaluator.metricName: "f1"})

print(f"✅ Neural Network Test Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-score: {f1:.4f}")

# Optional: Show predictions
df_mlp_preds.select("scaled_features", "label_index", "prediction").show(5, truncate=False)


✅ Neural Network Test Accuracy: 0.9737
Precision: 0.9705
Recall: 0.9737
F1-score: 0.9719
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+----------+
|scaled_features                                                                                                                                                                                                                                                                                                                                                                                           

IMPLEMENTING K-MEANS ON THE SAMPLE DATASET

In [53]:
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import col, count, desc

# -----------------------------
# 1. Train KMeans model
# -----------------------------
k = 15   # number of attack types (clusters)
kmeans = KMeans(
    featuresCol="scaled_features",
    k=k,
    seed=42
)

kmeans_model = kmeans.fit(train_df)
print("✅ KMeans model training complete")

# -----------------------------
# 2. Assign clusters on training set
# -----------------------------
df_kmeans_train = kmeans_model.transform(train_df)

# -----------------------------
# 3. Map clusters → real labels using majority vote
# -----------------------------
cluster_map = (
    df_kmeans_train.groupBy("prediction", "label_index")
    .agg(count("*").alias("count"))
    .orderBy("prediction", desc("count"))
)

cluster_map.show(15, truncate=False)

# Extract dominant label per cluster
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window = Window.partitionBy("prediction").orderBy(desc("count"))
cluster_label_map = cluster_map.withColumn("rn", row_number().over(window)).filter(col("rn") == 1)

print("✅ Cluster → Label mapping created")
cluster_label_map.show()

# -----------------------------
# 4. Test on full test_df
# -----------------------------
df_kmeans_test = kmeans_model.transform(test_df)

# ✅ Fix: rename label column before joining to avoid ambiguity
cluster_label_map_fixed = cluster_label_map.withColumnRenamed("label_index", "mapped_label")

predicted_df = df_kmeans_test.join(
    cluster_label_map_fixed,
    on="prediction",
    how="left"
).select(
    "scaled_features",
    df_kmeans_test["label_index"].alias("true_label"),
    col("mapped_label").alias("predicted_label")
)

print("✅ Cluster → Class assignment complete (no ambiguity)")

# ✅ Fix: cast to double to avoid IllegalArgumentException
predicted_df_fixed = predicted_df.withColumn(
    "true_label", col("true_label").cast("double")
).withColumn(
    "predicted_label", col("predicted_label").cast("double")
)

# -----------------------------
# 5. Evaluate mapping accuracy
# -----------------------------
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="true_label",
    predictionCol="predicted_label"
)

accuracy = evaluator.evaluate(predicted_df_fixed, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predicted_df_fixed, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predicted_df_fixed, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predicted_df_fixed, {evaluator.metricName: "f1"})

print("\n===== KMeans (Cluster-Mapped) Classification Report =====")
print(f"✅ Accuracy:  {accuracy:.4f}")
print(f"Precision:   {precision:.4f}")
print(f"Recall:      {recall:.4f}")
print(f"F1 Score:    {f1:.4f}")




✅ KMeans model training complete
+----------+-----------+------+
|prediction|label_index|count |
+----------+-----------+------+
|0         |0          |107235|
|0         |10         |31471 |
|0         |7          |793   |
|0         |3          |659   |
|0         |11         |615   |
|0         |4          |459   |
|0         |6          |406   |
|0         |2          |393   |
|0         |12         |272   |
|0         |1          |230   |
|0         |14         |133   |
|0         |5          |34    |
|0         |13         |3     |
|0         |9          |2     |
|1         |0          |197425|
+----------+-----------+------+
only showing top 15 rows
✅ Cluster → Label mapping created
+----------+-----------+------+---+
|prediction|label_index| count| rn|
+----------+-----------+------+---+
|         0|          0|107235|  1|
|         1|          0|197425|  1|
|         2|          0|    73|  1|
|         3|          0|  4701|  1|
|         4|          2|  2411|  1|
|         5|

IMPLEMENTING BISECTING K-MEANS ON THE SAMPLE DATASET

In [55]:
from pyspark.ml.clustering import BisectingKMeans
from pyspark.sql.functions import col, count, desc
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# -----------------------------
# 1. Training the Bisecting KMeans
# -----------------------------
bkmeans = BisectingKMeans(
    featuresCol="scaled_features",
    k=15,
    seed=42
)

bk_model = bkmeans.fit(train_df)
print("✅ Bisecting KMeans model training complete")

# -----------------------------
# 2. Assigning clusters on training set
# -----------------------------
df_bk_train = bk_model.transform(train_df)

# -----------------------------
# 3. Cluster → Label mapping using majority voting
# -----------------------------
cluster_map = (
    df_bk_train.groupBy("prediction", "label_index")
    .agg(count("*").alias("count"))
    .orderBy("prediction", desc("count"))
)

cluster_map.show(20, truncate=False)

# Picking the dominant class for each cluster
window = Window.partitionBy("prediction").orderBy(desc("count"))
cluster_label_map = cluster_map.withColumn("rn", row_number().over(window)).filter(col("rn") == 1)

cluster_label_map_fixed = cluster_label_map.withColumnRenamed("label_index", "mapped_label")
print("✅ Cluster → Label mapping created")

# -----------------------------
# 4. Assigning the predicted labels to test_df
# -----------------------------
df_bk_test = bk_model.transform(test_df)

predicted_df = df_bk_test.join(
    cluster_label_map_fixed, 
    on="prediction", 
    how="left"
).select(
    "scaled_features",
    df_bk_test["label_index"].alias("true_label"),
    col("mapped_label").alias("predicted_label")
)

# Casting to double (Spark evaluator requirement)
predicted_df = predicted_df.withColumn("true_label", col("true_label").cast("double")) \
                           .withColumn("predicted_label", col("predicted_label").cast("double"))

print("✅ Cluster → Class assignment complete")

# -----------------------------
# 5. Evaluation of the results
# -----------------------------
evaluator = MulticlassClassificationEvaluator(labelCol="true_label", predictionCol="predicted_label")

accuracy = evaluator.evaluate(predicted_df, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predicted_df, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predicted_df, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predicted_df, {evaluator.metricName: "f1"})

print("\n===== Bisecting KMeans Classification Report =====")
print(f"✅ Accuracy:  {accuracy:.4f}")
print(f"Precision:   {precision:.4f}")
print(f"Recall:      {recall:.4f}")
print(f"F1 Score:    {f1:.4f}")


✅ Bisecting KMeans model training complete
+----------+-----------+------+
|prediction|label_index|count |
+----------+-----------+------+
|0         |0          |30924 |
|0         |10         |30902 |
|0         |4          |1619  |
|0         |7          |793   |
|0         |6          |389   |
|0         |12         |244   |
|0         |1          |222   |
|0         |14         |130   |
|0         |5          |15    |
|0         |11         |6     |
|0         |2          |3     |
|1         |0          |195614|
|1         |5          |110   |
|1         |2          |13    |
|2         |0          |50709 |
|2         |4          |13506 |
|2         |2          |9191  |
|2         |10         |843   |
|2         |3          |279   |
|2         |1          |4     |
+----------+-----------+------+
only showing top 20 rows
✅ Cluster → Label mapping created
✅ Cluster → Class assignment complete

===== Bisecting KMeans Classification Report =====
✅ Accuracy:  0.8598
Precision:   0.7956
