In [1]:
from pyspark.mllib.clustering import KMeans
from numpy import array, random
from math import sqrt
from pyspark import SparkConf, SparkContext
from sklearn.preprocessing import scale

In [2]:
k =5 # no of clusters

In [3]:
conf = SparkConf().setMaster("local").setAppName("KMeansApp")
sc = SparkContext(conf=conf)


In [7]:
def createClusteredData(N, k):
    random.seed(10)
    pointsPerCluster = float(N)/k
    X = []
    for i in range (k):
        incomeCentroid = random.uniform(20000.0, 200000.0)
        ageCentroid = random.uniform(20.0, 70.0)
        for j in range(int(pointsPerCluster)):
            X.append([random.normal(incomeCentroid, 10000.0), random.normal(ageCentroid, 2.0)])
    X = array(X)
    return X

random.seed(0)


In [8]:
data = sc.parallelize(scale(createClusteredData(100, k)))

In [9]:
clusters = KMeans.train(data, k, maxIterations=10, initializationMode="random")

In [10]:
resultRDD = data.map(lambda point: clusters.predict(point)).cache()

In [11]:
print("Counts")
counts = resultRDD.countByValue()
print(counts)

print("Clusters")
results = resultRDD.collect()
print(results)

Counts
defaultdict(<class 'int'>, {3: 20, 4: 40, 0: 16, 2: 9, 1: 15})
Clusters
[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 0, 0, 0, 2, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 2, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 1, 1, 1, 2, 1, 1, 2, 1, 1, 2, 1, 1, 1, 1, 1, 2, 2, 1, 1, 1]


In [12]:
#Evaluating using sum of squared errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

SumOfSquaredErrors = data.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Sum "+str(SumOfSquaredErrors))

Sum 32.79296695821939
