## Import the Libraries

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("K-Means Clustering").getOrCreate()

## Download the Dataset

In [9]:
!wget https://raw.githubusercontent.com/mananparasher/Spark-Datasets/master/breast-cancer-wisconsin.csv

--2020-06-22 20:29:30--  https://raw.githubusercontent.com/mananparasher/Spark-Datasets/master/breast-cancer-wisconsin.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.52.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.52.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 24063 (23K) [text/plain]
Saving to: ‘breast-cancer-wisconsin.csv.1’


2020-06-22 20:29:31 (466 KB/s) - ‘breast-cancer-wisconsin.csv.1’ saved [24063/24063]



## Load the Data in Spark DataFrame  

In [2]:
df = spark.read.csv('breast-cancer-wisconsin.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- Clump_Thickness: integer (nullable = true)
 |-- Cell_Size: integer (nullable = true)
 |-- Cell_Shape: integer (nullable = true)
 |-- Marginal_Adhesion: integer (nullable = true)
 |-- Epithelial_Cell_Size: integer (nullable = true)
 |-- Bare_Nuclei: double (nullable = true)
 |-- Bland_Chromatin: integer (nullable = true)
 |-- Normal_Nucleoli: integer (nullable = true)
 |-- Mitoses: integer (nullable = true)
 |-- Class: integer (nullable = true)



##  Data Processing for Machine Learning model     

In [6]:
vectorAssembler = VectorAssembler(inputCols = ['Clump_Thickness', 'Cell_Size', 'Cell_Shape',\
'Marginal_Adhesion', 'Epithelial_Cell_Size', 'Normal_Nucleoli', 'Bland_Chromatin',\
'Bare_Nuclei', 'Mitoses', 'Class'], outputCol = 'features')

transformed_df = vectorAssembler.transform(df)
transformed_df=transformed_df.select("features")
transformed_df.show()

+--------------------+
|            features|
+--------------------+
|[5.0,1.0,1.0,1.0,...|
|[5.0,4.0,4.0,5.0,...|
|[3.0,1.0,1.0,1.0,...|
|[6.0,8.0,8.0,1.0,...|
|[4.0,1.0,1.0,3.0,...|
|[8.0,10.0,10.0,8....|
|[1.0,1.0,1.0,1.0,...|
|[2.0,1.0,2.0,1.0,...|
|[2.0,1.0,1.0,1.0,...|
|[4.0,2.0,1.0,1.0,...|
|[1.0,1.0,1.0,1.0,...|
|[2.0,1.0,1.0,1.0,...|
|[5.0,3.0,3.0,3.0,...|
|[1.0,1.0,1.0,1.0,...|
|[8.0,7.0,5.0,10.0...|
|[7.0,4.0,6.0,4.0,...|
|[4.0,1.0,1.0,1.0,...|
|[4.0,1.0,1.0,1.0,...|
|[10.0,7.0,7.0,6.0...|
|[6.0,1.0,1.0,1.0,...|
+--------------------+
only showing top 20 rows



In [7]:
array_rows=[]
for row in transformed_df.take(20):
    array_rows.append(row.__getitem__("features").values.tolist())
    
array_rows

[[5.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0, 1.0, 1.0, 2.0],
 [5.0, 4.0, 4.0, 5.0, 7.0, 2.0, 3.0, 10.0, 1.0, 2.0],
 [3.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0, 2.0, 1.0, 2.0],
 [6.0, 8.0, 8.0, 1.0, 3.0, 7.0, 3.0, 4.0, 1.0, 2.0],
 [4.0, 1.0, 1.0, 3.0, 2.0, 1.0, 3.0, 1.0, 1.0, 2.0],
 [8.0, 10.0, 10.0, 8.0, 7.0, 7.0, 9.0, 10.0, 1.0, 4.0],
 [1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0, 10.0, 1.0, 2.0],
 [2.0, 1.0, 2.0, 1.0, 2.0, 1.0, 3.0, 1.0, 1.0, 2.0],
 [2.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 5.0, 2.0],
 [4.0, 2.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0],
 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 3.0, 1.0, 1.0, 2.0],
 [2.0, 1.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0],
 [5.0, 3.0, 3.0, 3.0, 2.0, 4.0, 4.0, 3.0, 1.0, 4.0],
 [1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0, 3.0, 1.0, 2.0],
 [8.0, 7.0, 5.0, 10.0, 7.0, 5.0, 5.0, 9.0, 4.0, 4.0],
 [7.0, 4.0, 6.0, 4.0, 6.0, 3.0, 4.0, 1.0, 1.0, 4.0],
 [4.0, 1.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0],
 [4.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0, 1.0, 1.0, 2.0],
 [10.0, 7.0, 7.0, 6.0, 4.0, 1.0, 4.0, 10

In [8]:
spark_rdd = sc.parallelize(array_rows)
spark_rdd.take(10)

[[5.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0, 1.0, 1.0, 2.0],
 [5.0, 4.0, 4.0, 5.0, 7.0, 2.0, 3.0, 10.0, 1.0, 2.0],
 [3.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0, 2.0, 1.0, 2.0],
 [6.0, 8.0, 8.0, 1.0, 3.0, 7.0, 3.0, 4.0, 1.0, 2.0],
 [4.0, 1.0, 1.0, 3.0, 2.0, 1.0, 3.0, 1.0, 1.0, 2.0],
 [8.0, 10.0, 10.0, 8.0, 7.0, 7.0, 9.0, 10.0, 1.0, 4.0],
 [1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0, 10.0, 1.0, 2.0],
 [2.0, 1.0, 2.0, 1.0, 2.0, 1.0, 3.0, 1.0, 1.0, 2.0],
 [2.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 5.0, 2.0],
 [4.0, 2.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0]]

## Model Implementation   

In [9]:
kmeans = KMeans.train(spark_rdd, 4, maxIterations=10, initializationMode="random")
kmeans.predict(array([5.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0, 1.0, 1.0, 2.0]))

1

## Clusters Formed   

In [10]:
kmeans.clusterCenters

[array([ 3. ,  2.5,  2.5,  3. ,  4.5,  1.5,  3. , 10. ,  1. ,  2. ]),
 array([3.16666667, 1.08333333, 1.08333333, 1.16666667, 1.91666667,
        1.        , 2.58333333, 1.25      , 1.33333333, 2.        ]),
 array([6.        , 5.        , 5.66666667, 2.66666667, 3.66666667,
        4.66666667, 3.66666667, 2.66666667, 1.        , 3.33333333]),
 array([8.66666667, 8.        , 7.33333333, 8.        , 6.        ,
        4.33333333, 6.        , 9.66666667, 2.33333333, 4.        ])]