## Gaussian Mixture Model
A Gaussian mixture model (GMM) $\hat{p}$ approximates the ground-truth pdf $p^*$ by taking a weighted sum of $C$ *components*, $
\mathcal{N}(\mu_c, \Sigma_c)$, $c=1,...,C$. $C$ is a hyperparameter to be determined by outside knowledge or some sort of tuning. Splitting the training set into a training and validation set (or applying K-fold cross-validation) would be most appropriate in general to choose a single model, but we wished to see the test performance for a range of $C$ to understand its effect. Additionally, a GMM requires some sort of initialisation to then begin its training. We implemented two methods:
- Random initialisation: the initial means are a randomly chosen set of training points, and the initial covariance matrices are all the empirical covariance of the training data multiplied by a normalisation factor chosen through trial and error on the training set that seemed to lead to convergence
- K-means++: We first run a K-means++ clustering method on $C$ components, and then take the cluster centers as the initial means, and the within-cluster empirical variances as the initial variances.

However, we found that the random initialisation method was very unstable, often diverging (in particular for high numbers of components and low numbers of samples). We decided to only display results for 
A number of training iterations must also be selected, an appropriate method would be to set a high number and then set some termination criterion to check for convergence (in particular that the means and variances stop changing significantly)

We define some functions to aid in model training and then obtain some GMMs fitted to the training set for some chosen numbers of components (C), each initialisation method (random or K-means++), and training set size. We simulate having a data set of size $n=100, 200, 500$ and then splitting these 80/20 into training and test sets.

In [1]:
from sklearn.cluster import KMeans
import numpy as np
from scipy.stats import multivariate_normal
from utils import get_train_test_data
from MMD import calculate_mmd
from itertools import product
from collections import namedtuple
from tqdm.auto import tqdm

# Define helper functions for gmm
def random_initialisation(train_data, C, cov_normalisation=0.2, seed=None):
    if seed:
        np.random.seed(seed)
    # Weights are uniform
    weights_array = np.ones(C) * 1 / C

    # Select C data points to act as initial means
    data_indices = np.random.choice(len(train_data), C, replace=False)
    means_array = train_data[data_indices]

    # Take empirical covariance and multiply by a normalisation factor for random covariances
    data_empirical_covariance = np.cov(train_data, rowvar=False)
    covariances_array = np.repeat(data_empirical_covariance[np.newaxis], C,
                                  axis=0) * cov_normalisation

    return weights_array, means_array, covariances_array

def kmeans_initialisation(train_data, C, seed):
    # Weights are uniform
    weights_array = np.ones(C) * 1 / C

    # Perform k means clustering; initial means are cluster centers, initial variances are cluster empirical variances
    kmeans = KMeans(n_clusters=C, init='k-means++', random_state=seed).fit(train_data)

    ## Obtain means
    means_array = kmeans.cluster_centers_

    ## Obtain covariance arrays by getting empirical covariance within each
    covariances_array = np.zeros((C, 2, 2))
    for k in range(C):
        cluster_data = train_data[kmeans.labels_ == k]
        deviations = cluster_data - means_array[k]
        covariances_array[k] = np.cov(deviations, rowvar=False)

    return weights_array, means_array, covariances_array

def get_influence_array(means_array, covariances_array, weights_array, train_data, C):
    # Store component and full distributions for getting the influence array
    component_dists = [multivariate_normal(means_array[i], covariances_array[i]) for i in range(C)]

    # define pdf_values: element i,k is the probability density of datapoint i in component k
    pdf_values = np.array([[dist.pdf(data_point) for dist in component_dists] for data_point in train_data])

    # Define gamma array: gamma_i,k = 'influences of component k on instance i'
    influence_array = weights_array * pdf_values / np.sum(weights_array * pdf_values, axis=1, keepdims=True)

    return influence_array

def single_value_pdf(x, means_array, covariances_array, weights_array):
    component_dists = [multivariate_normal(means_array[i], covariances_array[i]) for i in range(len(means_array))]

    component_pdf_values = np.array([dist.pdf(x) for dist in component_dists])

    pdf_value = np.sum([weight*density for weight,density in zip(weights_array, component_pdf_values)])
    return(pdf_value)



