## Problem 5
### EM with Binomial Distribution:
Two biased coins mixed together:
- Coin 1: Shows heads with probability p
- Coin 2: Shows heads with probability r
- Mixing weights: pick coin 1 with probability->$$\pi$$
- M : Number of sequences
- K : number of flips per seqeuence

In [44]:
import numpy as np

### Part A - Generate Mix data and EM Algorithm:

In [45]:
def generate_coin_mix_data(pi, p, r, M = 100, K = 10):
    data = []
    true_coins = []     # this keeps track of which coin was used for verification

    for i in range(M):
        if np.random.random() < pi:
            coin = 1
            prob_heads = p
        else:
            coin = 2
            prob_heads = r

        # 1 for heads & 0 for tails
        flips = np.random.binomial(1, prob_heads, K)

        data.append(flips)
        true_coins.append(coin)

    return np.array(data), np.array(true_coins)

In [46]:
# data generation check:
pi = 0.75
p = 0.8
r = 0.4

data, true_coins = generate_coin_mix_data(pi, p, r)
print(f"Data shape: {data.shape}")
print(f"Sample sequences:\n{data[:3]}")
print(f"Number of heads per sequence: {np.sum(data, axis=1)[:10]}")

Data shape: (100, 10)
Sample sequences:
[[1 1 1 1 1 1 1 0 1 1]
 [1 1 0 1 1 1 1 0 1 1]
 [1 1 1 1 1 1 1 1 0 1]]
Number of heads per sequence: [ 9  8  9  8  9 10  6  9  7  9]


In [47]:
def EM_Binomial_Mix(data, max_iter=100, tol=1e-6):
    M, K = data.shape

    # initializing params randomly:
    pi_rand = np.random.random()
    p_rand = np.random.random()
    r_rand = np.random.random()

    print(f"Initial: pi={pi_rand:.3f}, p={p_rand:.3f}, r={r_rand:.3f}")

    log_likelihoods = []

    for iteration in range(max_iter):
        """
        E - STEP:
        Calc responsibilities => which coin likely to generate each sequence
        """

        n_heads = np.sum(data, axis=1)
        n_tails = K - n_heads

        """
        Likelihood of each sequence given coin 1
        P(sequence | coin 1) = C(K, n_heads) * p^n_heads * (1-p)^n_tails
        """
        likelihood_coin1 = (p_rand ** n_heads) * ((1-p_rand) ** n_tails)

        likelihood_coin2 = (r_rand ** n_heads) * ((1-r_rand) ** n_tails)

        """
        Joint Probability: P(sequence, coin) = P(sequence | coin) * P(coin)
        """
        joint_coin1 = likelihood_coin1 * pi_rand
        joint_coin2 = likelihood_coin2 * (1-pi_rand)

        """
        Responsibilities: P(coin|sequence) -> with bayes rule
        """
        total = joint_coin1 + joint_coin2 + 1e-10
        gamma_coin1 = joint_coin1 / total
        gamma_coin2 = joint_coin2 / total

        """
        M - STEP:
        Update parameters based on responsibilities
        Update Mixing weight
        """
        pi_rand_new = np.mean(gamma_coin1)

        """
        Update coin probabilities for both coins
        Weighted average of heads frequency
        """
        weighted_sum_coin1 = np.sum(gamma_coin1[:, np.newaxis] * data)
        weighted_total_coin1 = np.sum(gamma_coin1) * K
        p_rand_new = weighted_sum_coin1 / weighted_total_coin1

        weighted_sum_coin2 = np.sum(gamma_coin2[:, np.newaxis] * data)
        weighted_total_coin2 = np.sum(gamma_coin2) * K
        r_rand_new = weighted_sum_coin2 / weighted_total_coin2

        """
        calc log-likelihood and convergence step
        """
        log_likelihood = np.sum(np.log(total))
        log_likelihoods.append(log_likelihood)

        if iteration>0:
            if abs(pi_rand_new - pi_rand) < tol and abs(p_rand_new - p_rand) < tol and abs(r_rand_new - r_rand) < tol:
                print(f"Converged at iteration {iteration}")
                break

        pi_rand = pi_rand_new
        p_rand = p_rand_new
        r_rand = r_rand_new

        if iteration % 10 == 0:
            print(f"Iter {iteration}: pi={pi_rand:.3f}, p={p_rand:.3f}, r={r_rand:.3f}")

    return pi_rand, p_rand, r_rand, log_likelihoods

In [48]:
# Running em on data
pi_recovered, p_recovered, r_recovered, log_likelihoods = EM_Binomial_Mix(data)

