Wenqing Zhu (UIN:653942417) and Siyuan Qian (UIN:673877907)

Contributions:

Wenqing Zhu: Part I  

Siyuan Qian: Part II

In [1]:
import numpy as np
import requests
import matplotlib.pyplot as plt
import math

Part I

In [57]:

# Define the myEM function to implement the EM algorithm:
# INPUT:
#   - data: the datase
#   - G: the number of components
#   - initial parameters
#   - itmax: the number of iterations
# OUTPUT:
#   - prob: a G-dimensional probability vector (p1, ..., pG)
#   - mean: A p-by-G matrix with the k-th column being μk, the p-dimensional mean for the k-th Gaussian component.
#   - Sigma: A p-by-p covariance matrix   shared by all G components;
#   - loglik: A number equal to sum_over_N(log(sum_over_G(pk*N(x;mu,sigma))))

def myEM(data, G, mu, sigma, p, itmax):

    likelihood = np.zeros((itmax,))

    for j in range(itmax):
        
        CondProb = Estep(data, mu, sigma, p, G)
        mu, sigma, p = Mstep(data,CondProb,G)
        likelihood[j] = loglik(data, mu, sigma, p, G, CondProb)

    return mu, sigma, p, likelihood


def Estep(data, mu, sigma, p, G):

    n = data.shape[0]
    d = data.shape[1]
    sigma_invers = np.linalg.inv(sigma)
    sigma_determ = np.linalg.det(sigma)

    exponent = [] # np.zeros((n,1))
    gaussian_prob = np.zeros((n,G))
    CondProb = np.zeros((n,G))  # an nxG matrix, where each element represents a conditional probability
    factor = math.sqrt( ( (2*math.pi)**d ) * sigma_determ )
    Prob_ik = np.zeros((n,G))

    for i in range(G):
        # Compute the PDF of the multivariate normal distribution
        mu_i = mu[:,i]
        p_i = p[i]
        dis_i = data - mu_i
        Prob_matrix = (dis_i @ sigma_invers) @ dis_i.T
        Diag = np.array([Prob_matrix[i][i] for i in range(n)])
        # gaussian_prob = p_i/factor * np.exp(-0.5*Diag)
        gaussian_prob[:,i] = p_i/factor * np.exp(-0.5*Diag)
    
    gaussiam_sum = np.sum(gaussian_prob,axis = 1).reshape(n,1)
    CondProb = gaussian_prob/gaussiam_sum

    return CondProb


def Mstep(data,CondProb,G):
    n = data.shape[0]
    d = data.shape[1]

    sum_CondProb = np.sum( CondProb, axis = 0) 
    updated_p = sum_CondProb/n

    updated_mu = np.zeros((d,G))
    updated_sigma = np.zeros((d,d))

    for i in range(G):
        mu_i = []
        dis_i = []
        sigma_i = []
        CondProb_i = []
        
        CondProb_i = CondProb[:,i].reshape(n,1)
        
        mu_i = np.sum((CondProb_i * data),axis = 0)/np.sum(CondProb_i)
        updated_mu[:,i] = mu_i

        dis_i = np.sqrt(CondProb_i) * (data - mu_i)
        sigma_i = np.dot(dis_i.T,dis_i)/np.sum(CondProb_i)
        
        updated_sigma = updated_sigma + updated_p[i]*sigma_i
        
    return updated_mu, updated_sigma, updated_p


def loglik(data, mu, sigma, p, G, CondProb):

    n = data.shape[0]
    d = data.shape[1]
    sigma_invers = np.linalg.inv(sigma)
    sigma_determ = np.linalg.det(sigma)
    factor = math.sqrt( ( (2*math.pi)**d ) * sigma_determ )

    Prob_matrix = np.zeros((n,n))
    gaussian_prob = np.zeros((n,G))
    # Prob_ik = np.zeros((n,G))   

    for i in range(G):
        mu_i = mu[:,i]
        p_i = p[i]
        dis_i = data - mu_i
        CondProb_i = CondProb[:,i]
        
        Prob_matrix = (dis_i @ sigma_invers) @ dis_i.T
        Diag = np.array([Prob_matrix[i][i] for i in range(n)])
        gaussian_prob[:,i] = p_i/factor * np.exp(-0.5*Diag)
        # Prob_ik[:, i] = CondProb_i * np.log( gaussian_prob)
    
    g = np.sum( np.log( np.sum(gaussian_prob,axis = 1) ))
    
    return g


