# Mixture Model

In this excersise we want to implement a mixture model.
We will start to implement the Gaussian mixture model and test it on artifical data.
Furthermore, we will take a look how the mixture model converges to the final solution.

Once the Gaussian mixture model works, we want to apply it to an unsupervised task.
We take an image as input and model the color as feature with 3 components.
When a Gaussian mixture model is trained on the image, we can quantize the image.

As final task, we want to implement a Bernoulli mixture model and apply it to the suppervised MNIST task:
 - Train one Bernoulli mixture model for each digit on the training data
 - To evaluate the model, you assign the digit to the test sample from the the mixture model that most likly produced this sample.
 - You should be able to get a accuracy over 90 %


In [None]:
%matplotlib inline

import numpy as np
import scipy.stats
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

## GMM

 - Implement the missing code in the GaussianMixtureModel.
 - Test it on the artificial generated data in the cells below.

In [None]:
class GaussianMixtureModel:
    def __init__(self):
        self.means = None  # Shape (K, D)
        self.covariances = None  # Shape (K, D, D)
        self.weights = None  # Shape (K,)
    
    @classmethod
    def fit(cls, x, num_classes,* , initialization=None, iterations=100, minimum_increase=1e-2, verbose=False):
        """Fits and returns a GMM.
        
        Params:
            x: Observation with shape (N, D)
            num_classes: Scalar >0.
            iterations: Scalar >0.
        """
        N, D = x.shape
        model = cls()
        
        # Initialization
        if initialization is None:
            affiliations = ???
        else:
            affiliations = np.copy(initialization)
        
        # EM iterations
        last_log_likelihood = -np.inf
        iteration = 0
        while True:
            model._m_step(x, affiliations)
            affiliations, log_likelihood = model._e_step(x)
            log_likelihood = np.sum(log_likelihood)
            
            if log_likelihood < last_log_likelihood:
                print('Likelihood should always increase: {} < {}'.format(log_likelihood, last_log_likelihood))
            
            if np.abs(log_likelihood - last_log_likelihood) < minimum_increase:
                if verbose:
                    print('Minimum likelihood change not reached.')
                break
            
            if iteration >= iterations:
                if verbose:
                    print('Maximum number of iterations exhausted.')
                break
            
            iteration += 1
            last_log_likelihood = log_likelihood
            
        if verbose:
            print('Stopping after {} iterations.'.format(iteration))
        
        return model, affiliations, log_likelihood
    
    def _m_step(self, x, affiliations):
        N, D = x.shape
        denominator = ???
        self.weights = ???
        
        self.means = ???
        self.means /= ???
        
        difference = ???  # Shape (K, N, D)
        self.covariances = ???
        self.covariances /= ???
    
    def _e_step(self, x):
        N, D = x.shape
        K = self.weights.shape[0]
        
        # Normalization constant of the Gaussian distribution
        log_conditional_pdf = np.zeros((K, N))
        log_conditional_pdf += -1/2 * D * np.log(2 * np.pi) - 1/2 * np.log(np.linalg.det(self.covariances))[:, None]  # log_conditional_pdf += ???
        
        # Exponent of the Gaussian distribution
        difference = ???  # Shape (K, N, D)
        log_conditional_pdf += -1/2 * np.einsum('knd,kdn->kn', difference, np.linalg.solve(self.covariances, difference.transpose(0, 2, 1)))  # log_conditional_pdf += ???
        
        log_joint_pdf = ???
        affiliations = ???
        affiliations /= np.maximum(???, np.finfo(affiliations.dtype).tiny)
        
        log_likelihood = ???  # see scipy.special.logsumexp
        
        return affiliations, log_likelihood


# Generate artificial data

In [None]:
N = 1000
p = np.array([0.3, 0.6, 0.1])
K = p.shape[0]
labels = np.random.choice(range(K), size=(N,), p=p)
means = np.array([[-1, -1], [1, 1], [-2, 2]])
D = means.shape[-1]
x = np.sqrt(1 / 4) * np.random.normal(size=(N, D))

