In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install Spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz

# Unzip the Spark file to the current folder
!tar xf spark-3.0.3-bin-hadoop3.2.tgz

# Install findspark
!pip install -q findspark

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop3.2"

# Start a SparkSession
import findspark
findspark.init()

# Import SparkSession
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Test Spark
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3)



+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows



In [None]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.ml.feature import VectorAssembler, StandardScaler

In [None]:
df = spark.read.csv('/content/customer_shopping_data.csv',header=True,escape="\"")

df_sample = df.sample(withReplacement=False, fraction=0.1, seed=42).limit(10000)

df_sample.printSchema()
df_sample.show(5)

df_sample = df_sample.withColumn("quantity", col("quantity").cast(DoubleType()))
df_sample = df_sample.withColumn("price", col("price").cast(DoubleType()))
df_sample = df_sample.withColumn("age", col("age").cast(IntegerType()))

df_sample = df_sample.dropna()

feature_columns = ["quantity", "price", "age"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
assembled_df = assembler.transform(df_sample)

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
scaler_model = scaler.fit(assembled_df)
scaled_df = scaler_model.transform(assembled_df)

scaled_df.select("features", "scaledFeatures").show(5)

root
 |-- invoice_no: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- price: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- invoice_date: string (nullable = true)
 |-- shopping_mall: string (nullable = true)

+----------+-----------+------+---+---------------+--------+------+--------------+------------+-----------------+
|invoice_no|customer_id|gender|age|       category|quantity| price|payment_method|invoice_date|    shopping_mall|
+----------+-----------+------+---+---------------+--------+------+--------------+------------+-----------------+
|   I293112|    C176086|Female| 32|       Clothing|       2|600.16|   Credit Card|  13/01/2021| Mall of Istanbul|
|   I294687|    C300786|  Male| 65|          Books|       2|  30.3|    Debit Card|  16/01/2021|        Metrocity|
|   I993048|  

In [None]:
from pyspark.ml.clustering import KMeans, GaussianMixture
from pyspark.ml.evaluation import ClusteringEvaluator

In [None]:
# K-means Clustering
kmeans = KMeans(featuresCol="scaledFeatures", k=5, seed=13)
kmeans_model = kmeans.fit(scaled_df)
kmeans_predictions = kmeans_model.transform(scaled_df)

# Gaussian Mixture Models (GMM) Clustering
gmm = GaussianMixture(featuresCol="scaledFeatures", k=5, seed=13)
gmm_model = gmm.fit(scaled_df)
gmm_predictions = gmm_model.transform(scaled_df)

# Evaluate Clustering Performance
evaluator = ClusteringEvaluator(featuresCol="scaledFeatures")

# Silhouette Score
kmeans_silhouette = evaluator.evaluate(kmeans_predictions)
gmm_silhouette = evaluator.evaluate(gmm_predictions)
print(f"K-means Silhouette Score = {kmeans_silhouette}")
print(f"GMM Silhouette Score = {gmm_silhouette}")



K-means Silhouette Score = 0.484626984314677
GMM Silhouette Score = 0.1719341034165961


In [None]:
# Davies-Bouldin Index
def davies_bouldin_index(predictions, features_col, prediction_col="prediction"):
    from pyspark.ml.linalg import Vectors
    from sklearn.metrics import davies_bouldin_score
    pdf = predictions.select(features_col, prediction_col).toPandas()
    X = list(pdf[features_col].apply(lambda x: Vectors.dense(x).toArray()))
    labels = pdf[prediction_col]
    return davies_bouldin_score(X, labels)

kmeans_dbi = davies_bouldin_index(kmeans_predictions, "scaledFeatures")
gmm_dbi = davies_bouldin_index(gmm_predictions, "scaledFeatures")
print(f"K-means Davies-Bouldin Index = {kmeans_dbi}")
print(f"GMM Davies-Bouldin Index = {gmm_dbi}")

# Calinski-Harabasz Index
def calinski_harabasz_index(predictions, features_col, prediction_col="prediction"):
    from pyspark.ml.linalg import Vectors
    from sklearn.metrics import calinski_harabasz_score
    pdf = predictions.select(features_col, prediction_col).toPandas()
    X = list(pdf[features_col].apply(lambda x: Vectors.dense(x).toArray()))
    labels = pdf[prediction_col]
    return calinski_harabasz_score(X, labels)

kmeans_chi = calinski_harabasz_index(kmeans_predictions, "scaledFeatures")
gmm_chi = calinski_harabasz_index(gmm_predictions, "scaledFeatures")
print(f"K-means Calinski-Harabasz Index = {kmeans_chi}")
print(f"GMM Calinski-Harabasz Index = {gmm_chi}")

# Stop the Spark session
spark.stop()

K-means Davies-Bouldin Index = 0.9934456903165045
GMM Davies-Bouldin Index = 1.8933869200209297
K-means Calinski-Harabasz Index = 5447.753674244457
GMM Calinski-Harabasz Index = 2048.7572789884725