def Init(G,data):
    
    n = data.shape[0]
    dimX = data.shape[1]
    mu = np.zeros((dimX, G))
    sigma = np.zeros((dimX, dimX))
    p = np.zeros((G,))

    if G == 2:
        p[0] = 10/n
        p[1] = 1-p[0]
        mu[:,0] = np.mean(data[0:10,:], axis = 0)
        mu[:,1] = np.mean(data[10:,:], axis = 0)
        dis_0 = data[0:10,:] - mu[:,0]
        dis_1 = data[10:,:] - mu[:,1]
        sigma = (np.dot(dis_0.T,dis_0) + np.dot(dis_1.T,dis_1))/n
        
    elif G == 3:
        p[0] = 10/n
        p[1] = 20/n
        p[2] = 1-p[0]-p[1]
        mu[:,0] = np.mean(data[0:10,:], axis = 0)
        mu[:,1] = np.mean(data[10:30,:], axis = 0)
        mu[:,2] = np.mean(data[30:,:], axis = 0)
        dis_0 = data[0:10,:] - mu[:,0]
        dis_1 = data[10:30,:] - mu[:,1]
        dis_2 = data[30:,:] - mu[:,2]
        sigma = (np.dot(dis_0.T,dis_0) + np.dot(dis_1.T,dis_1) + np.dot(dis_2.T,dis_2))/n

    else:
        print('Error: please enter G=2 or G=3 ')
    
    # print(sigma.shape)
    if sigma.shape[0] != dimX:
        print('Error: wrong array dimension!')
    
    return mu, sigma, p

In [49]:
url = 'https://liangfgithub.github.io/Data/faithful.dat'
response = requests.get(url)
if response.status_code == 200:
    data = response.text

    lines = data.split('\n')[1:]
    data_list = []

    for line in lines:
        values = line.split()  # Assuming columns are separated by spaces
        if len(values) == 3:
            data_list.append([float(val) for val in values])

    # Convert the list of lists to a NumPy array
    data_array = np.array(data_list)
    print(data_array.shape)

else:
    print("Failed to download the data.")


(272, 3)


In [59]:
X = data_array[:,1:]

G = 2
itmax = 20

mu, sigma, p = Init(G,X)
mu_out, sigma_out, p_out, likelihood_out  = myEM(X, G, mu, sigma, p, itmax)

print(p_out)
print(mu_out)
print(sigma_out)

print(likelihood_out[itmax-1])



[0.04297883 0.95702117]
[[ 3.49564188  3.48743016]
 [76.79789154 70.63205853]]
[[  1.29793612  13.92433626]
 [ 13.92433626 182.58009247]]
-1289.5693549424104


In [60]:
G3 = 3
itmax_g3 = 20

mu_g3, sigma_g3, p_g3 = Init(G3,X)

mu_g3_out, sigma_g3_out, p_g3_out, likelihood_g3_out  = myEM(X, G3, mu_g3, sigma_g3, p_g3, itmax_g3)

print(p_g3_out)
print(mu_g3_out)
print(sigma_g3_out)

print(likelihood_g3_out[itmax-1])

[0.04363422 0.07718656 0.87917922]
[[ 3.51006918  2.81616674  3.54564083]
 [77.10563811 63.35752634 71.25084801]]
[[  1.26015772  13.51153756]
 [ 13.51153756 177.96419105]]
-1289.350958862739


Part II

In [2]:
def forward(obs,A,B,w):
    T = len(obs)
    mz = A.shape[0]
    alpha = np.zeros([T,mz])
    alpha[0] = w*B[:,obs[0]]

    # alpha_test = np.zeros([T,mz])
    # alpha_test[0] = w*B[:,obs[0]]
    for t in range(1,T):
        # alpha[t] = np.sum(alpha[t-1]*A.T,axis = 1)*B[:,obs[t]]
        alpha[t] = np.dot(alpha[t-1],A)*B[:,obs[t]]
        # for i in range(mz):
        #     for j in range(mz):
        #         alpha_test[t,i] += alpha_test[t-1,j]*A[j,i]*B[i,obs[t]]
    # print(np.max(alpha-alpha_test))
    return alpha

def backward(obs,A,B):
    T = len(obs)
    mz = A.shape[0]
    beta = np.zeros([T,mz])
    beta[-1] = 1

    # beta_test = np.zeros_like(beta)
    # beta_test[-1] = 1
    for t in range(T-2,-1,-1):
        beta[t] = np.sum(A*(B[:,obs[t+1]]*beta[t+1]),axis = 1)
    #     for i in range(mz):
    #         for j in range(mz):
    #             beta_test[t,i] += A[i,j]*B[j,obs[t+1]]*beta[t+1,j]
    # print(beta==beta_test)
    return beta

