netids: nalladi2, davyji2, elcook3

We all got on shared screen calls and worked on all parts together

In [1]:
import math
import numpy as np

In [2]:
def mvgpdf(X, means, cov_mat, p, G):
    # multivariate gaussian PDF
    probs = np.zeros((X.shape[0], G)) # n x g shape
    for k, mu in enumerate(means): # exponent term
        probs[:,k] = np.sum((np.linalg.inv(cov_mat) @ (X - mu).T) * (X - mu).T, axis=0)
    probs = np.exp(probs * -0.5)
    probs /= math.sqrt(np.linalg.det(cov_mat) * (2 * math.pi) ** p)
    return probs

In [3]:
def loglik(X, mixture_weights, means, cov_mat, p, G):
    probs = mvgpdf(X, means, cov_mat, p, G)
    probs *= mixture_weights
    probs = probs.sum(1)
    return np.sum(np.log(probs))

In [4]:
def Estep(X, mixture_weights, means, cov_mat, p, G):
    probs = mvgpdf(X, means, cov_mat, p, G)
    probs *= mixture_weights
    return (probs / probs.sum(axis=1, keepdims=True))

In [5]:
def Mstep(X, probs, p, G):
    mixture_weights = np.mean(probs, axis=0)
    means = np.zeros((G, p))
    cov_mat_num = np.zeros((p, p))
    cov_mat_denom = 0
    for k in range(G):
        means[k] = np.sum((X.T * probs[:,k]).T, 0) / sum(probs[:,k])
        tmp = X - means[k]
        cov_mat_num += tmp.T @ np.diag(probs[:,k]) @ tmp
        cov_mat_denom += np.sum(probs[:,k])
    cov_mat = cov_mat_num / cov_mat_denom
    return mixture_weights, means, cov_mat

In [6]:
def myEM(data, G, mixture_weights, means, cov_mat, p, itmax):
    mixture_weights_ = mixture_weights 
    means_ = means
    cov_mat_ = cov_mat
    for i in range(itmax):
        r_ = Estep(data, mixture_weights_, means_, cov_mat_, p, G)
        mixture_weights_, means_, cov_mat_ = Mstep(data, r_, p, G)
    return mixture_weights_, means_.T, cov_mat_, loglik(data, mixture_weights_, means_, cov_mat_, p, G)

In [7]:
faithful = open('faithful.dat')
temp = []
for row in faithful:
    temp.append(row.split()[1:])
temp = temp[1:]
for i in range(len(temp)):
    for j in range(len(temp[i])):
        temp[i][j] = float(temp[i][j])
faithful = np.array(temp)

In [8]:
## Testing G = 2
itmax = 20

n = len(faithful)
X = faithful
p = 2
G = 2

p1 = 10/n
p2 = 1 - p1
mixture_weights = [p1, p2]

mu1 = X[:10].mean(0)
mu2 = X[10:].mean(0)
means = np.array([mu1, mu2])

cov_mat_L = sum([(X[l] - mu1).reshape((2, 1)) @ (X[l] - mu1).reshape((1, 2)) for l in range(10)])
cov_mat_R = sum([(X[l] - mu2).reshape((2, 1)) @ (X[l] - mu2).reshape((1, 2)) for l in range(10, len(X))])
cov_mat = (cov_mat_L + cov_mat_R) / n

In [9]:
prob, mean, Sigma, loglik_val = myEM(X, G, mixture_weights, means, cov_mat, p, 20)

print("prob")
print(prob)

print("mean")
print(mean)

print("Sigma")
print(Sigma)

print("loglik")
print(loglik_val)

prob
[0.04297883 0.95702117]
mean
[[ 3.49564188  3.48743016]
 [76.79789154 70.63205853]]
Sigma
[[  1.29793612  13.92433626]
 [ 13.92433626 182.58009247]]
loglik
-1289.5693549424104


In [10]:
## Testing G = 3
G = 3

p2 = 20/n
p3 = 1 - p1 - p2
mixture_weights = [p1, p2, p3]

mu2 = X[10:30].mean(0)
mu3 = X[30:].mean(0)
means = np.array([mu1, mu2, mu3])

