In [1]:
import numpy as np
import pandas as pd
import time
import math
import csv
import json
import xgboost as xgb
import datetime
import sys
from pyspark import SparkContext 
from sklearn import preprocessing
import warnings
warnings.filterwarnings(action='ignore')

In [2]:
sc = SparkContext('local[*]', 'task')

In [3]:
input_file = "hw6_clustering.txt"
n_cluster = 10
output_file = "output.txt"

In [4]:
import random
from sklearn.cluster import KMeans

In [106]:
def random_split_5():
    rand_num = random.random()
    if rand_num<0.2:
        return 0
    elif rand_num<0.4:
        return 1
    elif rand_num<0.6:
        return 2
    elif rand_num<0.8:
        return 3
    else:
        return 4
    
def Mahalanobis_Distance(x, N, SUM, SUMSQ):

    d_to_cluster = dict()   
    for cluster_idx in range(len(SUM)):
        tmp = 0
        for idx in range(len(x)-2):
            c_i = SUM[cluster_idx][1][idx]/N[cluster_idx][1]
            sigma_i = ((SUMSQ[cluster_idx][1][idx]/N[cluster_idx][1] - (SUM[cluster_idx][1][idx]/N[cluster_idx][1])**2) **0.5)
            tmp += ((x[idx+2] - c_i) / sigma_i )**2
        d = tmp**0.5
        d_to_cluster[SUM[cluster_idx][0]] = d
        
    return d_to_cluster

def Mahalanobis_Distance_btw_two_clusters(cluster_id, N, SUM, SUMSQ, dim):
    d_to_cluster = dict()
    c_n = dict(N)[cluster_id]
    c_sum = dict(SUM)[cluster_id]
    c_sumsq = dict(SUMSQ)[cluster_id]
    
    c_centroid = [dict(SUM)[cluster_id][idx]/dict(N)[cluster_id] for idx in range(dim)]
    c_sigma = [((dict(SUMSQ)[cluster_id][idx]/dict(N)[cluster_id] - (dict(SUM)[cluster_id][idx]/dict(N)[cluster_id])**2)**0.5) for idx in range(dim)]
    for cluster_idx in range(len(SUM)):
        tmp = 0
        for idx in range(dim):
            c_i = SUM[cluster_idx][1][idx]/N[cluster_idx][1]
            sigma_i = ((SUMSQ[cluster_idx][1][idx]/N[cluster_idx][1] - (SUM[cluster_idx][1][idx]/N[cluster_idx][1])**2) **0.5)
            tmp += ((c_centroid[idx] - c_i)**2)/ (c_sigma[idx]*sigma_i) 
        d = tmp**0.5
        d_to_cluster[SUM[cluster_idx][0]] = d
        
    return d_to_cluster

def Mahalanobis_Distance_btw_CS_DS(cluster_id, N, SUM, SUMSQ, dim, N_CS, SUM_CS, SUMSQ_CS):
    d_to_cluster = dict()
    c_n = dict(N_CS)[cluster_id]
    c_sum = dict(SUM_CS)[cluster_id]
    c_sumsq = dict(SUMSQ_CS)[cluster_id]
    
    c_centroid = [dict(SUM_CS)[cluster_id][idx]/dict(N_CS)[cluster_id] for idx in range(dim)]
    c_sigma = [((dict(SUMSQ_CS)[cluster_id][idx]/dict(N_CS)[cluster_id] - (dict(SUM_CS)[cluster_id][idx]/dict(N_CS)[cluster_id])**2)**0.5) for idx in range(dim)]
    for cluster_idx in range(len(SUM)):
        tmp = 0
        for idx in range(dim):
            c_i = SUM[cluster_idx][1][idx]/N[cluster_idx][1]
            sigma_i = ((SUMSQ[cluster_idx][1][idx]/N[cluster_idx][1] - (SUM[cluster_idx][1][idx]/N[cluster_idx][1])**2) **0.5)
            tmp += ((c_centroid[idx] - c_i)**2)/ (c_sigma[idx]*sigma_i) 
        d = tmp**0.5
        d_to_cluster[SUM[cluster_idx][0]] = d
        
    return d_to_cluster