for k in range(K):
    x[labels == k, :] += means[k, :]

plt.scatter(x[:, 0], x[:, 1])

# Artificial test with just a few iterations

In [None]:
def compute_complete_grid(xlim, ylim, steps):
    x, y = np.meshgrid(
        np.linspace(*xlim, steps),
        np.linspace(*ylim, steps)
    )
    return x, y, np.stack([x, y]).T

def plot_multivariate_normal(mean, covariance, ax=None):
    assert mean.shape == (2,), mean
    assert covariance.shape == (2, 2), covariance
    
    steps = 100
    
    xlim = [mean[0] - 2 * np.sqrt(covariance[0, 0]), mean[0] + 2 * np.sqrt(covariance[0, 0])]
    ylim = [mean[1] - 2 * np.sqrt(covariance[1, 1]), mean[1] + 2 * np.sqrt(covariance[1, 1])]
    
    x, y, features_grid = compute_complete_grid(xlim, ylim, steps)
    z = scipy.stats.multivariate_normal.pdf(features_grid, mean, covariance)
    level1std = scipy.stats.multivariate_normal.pdf(np.zeros([D]), np.zeros([D]), model.covariances[0]) * np.exp(-0.5 * 1 ** 2)
    if ax is None:
        ax = plt
    ax.contour(x, y, z, [level1std])
    ax.scatter(mean[0], mean[1], color='red')

In [None]:
K = 3

affiliations = np.random.uniform(size=(K, N))
affiliations /= affiliations.sum(axis=0)
log_likelihood_history = []
for _ in range(40):
    model, affiliations, log_likelihood = GaussianMixtureModel.fit(x, num_classes=2, initialization=affiliations, iterations=1, verbose=True)
    f = plt.figure(figsize=(3 * K, 3))
    axes = f.subplots(1, K, squeeze=False).flatten()
    for k in range(K):
        axes[k].scatter(x[:, 0], x[:, 1], c=affiliations[k, :])
        plot_multivariate_normal(model.means[k], model.covariances[k], ax=axes[k])
    plt.show()
    print(log_likelihood)
    log_likelihood_history.append(log_likelihood)
plt.plot(log_likelihood_history)
plt.show()

# Production test on aritificial data

In [None]:
model, affiliations, log_likelihood = GaussianMixtureModel.fit(x, num_classes=K, iterations=100, verbose=True)

f = plt.figure(figsize=(4 * K, 4))
axes = f.subplots(1, K, squeeze=False).flatten()
for k in range(K):
    axes[k].scatter(x[:, 0], x[:, 1], c=affiliations[k, :])
    plot_multivariate_normal(model.means[k], model.covariances[k], ax=axes[k])
    
plt.show()
print(log_likelihood)

In [None]:
model.means

In [None]:
model.weights

In [None]:
model.covariances

In [None]:
np.linalg.det(model.covariances)

# Test on real data as an image segmentation task

Now we want to apply the mixture model to an image.
The color is the feature vector.
Once it is trained we can quantize the image.

In [None]:
# Read any image. Try also some other images
image = plt.imread('liliumbulbiferum.jpg', format='jpeg')

In [None]:
plt.imshow(image)

In [None]:
K = ???
x_image = image.reshape((-1, 3)).astype(np.float32)
pixels = x_image.shape[0]
model, affiliations, log_likelihood = GaussianMixtureModel.fit(x_image, num_classes=K, iterations=100, verbose=True)
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')

# We here plot every tenth value since it takes quite long with so many data points
slicer = slice(0, None, 10)
x_image_tmp = x_image[::10, :]
ax.scatter(x_image_tmp[:, 0], x_image_tmp[:, 1], x_image_tmp[:, 2], c=affiliations[0, slicer])
plt.show()
print(log_likelihood)

In [None]:
plots = 2
f, axes = plt.subplots(1, plots, figsize=(8 * plots, 8))

new_image = np.zeros([pixels, 3])
for k in range(K):
    color = model.means[k] / 255
    new_image += affiliations[k, :, np.newaxis] * color
    
