In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

### Problem 3

In [2]:
import numpy as np
from scipy.stats import multivariate_normal

class GaussianMixtureModel:
    def __init__(self, n_gaussians, max_iter=100, tolerence=1e-5):

        self.n_gaussians = n_gaussians
        self.max_iter = max_iter
        self.tolerence = tolerence
        self.means = None
        self.covariances = None
        self.weights = None

    def fit(self, X):

        n_samples, n_features = X.shape

        # Initialize parameters
        self.weights =np.random.dirichlet(alpha=np.ones(self.n_gaussians))  
        self.means = X[np.random.choice(n_samples, self.n_gaussians, replace=False)]  
        self.covariances = [np.eye(n_features) for _ in range(self.n_gaussians)]  

        prev_log_likelihood = 0

        for iteration in range(self.max_iter):
            # Compute responsibilities
            responsibilities = self.e_step(X)

            # Update parameters
            self.m_step(X, responsibilities)

            # Get log-likelihood
            log_likelihood = self.get_log_likelihood(X)

            # Check for convergence
            if np.abs(log_likelihood - prev_log_likelihood) < self.tolerence:
                print(f"It took {iteration} iterations to converge")
                break

            prev_log_likelihood = log_likelihood

    def e_step(self, X):

        n_samples = X.shape[0]
        responsibilities = np.zeros((n_samples, self.n_gaussians))

        for k in range(self.n_gaussians):
            # Compute the probability density function for each component
            responsibilities[:, k] = self.weights[k] * multivariate_normal.pdf(
                X, mean=self.means[k], cov=self.covariances[k]
            )

        # Normalize responsibilities
        responsibilities /= np.sum(responsibilities, axis=1, keepdims=True)
        return responsibilities

    def m_step(self, X, responsibilities):

        n_samples = X.shape[0]

        # Update weights
        self.weights = np.mean(responsibilities, axis=0)

        # Update means
        for k in range(self.n_gaussians):
            self.means[k] = np.sum(responsibilities[:, k].reshape(-1, 1) * X, axis=0) / np.sum(responsibilities[:, k])

        # Update covariances
        for k in range(self.n_gaussians):
            diff = X - self.means[k]
            self.covariances[k] = (
                np.dot((responsibilities[:, k].reshape(-1, 1) * diff).T, diff) / np.sum(responsibilities[:, k])
            )

    def get_log_likelihood(self, X):

        likelihood = np.zeros((X.shape[0], self.n_gaussians))
        for k in range(self.n_gaussians):
            likelihood[:, k] = self.weights[k] * multivariate_normal.pdf(
                X, mean=self.means[k], cov=self.covariances[k]
            )
        return np.sum(np.log(np.sum(likelihood, axis=1)))

    def predict(self, X):

        responsibilities = self.e_step(X)
        return np.argmax(responsibilities, axis=1)

#### Question A

In [3]:
gaussian_data_2 = pd.read_csv('2gaussian.txt',sep=' ',header=None)

In [4]:
GMM_algo = GaussianMixtureModel(n_gaussians=2)

In [5]:
X = gaussian_data_2.values
GMM_algo.fit(X)

It took 14 iterations to converge


In [6]:
GMM_algo.weights

array([0.33478522, 0.66521478])

In [7]:
GMM_algo.means

array([[2.99406954, 3.05210401],
       [7.01311596, 3.9831157 ]])

In [8]:
GMM_algo.covariances

[array([[1.01013134, 0.02720261],
        [0.02720261, 2.93787134]]),
 array([[0.97481532, 0.49750231],
        [0.49750231, 1.00116963]])]

#### Question B

In [9]:
gaussian_data_3 = pd.read_csv('3gaussian.txt',sep=' ',header=None)
GMM_algo = GaussianMixtureModel(n_gaussians=3)
X = gaussian_data_3.values
GMM_algo.fit(X)

It took 82 iterations to converge


In [10]:
GMM_algo.weights

array([0.20563136, 0.29843228, 0.49593636])

In [11]:
GMM_algo.means

array([[3.03984489, 3.0489184 ],
       [7.02159479, 4.01547869],
       [5.01179863, 7.00153459]])

In [12]:
GMM_algo.covariances

[array([[1.02862002, 0.02710779],
        [0.02710779, 3.38555991]]),
 array([[0.99035672, 0.50093779],
        [0.50093779, 0.99564625]]),
 array([[0.97962786, 0.18510425],
        [0.18510425, 0.97446071]])]

### Problem 4

In [13]:
import numpy as np
from scipy.stats import binom

class BinomialMixtureModel:
    def __init__(self, n_binomials, max_iter=100, tol=1e-6):

        self.n_binomials = n_binomials
        self.D = None  # Number of flips
        self.max_iter = max_iter
        self.tolerence = tol
        self.pi = None  # probability of selecting a coin
        self.p = None  # probability of occurence of heads

    def fit(self, X):

        n_samples,self.D = X.shape  
        X = X.sum(axis=1)
        
        np.random.seed(42)
        self.pi =  np.random.dirichlet(alpha=np.ones(self.n_binomials)) # Mixing coefficients
        self.p = np.random.uniform(low=0.2, high=0.8, size=self.n_binomials)  # Bias probabilities for each coin

        prev_log_likelihood = 0

        for iteration in range(self.max_iter):
            # Compute responsibilities
            responsibilities = self.e_step(X)

            #  Update parameters
            self.m_step(X, responsibilities)

            # Get log-likelihood
            log_likelihood = self.get_log_likelihood(X)

            # Check for convergence
            if np.abs(log_likelihood - prev_log_likelihood) < self.tolerence:
                print(f"It took {iteration} iterations to converge")
                break

            log_likelihood_prev = log_likelihood

    def e_step(self, X):

        n_samples = X.shape[0]
        responsibilities = np.zeros((n_samples, self.n_binomials))

        for k in range(self.n_binomials):
            responsibilities[:, k] = self.pi[k] * binom.pmf(X, self.D, self.p[k])

        # Normalize responsibilities
        responsibilities /= responsibilities.sum(axis=1, keepdims=True)
        return responsibilities

    def m_step(self, X, responsibilities):
  
        # Update mixing coefficients
        self.pi = responsibilities.mean(axis=0)

        # Update bias probabilities
        for k in range(self.n_binomials):
            self.p[k] = np.sum(responsibilities[:, k] * X) / (self.D * np.sum(responsibilities[:, k]))

    def get_log_likelihood(self, X):

        likelihood = np.zeros((X.shape[0], self.n_binomials))
        for k in range(self.n_binomials):
            likelihood[:, k] = self.pi[k] * binom.pmf(X, self.D, self.p[k])

        return np.sum(np.log(likelihood.sum(axis=1)))

    def predict(self, X):

        responsibilities = self.e_step(X)
        return np.argmax(responsibilities, axis=1)

In [14]:
coin_flips_outcome=pd.read_csv('coin_flips_outcome.txt',sep=' ',header=None)

In [15]:
BMM_algo = BinomialMixtureModel(n_binomials=3)
X = coin_flips_outcome.values
BMM_algo.fit(X)

In [16]:
print("Estimated selection probabilities (pi):", BMM_algo.pi)
print("Estimated bias probabilities (p):", BMM_algo.p)

Estimated selection probabilities (pi): [0.17855768 0.51462834 0.30681398]
Estimated bias probabilities (p): [0.93172853 0.61003783 0.23691867]
