#DATASCI W261: Machine Learning at Scale

# MrJob class for Kmeans

### If you want to change the code, please edit Kmeans.py directly

In [5]:
%%writefile Kmeans.py
import numpy as np
from numpy import argmin, array, random
from mrjob.job import MRJob
from mrjob.step import MRJobStep
from itertools import chain

#Calculate find the nearest centroid for data point 
def MinDist(datapoint, centroid_points):
    datapoint = array(datapoint)
    centroid_points = array(centroid_points)
    diff = datapoint - centroid_points 
    diffsq = diff**2
    
    distances = (diffsq.sum(axis = 1))**0.5
    # Get the nearest centroid for each instance
    min_idx = argmin(distances)
    return min_idx

#Check whether centroids converge
def stop_criterion(centroid_points_old, centroid_points_new,T):
    oldvalue = list(chain(*centroid_points_old))
    newvalue = list(chain(*centroid_points_new))
    Diff = [abs(x-y) for x, y in zip(oldvalue, newvalue)]
    Flag = True
    for i in Diff:
        if(i>T):
            Flag = False
            break
    return Flag


class MRKmeans(MRJob):
    centroid_points=[]
    k=3    
    def steps(self):
        return [
            MRJobStep(mapper_init = self.mapper_init, mapper=self.mapper,combiner = self.combiner,reducer=self.reducer)
               ]
    #load centroids info from file
    def mapper_init(self):
        self.centroid_points = [map(float,s.split('\n')[0].split(',')) for s in open("Centroids.txt").readlines()]
        open('Centroids.txt', 'w').close()
    #load data and output the nearest centroid index and data point 
    def mapper(self, _, line):
        D = (map(float,line.split(',')))
        idx = MinDist(D,self.centroid_points)
        norm = np.sqrt((D[0]**2)+(D[1]**2))
        yield int(idx), (norm**-1, D[0],D[1],1)
        
    #Combine sum of data points locally
    def combiner(self, idx, inputdata):
        sumx = sumy = num = 0
        for weight, x, y, n in inputdata:
            num = num + n
            sumx = sumx + x*weight
            sumy = sumy + y*weight
        yield int(idx),(sumx,sumy,num)
    #Aggregate sum for each cluster and then calculate the new centroids
    def reducer(self, idx, inputdata): 
        centroids = []
        num = [0]*self.k 
        distances = 0
        for i in range(self.k):
            centroids.append([0,0])
        for x, y, n in inputdata:
            num[idx] = num[idx] + n
            centroids[idx][0] = centroids[idx][0] + x
            centroids[idx][1] = centroids[idx][1] + y
        centroids[idx][0] = centroids[idx][0]/num[idx]
        centroids[idx][1] = centroids[idx][1]/num[idx]
        with open('Centroids.txt', 'a') as f:
            f.writelines(str(centroids[idx][0]) + ',' + str(centroids[idx][1]) + '\n')
        yield idx,(centroids[idx][0],centroids[idx][1])
        
if __name__ == '__main__':
    MRKmeans.run()

Overwriting Kmeans.py


# Driver:

Generate random initial centroids

New Centroids = initial centroids

While(1)：
+ Cacluate new centroids
+ stop if new centroids close to old centroids
+ Updates centroids 

In [6]:
%reload_ext autoreload
%autoreload 2
from numpy import random, array
from Kmeans import MRKmeans, stop_criterion
mr_job = MRKmeans(args=['Kmeandata.csv', '--file=/Users/Safyre/Documents/W261_Midterm_Prep/Centroids.txt'])

#Geneate initial centroids
centroid_points = [[0,0],[6,3],[3,6]]
k = 3
with open('Centroids.txt', 'w+') as f:
        f.writelines(','.join(str(j) for j in i) + '\n' for i in centroid_points)

# Update centroids iteratively
for i in range(10):
    # save previous centoids to check convergency
    centroid_points_old = centroid_points[:]
    print "iteration"+str(i+1)+":"
    with mr_job.make_runner() as runner: 
        runner.run()
        # stream_output: get access of the output 
        for line in runner.stream_output():
            key,value =  mr_job.parse_output_line(line)
            print key, value
            centroid_points[key] = value
    print "\n"
    i = i + 1
print "Centroids\n"
print centroid_points


iteration1:
0 [-0.6185598458783587, 0.10121214013110123]
1 [0.9741507159200979, 0.033897959732550026]
2 [0.05290992672522764, 0.9732359389851719]


iteration2:
0 [-0.9686603376120722, 0.0004953401480519062]
1 [0.9667327574053561, -0.007163503804041002]
2 [0.008537284955669622, 0.965151249337708]


iteration3:
0 [-0.9679507185802757, 0.0025351790126423666]
1 [0.9667327574053561, -0.007163503804041002]
2 [0.010756330127372938, 0.9660024871240357]


iteration4:
0 [-0.9679507185802757, 0.0025351790126423666]
1 [0.9667327574053561, -0.007163503804041002]
2 [0.010756330127372938, 0.9660024871240357]


iteration5:
0 [-0.9679507185802757, 0.0025351790126423666]
1 [0.9667327574053561, -0.007163503804041002]
2 [0.010756330127372938, 0.9660024871240357]


iteration6:
0 [-0.9679507185802757, 0.0025351790126423666]
1 [0.9667327574053561, -0.007163503804041002]
2 [0.010756330127372938, 0.9660024871240357]


iteration7:
0 [-0.9679507185802757, 0.0025351790126423666]
1 [0.9667327574053561, -0.00716350