print("\n" + "="*50)
print("True parameters:")
print(f"  pi={pi:.3f}, p={p:.3f}, r={r:.3f}")
print("Recovered parameters:")
print(f"  pi={pi_recovered:.3f}, p={p_recovered:.3f}, r={r_recovered:.3f}")

Initial: pi=0.540, p=0.498, r=0.535
Iter 0: pi=0.463, p=0.684, r=0.753
Iter 10: pi=0.289, p=0.447, r=0.833
Iter 20: pi=0.275, p=0.436, r=0.829
Iter 30: pi=0.275, p=0.436, r=0.829
Converged at iteration 34

True parameters:
  pi=0.750, p=0.800, r=0.400
Recovered parameters:
  pi=0.275, p=0.436, r=0.829


### Part - B : Testing EM with multiple param sets

In [49]:
import numpy as np
import pandas as pd

def test_EM_multi_param():
    """
    Part B: Test EM algorithm with multiple parameter sets
    Report results in a comparison table
    """
    test_cases = [
        {'pi': 0.75, 'p': 0.8, 'r': 0.4, 'description': 'Well separated'},
        {'pi': 0.5, 'p': 0.7, 'r': 0.3, 'description': 'Equal mixing'},
        {'pi': 0.3, 'p': 0.6, 'r': 0.2, 'description': 'Unequal mixing (pi<0.5)'},
        {'pi': 0.6, 'p': 0.65, 'r': 0.35, 'description': 'Similar probabilities'},
        {'pi': 0.8, 'p': 0.9, 'r': 0.1, 'description': 'Extreme probabilities'},
        {'pi': 0.4, 'p': 0.55, 'r': 0.45, 'description': 'Very similar coins'},
        {'pi': 0.7, 'p': 0.85, 'r': 0.25, 'description': 'Moderate separation'},
        {'pi': 0.25, 'p': 0.5, 'r': 0.1, 'description': 'Low mixing weight'}
    ]

    results_list = []

    print("="*80)
    print("TESTING EM ALGORITHM WITH MULTIPLE PARAMETER SETS")
    print("="*80)

    for idx, params in enumerate(test_cases, 1):
        print(f"\n--- Test Case {idx}: {params['description']} ---")
        print(f"True Parameters: π={params['pi']:.3f}, p={params['p']:.3f}, r={params['r']:.3f}")

        # Generate data with true parameters
        data, _ = generate_coin_mix_data(params['pi'], params['p'], params['r'], M=100, K=10)

        best_result = None
        best_log_likelihood = -np.inf

        for run in range(5):
            pi_rec, p_rec, r_rec, log_liks = EM_Binomial_Mix(data, max_iter=100)

            if len(log_liks) > 0 and log_liks[-1] > best_log_likelihood:
                best_log_likelihood = log_liks[-1]
                best_result = (pi_rec, p_rec, r_rec)

        pi_rec, p_rec, r_rec = best_result

        error1 = abs(params['pi'] - pi_rec) + abs(params['p'] - p_rec) + abs(params['r'] - r_rec)
        error2 = abs(params['pi'] - (1-pi_rec)) + abs(params['p'] - r_rec) + abs(params['r'] - p_rec)

        if error2 < error1:
            pi_rec = 1 - pi_rec
            p_rec, r_rec = r_rec, p_rec
            print("(Label switching detected and corrected)")

        print(f"Recovered:       π={pi_rec:.3f}, p={p_rec:.3f}, r={r_rec:.3f}")

        results_list.append({
            'Test Case': idx,
            'Description': params['description'],
            'True π': params['pi'],
            'Recovered π': round(pi_rec, 4),
            'π Error': round(abs(params['pi'] - pi_rec), 4),
            'True p': params['p'],
            'Recovered p': round(p_rec, 4),
            'p Error': round(abs(params['p'] - p_rec), 4),
            'True r': params['r'],
            'Recovered r': round(r_rec, 4),
            'r Error': round(abs(params['r'] - r_rec), 4),
            'Total Error': round(abs(params['pi'] - pi_rec) +
                               abs(params['p'] - p_rec) +
                               abs(params['r'] - r_rec), 4)
        })

    results_df = pd.DataFrame(results_list)

    print("\n" + "="*80)
    print("COMPARISON TABLE: TRUE VS RECOVERED PARAMETERS")
    print("="*80)
    print(results_df.to_string(index=False))

    print("\n" + "="*80)
    print("SUMMARY STATISTICS")
    print("="*80)
    print(f"Average π error: {results_df['π Error'].mean():.4f}")
    print(f"Average p error: {results_df['p Error'].mean():.4f}")
    print(f"Average r error: {results_df['r Error'].mean():.4f}")
    print(f"Average total error: {results_df['Total Error'].mean():.4f}")

    return results_df