cov_mat_L = sum([(X[l] - mu1).reshape((2, 1)) @ (X[l] - mu1).reshape((1, 2)) for l in range(10)])
cov_mat_R = sum([(X[l] - mu2).reshape((2, 1)) @ (X[l] - mu2).reshape((1, 2)) for l in range(10, 30)])
cov_mat_3 = sum([(X[l] - mu3).reshape((2, 1)) @ (X[l] - mu3).reshape((1, 2)) for l in range(30, len(X))])
cov_mat = (cov_mat_L + cov_mat_R + cov_mat_3) / n

In [11]:
prob, mean, Sigma, loglik_val = myEM(X, G, mixture_weights, means, cov_mat, p, itmax)
print("prob")
print(prob)

print("mean")
print(mean)

print("Sigma")
print(Sigma)

print("loglik")
print(loglik_val)

prob
[0.04363422 0.07718656 0.87917922]
mean
[[ 3.51006918  2.81616674  3.54564083]
 [77.10563811 63.35752634 71.25084801]]
Sigma
[[  1.26015772  13.51153756]
 [ 13.51153756 177.96419105]]
loglik
-1289.350958862739


## Part 2

### Baum-Welch

In [12]:
#BW
def forward(t,i,x, mz,w,A,B,alphas):
    if alphas[t,i] != 0:
        return alphas[t,i]
    elif t == 0:#changed from 1
        return w[i]*B[i,x[0]]
    else:
        s=0
        for j in range(mz):
            s+=forward(t-1,j,x, mz,w,A,B,alphas)*A[j,i]*B[i,x[t]]
        return s
        

In [13]:
def backward(t,i,x, mx, mz,w,A,B,betas):
    n = len(x)
    if betas[t,i] !=0:
        return betas[t,i]
    elif t == n-1:#changed from n
        return 1
    else:
        s = 0
        for j in range(mz):
            s+=A[i,j]*B[j,x[t+1]]*backward(t+1,j,x, mx, mz,w,A,B,betas)#changed bxt
        return s

In [14]:
def biggamma(t,i,j,x, mx,mz,w,A,B,alphas,betas):
    return forward(t,i,x, mz,w,A,B,alphas)*A[i,j]*B[j,x[t+1]]*backward(t+1,j,x, mx, mz,w,A,B,betas)

In [15]:
def BW_onestep(data, mx, mz,w,A,B):
    #Estep 
    n = len(data)
    T = len(data)
    #calculate the alphas and betas
    oldA = A.copy()
    alphas = np.zeros((n,mz))
    for t in range(n):
        for i in range(mz):
            alphas[t,i] = forward(t,i,data, mz,w,A,B, alphas)
    betas = np.zeros((n, mz))
    for t in range(n-1,-1,-1):
        for j in range(mz):
            betas[t,j] = backward(t,j,data,mx, mz,w,A,B,betas)

    gamma = np.zeros((mz,mz,T-1))
    for i in range(mz):
        for j in range(mz):
            for t in range(T-1):
                gamma[i][j][t] = biggamma(t,i,j,data, mx, mz,w,A,B,alphas,betas)
    
    for i in range(mz):
        den = np.sum(gamma[i,:,:])
        for j in range(mz):
            num = np.sum(gamma[i,j,:])
            A[i][j] = num/den
    
    margiGamma = np.zeros((T,mz))
    for t in range(T):
        for i in range(mz):
            if t==0:#changed from 1
                s = 0
                for j in range(mz):
                    s+=gamma[i,j,t]#changed from 1
                margiGamma[t,i] = s
            elif t == T-1:#changed from len(x)
                s = 0
                for j in range(mz):
                    s+=gamma[j,i,t-1]
                margiGamma[t,i] = s
            else:
                s = 0
                for j in range(mz):
                    #print(t,i,j)
                    s+=gamma[i,j,t]
                margiGamma[t,i] = s
    for i in range(mz):
        den = 0
        for t in range(T):
            den += margiGamma[t,i]
        for l in range(mx):
            num = 0
            for t in range(T):
                if data[t] == l:
                    num+=margiGamma[t,i]
            B[i,l] = num/den
            #print(num,den)
            
    return(A,B)

