In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

In [2]:
init_prob = np.array([.5, .5])
transition_prob = np.array(([0.95, .05], [.05, .95]))
emission_prob = np.array(([1/6, 1/6, 1/6, 1/6, 1/6, 1/6], [1/10, 1/10, 1/10, 1/10, 1/10, 1/2]))
states = np.array([0,1])

In [3]:
data1 = pd.read_csv('~/Documents/StatsMLClass/data/hw11pb1.csv', header=None)
data2 = pd.read_csv('~/Documents/StatsMLClass/data/hw11pb2.csv', header=None)

In [4]:
data1 = data1.values.flatten()
data2 = data2.values.flatten()
data1 = np.asarray(data1)
data2 = np.asarray(data2)

In [5]:
class hidden_markov_model:
    def __init__(self, start_probs, transition_probs, emission_probs, state_set, observations):
        self.start = start_probs
        self.transition = transition_probs
        self.emission = emission_probs
        self.states = state_set
        self.obs = observations
        self.n_states = len(self.states)
        self.t_obs = len(observations)
        self.alpha = np.zeros((self.t_obs, self.n_states))
        self.beta = np.zeros((self.t_obs, self.n_states))
        self.underflow = np.ones(len(self.obs))
    
    def get_gammas(self, alphas, betas, T, N):
        gammas = np.zeros((len(self.obs)-1, self.n_states))
        xs = np.zeros((len(self.obs)-1, self.n_states, self.n_states))
        for t in range(self.t_obs-1):
            xinum = np.array([self.alpha[t]]*2).T * self.transition * np.array([self.beta[t+1]]*2)*np.array([self.emission[:, self.obs[t]-1]]*2)
            xs[t] = xinum/np.sum(xinum)
            gammas[t] = np.sum(xs[t], axis=1)
        
        return gammas, xs
        
    def EM(self, N, M, T, pi, A, B):
        alphas = self.forward()
        betas = self.backward()
        
        gammas, xs = self.get_gammas(alphas, betas, T, N)
        
        new_pi = gammas[0, :]
        
        new_a = np.sum(xs, axis=0)/(np.array([np.sum(gammas, axis=0)]*2).T)
       
        xinds = np.zeros((len(self.obs), len(self.emission[0])))
        xinds[np.arange(len(self.obs)), self.obs-1] = 1
        
        new_b = (gammas.T@xinds[:-1])/np.array([np.sum(gammas, axis=0)]*6).T
        
        return new_pi, new_a, new_b
        
    def baumwelch(self, tolerance, maxiter):
        T = self.t_obs
        N = self.n_states
        M = len(self.states)
        likely = []
        
        pi = np.log(self.start)
        A = np.log(self.transition)
        B = np.log(self.emission)
        
        iter_ = 0
        err = np.inf
        while iter_ < maxiter or err > tolerance:
            iter_ += 1
            if iter_ % 100 == 0:
                print('Iter:', iter_)
            new_pi, new_a, new_b = self.EM(N, M, T, pi, A, B)
            
            err = np.abs(np.linalg.norm(A - new_a) + np.linalg.norm(B - new_b))/2
            
            pi, A, B = new_pi, new_a, new_b
            
        return pi, A, B
    
    def forward(self):
        self.alpha[0] = self.start * self.emission[:, self.obs[0]-1]
        for t in range(1, self.t_obs):
            for k in self.states:
                self.alpha[t, k] = (self.alpha[t-1]@self.transition[k]) * self.emission[k, self.obs[t]-1]
            self.underflow[t] = np.sum(self.alpha[t])
            self.alpha[t] = self.alpha[t]/self.underflow[t]
        return self.alpha
    
    def backward(self):
        self.beta[:, -1:] = 1
        
        for t in range(2, len(self.obs)+1):
            for k in self.states:
                self.beta[-t, k] = np.sum(self.beta[-t+1] * self.transition[k] * self.emission[:, self.obs[-t+1]-1])
            self.beta[-t] = self.beta[-t]/self.underflow[-t]
        return self.beta
    
    def viterbi(self):
        N = self.transition.shape[0]
        delta = np.zeros((self.t_obs, N))
        psi = np.zeros((self.t_obs, N))
        delta[0] = self.start * self.emission[:, self.obs[0]-1]
        for tt in range(1, self.t_obs):
            for j in range(N):
                delta[tt, j] = np.max(delta[tt-1] * self.transition[:, j] * self.emission[j, self.obs[tt]-1])
                psi[tt, j] = np.argmax(delta[tt-1] * self.transition[:, j])
                
        states = np.zeros(self.t_obs)
        states[self.t_obs-1] = np.argmax(np.exp(delta[self.n_states-1]))
        states = states.astype('int')
        for t in range(self.t_obs-2, -1, -1):
            states[t] = psi[t+1, states[t+1]]
        return states
    
    

In [6]:
hmm1 = hidden_markov_model(init_prob, transition_prob, emission_prob, states, data1)

In [7]:
guess = hmm1.viterbi()
empt = np.empty(guess.shape, dtype='str')
f_inds = np.where(guess == 0)
l_inds = np.where(guess==1)
empt[f_inds] = 'F'
empt[l_inds] = 'L'
empt

array(['F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F',
       'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F',
       'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F',
       'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F',
       'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F',
       'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'L', 'L', 'L',
       'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L',
       'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L',
       'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L', 'L',
       'L', 'L', 'L', 'L', 'L', 'L', 'F', 'F', 'F', 'F', 'F', 'F', 'F',
       'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F',
       'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F',
       'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F',
       'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'F

In [8]:
alph = hmm1.forward()

In [9]:
bet = hmm1.backward()

In [10]:
alph[111, 0]/alph[111, 1]

0.07238525813824889

In [11]:
bet[111, 0]/bet[111, 1]

0.21634608629264707

In [13]:
A = np.array([[0.95, 0.05], [0.05, 0.95]])
B = np.array(([[1/6, 1/6, 1/6, 1/6, 1/6, 1/6], [.1, .1, .1, .1, .1, .5]]))
pi = np.array([1/2, 1/2])

In [14]:
hmm2 = hidden_markov_model(pi, A, B, states, data2)

In [15]:
solved_pi, solved_A, solve_B = hmm2.baumwelch(1e-2, 100)

Iter: 100


In [16]:
print('Pi found through Baum Welch:', solved_pi)
print('Transition probabilities found through Baum Welch:', solved_A)
print('Emission probabilities found through Baum Welch:', solve_B)

Pi found through Baum Welch: [0.42671712 0.57328288]
Transition probabilities found through Baum Welch: [[0.99065118 0.00934882]
 [0.16993597 0.83006403]]
Emission probabilities found through Baum Welch: [[0.20145444 0.20746921 0.19321639 0.2003796  0.1272622  0.07021816]
 [0.10984433 0.10203983 0.1023266  0.10441057 0.06631893 0.51505975]]