# Run Part B
results = test_EM_multi_param()

TESTING EM ALGORITHM WITH MULTIPLE PARAMETER SETS

--- Test Case 1: Well separated ---
True Parameters: π=0.750, p=0.800, r=0.400
Initial: pi=0.016, p=0.960, r=0.711
Iter 0: pi=0.034, p=0.972, r=0.708
Iter 10: pi=0.403, p=0.890, r=0.601
Iter 20: pi=0.713, p=0.827, r=0.443
Iter 30: pi=0.781, p=0.810, r=0.386
Iter 40: pi=0.790, p=0.807, r=0.378
Iter 50: pi=0.791, p=0.807, r=0.376
Iter 60: pi=0.791, p=0.807, r=0.376
Converged at iteration 69
Initial: pi=0.869, p=0.842, r=0.705
Iter 0: pi=0.733, p=0.801, r=0.486
Iter 10: pi=0.782, p=0.810, r=0.385
Iter 20: pi=0.790, p=0.807, r=0.378
Iter 30: pi=0.791, p=0.807, r=0.376
Iter 40: pi=0.791, p=0.807, r=0.376
Converged at iteration 48
Initial: pi=0.576, p=0.127, r=0.017
Iter 0: pi=0.972, p=0.717, r=0.116
Iter 10: pi=0.819, p=0.798, r=0.348
Iter 20: pi=0.795, p=0.806, r=0.373
Iter 30: pi=0.792, p=0.807, r=0.376
Iter 40: pi=0.791, p=0.807, r=0.376
Iter 50: pi=0.791, p=0.807, r=0.376
Converged at iteration 53
Initial: pi=0.421, p=0.552, r=0.312
Ite

### Part - C : T param

In [50]:
def generate_T_coins_data(pi_vector, p_vector, M=100, K=10):
    """
    Args:
        pi_vector: mixing weights for T coins (must sum to 1)
        p_vector: head probabilities for T coins
        M: number of sequences
        K: flips per sequence
    """
    T = len(pi_vector)
    data = []
    true_coins = []

    for i in range(M):
        """select coin based on mixing weights:"""
        coin = np.random.choice(T, p=pi_vector)
        prob_heads = p_vector[coin]

        # flip coin K times:
        flips = np.random.binomial(1, prob_heads, K)
        data.append(flips)
        true_coins.append(coin)
    return np.array(data), np.array(true_coins)

In [51]:
def EM_T_coins(data, T, max_iter=100, tol=1e-6):
    """
    Args:
        data: M x K matrix of coin flips
        T: number of coins in mixture
        max_iter: max iterations
        tol: tolerance
    """
    M, K = data.shape
    pi = np.random.dirichlet(np.ones(T))
    p = np.random.random(T) # head prob for each coin
    log_likelihoods = []

    for iteration in range(max_iter):
        """E-STEP"""
        n_heads = np.sum(data, axis=1)
        n_tails = K - n_heads

        gamma = np.zeros((M, T)) # responsibilities for all T coins

        for t in range(T):
            likelihood = (p[t] ** n_heads) * ((1-p[t]) ** n_tails)
            gamma[:, t] = likelihood * pi[t]

        gamma_sum = np.sum(gamma, axis=1, keepdims=True) + 1e-10
        gamma = gamma / gamma_sum

        """M-STEP"""
        pi_new = np.mean(gamma, axis=0)
        p_new = np.zeros(T)

        for t in range(T):
            weighted_sum = np.sum(gamma[:, t:t+1] * data)
            weighted_total = np.sum(gamma[:, t]) * K
            p_new[t] = weighted_sum / (weighted_total + 1e-10)

        log_likelihood = np.sum(np.log(gamma_sum))
        log_likelihoods.append(log_likelihood)

        # convergence check:
        if np.max(np.abs(pi_new - pi)) < tol and np.max(np.abs(p_new - p)) < tol:
            print(f"Converged at iteration {iteration}")
            break

        pi = pi_new
        p = p_new

        if iteration % 20 == 0:
            print(f"Iter {iteration}: π={pi}, p={p}")

    return pi, p, log_likelihoods

In [52]:
def match_coins_simple(pi_true, p_true, pi_rec, p_rec):
    """
    Match recovered coins to true coins based on p values
    """
    T = len(p_true)

    true_idx = np.argsort(p_true)
    rec_idx = np.argsort(p_rec)

    reorder = np.zeros(T, dtype=int)
    for i in range(T):
        reorder[true_idx[i]] = rec_idx[i]

    return pi_rec[reorder], p_rec[reorder]

