# Preparation

data classes, general functions

In [None]:
from __future__ import annotations

import pandas as pd 
import numpy as np
import matplotlib.pyplot as plt
import random

from enum import Enum, auto
from dataclasses import dataclass
from typing import List
from collections import Counter

In [None]:
KNN_TRAIN = "./mnist_small/train.csv"
KNN_TEST = "./mnist_small/test.csv"

names = ["label"] + [f"Pix {i}" for i in range(28*28)]

train_df = pd.read_csv(KNN_TRAIN, names=names)
test_df = pd.read_csv(KNN_TEST, names=names)

## Coding part
The following sections contain the coding part of the project

* Image visualization (helper tool)

In [None]:
def visualize_image(array: np.ndarray):
    """Visualize the given flattened image. Assume 28*28 len"""
    image = array.reshape(28, 28)

    plt.figure()
    plt.title(f"Cluster center") 
    plt.imshow(image, cmap='gray', vmin=0, vmax=255)

    plt.show(True)

def add_cluster_center_as_ax(cluster_center: np.ndarray, cluster_index: int, ax):
    image = cluster_center.reshape(28, 28)

    ax.set_title(f"Clust.{cluster_index}")
    ax.imshow(image, cmap='gray', vmin=0, vmax=255)

* trivial score computation

In [None]:
class TerminateCondition(Enum):
    CHANGED_CLUSTER = 0b01
    CHANGED_CENTER = 0b10

@dataclass
class Score():
    """Hold the score for a training set evalutation. Contain the score evalutation method"""
    total: int
    success: int
    failures: int

    accuracy: float

    @classmethod
    def compute_score(total: int, successes: int, failures: int) -> Score:
        """Compute the score from the given successes / failures
        
        Returns
        -----
            A score object containing the information
        """
        accuracy = successes / total

        return Score(total, successes, failures, accuracy)

* Custom classifier, heavily coupled to test dataset but optimized for efficient tests on k via memoization of the distance map

