# Map-Reudce Kmeans

### Using Spark RDD and parallel work

## Map

* The map function takes an RDD of all the points and a set of k centroids.
* Calculates the closest centroid for each point using euclidean distance.
* returns < c , p > pairs where c is the closest centroid to point p.

In [0]:
def euc_dist(p1,p2):
    # Calculates the euclidean distance between 2 points
    return sum((value1 - value2) ** 2 for value1, value2 in zip(p1, p2)) ** 0.5

def assign_point_to_centroid(point, centroids):
# Function that assigns the closes centroid to a point
# The function to map by
    closest_centroid = None
    closest_distance = float('inf')
    for centroid in centroids:
        distance = euc_dist(point, centroid)
        if distance < closest_distance:
            closest_distance = distance
            closest_centroid = centroid
    return (closest_centroid, point)

In [0]:
def map_function(rdd, centroids):
    # Assign each point to the closest centroid
    assigned_points = rdd.map(lambda point: assign_point_to_centroid(point,centroids))

    # Collect the assigned points as a list of tuples
    # assigned_points_list = assigned_points.collect()

    return assigned_points

## Reduce

* The reduce funtion takes the output from the map function.
* Calculates the new centroid by averaging all the points in each cluster.
* Returns < old_centroid , new_centroid > pairs.

In [0]:
# Fucntion that calculates new centroids based on points assigned to it
def calculate_new_centroids(points):
    points_list = list(points)
    num_points = len(points_list)
    dimensions = len(points_list[0]) # 0 is arbitrary
    centroid_sum = [0.0] * dimensions

    for point in points_list:
        centroid_sum = [centroid_sum[i] + point[i] for i in range(dimensions)]

    new_centroid = tuple([centroid_sum[i] / num_points for i in range(dimensions)])
    return new_centroid

In [0]:
def reduce_function(assigned_points):

    # Turn points to tuples
    assigned_points = [(tuple(t[0]),tuple(t[1])) for t in assigned_points.collect()]

    # Create an RDD from assigned_points
    assigned_points_rdd = sc.parallelize(assigned_points)

    # Group assigned points by centroid (shuffle and sort)
    grouped_points = assigned_points_rdd.groupByKey()

    # Calculate new centroids - This is a list of len k of (old_centroid,new_centroid)
    old_new_centroids = grouped_points.mapValues(calculate_new_centroids)
    
    return old_new_centroids

## K-Means Algorithm

* In the Kmeans algorithm, map and reduce functions are repeated until convergence is reached.

Choosing K random points for the start of the algorithm

In [0]:
def choose_centroids(rdd,K):
    # Randomly select k points from the RDD
    centroids = rdd.takeSample(False, K)
    return centroids

In [0]:
import numpy as np

def Kmeans(rdd, k, CT=0.0001, I=30):
    '''
    rdd - The data points
    k - The number of clusters
    CT - Convergence threshold (parameter - default is set to 0.0001)
    I - Number of iterations for K-Means (parameter - default is set to 30)
    '''
    # Choose random points for the start of the algorithm
    centroids = choose_centroids(rdd, k)
    for i in range(I):
        # Map points to centroids
        assigned_points = map_function(rdd,centroids)

        # Calculate new centroids
        old_new_centroids = reduce_function(assigned_points)

        # Check for convergence
        old_new_centroids_list = old_new_centroids.collect()
        count = 0
        for (old_centroid, new_centroid) in old_new_centroids_list:
            if euc_dist(old_centroid,new_centroid) <= CT:
                count += 1
        if count == k:
            break
        centroids = [np.array(c[1]) for c in old_new_centroids_list]
        
    return centroids
        

# Testing the algorithm

I will be using the Iris dataset

In [0]:
dataset_name = "iris"
iris_path = f'/FileStore/tables/iris.csv'
iris_df = spark.read.csv(iris_path, header= True)

In [0]:
iris_df.show(5)

+---+---+---+---+-----+
| f1| f2| f3| f4|class|
+---+---+---+---+-----+
|5.1|3.5|1.4|0.2|    0|
|4.9|  3|1.4|0.2|    0|
|4.7|3.2|1.3|0.2|    0|
|4.6|3.1|1.5|0.2|    0|
|  5|3.6|1.4|0.2|    0|
+---+---+---+---+-----+
only showing top 5 rows



## Preproccess

Changing data types and normalizing attributes

In [0]:
from pyspark.sql import functions
from sklearn.preprocessing import MinMaxScaler
def preprocess(df):
    # Preprocesses the data. Returns an RDD.

    # Select columns to use for Kmeans algorithm
    selected_columns = df.columns[:-1]
    df = df.select(selected_columns)

    # Transform values to float type
    for c in selected_columns:
        df = df.withColumn(c, functions.col(c).cast('float'))
    
    # Normalize each column
    scaler = MinMaxScaler()
    scaler.fit(df.collect())
    df = scaler.transform(df.collect())
    rdd = sc.parallelize(df)
    return rdd

