In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.master("local[4]") \
.appName("MallCustomersClustering") \
.config("spark.executor.memory", "1g") \
.config("spark.driver.memory", "1g") \
.getOrCreate()

In [None]:
mall = spark.read \
.option("header", True) \
.option("inferSchema", True) \
.option("sep", ",") \
.csv("/home/alper/Spark/data/Mall_Customers.csv")

In [None]:
mall.limit(5).toPandas().head()

In [None]:
mall.printSchema()

In [None]:
numeric = ["CustomerID", "Age", "Annual Income (k$)", "Spending Score (1-100)"]
categorical = ["Gender"] 

In [None]:
mall.describe(numeric).show()

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

In [None]:
vector_assembler = VectorAssembler() \
.setInputCols(["Annual Income (k$)", "Spending Score (1-100)"]) \
.setOutputCol("features")

In [None]:
standard_scaler = StandardScaler() \
.setInputCol("features") \
.setOutputCol("features_scaled")

In [None]:
mall.limit(5).toPandas().head()

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline

def clustering(mall, k):
    kmeans_object = KMeans() \
    .setK(k) \
    .setSeed(4242) \
    .setFeaturesCol("features_scaled") \
    .setPredictionCol("cluster")
    
    pipe = Pipeline() \
    .setStages([vector_assembler, standard_scaler, kmeans_object])
    
    pipe_model = pipe.fit(mall)
    return pipe_model

In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator

for k in range(2, 11):
    pipe_model = clustering(mall, k)
    transformedDF = pipe_model.transform(mall)
    
    evaluator = ClusteringEvaluator() \
    .setFeaturesCol("features_scaled") \
    .setPredictionCol("cluster") \
    .setMetricName("silhouette")
    
    score = evaluator.evaluate(transformedDF)
    print(k, "-", score)