In [1]:
import numpy as np
import matplotlib.pyplot as plt

In [2]:
np.random.seed(42)
%matplotlib inline

In [13]:
class KMeans:
    def __init__(self, k=5, max_iter=100, plot_steps=False):
        self.k = k
        self.max_iter = max_iter
        self.plot_steps = plot_steps
        # list of sample indices for each cluster
        self.clusters = [[] for _ in range(k)]
        # mean feature vectors for each cluster
        self.centroids = []

    def _euclidean_distance(self, x, y):
        return np.sqrt(np.sum((x - y) ** 2))
 
    def _closest_centroid(self, sample, centroids):
        distances = [self._euclidean_distance(sample, c) for c in centroids]
        closest_centroid = np.argmin(distances)
        return closest_centroid

    def _create_clusters(self, centroids):
        clusters = [[] for _ in range(self.k)]
        for idx, sample in enumerate(self.x):
            closest_centroid = self._closest_centroid(sample, centroids)
            clusters[closest_centroid].append(idx)
        return clusters
    
    def _update_centroids(self):
        centroids = np.zeros((self.k, self.num_feature))
        for i, cluster in enumerate(self.clusters):
            if len(cluster) == 0:
                continue
            cluster_mean = np.mean(self.x[cluster], axis=0)
            centroids[i] = cluster_mean

        return centroids
    
    def _has_converged(self, prev_centroids):
        total_changes = [self._euclidean_distance(self.centroids[idx], prev_centroids[idx]) for idx in range(self.k)]
        total_change = np.sum(total_changes) < 1e-6
        return total_change

    def _centroid_distance(self):
        for clusterIdx, cluster in enumerate(self.clusters):
            distances = []
            if len(cluster) == 0:
                continue
            for pointIdx in cluster:
                distances.append(self._euclidean_distance(self.centroids[clusterIdx], self.x[pointIdx]))

        return np.sum(distances)

    def predict(self, X):
        self.x = X
        self.num_sample, self.num_feature = X.shape

        # initialize centroids
        # get k random samples from a list 0 to num_sample
        random_sample_idxs = np.random.choice(
            self.num_sample, self.k, replace=False)
        self.centroids = [X[idx] for idx in random_sample_idxs]

        # optimize centroids
        for _ in range(self.max_iter):
            # update clusters
            self.clusters = self._create_clusters(self.centroids)

            # update centroids
            prev_centroids = self.centroids
            self.centroids = self._update_centroids()
            if self.plot_steps:
                self.plot()

            # check convergence
            if self._has_converged(prev_centroids):
                break

        # return cluster labels
        return self._centroid_distance()
    
    def plot(self):
        fig, ax = plt.subplots(figsize=(12, 8))

        for clusterIdx, pointIdx in enumerate(self.clusters):
            point = self.x[pointIdx].T
            ax.scatter(*point)

        for centroid in self.centroids:
            ax.scatter(*centroid, c='black', marker='x', linewidths=2)

        plt.show()



In [22]:
total_distances = []

# Elbow method
for k in range(1, 8):
    kmeans = KMeans(k=k, max_iter=100, plot_steps=False)
    dataset = np.array([
        [1, 2],
        [2, 3],
        [3, 1],
        [4, 2],
        [5, 4],
        [6, 5],
        [7, 6],
        [8, 7],
    ])

    total_distance = kmeans.predict(dataset)
    total_distances.append(total_distance)

print(total_distances)

[22.126942800463553, 5.23606797749979, 5.23606797749979, 3.9436506316151, 1.4142135623730951, 0.0, 0.0]