def update_parameters(train_data, influence_array, C):
    n = len(train_data)
    # Update parameters
    weights_array = np.sum(influence_array, axis = 0)/n

    means_array = np.zeros((C, 2))
    for k in range(C):
        means_array[k] = np.sum((influence_array[:, k, np.newaxis]*train_data), axis = 0)/(n*weights_array[k])

    covariances_array = np.zeros((C, 2, 2))
    for k in range(C):
        deviations = train_data - means_array[k]
        covariances_array[k] = \
            np.array([influence_array[i,k]*np.outer(deviations[i], deviations[i]) for i in range(n)]).sum(axis=0) \
            /influence_array.sum(axis = 0)[k]

    return weights_array, means_array, covariances_array

def make_gmm_pdf(weights_array, means_array, covariances_array):
    def gmm_pdf(x):
        component_dists = [multivariate_normal(means_array[i], covariances_array[i]) for i in range(len(means_array))]
        pdf_value = np.sum([weight*dist.pdf(x) for weight, dist in zip(weights_array, component_dists)])
        return pdf_value

    return gmm_pdf

def train_gmm(train_data, C, num_iter, init_method, init_covariance_normalisation, seed):
    # Get initial guesses
    assert init_method in ['random', 'kmeans++'], 'Invalid initialisation method'

    if init_method == 'random':
        weights_array, means_array, covariances_array = random_initialisation(train_data,
                                                                              C,
                                                                              init_covariance_normalisation,
                                                                              seed)
    elif init_method == 'kmeans++':
        weights_array, means_array, covariances_array = kmeans_initialisation(train_data,
                                                                              C,
                                                                              seed)

    # Iterate EM algorithm
    for t in range(num_iter):
        influence_array = get_influence_array(means_array, covariances_array, weights_array, train_data, C)

        weights_array, means_array, covariances_array = update_parameters(train_data, influence_array, C)

    return weights_array, means_array, covariances_array

def sample_from_gmm(weights_array, means_array, covariances_array, n, seed):
    if seed:
        np.random.seed(seed)

    # Choose for each sample which component to sample from
    components = np.random.choice(len(weights_array), size=n, p=weights_array)

    # Sample from the corresponding components
    samples = np.array([
        multivariate_normal(means_array[k], covariances_array[k]).rvs()
        for k in components
    ])

    return samples




  from .autonotebook import tqdm as notebook_tqdm


In [2]:

# Fix data parameters
ns = [int(1.25*100), int(1.25*200), int(1.25*500)] 
init_methods = ['kmeans++']
Cs = [2,5,10]
seed = 11121
init_covariance_normalisation = 0.1 # only for random init; established through trial and error
num_iter = 50
normalise = True

# Initialise results storage variable

GMMResult = namedtuple('GMMResult', ['n', 'init_method', 'C', 'MMD'])
GMMResults = []

for n, init_method, C in tqdm(product(ns, init_methods, Cs)):
    # Get data
    train_data, test_data = get_train_test_data(n, seed)

    # Optionally Normalise
    if normalise:
        mean = np.mean(train_data, axis = 0)
        std = np.std(train_data, axis = 0)

        train_data, test_data = (train_data - mean)/std, (test_data - mean)/std

    weights_array, means_array, covariances_array = \
        train_gmm(train_data, C, num_iter, init_method, init_covariance_normalisation, seed)

    learned_samples = sample_from_gmm(weights_array, means_array, covariances_array, n, seed)

    # Caclulate MMD
    gmm_mmd = calculate_mmd(test_data, learned_samples)

    GMMResults.append(GMMResult(n=n, init_method=init_method, C=C, MMD=gmm_mmd))


for res in GMMResults:
    print(f'number of training samples: {int(0.8*res.n)}, number of components: {res.C}, MMD: {res.MMD}')

  mmd = max(mmd_squared_results.values()) ** 0.5
  mmd = max(mmd_squared_results.values()) ** 0.5
  mmd = max(mmd_squared_results.values()) ** 0.5
  mmd = max(mmd_squared_results.values()) ** 0.5
  mmd = max(mmd_squared_results.values()) ** 0.5
9it [01:18,  8.73s/it]

number of training samples: 100, number of components: 2, MMD: nan
number of training samples: 100, number of components: 5, MMD: nan
number of training samples: 100, number of components: 10, MMD: nan
number of training samples: 200, number of components: 2, MMD: 0.13008836626028494
number of training samples: 200, number of components: 5, MMD: 0.08425082728835152
number of training samples: 200, number of components: 10, MMD: 0.1411589983155624
number of training samples: 500, number of components: 2, MMD: 0.09450495247697198
number of training samples: 500, number of components: 5, MMD: nan
number of training samples: 500, number of components: 10, MMD: nan





