In [11]:
import numpy as np
import math

def gen():
    val = np.random.random((1,))
    if val < 2/3:
        return -1
    if val < 5/6:
        return 0
    return 1

def draw_u(d):
    return np.array([gen() for _ in range(d)])

def check_angle(mat, next, n, eps):
    for i in range(n):
        c = np.dot(mat[i], next) / np.linalg.norm(mat[i]) / np.linalg.norm(next)
        if abs(c) > eps:
            return True
    return False


def draw_set(k,d, eps):
    ans = np.zeros((k,d), np.float32)
    for i in range(k):
        next = draw_u(d)
        while check_angle(ans, next, i, eps):
            next = draw_u(d)
        ans[i] = next
    return ans




In [18]:
vectors = draw_set(6, 30, 0.1)

In [26]:
c = np.zeros((6,6))
for i in range(6):
    for j in range(6):
        c[i][j] = np.dot(vectors[i], vectors[j]) / np.linalg.norm(vectors[i]) / np.linalg.norm(vectors[j])
print(c) 

np.round(c, decimals=3)

[[ 0.99999988  0.04256282  0.08512565  0.04166666 -0.08006407  0.        ]
 [ 0.04256282  0.99999988  0.08695652 -0.08512565 -0.04089304  0.08512565]
 [ 0.08512565  0.08695652  0.99999988  0.08512565 -0.08178608  0.        ]
 [ 0.04166666 -0.08512565  0.08512565  0.99999988  0.         -0.08333333]
 [-0.08006407 -0.04089304 -0.08178608  0.          1.          0.        ]
 [ 0.          0.08512565  0.         -0.08333333  0.          0.99999988]]


array([[ 1.   ,  0.043,  0.085,  0.042, -0.08 ,  0.   ],
       [ 0.043,  1.   ,  0.087, -0.085, -0.041,  0.085],
       [ 0.085,  0.087,  1.   ,  0.085, -0.082,  0.   ],
       [ 0.042, -0.085,  0.085,  1.   ,  0.   , -0.083],
       [-0.08 , -0.041, -0.082,  0.   ,  1.   ,  0.   ],
       [ 0.   ,  0.085,  0.   , -0.083,  0.   ,  1.   ]])

In [53]:
def draw_sample(n, seed = 0):
    u1 = vectors[0]
    u2 = vectors[1]
    u3 = vectors[2]
    u4 = vectors[3]
    u5 = vectors[4]
    u6 = vectors[5]
    f1 = lambda v1, v2, n: -2*u1 + v1*u2 -v2*u3 + n
    f2 = lambda v1, v2, n: -1.5*u4 -0.7*v1*u5 +0.9*v2*u6 + n
    f3 = lambda v1, v2, n: 1.2*u6 - v1*(u1+u2) +v2*u5 + n
    fs = [f1, f2, f3]

    np.random.seed(seed)
    ans = np.zeros((n*3,31))
    for i in range(n):
        for j in range(3):
            v1 = np.random.normal()
            v2 = np.random.normal()
            noise = np.random.standard_normal(30)
            ans[i*3+j][0:30] = fs[j](v1, v2, noise)
            ans[i*3+j][30] = j
    return ans        

In [55]:
class DataFrame(object):

    def __init__(self, train, test, n_train, n_test, d:int = 2):
        self.train = train
        self.test = test
        self.n_train = n_train
        self.n_test = n_test
        self.d = d

class DataSection(object):

    def __init__(self, data: DataFrame, train: bool, n:int):
        self.n = n
        chosen = data.train if train else data.test
        self.data = chosen[0:n,0:data.d]
        self.label = np.array(chosen[0:n,data.d], dtype=np.int32)


class KMean(DataSection):

    def __init__(self, data: DataFrame, train: bool, n:int, k:int, large = 1e6):
        super().__init__(data, train, n)
        self.k = k
        self.d = data.d
        self.large = large
        self.m = np.zeros((k,self.d), np.float32)
        self.assign = np.zeros((self.n), np.int32)
    
    def init_kpp(self, seed):
        np.random.seed(seed)
        choices = np.random.random((self.k,))
        prob = np.ones((self.n), np.float32)
        for i in range(self.k):
            choice = choices[i] * sum(prob)
            for j in range(self.n):
                choice -= prob[j]
                if choice <= 0:
                    self.m[i] = self.data[j]
                    break
            for j in range(self.n):
                min_dist = self.large
                for k in range(i+1):
                    diff = self.data[j] - self.m[k]
                    dist = np.dot(diff, diff)
                    min_dist = min(min_dist, dist)
                prob[j] = min_dist

    def init_random(self, seed):
        np.random.seed(seed)
        for i in range(self.k):
            self.m[i] = self.data[np.random.randint(self.n)]
                    
    def optimize_assign(self):
        sd = np.full((self.k, self.n, self.d), self.data)
        sm = np.full((self.n, self.k, self.d), self.m)
        sdf = sd - sm.swapaxes(0, 1)
        self.assign = np.argmin(np.linalg.norm(sdf, axis=2), 0)

    def test(self):
        diff = self.data - self.m[self.assign]
        return np.sum(np.square(diff)) / self.n

    def optimize_m(self):
        for i in range(self.k):
            self.m[i] = np.mean(self.data[self.assign == i],0)

            