## Running The Algorithm

In [0]:
rdd = preprocess(iris_df)
Kmeans(rdd,k=3,I=2)

Out[10]: [array([0.2655555 , 0.70999997, 0.08      , 0.07333334]),
 array([0.15624996, 0.40234374, 0.15095339, 0.12109375]),
 array([0.5663082 , 0.37903224, 0.68069982, 0.67697132])]

## Evaluation

Evaluating the model with:
* Calinski Harabasz Score (CH)
* Adjusted Rand Index (ARI)

In [0]:
from sklearn.metrics import calinski_harabasz_score
def eval_CH(points,centroids):
    # Calculate the Calinski-Harabasz score
    assignments = []
    d = lambda p1,p2: sum((a - b) ** 2 for a, b in zip(p1, p2)) ** 0.5
    for point in points:
        distances = [d(point,centroid) for centroid in centroids]
        smallest_value_index = distances.index(min(distances))
        assignments.append(smallest_value_index)
    score = calinski_harabasz_score(points, assignments)
    return score

from sklearn.metrics import adjusted_rand_score
def eval_ARI(points, truth_classes, centroids):
    # Calculate the Adjusted Rand Index (ARI) score
    assignments = []
    d = lambda p1,p2: sum((a - b) ** 2 for a, b in zip(p1, p2)) ** 0.5
    for point in points:
        distances = [d(point,centroid) for centroid in centroids]
        smallest_value_index = distances.index(min(distances))
        assignments.append(smallest_value_index)
    ari_score = adjusted_rand_score(truth_classes, assignments)
    return ari_score

In [0]:
def true_classes(df):
    return np.array(df.select(df.columns[-1]).rdd.flatMap(lambda x: x).collect())

def calculate_evaluation_statistics(ch_values, ari_values):
    # Calculate average and standard deviation for CH values
    ch_average = np.average(ch_values)
    ch_std_dev = np.std(ch_values)

    # Calculate average and standard deviation for ARI values
    ari_average = np.average(ari_values)
    ari_std_dev = np.std(ari_values)

    # Print the results
    print("CH Values:")
    print("  Average:", ch_average)
    print("  Standard Deviation:", ch_std_dev)

    print("ARI Values:")
    print("  Average:", ari_average)
    print("  Standard Deviation:", ari_std_dev)

Running multiple experiments of Kmeans and testing different k values

In [0]:
def evaluateKmeans(df, k, CT=0.0001, I=30, exp=10):
    '''
    df - The data including the class column
    k - The number of clusters for each expreiment
    CT - Convergence threshold (parameter - default is set to 0.0001)
    I - Number of iterations for K-Means (parameter - default is set to 30)
    Exp - Number of Experiments (parameter - default is set to 10)
    '''
    
    ch_values = []
    ari_values = []
    rdd = preprocess(df)
    for i in range(exp):
        centroids = Kmeans(rdd,k,CT,I)
        points = np.array(rdd.collect())
        ch_values.append(eval_CH(points,centroids))
        true_classifictaions = true_classes(df)
        ari_values.append(eval_ARI(points, true_classifictaions, centroids))
    calculate_evaluation_statistics(ch_values, ari_values)

In [0]:
for k in range (2,7):
    print(f"k = {k}")
    evaluateKmeans(iris_df,k)

k = 2
CH Values:
  Average: 353.3674059572578
  Standard Deviation: 5.684341886080802e-14
ARI Values:
  Average: 0.5681159420289854
  Standard Deviation: 1.1102230246251565e-16
k = 3
CH Values:
  Average: 308.7014828336743
  Standard Deviation: 68.83684373816534
ARI Values:
  Average: 0.6239346481112749
  Standard Deviation: 0.1278073990035785
k = 4
CH Values:
  Average: 289.97391483091803
  Standard Deviation: 19.81000400718611
ARI Values:
  Average: 0.5888747472530399
  Standard Deviation: 0.028079484887737507
k = 5
CH Values:
  Average: 261.29696195714763
  Standard Deviation: 21.38396727253739
ARI Values:
  Average: 0.5845928449391831
  Standard Deviation: 0.07575087350687562
k = 6
CH Values:
  Average: 236.25615362598145
  Standard Deviation: 23.389486253440136
ARI Values:
  Average: 0.5207243692274759
  Standard Deviation: 0.06788437146538576


We can see the model performs best on k=2 and k=3.

* The highest average CH score is at 2 clusters which indicates better-defined clusters with good separation and compactness.
* The highest average ARI is at 3 clusters which indicates better agreement between the predicted labels and the true labels of the data.