# Getting the data

In [408]:
from pyspark import SparkContext, SparkConf
from math import sqrt

sc = SparkContext.getOrCreate()
sc.stop()

conf = SparkConf().setAppName("kmeans").setMaster("local[*]")
sc = SparkContext(conf=conf)

* Getting data from S-sets : https://cs.joensuu.fi/sipu/datasets/
* Using the s1 dataset

In [409]:
s1 = sc.textFile("../data/s1.txt")


def extract_split(x):
    splits = x.split('    ')
    return (int(splits[1]), int(splits[2]))

s1_map = s1.flatMap(lambda x: x.split('\n')).map(extract_split)



In [410]:
s1_map.takeSample(False, 20)

[(517580, 170329),
 (451670, 388347),
 (379593, 380662),
 (172842, 351663),
 (606144, 575076),
 (851216, 516166),
 (849003, 115373),
 (154412, 310120),
 (624328, 392133),
 (317331, 556086),
 (338950, 572545),
 (396520, 405610),
 (683610, 910782),
 (282622, 879222),
 (820327, 352701),
 (508535, 830710),
 (630067, 372798),
 (140989, 537156),
 (572417, 403906),
 (678856, 850293)]

# Kmeans ++ intitialization

In [411]:
cluster1_center = s1_map.takeSample(False, 1)[0]


data_length = s1_map.count()
print(data_length)
BUCKET_SIZE = 1000
BUCKET_NUMBER = int(data_length / BUCKET_SIZE)



5000


In [412]:
from random import randint
s1_map = s1_map.map(lambda x:(randint(1,BUCKET_NUMBER),) + x)

In [413]:
s1_map = s1_map.map(lambda x : x + cluster1_center)
s1_map.takeSample(False,20)

[(3, 353979, 578923, 185661, 404482),
 (5, 453261, 148432, 185661, 404482),
 (4, 415270, 775605, 185661, 404482),
 (1, 818063, 797960, 185661, 404482),
 (1, 868128, 568529, 185661, 404482),
 (1, 587478, 424878, 185661, 404482),
 (2, 799001, 302014, 185661, 404482),
 (2, 803874, 675499, 185661, 404482),
 (2, 840357, 169437, 185661, 404482),
 (3, 341775, 425156, 185661, 404482),
 (5, 376613, 159699, 185661, 404482),
 (5, 460628, 434107, 185661, 404482),
 (5, 674151, 853540, 185661, 404482),
 (1, 805390, 319676, 185661, 404482),
 (1, 418149, 790096, 185661, 404482),
 (4, 802674, 727804, 185661, 404482),
 (4, 422419, 754048, 185661, 404482),
 (4, 808555, 600511, 185661, 404482),
 (3, 803796, 314893, 185661, 404482),
 (4, 163981, 344090, 185661, 404482)]

In [414]:
def compute_distance(xi_indexes, yi_indexes):
    def bar(row):
        sum = 0
        for i in range(len(xi_indexes)):
            sum += (row[yi_indexes[i]] - row[xi_indexes[i]]) ** 2
        distance = sqrt(sum)
        return row + (distance,)
    return bar


In [415]:
def reduceByKeyMax(dist_indexes):
    def reduce_custom(x1,x2):
        dist_x1 = []
        dist_x2 = []
        for idx in dist_indexes:
            dist_x1.append(x1[idx])
            dist_x2.append(x2[idx])
        
        mindist_x1 =  min(dist_x1)
        mindist_x2 = min(dist_x2)
        if(mindist_x1 > mindist_x2):
            return x1
        else:
            return x2
    return reduce_custom



In [416]:
def compute_average(list_reduce, coord_indexes):
    result = ()
    for idx in coord_indexes:
        coord_list = [reduce_tuple[1][idx] for reduce_tuple in list_reduce]
        average = sum(coord_list)/len(coord_list)
        result += (average,)
    return result
        

In [417]:
def initCluster(data,num_clusters,num_features):
    xi_indexes = [idx for idx in range(1,num_features+1)]
    yi_indexes = [idx for idx in range(num_features+1, num_features*2 + 1)]
    coord_indexes = [idx for idx in range(num_features)]
    dist_indexes = [num_features*2]
    current_clust = 2        
    for _ in range(num_clusters-1):
        
        
        data = data.map(compute_distance(xi_indexes, yi_indexes))
        
        reduce_tuple = data.map(lambda x:(x[0], x[1:]))
        
        reduce_tuple = reduce_tuple.reduceByKey(reduceByKeyMax(dist_indexes=dist_indexes))
        
        new_cluster_point = compute_average(reduce_tuple.collect(), coord_indexes=coord_indexes)
        
        print("New cluster point : {}".format(new_cluster_point))
        
        data = data.map(lambda x:(x + new_cluster_point))
        
        current_clust += 1
        dist_indexes.append(current_clust*num_features + current_clust - 2)
        yi_indexes = [old_value + num_features + 1 for old_value in yi_indexes]
        

    return data
        


In [418]:
s1_map = initCluster(s1_map, num_features=2,num_clusters=15)
s1_map.takeSample(withReplacement=False,num=100)

