<a href="https://colab.research.google.com/github/yonglanliu/unsupervised_learning/blob/main/KmeansPlusPlus.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
from sklearn.datasets import make_blobs

In [8]:
class KmeanPlusPlus():
  """
  1. Randomly choose one of the observatins to be a cluster center
  2. For each observation x, determine d(x), where d(x) denotes the minimal distance from x to a current cluster center
  3. Choose next cluster center from the data points, with probability of making an observation x a cluster center proportional to d(x)**2
  4. Repeat 2 and 3 until you have choosen the right number of clusters
  """
  def __init__(self, inputs:np.ndarray, k: int=3, random_seed: int=258):
    self.inputs = inputs
    self.centroids_list =[]
    self.centroids_idx_list = []
    self.random_seed=random_seed
    self.k = k

  def get_centroids_id(self):
    return self.centroids_idx_list

  def get_centroids(self):
    return self.centroids_list

  def update_centroids(self, idx, centroid):
    self.centroids_idx_list.append(idx)
    self.centroids_list.append(centroid)
    return self.centroids_idx_list, self.centroids_list

  def _first_centroid(self):
    """randomly select a data sample as the first centroid"""
    np.random.seed(self.random_seed)
    idx = np.random.choice(self.inputs.shape[0])
    centroid = list(self.inputs[idx])
    return idx, centroid

  def find_next_centroid(self):
    """
    centroids_coord: centriod coordinates
    inputs: inputs
    """
    centroids = np.array(self.centroids_list)
    # add one dimension to centriod
    centroids = centroids[:, None]
    # Apply numpy Broadcasting operation to determine the minimum distances
    min_distances = np.min(np.sum((centroids - self.inputs)**2, axis=-1), axis=0)
    # calculate the probability
    probability = min_distances / np.sum(min_distances)
    # select the sample with the largest probability as the next centroid
    dict_prob = {idx: prob for idx, prob in enumerate(probability)}
    dict_prob = {idx: prob for idx, prob in dict_prob.items() if idx not in self.centroids_idx_list}
    next_centroid_idx = max(dict_prob, key=dict_prob.get)
    next_centroid = list(self.inputs[next_centroid_idx])
    return next_centroid_idx, next_centroid

  def _call_(self):
    assert self.k >= 1
    first_centroid_id, first_centroid = self._first_centroid()
    centroids_list, centroids_idx_list = self.update_centroids(first_centroid_id, first_centroid)
    if self.k > 1:
      for i in range(1, self.k):
        next_centroid_idx, next_centroid = self.find_next_centroid()
        centroids_list, centroids_idx_list = self.update_centroids(next_centroid_idx, next_centroid)
    return centroids_list, centroids_idx_list

In [9]:
inputs,y = make_blobs(n_samples = 500,n_features = 2,centers = 3,random_state = 23)

In [10]:
kpp = KmeanPlusPlus(inputs, k=3)

In [11]:
centroids_list, centroids_idx_list = kpp._call_()

In [12]:
centroids_list

[199, 483, 30]

In [13]:
centroids_idx_list

[[-4.608795558215633, 5.7051538418033125],
 [6.4704644511626475, -6.806252906653327],
 [1.0228177939849923, 12.126754762862621]]