In [None]:
# load libraries
import numpy as np
from pyspark.sql import Row
from sklearn.metrics import calinski_harabasz_score, adjusted_rand_score
from sklearn.preprocessing import MinMaxScaler
from numpy.random import seed
import pandas as pd
seed(3)

In [None]:
# load data - iris, glass and parkinsons
file_location = "/FileStore/tables/"
file_type = "csv"
delimiter = ","
iris = spark.read.format(file_type).option("inferSchema", "true").option("header", "true").option("sep", ",").load(file_location + "iris.csv").rdd
glass = spark.read.format(file_type).option("inferSchema", "true").option("header", "true").option("sep", ",").load(file_location + "glass.csv").rdd
parkinsons = spark.read.format(file_type).option("inferSchema", "true").option("header", "true").option("sep", ",").load(file_location + "parkinsons.csv").rdd        

In [None]:
# k_means func
def k_means(df, K, CT = 0.0001, I = 30, Exp = 10):
  calinski = []
  ari = []
  df1=df

  # Normalize the data with MinMaxScaler()
  scaler = MinMaxScaler()
  scaler.fit(df.collect())
  new = scaler.transform(df.collect())
  df = spark.createDataFrame(data = new.tolist(), schema=list(df.collect()[0].asDict().keys())).rdd

  for i in range(Exp): 
    sample = df.takeSample(False, K)
    centroids = []
    
    for j in range(K):
      centroids.append((j,sample[j]))
    
    def points_sum(p1, p2):
      '''
      Parameters: p1 and p2 is a two data points.
      Returns: The sum of the two data points.
      '''
      p1 = p1.asDict()
      p2 = p2.asDict()
      new_point = {}
      for key in p1.keys():
        new_point[key] = p1[key] + p2[key]
      return Row(**new_point)

    # lambda func the return the distance between 2 data point
    distance_point= lambda p1,p2: (sum((np.array(list(p1.asDict().values())[:-1]) - np.array(list(p2.asDict().values())[:-1]))**2))**0.5
    
    def selection_centers(p):
      '''
      Parameters: p - data point.
      Returns: return tuple -the first item is the index of the closest centroid and the second item is the data point.
      '''
      dis_dict = {}
      for centroid in range(len(centroids)):
        dis_dict[centroid] = distance_point(p, centroids[centroid][1])
      return (min(dis_dict, key = dis_dict.get), p)

    
    for i in range(I):
      count = []
      points_centroids = df.map(selection_centers)
      sum_points= points_centroids.reduceByKey(points_sum).sortByKey(ascending=True)
    
      for i in range(K):
        count.append(points_centroids.filter(lambda x: x[0] == i).count())
      

      def points_average(p):
        '''
        Parameters: p - data point.
        Returns: An average data point and retrun tuple.
        '''
        point = p[1].asDict()
        new_point = {}
        for key in point.keys():
           new_point[key] = point[key]/count[p[0]]
        return (p[0],Row(**new_point))
      new_centroids = sum_points.map(points_average)
      centroids_lst2 = []
    
      for j in new_centroids.collect():
        centroids_lst2.append(j)

      for j in range(K):
        if distance_point(centroids_lst2[j][1],centroids[j][1]) > CT:
          flag = True
          break
      if(not flag):
          break
      centroids = centroids_lst2
      flag = False
   
    assig = df.map(selection_centers)

    without=[i[:-1] for i in df.collect()]
    actual=[]
    for i in df1.collect():
      actual.append(i[-1]) 
    pred =[i[0] for i in assig.collect()]
       
    calinski.append(calinski_harabasz_score(without,pred))
    ari.append(adjusted_rand_score(actual,pred))
    
  print('CH: (',np.mean(np.array(calinski)),";",np.std(np.array(calinski)),')')
  print('ARI: (',np.mean(np.array(ari)),";",np.std(np.array(ari)),')')
  
  return np.mean(np.array(calinski)),np.std(np.array(calinski)), np.mean(np.array(ari)),np.std(np.array(ari))


