In [1]:
import numpy as np

In [311]:
def initialize_parameters(N, K, seed=None):
    if seed is not None:
        np.random.seed(seed)

    # Probability vector of length K
    pi = np.random.dirichlet(alpha=np.ones(K))    

    assert(np.allclose(sum(pi), 1, atol=1e-6) == 1)

    # Transition matrix (K x K), where each column sums to 1, because a column
    # represents the jump probabilities from a fixed initial state.
    gamma = np.random.dirichlet(alpha=np.ones(K), size=K).T

    assert(np.allclose(np.sum(gamma, axis = 0), np.ones(K), atol=1e-6) == 1)

    # r matrix (N+1 x K), where each column sums to 1, because a column
    # represents the firing probability of a neuron in a fixed state.
    r = np.random.dirichlet(alpha=np.ones(N + 1), size=K).T

    assert(np.allclose(np.sum(r, axis = 0), np.ones(K), atol=1e-6))

    return pi, gamma, r

def generate_hmm_data(T, N, K, pi, gamma, r):
    z = np.zeros(T, dtype=int)  
    y = np.zeros(T, dtype=int)  

    # Initialization t=0. I extract the value of K following the init prob
    z[0] = np.random.choice(K, p=pi)
    y[0] = np.random.choice(N + 1, p=r[:, z[0]])

    #data generation
    for t in range(1, T):
        # I take the column of gamma corresponding to z[t-1] and extract from this distribution
        z[t] = np.random.choice(K, p=gamma[:, z[t-1]])
        
        # I take the column of r corresponding to z[t] and extract from this prob distribution
        y[t] = np.random.choice(N + 1, p=r[:, z[t]])

    return y, z


def forward(y, pi, gamma, r, T, K):
    
    alpha = np.zeros((T, K))   # alpha has T elements of dimension K each
    alpha[0] = pi * r[y[0]]   # r[y[0]] is the first row of r: r_y1,k for each k

    for t in range(0, T-1):
        for k in range(K):
            alpha[t+1, k] = r[y[t], k] * np.sum(alpha[t] * gamma[k])      #gamma[k] is the k'th row of gamma
    return alpha + 1e-6

def backward(y, gamma, r, T, K):
    
    beta = np.zeros((T, K))
    beta[-1] = 1
    
    for t in range(T - 1, 0, -1):
        for k in range(K):
            beta[t-1, k] = r[y[t], k] * np.sum(beta[t] * gamma[k])
    return beta + 1e-6

def e_step(y, pi, gamma, r, T, K):
    
    alpha = forward(y, pi, gamma, r, T, K)
    beta = backward(y, gamma, r, T, K)
    
    """
    print("alpha = ", alpha)
    print("beta = ", beta)
    """
    
    # it must have T-1 elements because it's the P(z_t = k and z_t+1 = l | ...)
    xi = np.zeros((T-1, K, K))
    
    for t in range(T-1):
        for k in range(K):
            for l in range(K):
                xi[t, k, l] = alpha[t, k] * gamma[k, l] * r[y[t + 1], l] * beta[t + 1, l]
        #renormalization
        xi[t] /= np.sum(xi[t])


    zeta = np.zeros((T, K))
    
    for t in range(T):
        for k in range(K):
            zeta[t,k] = alpha[t, k] * beta[t, k]

        zeta[t] /= np.sum(zeta[t])

    """
    print("zeta = ", zeta)
    print("xi = ", xi)
    """
    
    return zeta, xi

def m_step(y, zeta, xi, N, K):
    
    T = len(y)
    pi = np.zeros(K)
    gamma = np.zeros((K,K))
    r = np.zeros((N + 1, K))

    # update of pi
    pi = zeta[0]
    
    # update of gamma
    for k in range(K):
        for l in range(K):
            gamma[k, l] = np.sum(xi[:, k, l]) # --> Here's a division by 0
            
    for l in range(K):
        gamma[:, l] /= np.sum(gamma[:, l])
            
    # update of r 
    for k in range(K):
        for i in range(N + 1):
            numerator = 0
            for t in range(T):
                if y[t]==i:
                    numerator += zeta[t, k]
        
            r[i,k] = numerator

        r[:, k] /= np.sum(r[:, k]) # --> Here's a division by 0

    return pi, gamma, r

def em_algorithm(y, T, N, K, pi, gamma, r, max_iter=100, tol=1e-2):
    for iteration in range(max_iter):
        # print(iteration)
        
        # E-step
        zeta, xi = e_step(y, pi, gamma, r, T, K)        
        
        # M-step
        pi_updated, gamma_updated, r_updated = m_step(y, zeta, xi, N, K)

        """
        print("pi_upd = ", pi_updated)
        print("gamma_upd = ", gamma_updated)
        print("r_upd = ", r_updated)
        """
        
        # i compute the delta using the relative distance, and using the Frobenius norm
        delta_pi = np.linalg.norm(pi_updated - pi)
        delta_gamma = np.linalg.norm(gamma_updated - gamma, ord='fro')
        delta_r = np.linalg.norm(r_updated - r, ord='fro')
        

        if delta_pi < tol and delta_gamma < tol and delta_r < tol:
            print(f"Converged at iteration {iteration}")
            break

        pi = pi_updated
        gamma = gamma_updated
        r = r_updated

    return pi_updated, gamma_updated, r_updated

In [356]:
# Example usage
N = 5  # Number of neurons
K = 3  # Number of hidden states
T = 5  # Number of time steps

pi, gamma, r = initialize_parameters(N, K)
y, z = generate_hmm_data(T, N, K, pi, gamma, r)
pi_post, gamma_post, r_post = em_algorithm(y, T, N, K, pi, gamma, r, max_iter=200)

Converged at iteration 12


In [358]:
print("pi:", pi)
print("pi_post:", pi_post)

pi: [0.28640178 0.33046428 0.38313395]
pi_post: [6.51109554e-11 6.58841946e-11 1.00000000e+00]


In [359]:
print("gamma:", gamma)
print("gamma_post:", gamma_post)

gamma: [[0.01365094 0.29716794 0.19552591]
 [0.19450255 0.53207178 0.65546815]
 [0.79184651 0.17076028 0.14900594]]
gamma_post: [[9.70693358e-42 6.04300643e-45 3.62202551e-45]
 [7.29552918e-11 1.77360484e-10 3.78496823e-09]
 [1.00000000e+00 1.00000000e+00 9.99999996e-01]]


In [361]:
print("r:", r)
print("r_post:", r_post)

r: [[0.39534712 0.03100989 0.24410058]
 [0.04404357 0.05467267 0.02756573]
 [0.33005924 0.06781042 0.04261027]
 [0.02804101 0.55605137 0.21396308]
 [0.07599049 0.27302089 0.09239797]
 [0.12651857 0.01743475 0.37936236]]
r_post: [[9.99999154e-01 9.99794301e-01 3.99953840e-01]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [6.77181492e-07 1.95986960e-04 4.00030771e-01]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [1.69316571e-07 9.71241076e-06 2.00015389e-01]]