axes[0].imshow(new_image.reshape(image.shape))

new_image = np.zeros([pixels, 3])
for k in range(K):
    color = model.means[k] / 255
    new_image[np.argmax(affiliations, axis=0) == k, :] = color
    
axes[1].imshow(new_image.reshape(image.shape))

In [None]:
f, axes = plt.subplots(1, K, figsize=(4 * K, 4))
for k, ax in enumerate(axes):
    ax.imshow(affiliations[k].reshape(image[:, :, 0].shape))

# Extra exercise

## Mixture Model for supervised problem

Again we look at the MNIST dataset.
But this time we quantize the image to binary values. So each pixel is either one or zero (e.g. depending if the continuous value is bigger or smaller than 0.5).
Each image, interpreted as a vector, then contains 784 binary values.

Last exercise we saw the multidimensional Bernoulli distribution.
For each label (i.e. the numbers 0 to 9) in the MNIST dataset we can learn the parameters of the Bernoulli distribution on the train set and do a Bayes decision on the test set.

This will work, but the Bernoulli distribution is not optimal to decribe the data.

To improve the model of the data, the multidimensional Bernoulli distribution can be replace with a mixture of multidimensional Bernoulli distribution, where each component is interpreted as a variant of the number.

Task:
 - Implement the multidimensional Bernoulli Mixture Model
 - Download the mnist data
 - Binarize the data 
 - Fit one Mixture Model for each number, try different K's
 - Visualize the parameters of the multidimensional Bernoulli distributions, i.e. plot the means of the mixture model
 - Do a prediction on the test data
 - What is the best accouracy that you can achieve?
 - Visualize a confusion matrix

In [None]:
class BernoulliMixtureModel(GaussianMixtureModel):
    def __init__(self):
        self.means = None  # Shape (K, D)
        self.weights = None  # Shape (K,)
    
    def _m_step(self, x, affiliations):
        N, D = x.shape
        
    
    def _e_step(self, x):
        N, D = x.shape
        K = self.weights.shape[0]
        
        
    
        
        
        return affiliations, log_likelihood

In [None]:
def get_mnist():
    # The code to download the mnist data original came from
    # https://cntk.ai/pythondocs/CNTK_103A_MNIST_DataLoader.html
    
    import gzip
    import numpy as np
    import os
    import struct

    try: 
        from urllib.request import urlretrieve 
    except ImportError: 
        from urllib import urlretrieve

    def load_data(src, num_samples):
        print("Downloading " + src)
        gzfname, h = urlretrieve(src, "./delete.me")
        print("Done.")
        try:
            with gzip.open(gzfname) as gz:
                n = struct.unpack("I", gz.read(4))
                # Read magic number.
                if n[0] != 0x3080000:
                    raise Exception("Invalid file: unexpected magic number.")
                # Read number of entries.
                n = struct.unpack(">I", gz.read(4))[0]
                if n != num_samples:
                    raise Exception(
                        "Invalid file: expected {0} entries.".format(num_samples)
                    )
                crow = struct.unpack(">I", gz.read(4))[0]
                ccol = struct.unpack(">I", gz.read(4))[0]
                if crow != 28 or ccol != 28:
                    raise Exception(
                        "Invalid file: expected 28 rows/cols per image."
                    )
                # Read data.
                res = np.frombuffer(
                    gz.read(num_samples * crow * ccol), dtype=np.uint8
                )
        finally:
            os.remove(gzfname)
        return res.reshape((num_samples, crow, ccol)) / 256


    def load_labels(src, num_samples):
        print("Downloading " + src)
        gzfname, h = urlretrieve(src, "./delete.me")
        print("Done.")
        try:
            with gzip.open(gzfname) as gz:
                n = struct.unpack("I", gz.read(4))
                # Read magic number.
                if n[0] != 0x1080000:
                    raise Exception("Invalid file: unexpected magic number.")
                # Read number of entries.
                n = struct.unpack(">I", gz.read(4))
                if n[0] != num_samples:
                    raise Exception(
                        "Invalid file: expected {0} rows.".format(num_samples)
                    )
                # Read labels.
                res = np.frombuffer(gz.read(num_samples), dtype=np.uint8)
        finally:
            os.remove(gzfname)
        return res.reshape((num_samples))


    def try_download(data_source, label_source, num_samples):
        data = load_data(data_source, num_samples)
        labels = load_labels(label_source, num_samples)
        return data, labels
    
    # Not sure why, but yann lecun's website does no longer support 
    # simple downloader. (e.g. urlretrieve and wget fail, while curl work)
    # Since not everyone has linux, use a mirror from uni server.
    #     server = 'http://yann.lecun.com/exdb/mnist'
    server = 'https://raw.githubusercontent.com/fgnt/mnist/master'
    
    # URLs for the train image and label data
    url_train_image = f'{server}/train-images-idx3-ubyte.gz'
    url_train_labels = f'{server}/train-labels-idx1-ubyte.gz'
    num_train_samples = 60000

    print("Downloading train data")
    train_features, train_labels = try_download(url_train_image, url_train_labels, num_train_samples)

    # URLs for the test image and label data
    url_test_image = f'{server}/t10k-images-idx3-ubyte.gz'
    url_test_labels = f'{server}/t10k-labels-idx1-ubyte.gz'
    num_test_samples = 10000

    print("Downloading test data")
    test_features, test_labels = try_download(url_test_image, url_test_labels, num_test_samples)
    
    return train_features, train_labels, test_features, test_labels

