<a href="https://colab.research.google.com/github/scsanjay/ml_from_scratch/blob/main/11.%20K-means%20Clustering/KMeans.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Custom implementation of KMeans

In [1]:
import numpy as np

In [167]:
class KMeans():
  """
  KMeans clustering with Lloyds algorithm.

  Parameters
  ----------
  n_clusters : int, default=8
      The number of cluster to make.

  init : {'k-means++', 'random'}, default='k-means++'
      Initilisation of initial clusters.
  
  max_iter : int, default=300
      Maximum number of iterations.

  tol : float, default=1e-4
       Tolerance to declare convergence with Frobenius norm
       of the difference in the cluster centers of two consecutive iterations.

  Attributes
  ----------
  labels_ : array of size n_samples
      Cluster labels of each data point.
  """

  def __init__ (self, n_clusters=8, init='k-means++', max_iter=300, tol=1e-4):
    ''' initialize params '''
    self.n_clusters = n_clusters
    self.init = init
    self.max_iter = max_iter
    self.tol = tol
  
  def _getDistance(self, x_i, X):
    return np.power(np.sum(np.power(np.abs(x_i-X), 2), axis=1), 1/2) 

  def _getInitialCentroids(self, X):
    ''' Get initial centroids based on init param. '''
    if self.init == 'k-means++':
      # apply k-means++ for initial clusters
      
      # randomly select first centroid
      centroids = list(np.random.choice(range(len(X)), size=1, replace=False))
      
      # calculate distance matrix
      distance_matrix = []
      for x_i in X:
        distances = self._getDistance(x_i, X)
        distance_matrix.append(distances)
      distance_matrix = np.array(distance_matrix)

      # select rest of the centroids
      for i in range(self.n_clusters-1):
        # distance from nearest cluster
        cluster_distance_min = np.min(distance_matrix[centroids], axis=0)
        # divide by sum so that probability sum to 1
        np.multiply(cluster_distance_min, 1 / cluster_distance_min.sum(), cluster_distance_min)
        # select next centroid with proportional sampling
        centroids.append(np.random.choice(range(len(X)), size=1, replace=False, p=cluster_distance_min)[0])
    else :

      # select all centroids randomly
      centroids = np.random.choice(range(len(X)), size=self.n_clusters, replace=False)

    # return centroids
    return centroids

  def _getFrobeniusNorm(self, d):
    ''' Calculate Frobenius Norm which is sqrt of sum of square of each elements of the matrix'''
    d = d.flatten()
    d = np.power(d, 2)
    d = np.sum(d)
    return d**.5

  def fit(self, X):
    """
    It will find the specified number of clusters.

    Parameters
    ----------
    X : array of shape (n_samples, n_features)

    Returns
    -------
    self : object
    """

    # get intial centroids
    centroids_index = self._getInitialCentroids(X)
    centroids = X[centroids_index]

    # iterate till max
    for z in range(self.max_iter):
      # initiliaze empty clusters
      clusters = {k:[] for k in range(self.n_clusters)}
      clusters_index = {k:[] for k in range(self.n_clusters)}

      # ASSIGN
      # iterate for each data point
      for i, x_i in enumerate(X):
        # calculate distances to centroids for a point
        centroid_distances = self._getDistance(x_i, centroids)
        # get minimum distant centroid
        min_index = np.argmin(centroid_distances)
        # assign point to the nearest centroid
        clusters[min_index].append(x_i)
        clusters_index[min_index].append(i)
      
      # UPDATE
      new_centroids = []
      # iterate for each clusters
      for _, cluster in clusters.items():
        # calculate new centroid based on mean
        new_centroids.append(np.mean(cluster, axis=0))
      new_centroids = np.array(new_centroids)
      
      if np.abs(self._getFrobeniusNorm(centroids) - self._getFrobeniusNorm(new_centroids)) <= self.tol:
        # break if difference of Frobenius Norm of old and new centroids is less than tolerance
        break
      else:
        # set new centroid as centroid and repeat
        centroids = new_centroids

    # label each data point based on cluster
    self.labels_ = np.zeros(len(X), dtype=np.int16)
    for i, cluster in clusters_index.items():
      self.labels_[list(cluster)] = i
    self.labels_ = list(self.labels_)

    # return self
    return self

  def fit_predict (self, X):
    """
    It will find the clusters and return labels.

    Parameters
    ----------
    X : array of shape (n_samples, n_features)

    Returns
    -------
    labels : array of shape (n_samples,)
        Cluster labels of each data point. -1 if it's a noisy point.
    """
    # fit the data and return labels
    return self.fit(X).labels_

# Validate the implementation

In [159]:
X = np.array([[1, 2], [2, 2], [2, 3],
              [8, 7], [8, 8], [7, 8], 
              [-1, 0]])

## Custom implementation output

In [160]:
kmeans = KMeans(n_clusters=2).fit(X)
print(kmeans.labels_)
print(kmeans.fit_predict(X))

[0, 0, 0, 1, 1, 1, 0]
[0, 0, 0, 1, 1, 1, 0]


In [166]:
kmeans = KMeans(n_clusters=2, init='random').fit(X)
print(kmeans.fit_predict(X))

[0, 0, 0, 1, 1, 1, 0]


## sklearn's implementation output

In [161]:
from sklearn import cluster

In [165]:
kmeans = cluster.KMeans(n_clusters=2).fit(X)
print(kmeans.labels_)
print(kmeans.fit_predict(X))

[0 0 0 1 1 1 0]
[0 0 0 1 1 1 0]


**We are getting same outputs. That means the custom implementation is correct.**