k-means 는 머신러닝 알고리즘 중 간단한 편에 속하는 알고리즘이며, 알고리즘 구현의 연습용으로 매우 좋은 예입니다. 이 튜토리얼은 kmeans 를 직접 구현하는 연습을 합니다.

함수는 기능별로 나누어 구현하는 것이 좋습니다. `kmeans()` 함수는 `initialize()`, `kmeans_core()` 함수를 호출합니다.

`initialize()` 함수는 initial centroid 를 선택합니다. 

`kmeans_core()` 는 `reassign()`, `update_centroid()`, `check_convergence()` 함수를 호출합니다. 때로는 kmeans 학습 도중 어떤 점도 할당되지 않는 군집이 발생하는데, 이때는 군집에 할당된 거리가 가장 먼 한 점을 새로운 군집의 centroid 로 만들도록 하는 함수 `reinitialize_empty_cluster_with_distant()` 를 추가하였습니다.

kmeans 는 모든 데이터를 반드시 k 개의 군집으로 나누는 partitioning 알고리즘입니다. 하지만 centroid 와의 거리가 너무 멀면 그 데이터는 노이즈로 처리할 수도 있습니다. 이러한 constraint 를 추가하려면 `reassign()` 함수의 일부분을 수정할 수도 있습니다.

이에 대한 예시 코드는 아래의 repository 에 있으니 직접 해당 기능을 구현해보고, 아래의 코드로부터 구현체를 비교해 보시기 바랍니다.

https://github.com/lovit/ekmeans

In [1]:
import numpy as np
import scipy as sp
from sklearn.metrics import pairwise_distances_argmin_min
from sklearn.metrics.pairwise import paired_distances


def kmeans(X, n_clusters, metric, init='random',
    random_state=None, max_iter=10, tol=0.001, verbose=False):

    # initialize
    centers = initialize(X, n_clusters, init, random_state)
    labels = -np.ones(X.shape[0])

    # train
    centers, labels = kmeans_core(X, centers, metric,
        labels, max_iter, tol, verbose)

    return centers, labels

def kmeans_core(X, centers, metric, labels, max_iter, tol, verbose):
    n_clusters = centers.shape[0]

    # repeat
    for i_iter in range(1, max_iter + 1):

        # training
        labels_, dist = reassign(X, centers, metric)
        centers_, cluster_size = update_centroid(X, centers, labels_)

        # convergence check
        diff, n_changes, early_stop = check_convergence(
            centers, labels, centers_, labels_, tol, metric)
        if i_iter == max_iter:
            early_stop = False

        # reinitialize empty clusters
        n_empty_clusters = np.where(cluster_size == 0)[0].shape[0]
        if n_empty_clusters > 0:
            centers_ = reinitialize_empty_cluster_with_distant(
                X, centers_, cluster_size, dist)

        centers = centers_
        labels = labels_

        # verbose
        if verbose:
            # TODO
            # do something make better verbose message
            print(f'iter = {i_iter}/{max_iter}')

        if early_stop:
            break

    return centers, labels

def reassign(X, centers, metric):
    labels, dist = pairwise_distances_argmin_min(X, centers, metric=metric)

    # TODO
    # do something make better clusters

    return labels, dist

def update_centroid(X, centers, labels):
    n_clusters = centers.shape[0]
    centers_ = np.zeros(centers.shape, dtype=np.float)
    cluster_size = np.bincount(
        labels[np.where(labels >= 0)[0]],
        minlength = n_clusters
    )

    for label, size in enumerate(cluster_size):
        if size == 0:
            centers_[label] == centers[label]
        else:
            idxs = np.where(labels == label)[0]
            centers_[label] = np.asarray(X[idxs,:].sum(axis=0)) / idxs.shape[0]
    return centers_, cluster_size

def reinitialize_empty_cluster_with_distant(X, centers, cluster_size, dist):
    cluster_indices = np.where(cluster_size == 0)[0]
    n_empty = cluster_indices.shape[0]
    data_indices = dist.argsort()[-n_empty:]
    initials = X[data_indices,:]
    if sp.sparse.issparse(initials):
        initials = np.asarray(initials.todense())
    centers[cluster_indices,:] = initials
    return centers

def initialize(X, n_clusters, init, random_state):
    np.random.seed(random_state)
    if isinstance(init, str) and init == 'random':
        seeds = np.random.permutation(X.shape[0])[:n_clusters]
        if sp.sparse.issparse(X):
            centers = X[seeds,:].todense()
        else:
            centers = X[seeds,:]
    elif hasattr(init, '__array__'):
        centers = np.array(init, dtype=X.dtype)
        if centers.shape[0] != n_clusters:
            raise ValueError('the number of customized initial points '
                'should be same with n_clusters parameter')
    elif callable(init):
        centers = init(X, n_clusters, random_state=random_state)
        centers = np.asarray(centers, dtype=X.dtype)
    else:
        raise ValueError("init method should be "
            "['random', 'callable', 'numpy.ndarray']")
    return centers

def check_convergence(centers, labels, centers_, labels_, tol, metric):
    n_data = labels.shape[0]
    reassign_threshold = n_data * tol
    difference_threshold = tol
    diff = paired_distances(centers, centers_, metric=metric).mean()
    n_changes = np.where(labels != labels_)[0].shape[0]
    early_stop = (diff < difference_threshold) or (n_changes < reassign_threshold)
    return diff, n_changes, early_stop


직접 구현한 kmeans 가 제대로 작동하는지 테스트해봅니다.

In [2]:
import numpy as np
from soydata.data.clustering import make_circular_clusters
from soydata.visualize import scatterplot
from soydata.visualize import use_notebook

use_notebook()


In [3]:
X, labels = make_circular_clusters(n_clusters=10, r_min=0.05, r_max=0.15,
    equal_density=True, noise=0.05, seed=0, size_min=150, size_max=250)

data_indices = np.where(labels >= 0)[0]
noise_indices = np.where(labels == -1)[0]

p = scatterplot(X[data_indices], labels=labels, size=3, title='Circular clusters',
    show_inline=False, toolbar_location=None)
p = scatterplot(X[noise_indices], p=p, color='lightgrey')

In [4]:
centers, labels = kmeans(X, n_clusters=15, metric='euclidean', verbose=True)
data_indices = np.where(labels >= 0)[0]
noise_indices = np.where(labels == -1)[0]

p = scatterplot(X[data_indices], labels=labels, size=3, title='Circular clusters',
    show_inline=False, toolbar_location=None)
p = scatterplot(X[noise_indices], p=p, color='lightgrey')

iter = 1/10
iter = 2/10
iter = 3/10
iter = 4/10
iter = 5/10
iter = 6/10
iter = 7/10
iter = 8/10
iter = 9/10
iter = 10/10
