### Constrained KMeans Examples


In [None]:
!pip install torch-kmeans

In [1]:
# imports
import numpy as np
import torch
from sklearn.datasets import make_blobs
from torch_kmeans import ConstrainedKMeans


In [2]:
# function to generate some clustering data
def get_data(bs: int = 1,
             n: int = 20,
             d: int = 2,
             k: int = 4,
             different_k: bool = False,
             k_lims = (2, 5),
             add_noise: bool = True,
             fp_dtype = torch.float32,
             seed: int = 42):
    torch.manual_seed(seed)
    if different_k:
        a, b = k_lims
        k = torch.randint(low=a, high=b, size=(bs,)).long()
    else:
        k = torch.empty(bs).fill_(k).long()

    # generate pseudo clustering data
    x, y = [], []
    for i, k_ in enumerate(k.numpy()):
        x_, y_ = make_blobs(n_samples=n, centers=k_, n_features=d, random_state=seed+i)
        x.append(x_)
        y.append(y_)
    x = torch.from_numpy(np.stack(x, axis=0))
    y = torch.from_numpy(np.stack(y, axis=0))
    if add_noise:
        x += torch.randn(x.size())

    return x.to(fp_dtype), y, k


In [3]:
# create some data (BS, N, D)
# i.e. 1 instance with N=20 points and D=2 features
BS = 1
K = 4
x, y, k_per_instance = get_data(bs=BS, n=20, d=2, k=K, different_k=False)


To solve a constrained clustering problem, we need to assign some weights to the given samples.
The simplest constrained clustering task is arguably the case where each cluster has
a maximum number of points it can accommodate.

In [4]:
# to simulate this case we simply generate weights of 1 for all samples.
# Since the algorithm expects normalized weights between 0 and 1
# we normalize them by the max number of points per cluster
MAX_POINTS = 5
w = torch.ones(x.shape[:-1])
w /= MAX_POINTS
print(w)

tensor([[0.2000, 0.2000, 0.2000, 0.2000, 0.2000, 0.2000, 0.2000, 0.2000, 0.2000,
         0.2000, 0.2000, 0.2000, 0.2000, 0.2000, 0.2000, 0.2000, 0.2000, 0.2000,
         0.2000, 0.2000]])


In [5]:
# weights need to be explicitly provided
model = ConstrainedKMeans()
result = model(x, k=K, weights=w)
print(result.labels)
# check if constraint is valid
_, cnts = torch.unique(result.labels, return_counts=True)
print(cnts)


Full batch converged at iteration 6/100 with center shifts: 
tensor([0.]).
tensor([[0, 0, 3, 0, 1, 2, 0, 1, 3, 1, 2, 3, 2, 3, 2, 0, 2, 1, 3, 1]])
tensor([5, 5, 5, 5])


In [6]:
BS = 1
K = 4
x, y, k_per_instance = get_data(bs=BS, n=22, d=2, k=K, different_k=False)

# More complex constrained clustering tasks come with different weights for different samples
weights = torch.abs(torch.randn(size=y.size()))
# normalize weights per cluster given by label
norm_weights = torch.empty(y.size())
for i in range(BS):
    w = weights[i]
    y_ = y[i]
    unq = len(torch.unique(y_))
    nw = torch.empty(y_.size())
    for j in range(unq):
        msk = y_ == j
        w_ = w[msk]
        nw[msk] = w_ / (w_.sum() * 1.15)
    norm_weights[i] = nw
assert (norm_weights.sum(dim=-1).long() <= k_per_instance).all()


In [7]:
model = ConstrainedKMeans()
result = model(x, k=K, weights=norm_weights)
print(result.labels)
_, cnts = torch.unique(result.labels, return_counts=True)
print(cnts)
for i in range(K):
    msk = y == i
    w_sum = norm_weights[msk].sum()
    assert w_sum <= 1


Full batch converged at iteration 5/100 with center shifts: 
tensor([0.]).
tensor([[0, 0, 0, 0, 2, 3, 1, 2, 3, 0, 1, 3, 2, 1, 3, 1, 3, 1, 0, 2, 2, 1]])
tensor([6, 6, 5, 5])