class EMAlg(DataSection):

    def __init__(self, data: DataFrame, train: bool, n:int, k:int, d:int):
        super().__init__(data, train, n)
        self.k = k
        self.d = d
        self.m = np.zeros((k,d), np.float32)
        self.c = np.full((k,d,d), np.identity(d), np.float32)
        self.pi = np.full((k,), 1/k, np.float32)
        self.assign = np.zeros((k, self.n), np.float32)
        self.prob = np.zeros((k, self.n), np.float32)

    def step_assignment(self):
        self.prob = np.zeros((self.k, self.n), np.float32)
        for i in range(self.k):
            c = self.c[i]
            m = self.m[i]
            ic = np.linalg.inv(c)
            factor = math.sqrt((2 * math.pi) ** self.d * abs(np.linalg.det(c)))
            for j in range(self.n):
                x = self.data[j]
                upper = np.matmul(np.matmul(x - m, ic), x - m)
                self.prob[i,j] = self.pi[i] * np.exp(-upper / 2) / factor
        for j in range(self.n):
            total = sum(self.prob[:,j])
            for i in range(self.k):
                self.assign[i,j] = self.prob[i,j] / total
    
    def step_update(self):
        for i in range(self.k):
            neff = sum(self.assign[i])
            m = np.zeros((self.d,))
            for j in range(self.n):
                m += self.assign[i,j]*self.data[j]
            self.m[i] = m/neff
            c = np.zeros((self.d, self.d))
            for j in range(self.n):
                diff = self.data[j] - self.m[i]
                c += self.assign[i,j] * np.outer(diff, diff)
            self.c[i] = c/neff
            self.pi[i] = neff / self.n

    def test(self):
        sa = np.sum(self.assign * np.log(self.prob))
        sc = np.sum(self.pi)*self.n
        return sa - sc

In [54]:
data = DataFrame(draw_sample(200, 0), draw_sample(30000, 12345), 200, 1000, 30)

In [88]:
import matplotlib.pyplot as plt

def random_init(model: KMean, initializer, salt: int, n:int):
    best_init = 0
    best_loss = 1e6
    for i in range(n):
        initializer(model, salt + i)
        model.optimize_assign()
        loss = model.test()
        if loss < best_loss:
            best_loss = loss
            best_init = i
    initializer(model, salt + best_init)
    model.optimize_assign()

def train(model: KMean, eps: float):
    prev = 1e6
    loss_list = []
    loss = model.test()
    loss_list.append(loss)
    while prev-loss > eps:
        model.optimize_m()
        model.optimize_assign()
        prev = loss
        loss = model.test()
        loss_list.append(loss)
    return {"m":model.m, "loss":loss_list}

def test_and_list(m, n, k):
    model = KMean(data, False, n, k)
    model.m = m
    model.optimize_assign()
    table = np.zeros((3,k))
    partials = np.zeros((3,k))
    tot = [0,0,0]
    for i in range(3):
        for j in range(k):
            table[i,j] = np.mean(model.assign[model.label == i] == j)*100
            partials[i,j] = np.sum(model.assign[model.label == i] == j)
    for j in range(k):
        label = np.argmax(partials[:,j])
        for i in range(3):
            if i == label:
                continue
            tot[i] += partials[i,j]
    return {"table":table,"total":tot,"test":model}

In [89]:
def train_and_test(k, func, salt = 123456, eps = 1e-4, n_init = 10):
    model = KMean(data, True, 600, k)
    random_init(model, func, salt + k, n_init)
    result = train(model, eps)
    tables = test_and_list(result["m"], 90000, k)
    return {"m":result["m"], "loss":result["loss"], "table":tables["table"], "total":tables["total"], "model":model, "test":tables["test"]}

In [90]:
results = [train_and_test(k, KMean.init_kpp, 12345) for k in range(2,7)]

In [92]:
for k in range(2,7):
    r = results[k-2]
    print(np.round(np.array(r["total"])/30000*100,1))

[  0.   0. 100.]
[ 0.   9.6 19.2]
[0.  7.3 3.8]
[0.1 0.  2. ]
[0.  9.8 9.5]