In [16]:
def myBW(data, mx, mz,w,A,B, loop):
    for i in range(loop):
        A,B = BW_onestep(data, mx, mz,w,A,B)
    return (A,B)

In [17]:
#test data
part2 = open('coding4_part2_data.txt')
temp = []
for row in part2:
    temp.append(row.split())
for i in range(len(temp)):
    temp[i] = int(temp[i][0])-1#changed because it was one indexed
part2 = np.array(temp)
w = np.array([.5,.5])
A = np.array([[.5,.5],
              [.5,.5]])
B = np.array([[1/9,3/9,5/9],
              [1/6,2/6,3/6]])
mz = 2
mx=3

In [18]:
A,B = myBW(part2, mx, mz,w,A,B, 100)
print("A")
print(A)
print("B")
print(B)

A
[[0.49793938 0.50206062]
 [0.44883431 0.55116569]]
B
[[0.22159897 0.20266127 0.57573976]
 [0.34175148 0.17866665 0.47958186]]


### Viterbi

In [19]:
def viterbi(x, A, B, w):
    K = A.shape[0]
    T = len(x)
    
    delta = np.zeros((K, T))
    T2 = np.zeros((K, T))

    # Use w to have initial values in matrix
    delta[:, 0] = w * B[:, x[0]]
    T2[:, 0] = 0

    # Go through observations and edit the matrices
    for i in range(1, T):
        delta[:, i] = np.max(delta[:, i - 1] * A.T * B[np.newaxis, :, x[i]].T, 1)
        T2[:, i] = np.argmax(delta[:, i - 1] * A.T, 1)

    # Build the output, optimal model trajectory
    z = np.empty(T, 'B')
    z[-1] = np.argmax(delta[:, T - 1])
    for i in reversed(range(1, T)):
        z[i - 1] = T2[z[i], i]

    return z

In [20]:
temp = []
part2_z = open('Coding4_part2_Z.txt', 'r')
for line in part2_z.readlines():
    for num in line:
        if num.isdigit() == True:
            temp.append(int(num))
        
part2_zseq = np.array(temp)
# Generated sequence using Viterbi:
print(part2_zseq)

[1 1 1 1 1 1 1 2 1 1 1 1 1 2 2 1 1 1 1 1 1 1 2 2 2 2 2 1 1 1 1 1 1 1 2 1 1
 1 1 1 1 1 1 2 2 1 1 1 1 1 1 2 2 2 1 1 1 1 2 2 2 2 1 1 1 1 1 1 1 1 2 2 2 2
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 2 1 1 1 1 1 1 1 1 1 1 1 1 1 2 1
 1 1 1 2 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2
 2 2 2 1 1 1 2 2 2 2 2 2 1 1 1 1 1 2 2 2 2 2 2 2 2 2 1 1 1 2 2 2 1 1 1 1 1
 1 1 1 2 2 2 2 2 1 1 1 1 1 1 1]


In [21]:
z_calc = viterbi(part2, A, B, w)
# Print the calculated Z sequence (+1 because of indexing)
print(z_calc + 1)

[1 1 1 1 1 1 1 2 1 1 1 1 1 2 2 1 1 1 1 1 1 1 2 2 2 2 2 1 1 1 1 1 1 1 2 1 1
 1 1 1 1 1 1 2 2 1 1 1 1 1 1 2 2 2 1 1 1 1 2 2 2 2 1 1 1 1 1 1 1 1 2 2 2 2
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 2 1 1 1 1 1 1 1 1 1 1 1 1 1 2 1
 1 1 1 2 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2
 2 2 2 1 1 1 2 2 2 2 2 2 1 1 1 1 1 2 2 2 2 2 2 2 2 2 1 1 1 2 2 2 1 1 1 1 1
 1 1 1 2 2 2 2 2 1 1 1 1 1 1 1]


In [22]:
# Compare generated sequence with given sequence. 1.0 means complete accuracy
print( np.sum((part2_zseq - 1) == z_calc) / part2_zseq.size )

1.0