In [None]:
train_features, train_labels, test_features, test_labels = get_mnist()

In [None]:
train_features.shape

In [None]:
models = {}

train_features_binary = np.array(train_features > 0.5, dtype=np.float64)

for i in range(10):
    train_subset_X = train_features_binary[train_labels == i]
    models[i] = ???


In [None]:
for i, model_X in models.items():
    for image, ax in zip(model_X.means.reshape(-1, 28, 28), plt.subplots(1, len(model_X.means), squeeze=False)[1].ravel()):
        ax.imshow(image)

In [None]:
predict = ???
acc = np.mean(predict == test_labels)
acc

In [None]:
from sklearn.metrics import confusion_matrix
confusion_matrix(test_labels, predict)

In [None]:
plt.imshow(confusion_matrix(test_labels, predict))

In [None]:
def plot_confusion_matrix(y_true, y_pred, classes,
                          normalize=False,
                          title=None,
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    
    https://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html#sphx-glr-auto-examples-model-selection-plot-confusion-matrix-py
    """
    from sklearn import svm, datasets
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import confusion_matrix
    from sklearn.utils.multiclass import unique_labels
    
    if not title:
        if normalize:
            title = 'Normalized confusion matrix'
        else:
            title = 'Confusion matrix, without normalization'

    # Compute confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    # Only use the labels that appear in the data
    classes = classes[unique_labels(y_true, y_pred)]
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    fig, ax = plt.subplots()
    im = ax.imshow(cm, interpolation='nearest', cmap=cmap)
    ax.figure.colorbar(im, ax=ax)
    # We want to show all ticks...
    ax.set(xticks=np.arange(cm.shape[1]),
           yticks=np.arange(cm.shape[0]),
           # ... and label them with the respective list entries
           xticklabels=classes, yticklabels=classes,
           title=title,
           ylabel='True label',
           xlabel='Predicted label')

    # Rotate the tick labels and set their alignment.
    plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
             rotation_mode="anchor")

    # Loop over data dimensions and create text annotations.
    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, format(cm[i, j], fmt),
                    ha="center", va="center",
                    color="white" if cm[i, j] > thresh else "black")
    fig.tight_layout()
    return ax


np.set_printoptions(precision=2)

# Plot non-normalized confusion matrix
plot_confusion_matrix(test_labels, predict, classes=np.arange(10),
                      title='Confusion matrix, without normalization')

# Plot normalized confusion matrix
plot_confusion_matrix(test_labels, predict, classes=np.arange(10), normalize=True,
                      title='Normalized confusion matrix')