# Object Oriented K-Means Algorithm

In [1]:
# imports
from __future__ import annotations #postpone the evaluation of type hints until runtime.
from typing import TypeVar, Generic, List, Iterable, Tuple, Iterator, Sequence
from dataclasses import dataclass
from math import sqrt
from copy import deepcopy
from functools import partial
from random import uniform
from statistics import mean, pstdev


In [2]:
# data_point.py
class DataPoint:
    def __init__(self, initial: Iterable[float]) -> None:
        self._originals: Tuple[float, ...] = initial
        self.dimensions: Tuple[float, ...] = initial

    @property
    def num_dimensions(self) -> int:
        return len(self.dimensions)

    def distance(self, other: DataPoint) -> float:
        combined: Iterator[Tuple[float,float]] = zip(self.dimensions, other.dimensions)
        differences: List[float] = [(x - y)**2 for x, y in combined]
        return sqrt(sum(differences))

    def __eq__(self, other: object) -> bool:
        if isinstance(other, DataPoint):
            return NotImplemented
        return self.dimensions == other.dimensions

    def __repr__(self) -> str:
        return self._originals.__repr__()


In [3]:
# kmeans.py
def zscores(original: Sequence[float]) -> List[float]:
    avg: float = mean(original)
    std: float = pstdev(original)

    if std == 0:
        return [0.] * len(original)

    return [(x - avg)/std for x in original]


In [4]:
# # Algorithm for kmeans # #
# 1. initialize data points and k
# 2. normalize data points
# 3. create random centeroids for each cluster
# 4. assign data points to clusters
# 5. recalculate centeroids with new cluster data points
# 6. repeat 4 and 5
# 7. stop with maximum iterations or centeroids no move

Point= TypeVar('Point', bound= DataPoint)

class KMeans(Generic[Point]):

    @dataclass
    class Cluster:
        points: List[Point]
        centeroid: Point

    def __init__(self, k:int, data_points: List[Point]) -> None:
        if k < 1:
            raise ValueError('k must be positive integer')

        self._k: int= k
        self._data_points: List[Point]= data_points
        self._zscore_normalize()

        # initialize clusters
        self._clusters: List[KMeans.Cluster] = []

        for _ in range(self._k):
            rand_point: Point = self._random_point()
            cluster: KMeans.Cluster = KMeans.Cluster([], rand_point)
            self._clusters.append(cluster)

    @property
    def _centeroids(self) -> List[Point]:
        return [cluster.centeroid for cluster in self._clusters]

    # Point dimension selector
    def _dimension_slice(self, index: int = 0) -> List[float]:
        return [point.dimensions[index] for point in self._data_points]

    def _zscore_normalize(self) -> None:
        zscored: List[List[float]] = [[] for _ in range(len(self._data_points))]
        for dim in range(self._data_points[0].num_dimensions):
            dimension_slice: List[float] = self._dimension_slice(dim)
            for i, zscore in enumerate(zscores(dimension_slice)):
                zscored[i].append(zscore)
        for i in range(len(self._data_points)):
            self._data_points[i].dimensions = tuple(zscored[i])

    def _random_point(self) -> Point:
        rand_dimensions: List[float] = []
        for i in range(self._data_points[0].num_dimensions):
            values: List[float] = self._dimension_slice(i)
            rand_values: Tuple[float] = uniform(max(values), min(values))
            rand_dimensions.append(rand_values)
        return DataPoint(rand_dimensions)

    def _assign_clusters(self) -> None:
        for point in self._data_points:
            closest_cluster: Point = min(self._centeroids, key = partial(DataPoint.distance, point))
            index: int= self._centeroids.index(closest_cluster)
            self._clusters[index].points.append(point)

    def _calculate_centeroids(self) -> None:
        for cluster in self._clusters:
            if len(cluster.points) == 0:
                continue
            means: List[float] = []
            for i in range(cluster.points[0].num_dimensions):
                dimension_slice : List[float] = [p.dimensions[i] for p in cluster.points]
                means.append(mean(dimension_slice))
                cluster.centeroid = DataPoint(means)

    def run(self, max_iterations: int = 100) -> List[KMeans.Cluster]:
        for iters in range(max_iterations):
            for cluster in self._clusters:
                cluster.points.clear()

            self._assign_clusters()
            previous_centeroids: List[Point] = deepcopy(self._centeroids)
            self._calculate_centeroids()

            centroid_shift = sum([previous_centeroid.distance(current_centeroid)
                           for previous_centeroid, current_centeroid in zip(previous_centeroids, self._centeroids)])
            if centroid_shift < 1e-18:
                print(f'Converged after {iters} iterations')
                return self._clusters

        print('Did not converged in 100 iterations')
        return self._clusters


In [31]:
if __name__ == '__main__':
    point1 = DataPoint([2., 1., 1.])
    point2 = DataPoint([2., 2., 2.])
    point3 = DataPoint([30., 1.5, 2.5])

    k_means = KMeans(2, [point1, point2, point3])
    clusters : List[KMeans.Cluster] = k_means.run()

    for i, cluster in enumerate(clusters):
        print(f'CLuster{i}, {cluster.points}')

Converged after 1 iterations
CLuster0, [[2.0, 1.0, 1.0]]
CLuster1, [[2.0, 2.0, 2.0], [30.0, 1.5, 2.5]]