def N_SUM_SSQ(CS):
    N_CS = []
    SUM_CS = []
    SUMSQ_CS = []
    for key in CS:
        rows = CS[key]
        num = 0
        s = [0 for i in range(len(rows[0])-2)]
        ssq = [0 for i in range(len(rows[0])-2)]
        for row in rows:
            num+=1
            for i in range(len(rows[0])-2):
                s[i]+=row[i+2]
                ssq[i]+=row[i+2]**2
        N_CS.append((key, num))
        SUM_CS.append((key, s))
        SUMSQ_CS.append((key, ssq))
    return N_CS, SUM_CS, SUMSQ_CS

In [131]:
if __name__ == '__main__':
    
    
    t = time.time()
    n = 30
    
    ## RDDs
    textRDD = sc.textFile(input_file, n)
    
    # Step 1. Load 20% of the data randomly.
    random_data = textRDD\
        .map(lambda x : [float(feature) for feature in x.split(",")])\
        .map(lambda x : (x,random_split_5()))\
        .cache()
             
    random_data_0 = random_data.filter(lambda x : x[1] == 0).map(lambda x : x[0])
    dim = len(random_data_0.take(1)[0])-2
    print("num_round : " , len(random_data_0.collect()))
    

    # Step 2. Run K-Means (e.g., from sklearn) with a large K (e.g., 5 times of the number of the input clusters) on the data in memory using the Euclidean distance as the similarity measurement.
    
    collected_data = random_data_0.collect()
    large_K = n_cluster * 20
    kmeans = KMeans(n_clusters=large_K, random_state=0).fit([row[2:] for row in collected_data])## 0 : id, 1 : label
  

  
    
    # Step 3. In the K-Means result from Step 2, move all the clusters that contain only one point to RS (outliers).
    
    RS = []
    RS_labels = sc.parallelize(kmeans.labels_, n)\
        .map(lambda x : (x,1))\
        .reduceByKey(lambda a,b : a+b)\
        .filter(lambda x : x[1] == 1)\
        .map(lambda x : x[0]).collect()
    for row, label in zip(collected_data, kmeans.labels_):
        if label in RS_labels:
            RS.append(row)
           

 
    # Step 4. Run K-Means again to cluster the rest of the data points with K = the number of input clusters.
    
    collected_data = [row for row in collected_data if row not in RS]
    kmeans = KMeans(n_clusters=n_cluster, random_state=0).fit([row[2:] for row in collected_data])
    result = dict()
    for row, label in zip(collected_data, kmeans.labels_):
        result[int(row[0])] = label
        

   

 
    # Step 5. Use the K-Means result from Step 4 to generate the DS clusters (i.e., discard their points and generate statistics).
    c = sc.parallelize(collected_data, n)\
        .map(lambda x : x[2:])\
        .map(lambda x : (kmeans.predict([x])[0], x)).cache() ## [label, [a,b,c,d,e]], ....
    N = c.map(lambda x : (x[0],1)).reduceByKey(lambda a,b : a+b).collect()
    SUM = c.reduceByKey(lambda a,b : [a[i]+b[i] for i in range(len(a))]).collect()
    SUMSQ = c.map(lambda x : (x[0], [feature**2 for feature in x[1]]))\
        .reduceByKey(lambda a,b : [a[i]+b[i] for i in range(len(a))]).collect()
    

    print("Duration : ", time.time()-t)

    # The initialization of DS has finished, so far, you have K numbers of DS clusters (from Step 5) and some numbers of RS (from Step 3).
    # Step 6. Run K-Means on the points in the RS with a large K (e.g., 5 times of the number of the input clusters) to generate CS (clusters with more than one points) and RS (clusters with only one point).
    CS = dict()
    try:
        kmeans = KMeans(n_clusters=n_cluster*2, random_state=0).fit([row[2:] for row in RS])## 0 : id, 1 : label
    except:
        kmeans = KMeans(n_clusters=len(RS), random_state=0).fit([row[2:] for row in RS])
    RS_new = []
    RS_labels = sc.parallelize(kmeans.labels_, n)\
        .map(lambda x : (x,1))\
        .reduceByKey(lambda a,b : a+b)\
        .filter(lambda x : x[1] == 1)\
        .map(lambda x : x[0]).collect()
    CS_count = 0
    RS_count = 0
    for row, label in zip(RS, kmeans.labels_):
        if label in RS_labels:
            RS_count+=1
            RS_new.append(row)
        if label not in RS_labels:
            CS_count+=1
            CS[label + Round*10000] = CS.get(label + Round*10000, []) + [row]
    #####
    N_CS, SUM_CS, SUMSQ_CS = N_SUM_SSQ(CS)

            
    print("The intermediate results:")
    ## number of DS points, number of CS set, number of CS points, number of RS points
    print("Round {}:".format(1), sum([cluster[1] for cluster in N]),",",len(CS), "," , CS_count, ",", RS_count)
            
    print("Duration : ", time.time()-t)

