Import Libraires

In [None]:
from keras.datasets import mnist
import numpy as np
import matplotlib.pyplot as plt

Get and format data

In [None]:
def get_data():
    (x_train, y_train), (x_test, y_test) = mnist.load_data()

    # reshape matrix to vector
    x_train, x_test = x_train.reshape(x_train.shape[0], -1), x_test.reshape(x_test.shape[0], -1)

    # scale values to 0-1
    x_train = x_train/255.0
    x_test = x_test/255.0

    train_data, train_labels = [], []
    for i in range(10):
        train_labels += [i]*100
        temp_data = x_train[np.where(y_train == i)[0]]
        np.random.shuffle(temp_data)
        train_data.extend(temp_data[:100])
    
    train_data, train_labels = np.array(train_data), np.array(train_labels)
    idx = np.random.permutation(len(train_data))
    train_data, train_labels = train_data[idx], train_labels[idx]

    idx = np.random.permutation(len(x_test))
    test_data, test_labels = x_test[idx][:50], y_test[idx][:50]
    return (train_data, train_labels), (test_data, test_labels)

(x_train, y_train), (x_test, y_test) = get_data()
print(f"N = number of training examples = {x_train.shape[0]}")
print(f"n = number of features = {x_train.shape[1]}")

N = number of training examples = 1000
n = number of features = 784


KMeans Class

In [None]:
class KMeans():
    def __init__(self, x, y, cluster_cnt = 20, random_init = 1):
        self.data = x
        self.labels = y
        self.feature_len = self.data.shape[1]
        self.train_len = self.data.shape[0]
        self.clusters = np.zeros(self.train_len, dtype=int)
        self.cluster_cnt = cluster_cnt
        self.cluster_centers = np.random.uniform(size=(self.cluster_cnt, self.feature_len))
        self.cluster_labels = -1*np.ones(self.cluster_cnt, dtype=int)
        if random_init == 0:
            self.cluster_centers = self.data[np.random.randint(0, self.train_len, self.cluster_cnt)]
        self.j_clust = []
    
    def get_distance(self, x, y):
        return np.linalg.norm(x-y, 2)
    
    def update_cluster_assignment(self):
        for i in range(self.train_len):
            self.clusters[i] = np.argmin(np.array([self.get_distance(self.data[i], c) for c in self.cluster_centers]))
    
    def update_cluster_representatives(self):
        for i in range(self.cluster_cnt):
            idx = np.where(self.clusters == i)[0]
            if idx.shape[0] == 0:
                self.cluster_centers[i] = np.zeros(self.feature_len)
            else:
                self.cluster_centers[i] = np.mean(self.data[idx], axis=0)
    
    def update_cluster_labels(self):
        self.cluster_labels = -1*np.ones(self.cluster_cnt, dtype=int)
        for i in range(self.cluster_cnt):
            freq = np.bincount(self.labels[np.where(self.clusters == i)])
            if freq.shape[0]:
                self.cluster_labels[i]=np.argmax(freq)
    
    def get_j_clust(self):
        return np.mean(np.array([self.get_distance(self.data[i], self.cluster_centers[self.clusters[i]]) for i in range(self.train_len)]))
    
    def check_convergence(self):
        if len(self.j_clust) < 2:
            return False
        return np.absolute(self.j_clust[-1] - self.j_clust[-2]) < 1e-6
    
    def get_centers(self):
        return self.cluster_centers
    
    def train(self, max_iterations = 1000, info = False):
        for i in range(max_iterations):
            self.update_cluster_assignment()
            self.update_cluster_representatives()
            self.update_cluster_labels()
            self.j_clust.append(self.get_j_clust())
            if info:
                print(f"Iteration {i+1}: j_clust = {self.j_clust[-1]}")
            if self.check_convergence():
                print(f"Converged after {i+1} iterations.")
                return self.j_clust[-1]
        print(f"Max iterations reached.")
        return self.j_clust[-1]
    
    def test(self, x, y):
        predicted_clusters = np.array( [np.argmin(np.array([self.get_distance(x_i, c) for c in self.cluster_centers])) for x_i in x] )
        predicted_values = self.cluster_labels[predicted_clusters]
        return np.mean(np.equal(predicted_values, y))

    def visualise_cluster_representatives(self):
        dims = int(np.sqrt(self.feature_len))
        images = self.cluster_centers.reshape((self.cluster_cnt, dims, dims))
        fig, axs = plt.subplots(5, 4, figsize=(14, 14))
        plt.gray()
        for i, a in enumerate(axs.flat):
            a.matshow(images[i])
            a.axis('off')
            a.text(-4.0, -2.0, f"Value = {self.cluster_labels[i]}")
        fig.suptitle("Cluster Representatives", fontsize=30)
        plt.show()
    
    def visualise_j_clust(self):
        plt.xlabel('Iterations')
        plt.ylabel('J_Clust')
        plt.plot(range(len(self.j_clust)), self.j_clust)
        plt.show()


Results and visualisation (Part a and b)

In [None]:
kmeans_trainer = KMeans(x_train, y_train) # for case(ii) pass random_init = 0
kmeans_trainer.train()
kmeans_trainer.visualise_cluster_representatives()
kmeans_trainer.visualise_j_clust()
print(f"Accuracy is: {kmeans_trainer.test(x_test, y_test)}")

Variation in number of clusters (Part c)

In [None]:
cluster_range = range(5, 21)
j_clust_values = []
for i in cluster_range:
    kmeans_trainer = KMeans(x_train, y_train, cluster_cnt = i) # for case(ii) pass random_init = 0
    j_clust_values.append(kmeans_trainer.train())

j_clust_values = np.array(j_clust_values)
min_j_clust = np.min(j_clust_values)
k_min = cluster_range[np.argmin(j_clust_values)]
print(f"Min J_clust = {min_j_clust} for k = {k_min}")
for i, val in enumerate(j_clust_values):
    print(f"k = {i+5}, J_clust = {val}")
plt.xlabel('k (number of clusters)')
plt.ylabel('J_Clust')
plt.plot(cluster_range, j_clust_values)
plt.show()