In [1]:
import numpy as np
import math

In [9]:
class EM:
    def __init__(self, pi_init, p_init, q_init, eps=1e-6):
        self.pi = pi_init
        self.p = p_init
        self.q = q_init
        self.eps = eps
    
    def fit(self, samples):
        samples = np.array(samples)
        thetas = [np.array([self.pi, self.p, self.q])]
        while True:
            mu = (self.pi * np.power(self.p*np.ones_like(samples), samples) * np.power((1 - self.p)*np.ones_like(samples), 1 - samples))/(self.pi * np.power(self.p*np.ones_like(samples), samples) * np.power((1 - self.p)*np.ones_like(samples), 1 - samples) + (1 - self.pi) * np.power(self.q*np.ones_like(samples), samples) * np.power((1-self.q)*np.ones_like(samples), 1-samples))
            self.pi = np.mean(mu)
            self.p = np.dot(mu, samples.T)/np.sum(mu)
            self.q = np.dot(1-mu, samples.T)/np.sum(1-mu)
            thetas.append(np.array([self.pi, self.p, self.q]))
            if np.sqrt(np.sum((thetas[-1] - thetas[-2]) ** 2)) < self.eps:
                break
        return thetas


In [13]:
em = EM(0.46,0.55,0.67)
samples=[1,1,0,1,0,0,1,0,1,1]
thetas = em.fit(samples)
print(thetas)

[array([0.46, 0.55, 0.67]), array([0.46186284, 0.534595  , 0.65613464]), array([0.46186284, 0.534595  , 0.65613464])]


In [41]:
def gaussian(mu, sigma, y):
    prob = 1 / (np.sqrt(2 * math.pi) * sigma) * np.exp(-(y-mu)*(y-mu)/(2 * sigma * sigma))
    return prob

def calculate_distance(theta1, theta2):
    sum_dis = 0
    for i in range(len(theta1)):
        sum_dis += np.sqrt(np.sum((theta2[i] - theta1[i])**2))
    return sum_dis

class gaussian_EM:
    def __init__(self, alphas, mus, sigmas, eps=1e-12):
        self.alphas = np.array(alphas)
        self.mus = np.array(mus)
        self.sigmas = np.array(sigmas)
        self.eps = eps
    
    def fit(self, samples):
        samples = np.array(samples)
        thetas = [[self.alphas, self.mus, self.sigmas]]
        gammas = np.zeros([len(samples), len(self.alphas)])
        while True:
            for j in range(len(samples)):
                for k in range(len(self.alphas)):
                    gammas[j][k] = self.alphas[k] * gaussian(self.mus[k],self.sigmas[k], samples[j])
                gammas[j] = gammas[j] / np.sum(gammas[j])
            mus = np.array([np.dot(samples,gammas[:,k])/np.sum(gammas[:,k]) for k in range(len(self.mus))])
            self.sigmas = np.sqrt(np.array([np.dot((samples-self.mus[k])**2, gammas[:,k])/np.sum(gammas[:,k]) for k in range(len(self.sigmas))]))
            self.alphas = np.array([np.sum(gammas[:,k])/len(samples) for k in range(len(self.alphas))])
            self.mus = mus
            thetas.append([self.alphas, self.mus, self.sigmas])
            if calculate_distance(thetas[-1], thetas[-2]) < self.eps:
                break
        return thetas[-1]
        

In [44]:
gem = gaussian_EM(alphas=[0.5,0.5], mus=[20,20],sigmas=[10,10])
samples=[-67,-48,6,8,14,16,23,24,28,29,41,49,56,60,75]
theta = gem.fit(samples)
theta

[array([0.5, 0.5]),
 array([20.93333333, 20.93333333]),
 array([36.46453376, 36.46453376])]