In [1]:
import time
import random
import numpy as np
import pandas as pd
from sklearn import cluster, datasets, metrics
from sklearn.metrics import accuracy_score
from sklearn import preprocessing
from pyspark.sql import SparkSession
from pyspark.sql import Row

In [5]:
spark = SparkSession \
                .builder \
                .appName("RDD_and_DataFrame") \
                .config("spark.some.config.option", "some-value") \
                .getOrCreate()
sc = spark.sparkContext

# Data Preprocessing

## Wine Dataset

In [6]:
'''
line = sc.textFile("./dataset/wine.txt")
data_x = spark.createDataFrame(line.map(lambda r : r.split(",")).collect()).toPandas()
converge_dist = 0.05
K = 5
max_iter = 30
'''

'\nline = sc.textFile("./dataset/wine.txt")\ndata_x = spark.createDataFrame(line.map(lambda r : r.split(",")).collect()).toPandas()\nconverge_dist = 0.05\nK = 5\nmax_iter = 30\n'

In [7]:

line = sc.textFile("./dataset/c20d6n200000.txt")
data_x = spark.createDataFrame(line.map(lambda r : r.split(",")).collect()).toPandas()
converge_dist = 0.05
K = 6
max_iter = 30

In [19]:
def cal_distance(p,cent):
    bestIndex = 0
    closest = float("+inf")
    for i in range(len(cent)):
        tempDist = np.sum((p - cent[i]) ** 2)
        if tempDist < closest:
            closest = tempDist
            bestIndex = i
            
    return bestIndex

## Here comes our k-means

In [20]:
start = time.time()
#initialize tables
data = spark.createDataFrame(data_x).rdd.map(lambda r : np.array([float(x) for x in r])).cache()
cent = data.takeSample(False,K,1)
table= np.zeros(K)   
                
#first iteration    
iterl = 0
curr_sse = 1.0
        
#update centroid & data clustering
while curr_sse > converge_dist :
    mapping_data = data.map(lambda point : (cal_distance(point,cent) , (point,1)))
    table = mapping_data.reduceByKey(lambda p1,p2 : (p1[0]+p2[0] , p1[1]+p2[1]))
            
    result = table.map(lambda t : (t[0], t[1][0]/t[1][1])).collect()
    curr_sse = sum(np.sum((cent[i] - p) ** 2) for (i, p) in result)
    print(curr_sse)
    for(i,p) in result:
        cent[i] = p
       
    iterl+=1

    
print("Centers:")
print(cent)

end = time.time()

6108622.779842365
547535.184891067
839508.7118191819
164834.42361638905
162882.82855949653
25912.811660002117
53157.03667683278
235680.4705556058
4935.017946732099
48.40335176661537
0.0
Centers:
[array([ 7.95288247e+02, -4.47944551e-01, -3.89555710e+00, -8.00832993e+02,
        5.18257871e+00,  7.94670979e+02]), array([-810.61109498,    7.12569174, -162.71551508,  477.47833818,
       -158.16925449,  815.58238568]), array([ 273.32250583,    1.8657367 ,  801.13996693,  260.179773  ,
        800.19293462, -789.76087213]), array([ 812.90152437,   -5.91932219, -792.84848696,  804.38335069,
        803.47185115,   -9.5690966 ]), array([ 806.81381469,   -3.66555665,  271.05345376, -258.4474487 ,
       -789.06499369, -792.46230403]), array([-807.45525233,   -2.99750816,  271.34414787, -807.42332491,
        266.73722204, -795.97859691])]


In [21]:
print("Total time : ", end="")
print(end-start)
print(iterl)

Total time : 2.683225154876709
11


In [None]:

result_x = np.zeros(len(data_x.index))
j=0
for i in range(len(data_x.index)):
    result_x[i] = cal_distance(data_x.iloc[i].astype(float),cent)


In [None]:
silhouette_avg = metrics.silhouette_score(data_x,result_x)
print(silhouette_avg)

In [8]:
start = time.time()
kmeans_fit = cluster.KMeans(n_clusters = 3).fit(data_x)
end = time.time()
print("Total time : ", end="")
print(end-start)

Total time : 2.4522459506988525


In [9]:
cluster_labels = kmeans_fit.labels_

In [10]:
kmeans_fit.cluster_centers_

array([[-4.41388125e-01, -1.15025020e+02, -3.40724663e+02,
         5.71321088e+02, -8.01191326e+02, -3.42139507e+02],
       [ 2.40792208e-01, -1.12869019e+02,  5.73883737e+02,
        -1.14939787e+02,  1.15355360e+02,  8.00508832e+02],
       [-8.75471359e-01, -1.66314491e+00, -1.81747204e+00,
        -6.34670648e-01,  8.00333399e+02, -7.97792346e+02]])

In [11]:
silhouette_avg_2 = metrics.silhouette_score(data_x, cluster_labels)
print(silhouette_avg_2)

0.2396542543497579


In [21]:
spark.stop()