In [1]:
import numpy as np
import pandas as pd
import random
import time
from pyspark import SparkContext
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import math


In [18]:
iterations = 20
dataset_path = r"C:\Users\Selva\Downloads\data.txt"
centroids_path = r"C:\Users\Selva\Downloads\c2.txt"
euclidean_distance = False

In [20]:
sc = SparkContext()
data = sc.textFile(dataset_path)
centroids = sc.textFile(centroids_path)

In [21]:
data = data.map(lambda x: np.array(list(x.split())).astype(np.float))
centroids = centroids.map(lambda x: np.array(list(x.split())).astype(np.float))

In [22]:
data.take(1)

[array([  0.   ,   0.64 ,   0.64 ,   0.   ,   0.32 ,   0.   ,   0.   ,
          0.   ,   0.   ,   0.   ,   0.   ,   0.64 ,   0.   ,   0.   ,
          0.   ,   0.32 ,   0.   ,   1.29 ,   1.93 ,   0.   ,   0.96 ,
          0.   ,   0.   ,   0.   ,   0.   ,   0.   ,   0.   ,   0.   ,
          0.   ,   0.   ,   0.   ,   0.   ,   0.   ,   0.   ,   0.   ,
          0.   ,   0.   ,   0.   ,   0.   ,   0.   ,   0.   ,   0.   ,
          0.   ,   0.   ,   0.   ,   0.   ,   0.   ,   0.   ,   0.   ,
          0.   ,   0.   ,   0.778,   0.   ,   0.   ,   3.756,  61.   ,
        278.   ,   1.   ])]

In [23]:
centroids.take(1)

[array([  0.   ,   0.64 ,   0.64 ,   0.   ,   0.32 ,   0.   ,   0.   ,
          0.   ,   0.   ,   0.   ,   0.   ,   0.64 ,   0.   ,   0.   ,
          0.   ,   0.32 ,   0.   ,   1.29 ,   1.93 ,   0.   ,   0.96 ,
          0.   ,   0.   ,   0.   ,   0.   ,   0.   ,   0.   ,   0.   ,
          0.   ,   0.   ,   0.   ,   0.   ,   0.   ,   0.   ,   0.   ,
          0.   ,   0.   ,   0.   ,   0.   ,   0.   ,   0.   ,   0.   ,
          0.   ,   0.   ,   0.   ,   0.   ,   0.   ,   0.   ,   0.   ,
          0.   ,   0.   ,   0.778,   0.   ,   0.   ,   3.756,  61.   ,
        278.   ,   1.   ])]

In [24]:
def eucledian_distance(a,b):
    dist = 0 
    for i in range(0,len(a)):
        dist = dist + math.pow(a[i]-b[i],2)
    return math.sqrt(dist)

In [25]:
def manhattan_distance(a,b):
    dist = 0 
    for i in range(0,len(a)):
        dist = dist + abs(a[i]-b[i])
    return dist

In [26]:
def compute_distance(a,b,euclidean_distance):
    if euclidean_distance:
        return eucledian_distance(a,b)
    else:
        return manhattan_distance(a,b)

In [27]:
def compute_cost(min_distance,euclidean_distance):
    if eucledian_distance:
        return math.pow(min_distance,2)
    else:
        return min_distance

In [28]:
def assign_centroid(data_point,centroids_list,euclidean_distance):
    min_distance = float("inf")
    c = -1
    for k in range(0,len(centroids_list)):
        temp_distance = compute_distance(data_point,centroids_list[k],euclidean_distance)
        if temp_distance <= min_distance:
            min_distance = temp_distance
            c = k
    cost = compute_cost(min_distance,euclidean_distance)
    return c,cost

In [29]:
def kmeans(data, centroids, iterations, euclidean_distance):
    distances = [(0,float("inf"))]*data.count()
    data = data.zipWithIndex()
    cost = []
    centroids_list = centroids.collect()
    measure = ""
    if euclidean_distance:
        measure = "Eucledian"
    else:
        measure = "Manhattan"
    print("Max iterations : " +str(iterations) +", distance measure: "+(measure))
    for i in range(0,iterations):
        points_with_centroids = data.map(lambda d: (assign_centroid(d[0],centroids_list,euclidean_distance)))
        cost.append(points_with_centroids.values().sum())
        for j in range(0,len(centroids_list)):
            current_cluster_data_indices = points_with_centroids.zipWithIndex().filter(lambda z: z[0][0] == j).map(lambda z: z[1])
            current_cluster_data_indices_list = current_cluster_data_indices.collect()
            current_cluster_points = data.filter(lambda z: z[1] in current_cluster_data_indices_list).map(lambda z: z[0])
            centroids_list[j] = current_cluster_points.mean()
        print("Iteration " + str(i+1) +": " + str(cost[i]))
    return cost

In [31]:
cost = kmeans(data, centroids, iterations, euclidean_distance)

Max iterations : 20, distance measure: Manhattan
Iteration 1: 655439563.7534091
Iteration 2: 413705217.4542825
Iteration 3: 328497470.8182906
Iteration 4: 293722399.34747976
Iteration 5: 277431076.7992791
Iteration 6: 261213468.0040863
Iteration 7: 250299830.35410452
Iteration 8: 228097491.03422523
Iteration 9: 214101749.5963676
Iteration 10: 209285429.9481863
Iteration 11: 207576806.19860598
Iteration 12: 205428736.4893818
Iteration 13: 203840885.21360666
Iteration 14: 201709439.45644003
Iteration 15: 199719165.45965225
Iteration 16: 197988302.80494282
Iteration 17: 196383379.32653075
Iteration 18: 195310410.03057364
Iteration 19: 194676779.46828613
Iteration 20: 193878096.18808842


In [32]:
c2_man_cost = cost
print(c2_man_cost)

[655439563.7534091, 413705217.4542825, 328497470.8182906, 293722399.34747976, 277431076.7992791, 261213468.0040863, 250299830.35410452, 228097491.03422523, 214101749.5963676, 209285429.9481863, 207576806.19860598, 205428736.4893818, 203840885.21360666, 201709439.45644003, 199719165.45965225, 197988302.80494282, 196383379.32653075, 195310410.03057364, 194676779.46828613, 193878096.18808842]