In [None]:
# Sanity check:
# Fix data parameters
ns = [int(1.25*100), int(1.25*200), int(1.25*500)] 
init_methods = ['kmeans++']
Cs = [2,5,10]
seed = 11121
init_covariance_normalisation = 0.1 # only for random init; established through trial and error
num_iter = 50

GMMResult = namedtuple('GMMResult', ['n', 'C', 'MMD'])
GMMResults = []


from sklearn.mixture import GaussianMixture
from tqdm.auto import tqdm
GMMResults=[]
for n, C in tqdm(product(ns, Cs)):
    # Get data
    train_data, test_data = get_train_test_data(n, seed)

    # # Optionally Normalise
    # mean = np.mean(train_data, axis = 0)
    # std = np.std(train_data, axis = 0)

    # train_data, test_data = (train_data - mean)/std, (test_data - mean)/std

    gmm = GaussianMixture(C, covariance_type='full', random_state=seed, max_iter = 50)
    gmm.fit(train_data)

    learned_samples = gmm.sample(int(0.2*n))[0]

    # Caclulate MMD
    gmm_mmd = calculate_mmd(test_data, learned_samples)

    GMMResults.append(GMMResult(n=n, C=C, MMD=gmm_mmd))

for res in GMMResults:
    print(f'number of training samples: {int(0.8*res.n)}, number of components: {res.C}, MMD: {res.MMD}')

  mmd = max(mmd_squared_results.values()) ** 0.5
  mmd = max(mmd_squared_results.values()) ** 0.5
  mmd = max(mmd_squared_results.values()) ** 0.5
  mmd = max(mmd_squared_results.values()) ** 0.5
  mmd = max(mmd_squared_results.values()) ** 0.5
  mmd = max(mmd_squared_results.values()) ** 0.5
9it [00:02,  4.46it/s]

number of training samples: 100, number of components: 2, MMD: nan
number of training samples: 100, number of components: 5, MMD: 0.338474652622339
number of training samples: 100, number of components: 10, MMD: nan
number of training samples: 200, number of components: 2, MMD: nan
number of training samples: 200, number of components: 5, MMD: nan
number of training samples: 200, number of components: 10, MMD: 0.11928378193054755
number of training samples: 500, number of components: 2, MMD: nan
number of training samples: 500, number of components: 5, MMD: 0.041908202701563535
number of training samples: 500, number of components: 10, MMD: nan





# Higher dimension:

In [None]:
from sklearn import datasets
from sklearn.mixture import GaussianMixture
import matplotlib.pyplot as plt
import numpy as np

def get_digits_train_test_data(seed = None):
    if seed:
        np.random.seed(seed)
    digits_data = datasets.load_digits()
    features, targets = digits_data['data'], digits_data['target']
    
    # Split into train/test. Take random 80/20 split
    n = len(targets)
    shuffled_indices = np.random.permutation(n)
    features, targets = features[shuffled_indices], targets[shuffled_indices]
    
    split_index = np.floor(0.8*n).astype(int)
    
    train_features, test_features = features[:split_index], features[split_index:]
    train_targets, test_targets = targets[:split_index], targets[split_index]
    
    return train_features, train_targets, test_features, test_targets




In [None]:
# Get data
train_features, train_targets, test_features, test_targets = get_digits_train_test_data(11121)

In [None]:
# Train gmm
## Fix hyperparams

C = 8
seed = 11121

## Split per digit
digit_gmms = []
for digit in range(10):
    digit_features = train_features[train_targets == digit]

    digit_gmm = GaussianMixture(C, covariance_type='full', random_state=seed)
    digit_gmm.fit(digit_features)

    digit_gmms.append(digit_gmm)

# Generate synthetic data: 10 cases per digit
synthetic_features = np.array([digit_gmm.sample(10)[0] for digit_gmm in digit_gmms])
synthetic_features = synthetic_features.reshape(-1, synthetic_features.shape[-1])
synthetic_targets = np.repeat(np.arange(10),10)

# Get MMD
calculate_mmd(test_features, synthetic_features)



In [None]:
# Train random (forest classifier) 
from sklearn.ensemble import RandomForestClassifier
randomForest = RandomForestClassifier(random_state=seed)
randomForest.fit(train_features, train_targets)
rf_predictions = randomForest.predict(synthetic_features)

In [None]:
np.sum(synthetic_targets!=rf_predictions)
# Every digit is recognisable! 

np.int64(0)