In [53]:
def test_T_coins():
    print("="*80)
    print("PART C: TESTING EM WITH T COINS")
    print("="*80)

    # Test case 1: 3 coins
    print("\n--- Test Case 1: Three Coins ---")
    pi_true = np.array([0.5, 0.3, 0.2])
    p_true = np.array([0.8, 0.5, 0.2])
    print(f"True parameters:")
    print(f"  π = {pi_true}")
    print(f"  p = {p_true}")

    data, _ = generate_T_coins_data(pi_true, p_true, M=200, K=10)
    pi_rec, p_rec, _ = EM_T_coins(data, T=3)

    # Match coins to handle label switching
    pi_matched, p_matched = match_coins_simple(pi_true, p_true, pi_rec, p_rec)

    print(f"Recovered parameters:")
    print(f"  π = {pi_rec}")
    print(f"  p = {p_rec}")
    print(f"After matching:")
    print(f"  π = {pi_matched}")
    print(f"  p = {p_matched}")

    # Test case 2: 4 coins
    print("\n--- Test Case 2: Four Coins ---")
    pi_true = np.array([0.4, 0.3, 0.2, 0.1])
    p_true = np.array([0.9, 0.6, 0.4, 0.1])
    print(f"True parameters:")
    print(f"  π = {pi_true}")
    print(f"  p = {p_true}")

    data, _ = generate_T_coins_data(pi_true, p_true, M=300, K=10)
    pi_rec, p_rec, _ = EM_T_coins(data, T=4)
    pi_matched, p_matched = match_coins_simple(pi_true, p_true, pi_rec, p_rec)

    print(f"Recovered parameters:")
    print(f"  π = {pi_rec}")
    print(f"  p = {p_rec}")
    print(f"After matching:")
    print(f"  π = {pi_matched}")
    print(f"  p = {p_matched}")

    # Test case 3: Compare performance with different T values
    print("\n--- Test Case 3: Performance Comparison ---")
    results = []

    for T in [2, 3, 4, 5]:
        # Generate true parameters
        random_vals = np.random.random(T)
        pi_true = random_vals / np.sum(random_vals)
        p_true = np.sort(np.random.random(T))[::-1]

        data, _ = generate_T_coins_data(pi_true, p_true, M=100*T, K=10)

        # Run EM multiple times
        best_ll = -np.inf
        best_params = None

        for run in range(3):
            pi_rec, p_rec, log_liks = EM_T_coins(data, T=T)
            if len(log_liks) > 0 and log_liks[-1] > best_ll:
                best_ll = log_liks[-1]
                best_params = (pi_rec, p_rec)

        pi_rec, p_rec = best_params

        # Match coins before calculating error
        pi_matched, p_matched = match_coins_simple(pi_true, p_true, pi_rec, p_rec)

        # Calculate errors after matching
        pi_error = np.sum(np.abs(pi_true - pi_matched))
        p_error = np.sum(np.abs(p_true - p_matched))

        results.append({
            'T (# coins)': T,
            'Data points': 100*T,
            'π error (sum)': round(pi_error, 4),
            'p error (sum)': round(p_error, 4),
            'Log-likelihood': round(best_ll, 2)
        })

    results_df = pd.DataFrame(results)
    print("\nPerformance vs Number of Coins:")
    print(results_df.to_string(index=False))

test_T_coins()

PART C: TESTING EM WITH T COINS

--- Test Case 1: Three Coins ---
True parameters:
  π = [0.5 0.3 0.2]
  p = [0.8 0.5 0.2]
Iter 0: π=[0.55739007 0.10286061 0.33974889], p=[0.75362344 0.12937635 0.46984623]
Iter 20: π=[0.52069029 0.14386034 0.33544927], p=[0.78616268 0.17731401 0.47144005]
Iter 40: π=[0.49194932 0.16437978 0.3436708 ], p=[0.7935512  0.1909235  0.49823556]
Iter 60: π=[0.46731252 0.17876597 0.35392141], p=[0.79986508 0.20002514 0.51835038]
Iter 80: π=[0.44769977 0.18872338 0.36357675], p=[0.804896   0.20617455 0.53286751]
Recovered parameters:
  π = [0.43344761 0.19529206 0.37126023]
  p = [0.80856388 0.21017623 0.54270321]
After matching:
  π = [0.43344761 0.37126023 0.19529206]
  p = [0.80856388 0.54270321 0.21017623]

--- Test Case 2: Four Coins ---
True parameters:
  π = [0.4 0.3 0.2 0.1]
  p = [0.9 0.6 0.4 0.1]
Iter 0: π=[0.62381071 0.08518042 0.14401541 0.1469932 ], p=[0.77310635 0.78986298 0.10642152 0.41127074]
Iter 20: π=[0.3897983  0.12080883 0.15592389 0.333468