In [1]:
import typing as t
from scipy.spatial.distance import cdist
import numpy as np
import numpy.ma as ma
import pandas as pd

In [2]:
def generate_dataset(
    size: int = 20,
    dim: int = 2,
    min: int = -10,
    max: int = 10,
    seed: int = 0,
) -> np.ndarray:
    return (max - min) * np.random.default_rng(seed).random((size, dim)) + min

In [13]:
class KMeans:
    def __init__(self, n_clusters: int = 8, max_iter: int = 300):
        self.n_clusters = n_clusters
        self.max_iter = max_iter

        self.history = []
        self.centroids = None
        self.labels = None

    def fit(self, X: np.ndarray):
        data = X
        self.centroids = self._initialize_clusters(
            data=data, n_clusters=self.n_clusters
        )
        self.history = [self.centroids]

        for _ in range(self.max_iter):
            self.labels = self._assign_points(data=data, centroids=self.centroids)
            self.centroids = KMeans._update_centroids(
                data=data, assignations=self.labels, n_clusters=2
            )
            self.history.append(self.centroids)
        return self

    @staticmethod
    def _initialize_clusters(
        data: np.ndarray, n_clusters: int, seed: int = 0
    ) -> np.ndarray:
        if len(data.shape) != 2:
            raise ValueError(f"Expected data of shape 2, shape={data.shape}")

        min = data.min()
        max = data.max()
        dim = data.shape[1]
        return (max - min) * np.random.default_rng(seed).random((n_clusters, dim)) + min

    @staticmethod
    def _assign_points(data: np.ndarray, centroids: np.ndarray) -> np.ndarray:
        # e.g. [ [0.71, 12.6] , [12.3, 0.40] ... ]
        distances = cdist(data, centroids)
        # e.g. array([0, 1, ... ])
        return np.argmin(distances, axis=1)

    @staticmethod
    def _update_centroids(
        data: np.ndarray, assignations: np.ndarray, n_clusters: int
    ) -> np.ndarray:
        new_centroids = []
        for i in range(n_clusters):
            mask = assignations == i
            data_in_cluster = data[mask]
            # WARNING: if no points are within the cluster, we should probably reset it?
            new_centroid = data_in_cluster.mean(axis=0)
            new_centroids.append(new_centroid)
        return np.array(new_centroids)

In [14]:
data = generate_dataset(size=5)
print(data.shape)
data

(5, 2)


array([[ 2.73923375, -4.60426572],
       [-9.18052952, -9.66944729],
       [ 6.26540478,  8.25511155],
       [ 2.13271552,  4.58993122],
       [ 0.87249983,  8.70144848]])

In [15]:
kmeans = KMeans(n_clusters=3, max_iter=5)

In [16]:
kmeans.fit(X=data)

<__main__.KMeans at 0x16c2414d0>

In [17]:
kmeans.history

[array([[ 2.03210947, -4.71322369],
        [-8.91672695, -9.36581982],
        [ 5.2710555 ,  7.09869028]]),
 array([[ 2.73923375, -4.60426572],
        [-9.18052952, -9.66944729]]),
 array([[ 3.00246347,  4.23555638],
        [-9.18052952, -9.66944729]]),
 array([[ 3.00246347,  4.23555638],
        [-9.18052952, -9.66944729]]),
 array([[ 3.00246347,  4.23555638],
        [-9.18052952, -9.66944729]]),
 array([[ 3.00246347,  4.23555638],
        [-9.18052952, -9.66944729]])]

In [18]:
kmeans.labels

array([0, 1, 0, 0, 0])