In [1]:
import pandas as pd
import numpy as np

pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)

from IPython.core.display import display, HTML
display(HTML("<style>.container { width:97% !important; }</style>"))
display(HTML("<style>.CodeMirror { font-size:11px !important; }</style>"))

In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[4]")\
        .appName("kmeans")\
        .config('spark.driver.memory', '2g')\
        .config('spark.executor.memory', '4g')\
        .getOrCreate()

sc = spark.sparkContext

In [3]:
from sklearn import datasets

iris = datasets.load_iris()
X = iris.data
y = iris.target

irisdf = pd.concat([pd.DataFrame(X, columns = iris.feature_names), pd.DataFrame(y, columns = ['label'])], axis = 1)
irisdf.columns = irisdf.columns.str.replace('[ ()-,cm]', '')
irisdf.head()

  irisdf.columns = irisdf.columns.str.replace('[ ()-,cm]', '')


Unnamed: 0,sepallength,sepalwidth,petallength,petalwidth,label
0,5.1,3.5,1.4,0.2,0
1,4.9,3.0,1.4,0.2,0
2,4.7,3.2,1.3,0.2,0
3,4.6,3.1,1.5,0.2,0
4,5.0,3.6,1.4,0.2,0


In [4]:
irisPARK = spark.createDataFrame(irisdf) 

In [5]:
irisPARK.show()

+-----------+----------+-----------+----------+-----+
|sepallength|sepalwidth|petallength|petalwidth|label|
+-----------+----------+-----------+----------+-----+
|        5.1|       3.5|        1.4|       0.2|    0|
|        4.9|       3.0|        1.4|       0.2|    0|
|        4.7|       3.2|        1.3|       0.2|    0|
|        4.6|       3.1|        1.5|       0.2|    0|
|        5.0|       3.6|        1.4|       0.2|    0|
|        5.4|       3.9|        1.7|       0.4|    0|
|        4.6|       3.4|        1.4|       0.3|    0|
|        5.0|       3.4|        1.5|       0.2|    0|
|        4.4|       2.9|        1.4|       0.2|    0|
|        4.9|       3.1|        1.5|       0.1|    0|
|        5.4|       3.7|        1.5|       0.2|    0|
|        4.8|       3.4|        1.6|       0.2|    0|
|        4.8|       3.0|        1.4|       0.1|    0|
|        4.3|       3.0|        1.1|       0.1|    0|
|        5.8|       4.0|        1.2|       0.2|    0|
|        5.7|       4.4|    

In [6]:
irisPARK.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+
|summary|       sepallength|        sepalwidth|       petallength|        petalwidth|             label|
+-------+------------------+------------------+------------------+------------------+------------------+
|  count|               150|               150|               150|               150|               150|
|   mean| 5.843333333333334|3.0573333333333337|3.7580000000000005|1.1993333333333331|               1.0|
| stddev|0.8280661279778623|0.4358662849366982|1.7652982332594664|0.7622376689603466|0.8192319205190405|
|    min|               4.3|               2.0|               1.0|               0.1|                 0|
|    max|               7.9|               4.4|               6.9|               2.5|                 2|
+-------+------------------+------------------+------------------+------------------+------------------+



In [7]:
import pyspark.sql.functions as F

In [8]:
irisPARK.groupby('label').agg(F.count("*").alias('Count')).show()

+-----+-----+
|label|Count|
+-----+-----+
|    0|   50|
|    1|   50|
|    2|   50|
+-----+-----+



In [9]:
irisPARK.printSchema()

root
 |-- sepallength: double (nullable = true)
 |-- sepalwidth: double (nullable = true)
 |-- petallength: double (nullable = true)
 |-- petalwidth: double (nullable = true)
 |-- label: long (nullable = true)



In [10]:
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["sepallength", "sepalwidth", "petallength", "petalwidth"], outputCol="features")

In [11]:
vected = vecAssembler.transform(irisPARK)
vected.show()

