## Implement in Spark (PySpark) the following k-means algorithm.
1. Assign each point to a cluster at random
2. Compute the cluster centroids as the averages of the points assigned to each cluster 
3. Repeat the following lines l times 
    - Assign each point to the cluster with the closest centroid
    - Update the cluster centroids as the averages of the points assigned to each cluster

In [3]:
# make sure the JAVA_HOME is right. (~\.bach_profile)
from pyspark import SparkContext
sc = SparkContext()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-1-33ce3f59c0b1>:2 

In [10]:
import random 
k = 2 # number of clusters

filename = "kmean.csv"
data = sc.textFile(filename)
points = data.map(lambda x: x.split(","))
points = points.map(lambda x: ([float(x[0]), float(x[1])])) # change the data type from str to float
print (points.collect())

[[33.3, -17.5], [40.4, -20.5], [28.0, -23.9], [29.5, -19.0], [32.8, -18.84]]


In [11]:
# 1. Assign each point to a cluster at random

clusters = points.map(lambda x: (random.randint(1, 2), x)) 
print (clusters.collect())

[(2, [33.3, -17.5]), (1, [40.4, -20.5]), (1, [28.0, -23.9]), (2, [29.5, -19.0]), (1, [32.8, -18.84])]


In [12]:
# Euclidean distance between point A & B
import math
def distance(A, B):
    dist = math.sqrt(sum([(a - b) ** 2 for a, b in zip(A, B)]))
    return (dist)

In [13]:
# 2. Compute the cluster centroids as the averages of the points assigned to each cluster

clusters = clusters.mapValues(lambda x: (x, 1)) # add the count number 1
print (clusters.collect())

# find the closest centroid for a point
def closest(point, centroids):
    best_cluster = None
    best_dist = float("inf")
    for c in centroids:
        dist = distance(c[1], point)
        if dist < best_dist:
            best_dist = dist
            best_cluster = c[0]     
    return best_cluster

[(2, ([33.3, -17.5], 1)), (1, ([40.4, -20.5], 1)), (1, ([28.0, -23.9], 1)), (1, ([29.5, -19.0], 1)), (2, ([32.8, -18.84], 1))]


In [6]:
# 3. Repeat the following lines l times
#     - Assign each point to the cluster with the closest centroid
#     - Update the cluster centroids as the averages of the points assigned to each cluster

l = 3
for i in range(l):
    print (str(i+1) + " time...")
    print (clusters.collect())
    
    # 2. Compute the cluster centroids as the averages of the points assigned to each cluster
    clusters = clusters.reduceByKey(lambda a,b: ([a[0][0]+b[0][0], a[0][1]+b[0][1]], a[1]+b[1])) # add all points in the same clusters
    centroids = clusters.mapValues(lambda x: [x[0][0]/x[1], x[0][1]/x[1]]).collect() # compute the average
    print (centroids)
    clusters =  points.map(lambda x: (closest(x, centroids), (x, 1)))

1 time...
[(2, ([33.3, -17.5], 1)), (1, ([40.4, -20.5], 1)), (1, ([28.0, -23.9], 1)), (1, ([29.5, -19.0], 1)), (1, ([32.8, -18.84], 1))]
[(2, [29.5, -19.0]), (1, [33.625, -20.185])]
2 time...
[(1, ([33.3, -17.5], 1)), (1, ([40.4, -20.5], 1)), (2, ([28.0, -23.9], 1)), (2, ([29.5, -19.0], 1)), (1, ([32.8, -18.84], 1))]
[(2, [28.75, -21.45]), (1, [35.49999999999999, -18.94666666666667])]
3 time...
[(1, ([33.3, -17.5], 1)), (1, ([40.4, -20.5], 1)), (2, ([28.0, -23.9], 1)), (2, ([29.5, -19.0], 1)), (1, ([32.8, -18.84], 1))]
[(2, [28.75, -21.45]), (1, [35.49999999999999, -18.94666666666667])]