num_round :  64619
Duration :  53.7646861076355
The intermediate results:
Round 1: 64601 , 0 , 0 , 18
Duration :  54.52936601638794


In [132]:
    for Round in [2,3,4,5]:
#     Round = 1
        # Step 7. Load another 20% of the data randomly.
        random_data_round = random_data.filter(lambda x : x[1] == Round-1).map(lambda x : x[0])
        print("num_round : " , len(random_data_round.collect()))

        # Step 8. For the new points, compare them to each of the DS using the Mahalanobis Distance and assign
        # them to the nearest DSclusters if the distance is <2 root 𝑑.
        #d = root(sigma( (x - SUM/N)/((SUMSQ/N - (SUM/N)^2) **0.5) ))

        base = 2 * ((10**0.5))
        filtered_data = random_data_round\
            .map(lambda x : [x,Mahalanobis_Distance(x, N, SUM, SUMSQ)])\
            .map(lambda x :  [x[0],min(x[1].items(), key = lambda y : y[1])])\
            .map(lambda x : [x[0], x[1][0]] if x[1][1]<base else [x[0], "not DS"] ).cache()
        for row, label in filtered_data.collect():
            if label != "not DS":
                result[int(row[0])] = label
        N_new = filtered_data.map(lambda x : (x[1], 1)).reduceByKey(lambda a,b : a+b).collect()
        SUM_new = filtered_data.map(lambda x: (x[1], x[0][2:]))\
                                    .reduceByKey(lambda a,b : [a[i]+b[i] for i in range(len(a))]).collect()
        SUMSQ_new = filtered_data.map(lambda x : (x[1], [feature**2 for feature in x[0][2:]]))\
            .reduceByKey(lambda a,b : [a[i]+b[i] for i in range(len(a))]).collect()

        tmp_N_dict = dict()
        tmp_SUM_dict = dict()
        tmp_SUMSQ_dict = dict()
        for key in dict(N).keys():
            tmp_N_dict[key] = dict(N)[key] + dict(N_new)[key]
            tmp_SUM_dict[key] = [dict(SUM)[key][i] + dict(SUM_new)[key][i] for i in range(dim)]
            tmp_SUMSQ_dict[key] = [dict(SUMSQ)[key][i] + dict(SUMSQ_new)[key][i] for i in range(dim)]

        N = list(tmp_N_dict.items())
        SUM = list(tmp_SUM_dict.items())
        SUMSQ = list(tmp_SUMSQ_dict.items())


        # Step 9. For the new points that are not assigned to DS clusters, using the Mahalanobis Distance and
        # assign the points to the nearest CS clusters if the distance is <2 root 𝑑
        NOT_DS = filtered_data.filter(lambda x : x[1] == "not DS").map(lambda x : x[0])\
            .map(lambda x : [x,Mahalanobis_Distance(x, N_CS, SUM_CS, SUMSQ_CS)])\
            .map(lambda x :  [x[0],min(x[1].items(), key = lambda y : y[1])] if len(N_CS)>0 else [x[0], (0,base+1)])\
            .map(lambda x : [x[0], x[1][0]] if x[1][1]<base else [x[0], "not CS"] ).cache()
        
        ## add rows to CS
        for row in NOT_DS.collect():
            if row[1] != "not CS":
                CS[row[1]] = CS.get(row[1],[]) + [row[0]]

        N_CS_new = NOT_DS.map(lambda x : (x[1], 1)).reduceByKey(lambda a,b : a+b).collect()
        SUM_CS_new = NOT_DS.map(lambda x: (x[1], x[0][2:]))\
                                    .reduceByKey(lambda a,b : [a[i]+b[i] for i in range(len(a))]).collect()
        SUMSQ_CS_new = NOT_DS.map(lambda x : (x[1], [feature**2 for feature in x[0][2:]]))\
            .reduceByKey(lambda a,b : [a[i]+b[i] for i in range(len(a))]).collect()

        tmp_N_dict = dict()
        tmp_SUM_dict = dict()
        tmp_SUMSQ_dict = dict()
        for key in dict(N_CS).keys():
            tmp_N_dict[key] = dict(N_CS)[key] + dict(N_CS_new).get(key, 0)
            tmp_SUM_dict[key] = [dict(SUM_CS)[key][i] + dict(SUM_CS_new).get(key, [0 for idx in range(dim)])[i] for i in range(dim)]
            tmp_SUMSQ_dict[key] = [dict(SUMSQ_CS)[key][i] + dict(SUMSQ_CS_new).get(key, [0 for idx in range(dim)])[i] for i in range(dim)]

        N_CS = list(tmp_N_dict.items())
        SUM_CS = list(tmp_SUM_dict.items())
        SUMSQ_CS = list(tmp_SUMSQ_dict.items())




        # Step 10. For the new points that are not assigned to a DS cluster or a CS cluster, assign them to RS.
        NOT_CS = NOT_DS.filter(lambda x : x[1] == "not CS").map(lambda x : x[0]).collect()
        RS = RS + NOT_CS
        
        print("num_res 70 : ",  len(result)+sum(list(dict(N_CS).values()))+len(RS))
        print("RES :", len(result))
        print("N_CS :",sum(list(dict(N_CS).values())))
        print("RS :", len(RS))
        ##CS개수
        count = 0
        for key in CS:
            count += len(CS[key])
        print("num_CS : ",count)
        print("#######")  


        # Step 11. Run K-Means on the RS with a large K (e.g., 5 times of the number of the input clusters) to generate CS (clusters with more than one points) and RS (clusters with only one point).
        try:
            kmeans = KMeans(n_clusters=n_cluster*2, random_state=0).fit([row[2:] for row in RS])## 0 : id, 1 : label
        except:
            kmeans = KMeans(n_clusters=len(RS), random_state=0).fit([row[2:] for row in RS])

        RS_labels = sc.parallelize(kmeans.labels_, n)\
            .map(lambda x : (x,1))\
            .reduceByKey(lambda a,b : a+b)\
            .filter(lambda x : x[1] == 1)\
            .map(lambda x : x[0]).collect()
        CS_count = 0
        RS_count = 0
            
        rm_row = []
        for row, label in zip(RS, kmeans.labels_):
            if label in RS_labels:
                RS_count+=1

            if label not in RS_labels:
                CS_count+=1
                CS[label + Round*10000] = CS.get(label + Round*10000, []) + [row]
                rm_row.append(row)

        for row in rm_row:
            RS.remove(row)
                    
        N_CS, SUM_CS, SUMSQ_CS = N_SUM_SSQ(CS)
        print("num_res 94: ",  len(result)+sum(list(dict(N_CS).values()))+len(RS))
        print("RES :", len(result))
        print("N_CS :",sum(list(dict(N_CS).values())))
        print("RS :", len(RS))

        ##CS개수
        count = 0
        for key in CS:
            count += len(CS[key])
        print("num_CS : ",count)
        print("#######")  

        # Step 12.Merge CS clusters that have a Mahalanobis Distance <2 root 𝑑.
        New_N_CS_len = 10*10
        while len(N_CS) != New_N_CS_len and New_N_CS_len>1:
            N_CS, SUM_CS, SUMSQ_CS = N_SUM_SSQ(CS)
            centroids_CS = dict()
            for cluster_id in CS:
        #         print(dict(N_CS)[cluster_id])
                tmp = []
                for idx in range(dim):
                    tmp.append(dict(SUM_CS)[cluster_id][idx]/dict(N_CS)[cluster_id])
                centroids_CS[cluster_id] = tmp

            tmp = 0
            for cluster_id in centroids_CS:
                if tmp == 0:
                    m_dist = Mahalanobis_Distance_btw_two_clusters(cluster_id, N_CS, SUM_CS, SUMSQ_CS, dim)
                    del m_dist[cluster_id]
                    closest_cluster,mindist = min(m_dist.items(), key = lambda y: y[1])
                    if mindist < base:
                        tmp+=1