def BW_onestep(obs,parameter):
    A,B,w = parameter
    T = len(obs)

    mz = A.shape[0]
    mx = B.shape[1]

    alpha = forward(obs,A,B,w)
    beta = backward(obs,A,B)
    myGamma = np.empty([mz,mz,T-1])
    for t in range(T-1):
        for i in range(mz):
            for j in range(mz):
                myGamma[i,j,t] = alpha[t,i]*A[i,j]*B[j,obs[t+1]]*beta[t+1,j]
    Gamma = np.empty([mz,T])
    Gamma[:,:-1] = np.sum(myGamma,axis=1)
    Gamma[:,-1] = np.sum(myGamma[:,:,-1],axis = 0)
    A_new_numerator = np.sum(myGamma,axis = -1)
    A_new_denominator = np.sum(np.sum(myGamma,axis = -1),axis = -1)[:,np.newaxis]
    A_new = A_new_numerator/A_new_denominator

    B_denominator = np.sum(Gamma,axis = -1)
    B_new = np.zeros_like(B)
    for l in range(mx):
        index = np.where(obs[:-1] == l)[0]
        # print(index)
        # a = np.sum(Gamma[:,index],axis = -1)
        B_new[:,l] = np.sum(Gamma[:,index],axis = -1)/B_denominator
    # w_new = Gamma[:,0]
    return A_new,B_new

def myViterbi(obs,mx,mz,A,B,w):
    T = len(obs)
    Delta = np.zeros([T,mz])
    Delta[0,:] = w*B[:,obs[0]]
    
    for t in range(1,T):
        Delta[t] = np.max(Delta[t-1][:,np.newaxis]*A, axis = 0)*B[:,obs[t]]
    
    Z = np.zeros(T).astype(int)
    Z[-1] = np.argmax(Delta[-1])
    for t in range(T-2,-1,-1):
        Z[t] = np.argmax(Delta[t]*A[:,Z[t+1]])
    return Z
    




In [3]:
test1 = np.loadtxt('./coding4_part2_data.txt').astype(int)-1
w = np.array([0.5,0.5]).T
A = np.ones((2,2))*0.5
B = np.array([[1/9,3/9,5/9],[1/6,2/6,3/6]])
A_true = np.array([[0.49793938,0.50206062],[0.44883431, 0.55116569]])
B_true = np.array([[0.22159897,0.20266127,0.57573976],[0.34175148,0.17866665,0.47958186]])
mx = 3
mz = 2
for i in range(100):
    A_new,B_new= BW_onestep(test1,(A,B,w))
    A = A_new
    B = B_new



In [5]:
np.set_printoptions(precision=8)
print(A)

[[0.4978    0.5022   ]
 [0.4486599 0.5513401]]


In [6]:
print(np.linalg.norm(A-A_true))

0.00031574270530074146


In [7]:
print(B)

[[0.22167819 0.19702881 0.57595216]
 [0.3416422  0.17424007 0.47942261]]


In [8]:
print(np.linalg.norm(B-B_true))

0.007169930753910339


A and B are close to their true value but not exactly the same. We suppose it is because of truncation and part of A and B can not be displayed with 8 digits.

In [142]:
# Initialize an empty list to hold all values
data = []

# Open the file for reading
with open('./coding4_part2_Z.txt', 'r') as file:
    # Iterate over each line in the file
    for line in file:
        # Strip leading/trailing whitespace and split the line into columns
        # Convert each column value to int and extend the data list
        data.extend([int(value) for value in line.strip().split()])

# 'data' is now a one-dimensional list with integers
data_Z = np.array(data)


# A = np.array([[0.49793938,0.50206062],[0.44883431, 0.55116569]])
# B = np.array([[0.22159897,0.20266127,0.57573976],[0.34175148,0.17866665,0.47958186]])
# w = np.array([0.5,0.5]).T
# A = np.ones((2,2))*0.5
# B = np.array([[1/9,3/9,5/9],[1/6,2/6,3/6]])
a = myViterbi(test1,mx,mz,A,B,w)+1

In [143]:
data_Z == a

array([ True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,

No matter we try A and B from the code or the true value, all the hidden layer matches. 