# K-Means with Spark

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=4f67f8a9dca2ab44d0c7fb588c93a6622a8919c33dd1dc265f9d692bcc028959
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [2]:
from pyspark.mllib.clustering import KMeans
from pyspark import SparkConf, SparkContext
from numpy import array, random
from math import sqrt
from sklearn.preprocessing import scale

In [3]:
conf = SparkConf().setMaster('local').setAppName('SparkKMeans')
sc = SparkContext(conf = conf)

In [8]:
def createClusteredData(N,k):
    random.seed(10)
    pointsPerCluster = float(N)/k
    X = []
    for i in range(k):
        incomeCentroid = random.uniform(20000.0, 200000.0)
        ageCentroid = random.uniform(20.0, 70.0)
        for j in range(int(pointsPerCluster)):
            X.append([random.normal(incomeCentroid,10000.0),
                      random.normal(ageCentroid, 2.0)])
    X = array(X)

    return X

In [9]:
K=5
data = sc.parallelize(scale(createClusteredData(100,K)))

In [26]:
clusters = KMeans.train(data, K,
                        maxIterations=10,
                        #runs = 10,
                        initializationMode='random')

In [27]:
resultRDD = data.map(lambda point: clusters.predict(point)).cache()
print("Counts by value: ") #count unique cluster
counts = resultRDD.countByValue()
print(counts)

Counts by value: 
defaultdict(<class 'int'>, {0: 20, 2: 14, 1: 6, 3: 40, 4: 20})


In [28]:
print("Cluster assignments")
results = resultRDD.collect()
print(results)

Cluster assignments
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 1, 2, 2, 1, 1, 2, 2, 1, 2, 1, 2, 2, 2, 2, 2, 2, 1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]


In [31]:
# evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
  center = clusters.centers[clusters.predict(point)]
  return sqrt(sum([x**2 for x in (point - center)]))

In [32]:
WSSSE = data.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = "+ str(WSSSE))

Within Set Sum of Squared Error = 22.25445343635214