In [None]:
class KmeansClassifier:
    """Class encapsulating the logic of the KNN algorithm"""
    #Work with arrays to be more efficient
    data: np.ndarray
    labels: np.ndarray

    cluster_map: np.ndarray
    k: int

    def __init__(self, train_df: pd.DataFrame) -> None:
        """Init the KNN classifier with the training and testing dataset"""
        self.labels = train_df.loc[:, 'label'].to_numpy()
        self.data = train_df.drop(['label'], axis=1, inplace=False).to_numpy()
        print(f"K-means loaded with {len(self.data)} data")
    
    def cluster_data(self, cluster_index: int) -> np.ndarray:
        """Return the datapoint belonging to cluster index"""
        return self.data[self.cluster_map == cluster_index]

    def single_linkage_distance(self, cluster_ind_from: int, cluster_ind_to: int):
        """Determing the min distance between pts of clst1 to points of clst2"""
        cluster_from = self.cluster_data(cluster_ind_from)
        cluster_to = self.cluster_data(cluster_ind_to)

        min_point_distance = []
        for point in cluster_from:
            distances = self.euclidian_distances(point, cluster_to)
            min_point_distance.append(np.min(distances))
        
        return np.min(min_point_distance)

    def cluster_diameter(self, cluster_index: int) -> int:
        """Return the diameter of said cluster"""
        cluster_data = self.cluster_data(cluster_index)

        max_distances = []
        for point in cluster_data:
            distances = self.euclidian_distances(point, cluster_data)
            max_distances.append(np.max(distances))
        
        return np.max(max_distances)

    def dunn_index(self):
        cluster_min_distances = []
        cluster_diameters = []
        for cluster_index in range(self.k):

            #First: Calculate minimal single linkage distance between clusters
            cluster_distances = []
            for cluster_index_to in range(self.k):
                if cluster_index_to == cluster_index:
                    continue 
                single_linkage_distance = self.single_linkage_distance(cluster_index, cluster_index_to)
                cluster_distances.append(single_linkage_distance)
            #Add this cluster's minimum linkage distance to all min distances
            cluster_min_distances.append(np.min(cluster_distances))


            #Second: Calculate diameter for each cluster
            diameter = self.cluster_diameter(cluster_index)
            cluster_diameters.append(diameter)

        dunn_index = np.min(cluster_min_distances) / np.max(cluster_diameters)
        return dunn_index

    def davis_bouldin_index(self):
        """Return the davis bouldin index"""
        #Create repository of Di, Mi per cluster
        d_m_clusters = []
        for cluster_index in range(self.k):
            cluster_data = self.cluster_data(cluster_index)

            m = np.mean(cluster_data, axis=0)
            d = np.sum(self.euclidian_distances(m, cluster_data)) / len(cluster_data)
            d_m_clusters.append((d, m))
        
        #Now calculate each Rij easily as we have the repositories of centers and distances
        Ri_list = []
        for i, (di,mi) in enumerate(d_m_clusters):
            Rij_list = []
            for j, (dj, mj) in enumerate(d_m_clusters):
                if i == j:
                    continue
                # Put into one dim array, and get first value as distance function expects to compare array to matrix
                center_distances = self.euclidian_distances([mi], [mj])[0] 
                Rij = (di + dj) / center_distances
                Rij_list.append(Rij)
            Ri_list.append(np.max(Rij_list))
        
        davis_bouldin_index = sum(Ri_list) / self.k
        return davis_bouldin_index

    def euclidian_distances(self, point: np.ndarray, matrix: np.ndarray):
        """Compute the distance as the difference in intensity level pixel-wise between the input datapoint and the given matrix.
        
        Returns
        ----
            A matrix of distance between the point and each matrix's data point.
            The matrix thus has a size of (N, 1): I column as the distance is a singular value,
            N rows as each row is the distance to a matrix's datapoint
        """
        #use numpy functions as they are made to be efficient with vecotrized input
        return np.sqrt(np.sum(np.power(np.subtract(point, matrix), 2), axis=1))

    def is_terminated(self, i: int, i_max: int, has_changed_cluster: bool, has_changed_center: bool, 
        terminate_flags: List[TerminateCondition]) -> bool:
        if i >= i_max:
            print(f"\nMax iterations reached: {i}")
            return True
        if TerminateCondition.CHANGED_CLUSTER in terminate_flags and not has_changed_cluster:
            print(f"\nNo cluster changes detected")
            return True
        if TerminateCondition.CHANGED_CENTER in terminate_flags and not has_changed_center:
            print(f"\nNo center changes detected")
            return True
        return False

    def clusterize(self, k: int, iterations: int, terminate_flags: List[TerminateCondition]) -> tuple[np.ndarray, np.ndarray]:
        """Classify each item on the test dataset and yields an evaluation score
        
        Args
        -----
            k: int - The number of expected clusters
            iterations: int - 
            terminate_condition: int - Additional terminate condition, see Terminate condition enums

        Returns
        ----
            A score object
        """
        #Chose k random centers, init clusters with single point. Avoid same-point centers.
        self.k = k
        rand_indexes = []
        for _ in range(k):
            randToss = None
            while randToss is None or randToss in rand_indexes:
                randToss = random.randrange(0, len(self.data))
            rand_indexes.append(randToss)

        centers = np.take(self.data, rand_indexes, axis=0)
            
        #Init "cluster identification" array for each datapoint
        self.cluster_map = np.zeros((len(self.data)), dtype=int)

        has_changed_cluster = True
        has_changed_center = True

        i = 0
        while not self.is_terminated(i, iterations, has_changed_cluster, has_changed_center, terminate_flags):
            i += 1
            print(f"\rIteration <{i:5}>", end="")
            has_changed_cluster = False
            has_changed_center = False

            #Reassign each point to centers
            for point_index, point in enumerate(self.data):
                #Calculate distance map
                point_centers_dist_matrix = self.euclidian_distances(point, centers)

                #Find index of nearest center, that's the new cluster index
                new_cluster_index = np.argmin(point_centers_dist_matrix)

                #Verify if it has changed
                current_cluster_index = self.cluster_map[point_index]
                if current_cluster_index != new_cluster_index:
                    has_changed_cluster = True
                    self.cluster_map[point_index] = new_cluster_index

            #Re-calculate centers
            for cluster_index in range(k):
                #Get all data assigned to this cluster using ndarray binary mask on cluster index
                cluster_points = self.data[self.cluster_map == cluster_index]
                #cluster_points = np.take(self.data, (self.cluster_map == cluster_index), axis=0)

                #Center is simply the new mean of this data
                new_cluster_mean = np.mean(cluster_points, axis=0)

                #Track wheter the center has changed
                current_cluster_mean = centers[cluster_index]
                if not np.array_equal(current_cluster_mean, new_cluster_mean):
                    has_changed_center = True
                    centers[cluster_index] = new_cluster_mean

        return self.cluster_map, centers