#                         print("c id : " ,cluster_id)
#                         print("d id : ", closest_cluster)
                        CS[cluster_id] = CS[cluster_id] + CS[closest_cluster]
                        del CS[closest_cluster]
            New_N_CS_len = len(CS)
            
        N_CS, SUM_CS, SUMSQ_CS = N_SUM_SSQ(CS)

        ## Step 13. merge CS with DS that have a Mahalanobis Distance <2 root d
        if Round == 5:
            del_cluster = []
            for cluster in CS:
                dist_to_DS = Mahalanobis_Distance_btw_CS_DS(cluster, N, SUM, SUMSQ, dim, N_CS, SUM_CS, SUMSQ_CS)
                closest_DS, mindist = min(dist_to_DS.items(), key = lambda y: y[1])
                
                if mindist < base:
                    del_cluster.append(cluster)
                    print(cluster)
                    print(closest_DS, mindist)
                    tmp_N_dict = dict(N)
                    tmp_SUM_dict = dict(SUM)
                    tmp_SUMSQ_dict = dict(SUMSQ)
                    tmp_N_dict[closest_DS] = tmp_N_dict[closest_DS] + dict(N_CS)[cluster]
                    tmp_SUM_dict[closest_DS] = tmp_SUM_dict[closest_DS] + dict(SUM_CS)[cluster]
                    tmp_SUMSQ_dict[closest_DS] = tmp_SUMSQ_dict[closest_DS] + dict(SUMSQ_CS)[cluster]
                    
                    N = list(tmp_N_dict.items())
                    SUM = list(tmp_SUM_dict.items())
                    SUMSQ = list(tmp_SUMSQ_dict.items())
                    
                    for row in CS[cluster]:
                        result[int(row[0])] = closest_DS
                        
            for cluster in del_cluster:
                del CS[cluster]

        N_CS, SUM_CS, SUMSQ_CS = N_SUM_SSQ(CS)
        print("Round {}:".format(Round), sum([cluster[1] for cluster in N]),",",len(CS), "," , sum(list(dict(N_CS).values())), ",", len(RS))
        print("num_res : ",  len(result)+sum(list(dict(N_CS).values()))+len(RS))
            
    
    
    print("Duration : ", time.time()-t)

