In [1]:
import numpy as np
import seaborn as sns
import pandas as pd
from scipy import stats
from collections import Counter

In [2]:
def generate_mixGauss(n, mu_1 = 0, mu_2 = 2, sig_1 = 1, sig_2 = 1, alpha = 0.2, set_seed = None, form= 'mix-norm'):
    """ Generate mixture of two 1D-Gaussians with given parameters"""
    X = np.zeros(n)
    assert (0 <= alpha <= 1), "alpha proportion outside of [0,1] range"

    if isinstance(set_seed, int):
        np.random.seed(set_seed)
    
    if form == 'mix-norm':
        for i in range(n):
            draw = np.random.uniform()
            if draw < alpha:
                X[i] = np.random.normal(mu_1, sig_1)
            else:
                X[i] = np.random.normal(mu_2, sig_2)

    return X

def GaussianPrior(x, loc = 1, sigma = 1, lambd = 0.1):
    return stats.norm(loc, np.sqrt((sigma**2)/lambd)).pdf(x)

def GaussianLlhood(x, mu, sigma = 1):
    return stats.norm(mu, sigma).pdf(x)

In [None]:
def GaussianMixturePMC(X, N, V, T, n_parameters = 2, start_values = None, baseline_threshold = 0.01, alpha = 0.5):
    """ PMC Sampler for a simple Gaussian Mixture 

    X = Observations vector
    N = length of each set of sampled values at each iteration (fixed in this implementation across iterations)
    V = Variances vector
    T = number of iterations
    n_parameters = number of parameters(means) to be estimated
    start_values = (N x n_parameters) matrix, if None: take initial values of means as the means of the observations X, otherwise starting means
    baseline_threshold = percentage of generated samples at each iteration to be kept for each variance level
    alpha = mixture proportions
    """
    p = len(V)

    baseline = int(N*baseline_threshold*p) #Used to avoid certain variances from vanishing

    assert (N%p == 0), "N%p is not divisible" 
    R = [Counter(V)]

    if start_values is None:
        mu0 = np.full((N, n_parameters+1), np.mean(X))
        mu0[:, -1] = [i for sublist in [[x]*int(N/p) for x in V] for i in sublist]
        mu0_base = np.full((baseline, n_parameters+1), np.mean(X))
        mu0_base[:, -1] = [i for sublist in [[x]*int(baseline/p) for x in V] for i in sublist]
        mu0 = np.concatenate([mu0, mu0_base])
    else:
        # start_values = np.array(start_values)
        # if start_values.shape == (n_parameters,):
        #     mu0 = np.tile(start_values, (N+baseline, 1))
        #     m

        #np.assert(start_values.shape == (N+baseline, n_parameters+1)
        mu0 = start_values

    mu = mu0
    mus = [mu0]

    W = np.zeros(N + baseline)
    for t in range(T):
        if t%10 == 0:
            print('iteration: ', t)
        mu_star = mu.copy()
        for n in range(N+baseline):
            for i in range(len(mu[n]) - 1):
                #Perform isotropic perturbation of previous means batch, each sample with assigned variance
                mu_star[n][i] = np.random.normal(mu[n][i], np.sqrt(mu[n][-1]))

            #Assign each weight 
            W[n] = np.sum(np.log(GaussianLlhood(X, mu_star[n][0], sigma = np.sqrt(1))*alpha + GaussianLlhood(X, mu_star[n][1], sigma = np.sqrt(1))*(1-alpha))) +\
            np.log(GaussianPrior(mu_star[n][0])*GaussianPrior(mu_star[n][1])) -\
            (np.log(GaussianLlhood(mu_star[n][0], mu = mu[n][0], sigma = np.sqrt(mu[n][-1]))) + \
            np.log(GaussianLlhood(mu_star[n][1], mu = mu[n][1], sigma = np.sqrt(mu[n][-1]))))

        P = np.exp(W-np.max(W))
        weights_normalized = P/np.sum(P)   

        resample_idx = np.random.choice(N+baseline, N+baseline, p  = weights_normalized)
        mu = np.array([mu_star[x] for x in resample_idx])

        mu[-baseline:, -1] = [i for sublist in [[x]*int(baseline/p) for x in V] for i in sublist] #TO UPDATE IN FINAL VERSION CAREFUL!!!!
        V_assignments = mu[:, -1].copy()
        R.append(Counter(V_assignments))
        mus.append(mu.copy())
        #Reshuffle Variances
        np.random.shuffle(V_assignments)
        mu[:, -1] = V_assignments

    return mus, pd.DataFrame(R)