In [1]:
import numpy as np

In [1]:
class LBG:
    def __init__(self, n_clusters, epsilon=0.00001):
        self.n_clusters=n_clusters
        self.epsilon=epsilon

    def _get_init_cb(self, data):
        return np.array([np.mean(data, axis=0)])

    def _create_codebook(self, data, e):
        return data * (1.0 + e)

    def _average_distortion(self, data, codebook, idx):
        return np.mean([np.linalg.norm(data - codebook[i], axis=1) ** 2 for i in idx])

    def _split_codebook(self, data, codebook, init_distortion):
        tmp_cbs = np.empty((0, self.n_features))
        for cb in codebook:
            for i in range(2):
                tmp_cbs = np.append(tmp_cbs, self._create_codebook(codebook, (-1)**i * self.epsilon), axis=0)

        codebook = tmp_cbs
        distortion = init_distortion
        err = 1.0 + self.epsilon;

        while err > self.epsilon:
            norms = np.array([np.linalg.norm(data - c, axis=1)**2 for c in codebook])
            idx_min = np.argmin(norms, axis=0)
            idx_map = np.zeros((self.N, self.n_features))
            idx_map[range(len(data)), idx_min] = 1

            s = (np.sum(idx_map, axis=0, dtype=float)+1e-16).reshape(self.n_features, -1)
            codebook = np.dot(idx_map.T, data) / s

            prev_distortion = distortion
            distortion = self._average_distortion(data, codebook, idx_min)
            err = (prev_distortion - distortion) / distortion

        return codebook, idx_min

    def fit(self, data):
        shape = data.shape
        self.N = shape[0]
        self.n_features=shape[1]
        
        codebook = self._get_init_cb(data)
        distortion = self._average_distortion(data, codebook, np.array([[0]]))

        while len(codebook) < self.n_clusters:
            codebook, labels = self._split_codebook(data, codebook, distortion)
        
        self._cluster_centers=codebook

        return labels

    def predict(self, data):
        norms = np.array([np.linalg.norm(data - c, axis=1)**2 for c in self._cluster_centers])
        return np.argmin(norms, axis=0)

In [2]:
import math
from functools import reduce
from collections import defaultdict

_size_data = 0
_dim = 0


def generate(data, size_codebook, epsilon=0.00001):
    global _size_data, _dim

    _size_data = len(data)
    assert _size_data > 0

    _dim = len(data[0])
    assert _dim > 0

    codebook = []
    codebook_abs_weights = [_size_data]
    codebook_rel_weights = [1.0]

    c0 = avg_vec_of_vecs(data, _dim, _size_data)
    codebook.append(c0)

    avg_dist = avg_distortion_c0(c0, data)

    while len(codebook) < size_codebook:
        codebook, codebook_abs_weights, codebook_rel_weights, avg_dist = split_codebook(data, codebook,
                                                                                        epsilon, avg_dist)

    return codebook, codebook_abs_weights, codebook_rel_weights


def split(data, codebook, epsilon, initial_avg_dist):
    new_codevectors = []
    for c in codebook:
        c1 = new_codevector(c, epsilon)
        c2 = new_codevector(c, -epsilon)
        new_codevectors.extend((c1, c2))

    codebook = new_codevectors
    len_codebook = len(codebook)
    abs_weights = [0] * len_codebook
    rel_weights = [0.0] * len_codebook

    avg_dist = 0
    err = epsilon + 1
    num_iter = 0
    while err > epsilon:
        closest_c_list = [None] * _size_data    # list that contains the nearest codevector for each input data vector
        vecs_near_c = defaultdict(list)         # list with codevector index -> input data vector mapping
        vec_idxs_near_c = defaultdict(list)     # list with codevector index -> input data index mapping
        for i, vec in enumerate(data):  # for each input vector
            min_dist = None
            closest_c_index = None
            for i_c, c in enumerate(codebook):  # for each codevector
                d = euclid_squared(vec, c)
                if min_dist is None or d < min_dist:    # found new closest codevector
                    min_dist = d
                    closest_c_list[i] = c
                    closest_c_index = i_c
            vecs_near_c[closest_c_index].append(vec)
            vec_idxs_near_c[closest_c_index].append(i)

        for i_c in range(len_codebook): # for each codevector index
            vecs = vecs_near_c.get(i_c) or []   # get its proximity input vectors
            num_vecs_near_c = len(vecs)
            if num_vecs_near_c > 0:
                new_c = avg_vec_of_vecs(vecs, _dim)     # calculate the new center
                codebook[i_c] = new_c                   # update in codebook
                for i in vec_idxs_near_c[i_c]:          # update in input vector index -> codevector mapping list
                    closest_c_list[i] = new_c

                # update the weights
                abs_weights[i_c] = num_vecs_near_c
                rel_weights[i_c] = num_vecs_near_c / _size_data

        prev_avg_dist = avg_dist if avg_dist > 0 else initial_avg_dist
        avg_dist = avg_distortion_c_list(closest_c_list, data)

        err = (prev_avg_dist - avg_dist) / prev_avg_dist
        num_iter += 1

    return codebook, abs_weights, rel_weights, avg_dist


def avg_vec_of_vecs(vecs, dim=None, size=None):
    size = size or len(vecs)
    dim = dim or len(vecs[0])
    avg_vec = [0.0] * dim
    for vec in vecs:
        for i, x in enumerate(vec):
            avg_vec[i] += x / size

    return avg_vec


def new_codevector(c, e):
    return [x * (1.0 + e) for x in c]


def avg_distortion_c0(c0, data, size=None):
    size = size or _size_data
    return reduce(lambda s, d:  s + d / size,
                  (euclid_squared(c0, vec)
                   for vec in data),
                  0.0)


def avg_distortion_c_list(c_list, data, size=None):
    size = size or _size_data
    return reduce(lambda s, d:  s + d / size,
                  (euclid_squared(c_i, data[i])
                   for i, c_i in enumerate(c_list)),
                  0.0)


def euclid_squared(a, b):
    return sum((x_a - x_b) ** 2 for x_a, x_b in zip(a, b))

In [10]:
from skimage.io import imread, imsave
from sklearn.utils import shuffle
import time

In [13]:
img_path = 'face.png'
img = imread(img_path)
n_random=1000
n_clusters = 64

In [14]:
rows, cols, depth = img.shape
img_vect = img.reshape(rows * cols, depth)
img_train = shuffle(img_vect)[:n_random]

In [None]:
lbg = LBG(n_clusters=n_clusters)