num_round :  64210
num_res 70 :  128829
RES : 128790
N_CS : 0
RS : 39
num_CS :  0
#######
num_res 94:  128829
RES : 128790
N_CS : 32
RS : 7
num_CS :  32
#######
Round 2: 128790 , 13 , 32 , 7
num_res :  128829
num_round :  64348
num_res 70 :  193177
RES : 193118
N_CS : 33
RS : 26
num_CS :  33
#######
num_res 94:  193177
RES : 193118
N_CS : 45
RS : 14
num_CS :  45
#######
Round 3: 193118 , 14 , 45 , 14
num_res :  193177
num_round :  64772
num_res 70 :  257949
RES : 257872
N_CS : 53
RS : 24
num_CS :  53
#######
num_res 94:  257949
RES : 257872
N_CS : 60
RS : 17
num_CS :  60
#######
Round 4: 257872 , 17 , 60 , 17
num_res :  257949
num_round :  64363
num_res 70 :  322312
RES : 322210
N_CS : 76
RS : 26
num_CS :  76
#######
num_res 94:  322312
RES : 322210
N_CS : 87
RS : 15
num_CS :  87
#######
Round 5: 322210 , 21 , 87 , 15
num_res :  322312
Duration :  110.84398293495178


In [39]:
kmeans.labels_

array([12, 17, 18,  0, 11,  3,  7, 13, 10,  1, 14, 16,  5, 14,  6,  1,  9,
        5,  6, 13,  7, 19,  3, 12,  0,  7,  8,  1, 15, 10,  0,  6,  5,  1,
        5,  3,  4,  0,  2, 16,  4], dtype=int32)

In [108]:
len(list(result.keys()))-322312