# Main functions
Function used to perform the assignement

In [None]:
def clusterize_with_k(k: int) -> tuple[np.ndarray, np.ndarray, KmeansClassifier]:
    """Clusterize with the given K clusters.
    Uses 100 iterations as this is generally enough for the k-means to stop beforehand because of no changes detected.
    
    Returns
    -----
        The tzple (cluster_map, cluster_centers)
    """
    kmeans_classifier = KmeansClassifier(train_df)
    max_iter = 100

    cluster_map, cluster_centers = kmeans_classifier.clusterize(k, max_iter, (TerminateCondition.CHANGED_CENTER, TerminateCondition.CHANGED_CLUSTER))

    return cluster_map, cluster_centers, kmeans_classifier

## Cluster analysis
Make an analysis of the data repartition over the final clusterized dataset

In [None]:
def analyse(classifier: KmeansClassifier, labels: np.ndarray, cluster_map: np.ndarray):
    #Calculate each cluster's validity, using different methods

    #Anylse repartition with classes
    print(f"Analysing clusters...")
    for i in range(2):
        for j in range(5):
            #get all cluster values corresponding to the class
            class_value = 5*i + j
            binary_mask = (labels == class_value)
            cluster_values = cluster_map[binary_mask]
            
            #Get most used cluster
            sorted_freq_labels = Counter(cluster_values).most_common()
            most_freq_cluster = int(sorted_freq_labels[0][0])

            print(f"Label {class_value} has {np.count_nonzero(binary_mask)} elements. They are spread around {len(np.unique(cluster_values))} clusters")
            print(f"Cluster spread: {sorted_freq_labels}")
            counts = [count for val, count in sorted_freq_labels]
            together_ratio = 100 * (counts[0] / sum(counts))
            print(f"Class {class_value} is mostly represented by cluster {most_freq_cluster} : {counts[0]}/{sum(counts)} ({together_ratio:.2f})%")


## Data repartition
Plot used to see the overall data repartition between labels and clusters

In [None]:
def build_data_repartition(labels, cluster_map):
    fig, axs = plt.subplots(2)
    fig.suptitle('Vertically stacked subplots')

    axs[0].bar(np.linspace(0, len(train_df), len(train_df)), labels)
    axs[0].set_title("Index - Labels")
    axs[0].set_xlabel("Index")
    axs[0].set_ylabel("Label")

    axs[1].bar(np.linspace(0, len(train_df), len(train_df)), cluster_map)
    axs[1].set_title("Index - clusterized")
    axs[1].set_xlabel("Index")
    axs[1].set_ylabel("Cluster")

## Piechart
Used to see the volume repratition for labels and clusters


In [None]:
def build_piechart(labels, cluster_map):
    fig, axes = plt.subplots(2)
    fig.suptitle('Data repratition over initial dataset and resulting clusterized dataset')

    #Draw labels pie chart
    slices = []
    activities = []
    for i in range(10):
        slices.append(np.count_nonzero( (labels == i) )) 
        activities.append(f"nbr_{i}")

    axes[0].pie(slices,
        labels = activities,
        shadow = True)

    #Draw clusters pie chart
    slices = []
    activities = []
    for i in range(len(np.unique(cluster_map))):
        slices.append(np.count_nonzero( (cluster_map == i) ))
        activities.append(f"cluster_{i}")

    axes[1].pie(slices,
        labels = activities,
        shadow = True)

