In [9]:
import numpy as np
import random
import matplotlib.pyplot as plt
from tensorflow.keras.datasets import mnist
from kmeans import KMeans
import argparse

In [44]:
class KMeans():
    def __init__(
        self,
        x_train,
        y_train,
        num_clusters=3,
        max_iter=100,
        tol=1e-4,
        seed: str = None,
    ):
        """
        Initialize KMeans object.
        Arguments:
            dataset: numpy array of shape (n_samples, n_features)
            k: number of clusters
            max_iter: maximum number of iterations
            tol: tolerance for convergence
            seed: initial cluster centroids choice ['random','cluster']
        """
        self.dataset = x_train
        self.targets = y_train

        self.k = num_clusters
        self.max_iter = max_iter
        self.tol = tol

        self.num_features = x_train.shape[1]
        self.num_samples = x_train.shape[0]
        self.losses = []

        if seed == "random":
            self.centroids = np.random.uniform(
                size=(self.k, self.num_features))
        elif seed == "cluster":
            self.centroids = np.copy(self.dataset[np.random.choice(
                self.num_samples, self.k, replace=False)])
        else:
            raise ValueError("Seed must be in ['random', 'cluster']")
            
        self.old_centroids = np.copy(self.centroids)
        self.cluster_labels = np.zeros(self.num_samples, dtype=int)
        self.assign_clusters()

    def converged(self):
        if len(self.losses) < 2:
            return False
        if (abs(self.losses[-1] - self.losses[-2]) < self.tol):
            return True
        return False

    def assign_clusters(self):
        for i in range(self.num_samples):
            self.cluster_labels[i] = np.argmin(
                np.linalg.norm(self.dataset[i]-self.centroids, ord=2, axis=1))

    def get_centroid_labels(self):
        centroid_labels = np.zeros(self.k)
        for i in range(self.k):
            count = np.bincount(self.targets[self.cluster_labels == i])
            if len(count) > 0:
                centroid_labels[i] = np.argmax(count)
        return centroid_labels

    def fit(self):
        for i in range(self.max_iter):
            self.assign_clusters()
            self.update_centroids()
            loss = self.calc_loss()
            self.losses.append(loss)
#             print(f"Iteration {i} Loss: {loss}")
#             print("---------------------------")
            if self.converged() is True:
                print(f"TOTAL ITERATIONS = {i}")
                break
            self.old_centroids = np.copy(self.centroids)

    def calc_loss(self):
        loss = np.mean(np.linalg.norm(
            self.dataset - self.centroids[self.cluster_labels], ord=2, axis=1), axis=0)
        return loss

    def calculate_loss(self):
        loss = np.array(np.array([np.linalg.norm(self.data[i, :]-self.centers[int(
            self.class_labels[i]), :], ord=2) for i in range(self.num_samples)]))
        return np.mean(loss)

    def update_centroids(self):
        for i in range(self.k):
            alloted = self.dataset[self.cluster_labels == i]
            if len(alloted) > 0:
                self.centroids[i] = np.mean(alloted, axis=0)
            else:
                self.centroids[i] = np.zeros(self.num_features)

    def predict(self, x):
        labels = np.zeros(x.shape[0], dtype=int)
        for i in range(x.shape[0]):
            labels[i] = np.argmin(
                np.linalg.norm(x[i]-self.centroids, ord=2, axis=1))
        return self.get_centroid_labels()[labels]

In [53]:
def seed_everything(seed):
    random.seed(seed)
    np.random.seed(seed)

def load_data():
    (x_train, y_train), (x_test, y_test) = mnist.load_data()

    # normalize training and test data
    x_train = x_train / 255
    x_test = x_test / 255
    x_train = x_train.reshape(x_train.shape[0], -1)
    x_test = x_test.reshape(x_test.shape[0], -1)

    digits = []
    targets = []
    for i in range(10):
        images = x_train[y_train == i]
        digits.append(images[np.random.choice(
            len(images), 100, replace=False)])
        targets.append(np.full((100,), i))

    x_train = np.vstack(digits)
    y_train = np.hstack(targets)

    # shuffle the data
    permutation = np.random.permutation(x_train.shape[0])
    x_train = x_train[permutation]
    y_train = y_train[permutation]

    test_indices = np.random.choice(x_test.shape[0], 50)
    x_test = x_test[test_indices]
    y_test = y_test[test_indices]
    return (x_train, y_train), (x_test, y_test)


def plot_centroids(kmeans, centroids):
    centroid_images = np.copy(centroids.reshape(kmeans.k, 28, 28))
    centroid_images = centroid_images * 255

    centroid_labels = kmeans.get_centroid_labels()

    fig = plt.figure(figsize=(20, 20))
    nrows = 4
    ncols = kmeans.k // nrows + kmeans.k % nrows
    for i in range(kmeans.k):
        fig.add_subplot(nrows, ncols, i+1)
        plt.imshow(centroid_images[i], cmap="gray")
        plt.title(f"Label: {centroid_labels[i]}", fontsize=10)
        plt.axis("off")
    plt.show()


def classify_images(num_clusters=20, max_iter=200, tol=1e-6, seed="cluster"):
    # load the mnist data
    (x_train, y_train), (x_test, y_test) = load_data()
    # create a kmeans instance
    kmeans = KMeans(x_train, y_train,
                    num_clusters=num_clusters,
                    max_iter=max_iter,
                    tol=tol,
                    seed=seed)

    kmeans.fit()  # train the model
    # predict the labels from input labels and centroids
    predictions = kmeans.predict(x_test)
    print(f"Accuracy: {np.mean(predictions == y_test)}")  # print the accuracy
    plot_centroids(kmeans, kmeans.centroids)  # plot the centroids


def plot_jclust(max_iter=200, tol=1e-6, seed="cluster"):
    k = np.arange(start=5, stop=21, step=1, dtype=int)
    (x_train, y_train), (x_test, y_test) = load_data()
    # create a kmeans instance
    jclust = []
    accuracy = []
    for num_clusters in k:
        print("---------------------------")
        print(f"K = {num_clusters}")
        kmeans = KMeans(x_train, y_train,
                        num_clusters=num_clusters,
                        max_iter=max_iter,
                        tol=tol,
                        seed=seed)
        kmeans.fit()
        loss = kmeans.calc_loss()
        print(f"TOTAL LOSS = {loss}")
        jclust.append(loss)
        predictions = kmeans.predict(x_test)
        acc = np.mean(predictions == y_test)
        print(f"Accuracy = {acc}\n")
        accuracy.append(acc)
        
    plt.plot(k, jclust)
    plt.xlabel("Number of Clusters")
    plt.ylabel("J-Clustering Loss")
    plt.show()
    
    plt.plot(k, accuracy)
    plt.xlabel("Number of Clusters")
    plt.ylabel("Accuracy on test set")
    plt.show()