-101

In [44]:
len(RS)

23

In [45]:
len(NOT_CS)

21

In [71]:
rm_row

[]

In [46]:
NOT_CS

[[12049.0,
  -1.0,
  926.3919413069962,
  636.1659751280349,
  717.0570669250868,
  897.9535339242541,
  788.3146958601612,
  672.7337145809307,
  590.6979270459199,
  896.0154170747182,
  887.4563722204036,
  851.8592279550278],
 [13479.0,
  -1.0,
  -818.6927782739829,
  -1075.8951994388685,
  -706.4931980353986,
  -700.733838726702,
  -1049.6729554511426,
  -584.0728398351557,
  -729.0445172010881,
  -774.6470081493524,
  -826.1585850768568,
  -609.9020514578608],
 [21290.0,
  -1.0,
  -565.980483263564,
  -417.85493100932075,
  -774.7991528975615,
  -513.2833733707218,
  -783.3612920905719,
  -772.6063431119846,
  -451.37301248632826,
  -684.2999276194215,
  -795.5561021575325,
  -592.5500281652643],
 [25402.0,
  -1.0,
  744.214368728761,
  604.4112727302727,
  777.2291258205669,
  786.831530276389,
  644.2850761775976,
  793.0963810998428,
  889.8786795470905,
  670.2389822695048,
  605.2920472429176,
  952.3381265946174],
 [44522.0,
  -1.0,
  727.9210742312863,
  855.8532659456439,

In [41]:
RS_labels

[2, 8, 9, 11, 15, 17, 18, 19]

In [40]:
CS.keys()

dict_keys([20012, 20000, 20003, 20007, 20013, 20010, 20001, 20014, 20016, 20005, 20006])

In [144]:
count = 0
for i in CS:
    count+=len(CS[i])
print(count)

195


In [None]:
    with open(output_file, 'w') as file:
        file.write("The clustering results:\n")
        for idx, est in enumerate(Estimation):
            file.write(str(idx) + "," + str(stream_size) + "," + str(int(est)))
            file.write("\n")

        file.close()

In [125]:
 NOT_DS.collect()[1]

[[12551.0,
  9.0,
  -136.07054580881638,
  -169.10567624774328,
  -168.17118290137495,
  -142.36092082473812,
  -156.34084296633958,
  -158.51646939282458,
  178.61390504102192,
  -196.52132877411378,
  182.13353272337093,
  157.44492184066425],
 'not CS']

In [126]:
for row in NOT_DS.collect():
    if row[1] != "not CS":
        CS[row[1]] = CS.get(row[1],[]) + [row]
        print("ha")

ha
ha
ha
ha
ha
ha
ha
ha
ha


In [30]:
# Step 9. For the new points that are not assigned to DS clusters, using the Mahalanobis Distance and
        # assign the points to the nearest CS clusters if the distance is <2 root 𝑑
        NOT_DS = filtered_data.filter(lambda x : x[1] == "not DS").map(lambda x : x[0])\
            .map(lambda x : [x,Mahalanobis_Distance(x, N_CS, SUM_CS, SUMSQ_CS)])\
            .map(lambda x :  [x[0],min(x[1].items(), key = lambda y : y[1])] if len(N_CS)>0 else [x[0], (0,base+1)])\
            .map(lambda x : [x[0], x[1][0]] if x[1][1]<base else [x[0], "not CS"] ).cache()

        N_CS_new = NOT_DS.map(lambda x : (x[1], 1)).reduceByKey(lambda a,b : a+b).collect()
        SUM_CS_new = NOT_DS.map(lambda x: (x[1], x[0][2:]))\
                                    .reduceByKey(lambda a,b : [a[i]+b[i] for i in range(len(a))]).collect()
        SUMSQ_CS_new = NOT_DS.map(lambda x : (x[1], [feature**2 for feature in x[0][2:]]))\
            .reduceByKey(lambda a,b : [a[i]+b[i] for i in range(len(a))]).collect()

        tmp_N_dict = dict()
        tmp_SUM_dict = dict()
        tmp_SUMSQ_dict = dict()
        for key in dict(N_CS).keys():
            tmp_N_dict[key] = dict(N_CS)[key] + dict(N_CS_new).get(key, 0)
            tmp_SUM_dict[key] = [dict(SUM_CS)[key][i] + dict(SUM_CS_new).get(key, [0 for idx in range(dim)])[i] for i in range(dim)]
            tmp_SUMSQ_dict[key] = [dict(SUMSQ_CS)[key][i] + dict(SUMSQ_CS_new).get(key, [0 for idx in range(dim)])[i] for i in range(dim)]

        N_CS = list(tmp_N_dict.items())
        SUM_CS = list(tmp_SUM_dict.items())
        SUMSQ_CS = list(tmp_SUMSQ_dict.items())


dict_keys([5, 7, 16, 20015, 20000, 20007, 20002, 20003, 20011, 20004, 20010, 20013, 30003, 30006, 30001, 30013, 30012, 30002, 30015, 30004, 30019, 30016, 30009, 30008, 30014, 30010, 30011, 30000, 40000, 40005, 40007, 40017, 40019, 40001, 40018, 40010, 40013, 40002, 40014, 40015, 40011, 40003, 40006, 40004, 40009, 50009, 50003, 50016, 50000, 50010, 50001, 50005, 50013, 50018, 50014, 50019, 50015, 50006, 50007, 50012, 50011, 50002, 50008])

In [82]:
sum([row[1] for row in N])

322209

In [20]:
## Step 13. merge CS with DS that have a Mahalanobis Distance <2 root d
if Round ==4:
    for cluster in CS:
        dist_to_DS = Mahalanobis_Distance_btw_CS_DS(cluster, N, SUM, SUMSQ, dim, N_CS, SUM_CS, SUMSQ_CS)
        closest_DS, mindist = min(dist_to_DS.items(), key = lambda y: y[1])
        print(cluster)
        print(closest_DS, mindist)
        if mindist < base:
            tmp_N_dict = dict(N)
            tmp_SUM_dict = dict(SUM)
            tmp_SUMSQ_dict = dict(SUMSQ)
            tmp_N_dict[closest_DS] = tmp_N_dict[closest_DS] + dict(N_CS)[cluster]
            tmp_SUM_dict[closest_DS] = tmp_SUM_dict[closest_DS] + dict(SUM_CS)[cluster]
            tmp_SUMSQ_dict[closest_DS] = tmp_SUMSQ_dict[closest_DS] + dict(SUMSQ_CS)[cluster]
            del CS[cluster]
            

In [25]:
len(CS)

64

In [32]:
len(RS)

62

In [35]:
len(CS)

64

In [36]:
dist_to_DS

{0: 6.758848804263444,
 1: 11.259920132510954,
 2: 46.0291346228439,
 3: 27.65114840208466,
 4: 29.05665539453182,
 5: 25.60691774392599,
 6: 29.98678287542144,
 7: 17.07722540937219,
 8: 9.10432222098056,
 9: 8.097573609694457}

In [37]:
base

6.324555320336759

In [38]:
N_CS

[(11, 2),
 (10, 2),
 (5, 2),
 (14, 3),
 (4, 2),
 (10019, 2),
 (10001, 4),
 (10013, 2),
 (10011, 2),
 (10004, 3),
 (10015, 3),
 (10010, 3),
 (10005, 3),
 (10002, 2),
 (10006, 2),
 (10000, 5),
 (10007, 3),
 (10016, 3),
 (10008, 2),
 (20016, 4),
 (20010, 4),
 (20004, 3),
 (20017, 2),
 (20005, 7),
 (20008, 6),
 (20012, 4),
 (20009, 3),
 (20018, 3),
 (20015, 2),
 (20001, 3),
 (20003, 3),
 (20013, 4),
 (20000, 3),
 (30008, 3),
 (30003, 4),
 (30001, 7),
 (30011, 5),
 (30015, 3),
 (30010, 2),
 (30005, 6),
 (30006, 4),
 (30013, 4),
 (30007, 2),
 (30009, 3),
 (30012, 2),
 (30004, 4),
 (30017, 2),
 (30019, 2),
 (30014, 2),
 (30002, 2),
 (40008, 5),
 (40003, 5),
 (40001, 7),
 (40011, 3),
 (40009, 2),
 (40005, 5),
 (40006, 7),
 (40012, 4),
 (40007, 2),
 (40016, 2),
 (40004, 6),
 (40018, 2),
 (40015, 2),
 (40002, 4)]

In [23]:
def Mahalanobis_Distance_btw_CS_DS(cluster_id, N, SUM, SUMSQ, dim, N_CS, SUM_CS, SUMSQ_CS):
    d_to_cluster = dict()
    c_n = dict(N_CS)[cluster_id]
    c_sum = dict(SUM_CS)[cluster_id]
    c_sumsq = dict(SUMSQ_CS)[cluster_id]
    
    c_centroid = [dict(SUM_CS)[cluster_id][idx]/dict(N_CS)[cluster_id] for idx in range(dim)]
    c_sigma = [((dict(SUMSQ_CS)[cluster_id][idx]/dict(N_CS)[cluster_id] - (dict(SUM_CS)[cluster_id][idx]/dict(N_CS)[cluster_id])**2)**0.5) for idx in range(dim)]
    for cluster_idx in range(len(SUM)):
        tmp = 0
        for idx in range(dim):
            c_i = SUM[cluster_idx][1][idx]/N[cluster_idx][1]
            sigma_i = ((SUMSQ[cluster_idx][1][idx]/N[cluster_idx][1] - (SUM[cluster_idx][1][idx]/N[cluster_idx][1])**2) **0.5)
            tmp += ((c_centroid[idx] - c_i)**2)/ (c_sigma[idx]*sigma_i) 
        d = tmp**0.5
        d_to_cluster[SUM[cluster_idx][0]] = d
        
    return d_to_cluster

In [20]:
    print("Duration : ", time.time()-t)

Duration :  215.72827696800232


In [27]:
Mahalanobis_Distance_btw_CS_DS(11, N, SUM, SUMSQ, dim, N_CS, SUM_CS, SUMSQ_CS)

{0: 160.29740342792525,
 1: 213.1414304159653,
 2: 194.5434121368717,
 3: 176.3158622719533,
 4: 137.60815251155995,
 5: 194.67216170081252,
 6: 209.77281883991662,
 7: 144.70845880840218,
 8: 149.35168149592036,
 9: 136.61521669218715}

In [21]:
CS.keys()

dict_keys([11, 10, 5, 14, 4, 10019, 10001, 10013, 10011, 10004, 10015, 10010, 10005, 10002, 10006, 10000, 10007, 10016, 10008, 20016, 20010, 20004, 20017, 20005, 20008, 20012, 20009, 20018, 20015, 20001, 20003, 20013, 20000, 30008, 30003, 30001, 30011, 30015, 30010, 30005, 30006, 30013, 30007, 30009, 30012, 30004, 30017, 30019, 30014, 30002, 40008, 40003, 40001, 40011, 40009, 40005, 40006, 40012, 40007, 40016, 40004, 40018, 40015, 40002])

In [None]:
    CS = dict()
    kmeans = KMeans(n_clusters=n_cluster*2, random_state=0).fit([row[2:] for row in RS])## 0 : id, 1 : label
    RS_new = []
    RS_labels = sc.parallelize(kmeans.labels_, n)\
        .map(lambda x : (x,1))\
        .reduceByKey(lambda a,b : a+b)\
        .filter(lambda x : x[1] == 1)\
        .map(lambda x : x[0]).collect()
    CS_count = 0
    RS_count = 0
    for row, label in zip(RS, kmeans.labels_):
        if label in RS_labels:
            RS_count+=1
            RS_new.append(row)
        if label not in RS_labels:
            CS_count+=1
            CS[label] = CS.get(label, []) + [row]
    #####   
    N_CS = []
    SUM_CS = []
    SUMSQ_CS = []
    for key in CS:
        rows = CS[key]
        num = 0
        s = [0 for i in range(len(rows[0])-2)]
        ssq = [0 for i in range(len(rows[0])-2)]
        for row in rows:
            num+=1
            for i in range(len(rows[0])-2):
                s[i]+=row[i+2]
                ssq[i]+=row[i+2]**2
        N_CS.append((key, num))
        SUM_CS.append((key, s))
        SUMSQ_CS.append((key, ssq))

In [85]:
len(random_data.collect())

322312