In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# 1. Initialize Spark Session
spark = SparkSession.builder.appName("CustomerKMeansClustering").getOrCreate()

# 2. Load dataset
file_path = "Customer_Data.csv"  # adjust as needed
df = spark.read.csv(file_path, header=True, inferSchema=True)

# 3. Choose two numeric features to cluster on (e.g. BALANCE and PURCHASES)
features = ["BALANCE", "PURCHASES"]

# 4. Assemble features into a single vector column
assembler = VectorAssembler(inputCols=features, outputCol="rawFeatures")
assembled = assembler.transform(df).na.drop()  # drop any rows with nulls

# 5. Standardize the feature vectors
scaler = StandardScaler(
    inputCol="rawFeatures",
    outputCol="features",
    withStd=True,
    withMean=False
)
scaler_model = scaler.fit(assembled)
scaled = scaler_model.transform(assembled)

# 6. Elbow Method: compute WSSSE for k = 2…10
wssse_list = []
print("=== Elbow Method (WSSSE) ===")
for k in range(2, 11):
    km = KMeans(featuresCol="features", k=k, seed=42)
    model = km.fit(scaled)
    cost = model.summary.trainingCost
    wssse_list.append((k, cost))
    print(f"k={k:2d}, WSSSE={cost:.3f}")
print("============================\n")

# 7. Manually set optimal_k based on the elbow plot
optimal_k = 3  # ← update this after you inspect the printed WSSSE

# 8. Fit final KMeans model
km_final = KMeans(featuresCol="features", k=optimal_k, seed=42)
model_final = km_final.fit(scaled)
clusters = model_final.transform(scaled)

# 9. Show a sample of BALANCE, PURCHASES and assigned cluster
print("=== Sample cluster assignments ===")
clusters.select("BALANCE", "PURCHASES", "prediction").show(10, truncate=False)

# 10. Convert to pandas for plotting
clusters_pd = clusters.select("BALANCE", "PURCHASES", "prediction").toPandas()

# 11. Recover original-scale centroids
#     Note: scaler_model.std is a DenseVector of std devs
std_vector = scaler_model.std.toArray()
scaled_centers = np.array(model_final.clusterCenters())
orig_centers = scaled_centers * std_vector  # inverse scaling

# 12. Plot clusters with seaborn + centroids
plt.figure(figsize=(8,6))
sns.scatterplot(
    data=clusters_pd,
    x="BALANCE", y="PURCHASES",
    hue="prediction", palette="tab10", s=50
)
plt.scatter(
    orig_centers[:, 0], orig_centers[:, 1],
    marker='X', s=200, color='red', label='Centroids'
)
plt.title(f"K-Means Clustering on Customer Data (k={optimal_k})")
plt.xlabel("Balance")
plt.ylabel("Purchases")
plt.legend(title="Cluster")
plt.tight_layout()
plt.show()

# 13. Stop Spark session
spark.stop()
