# KMeans Clustering

In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('Data_mining').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [3]:
import numpy as np
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.functions import when, count, col, desc, asc, row_number, mean
from pyspark.sql.types import (StructField,FloatType,IntegerType,StructType)

In [4]:
df = spark.read.load("C:/Users/Tian/Documents/datasets/df_final.csv", format="csv", header="true")
df = (df.withColumn("Human_Development_Index", df["Human_Development_Index"].cast("float")) \
      .withColumn("Total_Case", df["Total_Case"].cast("float")) \
      .withColumn("Total_Death", df["Total_Death"].cast("float")) \
      .withColumn("GDP", df["GDP"].cast("float")) \
      .withColumn("GDP/Capita", df["GDP/Capita"].cast("float"))
      .withColumn("Poverty_Rate_(%)", df["Poverty_Rate_(%)"].cast("float")))
dm = df

In [5]:
train_data,test_data = df.randomSplit([0.7,0.3])
print("train_data: ", train_data.count())
print("test_data: ", test_data.count())

train_data:  91
test_data:  52


In [6]:
assembler = VectorAssembler(inputCols= df.columns, outputCol='features')
features = df.rdd.map(lambda r: r.features.array)
df = assembler.transform(df)

In [7]:
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures', withStd=True)
scaledModel = scaler.fit(df)
df = scaledModel.transform(df)

In [8]:
kmeans = KMeans(featuresCol='scaledFeatures', k=2)
model = kmeans.fit(df)
transformed = model.transform(df)
kmeans_labeled = KMeans(featuresCol='scaledFeatures', k=2, predictionCol ="cluster")
model_labeled = kmeans_labeled.fit(df)
prediction_labeled = model_labeled.transform(df)
#print(model_labeled)
prediction_labeled.show()

+-----------------------+----------+-----------+----------+----------+----------------+--------------------+--------------------+-------+
|Human_Development_Index|Total_Case|Total_Death|GDP/Capita|       GDP|Poverty_Rate_(%)|            features|      scaledFeatures|cluster|
+-----------------------+----------+-----------+----------+----------+----------------+--------------------+--------------------+-------+
|                  0.519|   9.72973|  5.4722705|  7.255902| 124.34163|            90.9|[0.51899999380111...|[3.07824805099661...|      1|
|                  0.417|  6.295266|        0.0|  6.554254|106.777145|            89.6|[0.41699999570846...|[2.47327444968539...|      1|
|                  0.477|  8.675393|   5.198497|  6.998548| 117.34302|            89.6|[0.47699999809265...|[2.82914129478151...|      1|
|                   0.71|  9.976088|   4.804021|  8.740833| 151.44487|            86.2|[0.70999997854232...|[4.21109070570253...|      1|
|                  0.455| 7.778630

In [9]:
eval = ClusteringEvaluator()

In [10]:
silhouette = eval.evaluate(transformed)
print(f"Silhoutte with squared euclidean distance: {silhouette}")

Silhoutte with squared euclidean distance: 0.6645799501535241


In [11]:
print(dm.columns)
centers = model_labeled.clusterCenters()
print("Cluster centers:")
for c in centers:
    print(c)

['Human_Development_Index', 'Total_Case', 'Total_Death', 'GDP/Capita', 'GDP', 'Poverty_Rate_(%)']
Cluster centers:
[4.77659186 6.91714522 3.56499457 8.38187138 6.76320686 0.20133645]
[3.21169814 5.79733544 2.52525685 6.81166103 5.30521967 1.80105946]
