In [1]:
import numpy as np
import matplotlib.pyplot as plt

from itertools import product
from scipy.special import comb
from scipy.optimize import minimize

In [2]:
observations = np.array(list(map(lambda string: np.array(list(map(lambda x: 1 if x == 'H' else 0, string))),
                                 ['HTTTHHTHTH', 'HHHHTHHHHH', 'HTHHHHHTHH', 'HTHTTTHHTT', 'THHHTHHHTH'])))

hidden_parameters = np.array(list(product(*([[1, 0]]*(len(observations))))))

#### Likelihood Maximization

In [3]:
Nhead = observations.sum(axis=1)
Ntot = 10
coefs = comb(Ntot, Nhead) / 2**Ntot

def likelihood(p1: float, p2: float) -> float:
    probas = np.array([p1, p2])
    return sum([(coefs * np.power(probas[hidden], Nhead) * np.power(1.0 - probas[hidden], Ntot - Nhead)).prod()
                for hidden in hidden_parameters])

vlikelihood = np.vectorize(likelihood)

In [22]:
initial_simplex = ([[0.2, 0.9], [0.3, 0.6], [0.6, 0.9]])
result = minimize(fun=lambda x: -vlikelihood(x[0], x[1]),
                  method='Nelder-Mead',
                  x0=[0.7, 0.4],
                  options={'initial_simplex' : initial_simplex})
p1, p2 = result['x']
print(f'{result['message']}\n{p1=}, {p2=}')

Optimization terminated successfully.
p1=0.5195682006547262, p2=0.7967582123327023


#### EM Algorithm

In [7]:
def loss(p1, p2):
    return np.log(vlikelihood(p1, p2))

def p_inner(z: np.ndarray, p1: float, p2: float) -> np.ndarray:
    probas = np.array([p1, p2])
    return np.array([(coefs * np.power(probas[hidden], Nhead) * np.power(1.0 - probas[hidden], Ntot - Nhead)).prod()
                     for hidden in hidden_parameters])

def p_outer(p1: float, p2: float) -> float:
    return np.sum(p_inner(hidden_parameters, p1, p2))

def delta(theta: tuple[float, float], theta_n: tuple[float, float]) -> float:
    return np.sum(np.multiply(p_inner(hidden_parameters, *theta_n),
                              np.log(np.divide(p_inner(hidden_parameters, *theta),
                                               np.multiply(p_inner(hidden_parameters, *theta_n),
                                                           p_outer(*theta_n))))))

In [75]:
class EMAlgorithm(object):
    def __init__(self, tolerance: float, n_iter: int) -> None:
        self.tolerance = tolerance
        self.n_iter = n_iter

    def _estimate(self, observation: np.ndarray, guess: tuple[float, float]) -> np.ndarray:
        p0, p1 = guess
        n0, n1 = np.unique(observation, return_counts=True)[1]
        prob = np.array([p0**n1 * (1 - p0)**n0,
                         p1**n1 * (1 - p1)**n0])
        return prob / np.sum(prob)
    
    def _find_coeff(self, observation: np.ndarray, probas: np.ndarray) -> list[float]:
        n0, n1 = np.unique(observation, return_counts=True)[1]
        return [n1 * probas[0], n0 * probas[0], n1 * probas[1], n0 * probas[1]]


    def _find_next_guess(self, observations: np.ndarray, guess: tuple[float, float]) -> tuple[float, float]:
        estimations = np.array([self._estimate(observation, guess) for observation in observations])
        coeff = np.sum(np.array([self._find_coeff(observation, estimation)
                                 for observation, estimation in zip(observations, estimations)]),
                                 axis=0)
        return (coeff[0]/(coeff[0]+coeff[1]),
                coeff[2]/(coeff[2]+coeff[3]))
    
    def optimize(self, observations: np.ndarray, initial_guess: tuple[float, float]) -> tuple[float, float]:
        guess = initial_guess
        for _ in range(self.n_iter):
            guess = self._find_next_guess(observations, guess)
        return guess

In [78]:
p1, p2 = EMAlgorithm(1e-8, 2).optimize(observations, (0.5, 0.8))
print(f'{p1=}, {p2=}')

p1=0.5164223903331018, p2=0.7970029411867086