## Function used to show repartition of labeled data into their clusters
This repartition is focused on original data, this would not be possible with trully unsupervised value.
This is a way to see where each of the drawn number went into.

In [None]:
def build_data_per_label(labels, cluster_map, cluster_centers):
    fig, axs = plt.subplots(2, 5)

    for i in range(2):
        for j in range(5):
            #get all cluster values corresponding to the class
            class_value = 5*i + j
            binary_mask = (labels == class_value)
            cluster_values = cluster_map[binary_mask]
            
            #Get most used cluster
            sorted_freq_labels = Counter(cluster_values).most_common()
            most_freq_cluster = int(sorted_freq_labels[0][0])

            #Display class's main cluster 
            center = cluster_centers[most_freq_cluster]
            add_cluster_center_as_ax(center, most_freq_cluster, axs[i][j])

## Display all clusters to have an idea of the final repartition
It is interesting to plot all the final cluster's centers in order to visualize each "cluster mean", or "cluster target".

In [None]:
def build_clusters_visuals(cluster_centers):
    cols = 5
    rows = int(np.ceil(len(cluster_centers) / cols))
    fig, axs = plt.subplots(rows, cols)

    for i in range(rows):
        for j in range(cols):
            cluster_ind = 5*i + j
            if cluster_ind < len(cluster_centers):
                add_cluster_center_as_ax(cluster_centers[cluster_ind], cluster_ind, axs[i][j] if rows>1 else axs[j])

# Assignnement
Test the model accuracy with different cluster validation methods

* Validation Values for K = {5, 7, 9, 10, 12, 15}

In [None]:
import os

labels = train_df.loc[:, 'label'].to_numpy()
out_path =  os.path.join(os.path.abspath(''), "out")
if os.path.exists(out_path):
    archive_path = os.path.join(os.path.abspath(''), "archive")
    if os.path.exists(archive_path):
        os.remove(archive_path)
    os.rename(out_path, archive_path)
os.mkdir(out_path)

Ks = [5, 7, 9, 10, 12, 15]
dunn_indexes = []
davis_bouldin_indexes = []

for k in Ks:
    print(f"Iteration k={k}")
    full_path = os.path.join(out_path, f"{k}")    
    os.mkdir(full_path)
    
    cluster_map, cluster_centers, classifier = clusterize_with_k(k)

    print(f"Calculating cluster validation...")
    dunn_index = classifier.dunn_index()
    print(f"Dunn index (The larger = the better): {dunn_index}")
    dunn_indexes.append(dunn_index)

    davis_bouldin_index = classifier.davis_bouldin_index()
    print(f"Davis bouldin index (The lower = the better): {davis_bouldin_index}")
    davis_bouldin_indexes.append(davis_bouldin_index)

    build_data_repartition(labels, cluster_map)
    plt.savefig(f'./out/{k}/data_repartition.pdf')

    build_piechart(labels, cluster_map)
    plt.savefig(f'./out/{k}/piecharts.pdf')

    build_data_per_label(labels, cluster_map, cluster_centers)
    plt.savefig(f'./out/{k}/label_repartition.pdf')

    build_clusters_visuals(cluster_centers)
    plt.savefig(f'./out/{k}/clusters.pdf')
    print()

print(f"Computation finished.")

dunn_ordered = sorted(zip(Ks, dunn_indexes), key = lambda k_dun: k_dun[1], reverse=True)
print(f"Best clusters according to Dunn: {dunn_ordered[0]}, ordered: {dunn_ordered}")

davis_sorted = sorted(zip(Ks, davis_bouldin_indexes), key = lambda k_davis: k_davis[1])
print(f"Best clusters according to davis bouldin: {davis_sorted[0]}, ordered: {davis_sorted}")