New cluster point : (887162.8, 622126.0)
New cluster point : (721309.6, 261912.6)
New cluster point : (656269.2, 946782.2)
New cluster point : (908462.0, 159283.6)
New cluster point : (713391.4, 930685.0)
New cluster point : (908462.0, 159283.6)
New cluster point : (713391.4, 930685.0)
New cluster point : (908462.0, 159283.6)
New cluster point : (713391.4, 930685.0)
New cluster point : (908462.0, 159283.6)
New cluster point : (713391.4, 930685.0)
New cluster point : (908462.0, 159283.6)
New cluster point : (713391.4, 930685.0)
New cluster point : (908462.0, 159283.6)


[(1,
  116860,
  550810,
  185661,
  404482,
  161695.5818351262,
  908462.0,
  159283.6,
  883134.5584343079,
  908462.0,
  159283.6,
  883134.5584343079,
  908462.0,
  159283.6,
  883134.5584343079,
  908462.0,
  159283.6,
  883134.5584343079,
  908462.0,
  159283.6,
  883134.5584343079,
  908462.0,
  159283.6,
  883134.5584343079,
  908462.0,
  159283.6,
  883134.5584343079,
  908462.0,
  159283.6,
  883134.5584343079,
  908462.0,
  159283.6,
  883134.5584343079,
  908462.0,
  159283.6,
  883134.5584343079,
  908462.0,
  159283.6,
  883134.5584343079,
  908462.0,
  159283.6,
  883134.5584343079,
  908462.0,
  159283.6,
  883134.5584343079,
  908462.0,
  159283.6),
 (5,
  802394,
  87835,
  185661,
  404482,
  693271.170537186,
  908462.0,
  159283.6,
  127887.93166659628,
  908462.0,
  159283.6,
  127887.93166659628,
  908462.0,
  159283.6,
  127887.93166659628,
  908462.0,
  159283.6,
  127887.93166659628,
  908462.0,
  159283.6,
  127887.93166659628,
  908462.0,
  159283.6,
  1278

In [248]:
s1_map = s1_map.map(compute_distance([1,2],[3,4]))
s1_map.takeSample(False,20)

[(1, 213064, 290947, 351883, 890475, 615389.7444262457),
 (2, 658103, 572250, 351883, 890475, 441630.8854971536),
 (4, 579564, 428498, 351883, 890475, 515035.32528361585),
 (2, 358830, 431609, 351883, 890475, 458918.5840266223),
 (3, 621537, 390372, 351883, 890475, 568169.244437782),
 (2, 807446, 342555, 351883, 890475, 712568.5745028334),
 (3, 920249, 551171, 351883, 890475, 661941.9267367795),
 (2, 796353, 710294, 351883, 890475, 479602.7248264964),
 (1, 618009, 867493, 351883, 890475, 267116.4918158368),
 (5, 875311, 542259, 351883, 890475, 628674.2032563449),
 (4, 821151, 366545, 351883, 890475, 703359.8657330399),
 (3, 147663, 555514, 351883, 890475, 392306.869581709),
 (4, 881789, 727577, 351883, 890475, 554379.0465376555),
 (4, 610722, 418243, 351883, 890475, 538517.1211252247),
 (2, 401708, 409924, 351883, 890475, 483127.0994531356),
 (4, 841292, 155620, 351883, 890475, 882911.6831858099),
 (3, 605911, 379990, 351883, 890475, 570197.4745726256),
 (3, 498730, 183037, 351883, 890

In [340]:

s1_reduce = s1_map.map(lambda x:(x[0], x[1:6]))
s1_reduce.takeSample(False,20)


[(5, (101722, 535117, 351883, 890475, 434580.06636867276)),
 (1, (140121, 346875, 351883, 890475, 583390.1821628471)),
 (2, (308044, 164482, 351883, 890475, 727315.4019887109)),
 (4, (295537, 91032, 351883, 890475, 801426.2174180478)),
 (5, (599870, 591110, 351883, 890475, 388737.6433971889)),
 (5, (606809, 572101, 351883, 890475, 407859.37448096)),
 (4, (636291, 360636, 351883, 890475, 601346.2200637832)),
 (3, (604554, 393193, 351883, 890475, 557792.0954665815)),
 (1, (431117, 394746, 351883, 890475, 502021.183016215)),
 (3, (404782, 557913, 351883, 890475, 336742.91090533737)),
 (1, (825141, 535997, 351883, 890475, 591293.3172698639)),
 (2, (770113, 241615, 351883, 890475, 771968.673263365)),
 (5, (814561, 575281, 351883, 890475, 559837.6455009077)),
 (2, (192460, 388323, 351883, 890475, 526851.3301046131)),
 (5, (790774, 303642, 351883, 890475, 732801.6646883384)),
 (5, (236431, 844100, 351883, 890475, 124417.8641875836)),
 (2, (817410, 346944, 351883, 890475, 715640.5073009772)),


In [344]:
s1_reduce = s1_reduce.reduceByKey(reduceByKeyMax(dist_indexes=[4]))
s1_reduce.collect()

[(2, (854996, 81095, 351883, 890475, 953005.0761506992)),
 (4, (869167, 105039, 351883, 890475, 940474.585915005)),
 (1, (883864, 111281, 351883, 890475, 943476.0590481351)),
 (3, (881106, 105566, 351883, 890475, 946656.8132169123)),
 (5, (874672, 104851, 351883, 890475, 943670.1796162683))]

In [348]:
print(cluster1_center)
print (new_cluster_point)

(351883, 890475)
(872761.0, 101566.4)


In [352]:
new_cluster_point = compute_average(s1_reduce.collect(), [0,1])


[2, 3]