+-----------+----------+-----------+----------+-----+-----------------+
|sepallength|sepalwidth|petallength|petalwidth|label|         features|
+-----------+----------+-----------+----------+-----+-----------------+
|        5.1|       3.5|        1.4|       0.2|    0|[5.1,3.5,1.4,0.2]|
|        4.9|       3.0|        1.4|       0.2|    0|[4.9,3.0,1.4,0.2]|
|        4.7|       3.2|        1.3|       0.2|    0|[4.7,3.2,1.3,0.2]|
|        4.6|       3.1|        1.5|       0.2|    0|[4.6,3.1,1.5,0.2]|
|        5.0|       3.6|        1.4|       0.2|    0|[5.0,3.6,1.4,0.2]|
|        5.4|       3.9|        1.7|       0.4|    0|[5.4,3.9,1.7,0.4]|
|        4.6|       3.4|        1.4|       0.3|    0|[4.6,3.4,1.4,0.3]|
|        5.0|       3.4|        1.5|       0.2|    0|[5.0,3.4,1.5,0.2]|
|        4.4|       2.9|        1.4|       0.2|    0|[4.4,2.9,1.4,0.2]|
|        4.9|       3.1|        1.5|       0.1|    0|[4.9,3.1,1.5,0.1]|
|        5.4|       3.7|        1.5|       0.2|    0|[5.4,3.7,1.

In [12]:
train, test = vected.randomSplit([0.8, 0.2], seed = 48)
train.count()

123

In [13]:
test.count()

27

In [14]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import ClusteringEvaluator

In [15]:
kmeans = KMeans() \
.setK(3) \
.setSeed(48) \
.setFeaturesCol("features") \
.setPredictionCol("cluster")

In [16]:
model = kmeans.fit(train)

In [17]:
pred_df = model.transform(test)

pred_df.toPandas()

Unnamed: 0,sepallength,sepalwidth,petallength,petalwidth,label,features,cluster
0,4.6,3.6,1.0,0.2,0,"[4.6, 3.6, 1.0, 0.2]",1
1,4.8,3.0,1.4,0.1,0,"[4.8, 3.0, 1.4, 0.1]",1
2,4.8,3.4,1.6,0.2,0,"[4.8, 3.4, 1.6, 0.2]",1
3,4.9,3.1,1.5,0.2,0,"[4.9, 3.1, 1.5, 0.2]",1
4,5.0,3.4,1.5,0.2,0,"[5.0, 3.4, 1.5, 0.2]",1
5,5.1,3.5,1.4,0.2,0,"[5.1, 3.5, 1.4, 0.2]",1
6,5.4,3.4,1.7,0.2,0,"[5.4, 3.4, 1.7, 0.2]",1
7,5.4,3.7,1.5,0.2,0,"[5.4, 3.7, 1.5, 0.2]",1
8,5.5,4.2,1.4,0.2,0,"[5.5, 4.2, 1.4, 0.2]",1
9,4.5,2.3,1.3,0.3,0,"[4.5, 2.3, 1.3, 0.3]",1


In [18]:
pred_df.groupBy('cluster').count().show()

+-------+-----+
|cluster|count|
+-------+-----+
|      1|   12|
|      2|    8|
|      0|    7|
+-------+-----+



In [19]:
for k in range(2, 11):
    
    kmeans = KMeans() \
    .setK(k) \
    .setSeed(48) \
    .setFeaturesCol("features") \
    .setPredictionCol("cluster")
    
    model = kmeans.fit(train)
    
    pred_df = model.transform(test)
    
    evaluator = ClusteringEvaluator() \
    .setFeaturesCol("features") \
    .setPredictionCol("cluster") \
    .setMetricName("silhouette")
    
    score = evaluator.evaluate(pred_df)
    
    print(k, score)

2 0.843481585592055
3 0.8078282316788999
4 0.614919838384644
5 0.4989573104941077
6 0.42917962895504586
7 0.49141691856535913
8 0.5510252277651395
9 0.6227684290871175
10 0.5070393849996856
