In [1]:
from __future__ import print_function
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

In [2]:
# create a spark Sesssion 
spark = SparkSession.builder.appName("KmeansClusterDemo").getOrCreate()


### Read the file 

In [3]:
# download https://www.kaggle.com/shwetabh123/mall-customers

data  = spark.read.option("header", "true").option("inferSchma","true").csv("Mall_Customers.csv")

In [9]:
data.printSchema()

root
 |-- CustomerID: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Annual Income (k$): string (nullable = true)
 |-- Spending Score (1-100): string (nullable = true)



In [4]:
data.show()

+----------+------+---+------------------+----------------------+
|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+
|      0001|  Male| 19|                15|                    39|
|      0002|  Male| 21|                15|                    81|
|      0003|Female| 20|                16|                     6|
|      0004|Female| 23|                16|                    77|
|      0005|Female| 31|                17|                    40|
|      0006|Female| 22|                17|                    76|
|      0007|Female| 35|                18|                     6|
|      0008|Female| 23|                18|                    94|
|      0009|  Male| 64|                19|                     3|
|      0010|Female| 30|                19|                    72|
|      0011|  Male| 67|                19|                    14|
|      0012|Female| 35|                19|                    99|
|      001

In [6]:
# convert the fileds into feature 

cols = data.select("Annual Income (k$)","Spending Score (1-100)").columns

print(cols)

clean_df = data.select("Annual Income (k$)","Spending Score (1-100)")

clean_df = clean_df.select(*(col(c).cast("float").alias(c) for c in cols))

vecAssembler = VectorAssembler(inputCols=cols, outputCol="features")

vector_DF =   vecAssembler.transform(clean_df)

vector_DF.show()



['Annual Income (k$)', 'Spending Score (1-100)']
+------------------+----------------------+-----------+
|Annual Income (k$)|Spending Score (1-100)|   features|
+------------------+----------------------+-----------+
|              15.0|                  39.0|[15.0,39.0]|
|              15.0|                  81.0|[15.0,81.0]|
|              16.0|                   6.0| [16.0,6.0]|
|              16.0|                  77.0|[16.0,77.0]|
|              17.0|                  40.0|[17.0,40.0]|
|              17.0|                  76.0|[17.0,76.0]|
|              18.0|                   6.0| [18.0,6.0]|
|              18.0|                  94.0|[18.0,94.0]|
|              19.0|                   3.0| [19.0,3.0]|
|              19.0|                  72.0|[19.0,72.0]|
|              19.0|                  14.0|[19.0,14.0]|
|              19.0|                  99.0|[19.0,99.0]|
|              20.0|                  15.0|[20.0,15.0]|
|              20.0|                  77.0|[20.0,77.0]|

In [7]:
#Trains a k-means model.
kmeans = KMeans().setK(6).setSeed(1)
model = kmeans.fit(vector_DF)

predictions = model.transform(vector_DF)
predictions.show()


+------------------+----------------------+-----------+----------+
|Annual Income (k$)|Spending Score (1-100)|   features|prediction|
+------------------+----------------------+-----------+----------+
|              15.0|                  39.0|[15.0,39.0]|         5|
|              15.0|                  81.0|[15.0,81.0]|         1|
|              16.0|                   6.0| [16.0,6.0]|         5|
|              16.0|                  77.0|[16.0,77.0]|         1|
|              17.0|                  40.0|[17.0,40.0]|         5|
|              17.0|                  76.0|[17.0,76.0]|         1|
|              18.0|                   6.0| [18.0,6.0]|         5|
|              18.0|                  94.0|[18.0,94.0]|         1|
|              19.0|                   3.0| [19.0,3.0]|         5|
|              19.0|                  72.0|[19.0,72.0]|         1|
|              19.0|                  14.0|[19.0,14.0]|         5|
|              19.0|                  99.0|[19.0,99.0]|       

In [8]:
#

evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
print("Cluster Centers: ")
centers = model.clusterCenters()
for center in centers:
    print(center)

Silhouette with squared euclidean distance = 0.6283328590841997
Cluster Centers: 
[63.72093023 46.1627907 ]
[25.0952381  80.04761905]
[86.53846154 82.12820513]
[89.40625 15.59375]
[47.29545455 51.40909091]
[25.14285714 19.52380952]
