$$
\newcommand{\R}{\mathbb{R}}
\newcommand{\v}[1]{\textbf{#1}}
\newcommand{\T}{^\top}
\DeclareMathOperator*{\argmax}{arg\,max}
\DeclareMathOperator*{\argmin}{arg\,min}
\DeclareMathOperator{\Tr}{Tr}
$$

### $$
    HMM = \left\{ 
        S, O, A_0 \in [0,1]^S, A \in [0,1]^{S \times S}, Q \in [0,1]^{S \times O} 
    \right\}
$$
---
### Three Main Problems

<h4> 1. Likelihood: &emsp; $\Pr(o_1, ..., o_T) = ?$ 
</h4>

<h4> 2. Decoding (state prediction): &emsp;  $\argmax_{\v s} \left\{ \Pr( \v o \mid \v s) \right\} \quad$ where $\v o, \v s \in \R^T$
</h4>

<h4> 3. Learning: &emsp;  Infer $(A, Q)$ &ensp; given a training set &ensp; $\{\v o^{(i)}\}_{i=1..N}$
</h4>

---

Imagine we are at a casino, and the dealer switches to a positively biased coin with probability $0.1$ and a negatively biased coin with probability $0.1$.
Every turn, sher returns to the fair coin with probability .2, and remains with the biased w.p. .8.

The biased coins are 60-40 / 40-60.

In [1]:
import numpy as np

class HMM:
    
    def __init__(self, A_0, A, Q):
        self.A_0 = A_0
        self.A = A
        self.Q = Q
        
    def pr_state_trans(self, s_seq):
        T = len(s_seq)
        s_tran_probs = np.zeros(T)
        s_tran_probs[0] = self.A_0[s_seq[0]]
        s_tran_probs[1:] = self.A[s_seq[:-1], s_seq[1:]]
        return s_tran_probs

    def pr_emissions(self, o_seq, s_seq):
        return self.Q[s_seq, o_seq]
    
    def pr_obs(self, o_seq, s_seq):
        s_tran_probs = self.pr_state_trans(s_seq)
        emission_probs = self.pr_emissions(o_seq, s_seq)
        return s_tran_probs.prod() * emission_probs.prod()
        
def gen_HMM(p_bias=.95):
    A = np.array([
        [.4, .3, .3],
        [.2, .8, 0],
        [.2, 0, .8]
    ])
    A_0 = np.ones(3)/3.
    Q = np.array([
        [.5, .5],
        [p_bias, 1 - p_bias],
        [1-p_bias, p_bias]
    ])
    return HMM(A_0, A, Q)

def sample_obs_seq(hmm, T=10):
    A_0, A, Q = hmm.A_0, hmm.A, hmm.Q
    
    def sample_multinoulli(probs):
        return np.argmax(np.random.multinomial(1, probs))

    s_seq = np.ones(T, dtype=int)*-1
    o_seq = np.ones(T, dtype=int)*-1
    s_seq[0] = sample_multinoulli(hmm.A_0)
    for t in range(T):
        curr_s = s_seq[t]
        o_seq[t] = sample_multinoulli(hmm.Q[curr_s])
        if t < T-1:
            next_s = sample_multinoulli(hmm.A[curr_s])
            s_seq[t+1] = next_s

    return o_seq, s_seq

hmm = gen_HMM(p_bias=.95)

o_seq, s_seq = sample_obs_seq(hmm, T=10)
np.set_printoptions(linewidth=200, precision=2)
print(s_seq)
print(o_seq)
print(hmm.pr_state_trans(s_seq))
print(hmm.pr_emissions(o_seq, s_seq))
hmm.pr_obs(o_seq, s_seq)

[1 1 1 1 1 1 0 0 2 0]
[0 0 0 0 0 0 0 1 1 1]
[0.33 0.8  0.8  0.8  0.8  0.8  0.2  0.4  0.3  0.2 ]
[0.95 0.95 0.95 0.95 0.95 0.95 0.5  0.5  0.95 0.5 ]


4.57662330368e-05

## Likelihood

* $
\Pr(\v s) 
    = \Pr(s_1) \prod_{t=2}^T \Pr(s_t \mid s_{t-1})
    = A_0[s_1] \prod_{t=2}^T A[s_{t-1}, s_t]
$


* $
\Pr(\v o \mid \v s) 
    = \prod_{t=1}^T \Pr(o_t \mid s_t)
    = \prod_{t=1}^T Q[s_t, o_t]
$

* $
\Pr(\v o) = \Pr(o_1, ..., o_T) = \sum_{\v s \in S^T} \Pr(\v o \mid \v s)\Pr(\v s)
$

* If $x\in \Delta_S$ is the current state distribution $ x\T A $ gives the next state distribution

### The Forward Algorithm

* <b>Initialization:</b> 

&emsp; &emsp; $
    \Pr(s_1 = s \wedge o_1 ) = A_0[s] Q[s, o_1]
$

* <b>Recursion</b>: 

&emsp; &emsp; $ 
\Pr(s_t = s' \wedge o_1,..,o_t) 
    = \sum_{s \in S} \Pr(s_{t-1}=s \wedge o_1,..,o_{t-1}) \Pr(s \mid s') \Pr(o_t \mid s')
$

&emsp; &emsp; $ 
F[t, s'] 
    = \sum_{s \in S} F[t-1, s] A[s, s'] Q[s', o_t]
$

In [2]:
o_seq, s_seq = sample_obs_seq(hmm, T=30)

print(s_seq)
print(o_seq)

[2 0 1 1 1 0 0 1 1 1 0 0 1 1 0 1 1 1 1 1 1 1 1 1 0 1 1 1 1 1]
[1 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0]


In [3]:
def forward(o_seq, hmm):
    A_0, A, Q = hmm.A_0, hmm.A, hmm.Q
    T = len(o_seq)
    N_s, N_o = Q.shape

    F = np.zeros([T, N_s])

    # Initialization
    F[0, :] = A_0 * Q[range(N_s), o_seq[0]]
    
    # DP
    for t in range(1, T):
        o_t = o_seq[t]
        for s_next in range(N_s):
            q = F[t-1] * A[:, s_next] * Q[s_next, o_t]
            F[t, s_next] = q.sum()

    return F[T-1].sum()
    
forward(o_seq, hmm)

1.1117751440035405e-06

### The Viterbi Algorithm

* <b>Initialization:</b> 

&emsp; &emsp; $
    F[0, s] = A_0[s] Q[s, o_1]
$

&emsp; &emsp; $
    Bp[0, s] = -1
$

* <b>Recursion</b>: 

&emsp; &emsp; $ 
    F[t, s'] = \max_{s \in S} \{ F[t-1, s] A[s, s'] Q[s', o_t] \}
$

&emsp; &emsp; $ 
    Bp[t, s'] = \argmax_{s \in S} \{"\}
$

In [4]:
def viterbi(o_seq, hmm):
    A_0, A, Q = hmm.A_0, hmm.A, hmm.Q
    T = len(o_seq)
    N_s, N_o = Q.shape

    F = np.zeros([T, N_s])
    Bp = np.zeros([T, N_s], dtype=int)
    
    # Initialization
    F[0, :] = A_0 * Q[range(N_s), o_seq[0]]
    Bp[0, :] = -1
    
    # DP
    for t in range(1, T):
        o_t = o_seq[t]
        for s_next in range(N_s):
            q = F[t-1] * A[:, s_next] * Q[s_next, o_t]
            Bp[t, s_next] = q.argmax()
            F[t, s_next] = q.max()
    
    best_path = [F[T-1].argmax()]
    for t in range(T-1, 0, -1):
        s_prev = Bp[t, best_path[-1]]
        best_path.append(s_prev)
    best_path = np.flip(best_path)
    
    max_prob = F[T-1].max()
    return F, Bp, best_path, max_prob

F, Bp, best_path, max_prob = viterbi(o_seq, hmm)

print(f'PRED: {best_path}')
print(f'TRUE: {s_seq}')
print(f'OBS : {o_seq}')

PRED: [0 1 1 1 1 1 1 1 1 1 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1]
TRUE: [2 0 1 1 1 0 0 1 1 1 0 0 1 1 0 1 1 1 1 1 1 1 1 1 0 1 1 1 1 1]
OBS : [1 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0]