In [None]:
df_dict={'Iris':[],'Glass':[],'Parkinsons':[]}

print("Dataset: Iris")
for k in range(3,8):
    print("K =",k)
    x=k_means(iris, k)
    df_dict["Iris"].append((k,x)) 
    print(x)
    
print("Dataset: Glass")
for k in range(5,10):
    print("K =",k)
    x=k_means(glass, k)
    df_dict["Glass"].append((k,x)) 

print("Dataset: Parkinsons")
for k in range(2,7):
    print("K =",k)
    x=k_means(parkinsons, k)
    df_dict["Parkinsons"].append((k,x)) 

Dataset: Iris
K = 3
CH: ( 324.1892815937291 ; 60.37001949609987 )
ARI: ( 0.6526737477072906 ; 0.11207520508170393 )
(324.1892815937291, 60.37001949609987, 0.6526737477072906, 0.11207520508170393)
K = 4
CH: ( 302.6870300172103 ; 15.112232709990248 )
ARI: ( 0.6097910987963282 ; 0.021499785774371994 )
(302.6870300172103, 15.112232709990248, 0.6097910987963282, 0.021499785774371994)
K = 5
CH: ( 272.9337735632501 ; 21.00976745990861 )
ARI: ( 0.5285260354522032 ; 0.06049626999676144 )
(272.9337735632501, 21.00976745990861, 0.5285260354522032, 0.06049626999676144)
K = 6
CH: ( 247.00326578164072 ; 14.673517505427004 )
ARI: ( 0.5165657123349845 ; 0.0860282450288365 )
(247.00326578164072, 14.673517505427004, 0.5165657123349845, 0.0860282450288365)
K = 7
CH: ( 232.98668891217213 ; 17.80875356614163 )
ARI: ( 0.440631853198359 ; 0.06513967814603326 )
(232.98668891217213, 17.80875356614163, 0.440631853198359, 0.06513967814603326)
Dataset: Glass
K = 5
CH: ( 79.70890163177363 ; 9.60659896908398 )
ARI:

In [None]:
DSnames_2= ["iris"]*5+["glass"]*5+["parkinsons"]*5
Kvals_2= list(range(3,8))+list(range(5,10))+list(range(2,7))
ARI_Avg_ARI_Std=[]
CH_Avg_CH_Std=[]
result_2 = { "Dataset name": DSnames_2,
          "The value of K": Kvals_2,
          "Average and std CH": [0]*15,
          "Average and std ARI": [0]*15
         }
df_final_2 = pd.DataFrame (result_2, columns = ['Dataset name','The value of K',"Average and std CH","Average and std ARI"])
for k,v in df_dict.items():
    for a in v:
        x,y,z,w = a[1]
        x= round(x,3)
        y= round(y,3)
        z= round(z,3)
        w= round(w,3)
        one= '('+str(x)+ " ; "+str(y) +')'
        two='('+str(z)+ " ; "+str(w) +')'
        ARI_Avg_ARI_Std.append(one)
        CH_Avg_CH_Std.append(two)
df_final_2["Average and std CH"] =ARI_Avg_ARI_Std
df_final_2["Average and std ARI"] =CH_Avg_CH_Std
display(df_final_2)


Dataset name,The value of K,Average and std CH,Average and std ARI
iris,3,(324.189 ; 60.37),(0.653 ; 0.112)
iris,4,(302.687 ; 15.112),(0.61 ; 0.021)
iris,5,(272.934 ; 21.01),(0.529 ; 0.06)
iris,6,(247.003 ; 14.674),(0.517 ; 0.086)
iris,7,(232.987 ; 17.809),(0.441 ; 0.065)
glass,5,(79.709 ; 9.607),(0.157 ; 0.034)
glass,6,(67.674 ; 10.557),(0.165 ; 0.024)
glass,7,(67.853 ; 7.837),(0.172 ; 0.038)
glass,8,(67.66 ; 7.5),(0.181 ; 0.029)
glass,9,(65.663 ; 4.306),(0.19 ; 0.026)
