In [None]:
import numpy as np
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats

In [None]:
# 5 state Markov chain, S1 to S5
# state at time t = qt
# P[qt = Si | qt-1=S1...] = P[qt = Si | qt-1 = Si]
# state transition probabilities A
# a_ij = P[qt = Sj | qt-1 = Si], 1 <= i; j <= N
#  a_ij >= 0
#  sum(j to N) a_ij = 1

# Weather example: S1 = rain (snow), S2 = cloudy, S3 = sunny

# transition probs
a = np.array([[0.4, 0.3, 0.3], [0.2, 0.6, 0.2], [0.1, 0.1, 0.8]])
# Given weather on day 1 (t = 1) is sunny (S3),
# probability of sun-sun-rain-rain-sun-cloudy-sun; i.e.,
# O = {S3, S3, S3, S1, S1, S3, S2, S3}
# P(O|model)
o = [2, 2, 2, 0, 0, 2, 1, 2]
p = 1
for prev_o, curr_o in zip(o[:-1], o[1:]):
    p *= a[prev_o, curr_o]
p

In [None]:
# What is the prob. it stays in the current state for `d` days?
# I.e., O = [Si] * d + [Sj]
i = 1
d = 10
pi_d = a[i, i] ** (d -1) * (1 - a[i, i])
# Expected number of days it'll stay the current state
expect_di = 1 / (1 - a[i, i])
pi_d, expect_di

## HMM definition

- $N$: number of states in the model (e.g., # of urns)
- $S=S_{1...N}$: the states in the model
- $q_t$: the state at time $t$
- $M$: the number of observation symbols per state (i.e., alphabet size)
- $V=v_{1...M}$: the observation symbols
- $A=\{a_{ij}\}; a_{ij} = P[q_{t+1} = S_j|q_t = S_i]$: state transition probability distribution
- $B=\{b_j(k)\}; b_j(k) = P[v_k\text{ at }t|q_t = S_j]$: observation symbol probability distribution in state $j$
- $\pi=\{\pi_i\}; \pi_i = P[q_1 = S_i]$: initial state distribution

In [None]:
# Wikipedia examples

states = ('Healthy', 'Fever')
 
observations = ('normal', 'cold', 'dizzy')
 
start_probability = {'Healthy': 0.6, 'Fever': 0.4}
 
transition_probability = {
   'Healthy' : {'Healthy': 0.7, 'Fever': 0.3},
   'Fever' : {'Healthy': 0.4, 'Fever': 0.6},
   }
 
emission_probability = {
   'Healthy' : {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
   'Fever' : {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6},
}

def fwd_bkw(x, states, a_0, a, e):
    L = len(x)
 
    fwd = []
    f_prev = {}
    # forward part of the algorithm
    for i, x_i in enumerate(x):
        f_curr = {}
        for st in states:
            if i == 0:
                # base case for the forward part
                prev_f_sum = a_0[st]
            else:
                prev_f_sum = sum(f_prev[k]*a[k][st] for k in states)
 
            f_curr[st] = e[st][x_i] * prev_f_sum
 
        fwd.append(f_curr)
        f_prev = f_curr
    
    end_st = 'Fever'
    p_fwd = sum(f_curr[k]*a[k][end_st] for k in states)
 
    bkw = []
    b_prev = {}
    # backward part of the algorithm
    for i, x_i_plus in enumerate(reversed(x[1:]+(None,))):
        b_curr = {}
        for st in states:
            if i == 0:
                # base case for backward part
                b_curr[st] = a[st][end_st]
            else:
                b_curr[st] = sum(a[st][l]*e[l][x_i_plus]*b_prev[l] for l in states)
 
        bkw.insert(0,b_curr)
        b_prev = b_curr
 
    p_bkw = sum(a_0[l] * e[l][x[0]] * b_curr[l] for l in states)
 
    # merging the two parts
    posterior = []
    for i in range(L):
        posterior.append({st: fwd[i][st]*bkw[i][st]/p_fwd for st in states})
 
    #assert p_fwd == p_bkw
    return fwd, bkw, posterior

def example():
    return fwd_bkw(observations,
                   states,
                   start_probability,
                   transition_probability,
                   emission_probability)

print "--- fwd-bkwd-posterior"
for line in example():
    print(' '.join(map(str, line)))

def viterbi(obs, states, start_p, trans_p, emit_p):
    V = [{}]
    path = {}
    
    # Initialize base cases (t == 0)
    for y in states:
        V[0][y] = start_p[y] * emit_p[y][obs[0]]
        path[y] = [y]
    
    # Run Viterbi for t > 0
    for t in range(1, len(obs)):
        V.append({})
        newpath = {}

        for y in states:
            (prob, state) = max((V[t-1][y0] * trans_p[y0][y] * emit_p[y][obs[t]], y0) for y0 in states)
            V[t][y] = prob
            newpath[y] = path[state] + [y]

        # Don't need to remember the old paths
        path = newpath
    
    # Return the most likely sequence over the given time frame
    n = len(obs) - 1
    print_dptable(V)
    (prob, state) = max((V[n][y], y) for y in states)
    return (prob, path[state])

def print_dptable(V):
    s = "    " + " ".join(("%7d" % i) for i in range(len(V))) + "\n"
    for y in V[0]:
        s += "%.5s: " % y
        s += " ".join("%.7s" % ("%f" % v[y]) for v in V)
        s += "\n"
    print(s)

print "--- viterbi"
def example():
    return viterbi(observations,
                   states,
                   start_probability,
                   transition_probability,
                   emission_probability)
print(example())

In [None]:
def sample_pdf(pdf, size=1):
    states = np.arange(pdf.size)
    dist = stats.rv_discrete(name='pdf', values=(states, pdf))
    return dist.rvs(size)

def observe(n_t, a, b, pi):
    q_t = sample_pdf(pi)  # initial state
    os = []
    qs = [q_t]
    for t in xrange(n_t):
        o_t = sample_pdf(b[q_t])
        q_t = sample_pdf(a[q_t])
        os.append(o_t)
        qs.append(q_t)


class HMM(object):
    def __init__(self, a, b, pi):
        assert a.shape[0] == a.shape[1]
        assert b.shape[0] == a.shape[0]
        assert pi.size == a.shape[0]
        self.a = a
        self.b = b
        self.pi = pi
        self.N = self.a.shape[0]
        self.M = self.b.shape[1]
        self.states = np.arange(self.N, dtype=int)

    def observe(self, n_t):
        return observe(n_t, self.a, self.b, self.pi)

    def forward(self, obs):
        # forward variable;
        # alpha_t(i) = P(past O sequence, qt = Si | hmm)
        f_prev = self.b.T[obs[0]] * self.pi
        fwd = [[f_prev]]
        for o_i in obs[1:]:
            f_curr = np.zeros(self.N)
            prev_f_sum = np.sum(f_prev * self.a.T, axis=1)
            f_curr = self.b.T[o_i] * prev_f_sum
            fwd.append([f_curr])
            f_prev = f_curr
        return np.concatenate(fwd, axis=0)

    def backward(self, obs):
        # backward variable;
        # beta_t(i) = P(future O sequence|qt = Si, hmm)
        b_prev = np.ones(self.N)
        bkw = [[b_prev]]
        for i, o_i_plus in enumerate(reversed(obs[1:])):
            b_curr = np.sum(a * self.b.T[o_i_plus] * b_prev, axis=1)
            bkw.insert(0, [b_curr])
            b_prev = b_curr
        return np.concatenate(bkw, axis=0)

    def obs_probability(self, obs):
        """Computed with the forward-backward procedure."""
        fwd = self.forward(obs)
        return np.sum(fwd[-1])

    def optimal_path(self, obs):
        """Computed with Viterbi algorithm."""
        return self.viterbi(obs)[1]

    def viterbi(self, obs):
        V = [self.pi * self.b.T[obs[0]]]
        path = [[s] for s in self.states]

        for t in xrange(1, len(obs)):
            v = V[t-1] * (self.a * self.b.T[obs[t]]).T
            prob, state = np.amax(v, axis=1), np.argmax(v, axis=1)
            V.append(prob)
            path = [path[state[y]] + [y] for y in self.states]
        # Return the most likely sequence
        prob, state = np.amax(V[-1]), np.argmax(V[-1])
        return prob, path[state]

    def update(self, obs, n_iters=1):
        """Update a, b, pi so that `obs` is more likely.

        Uses the Baum-Welch algorithm.
        """
        obs = np.asarray(obs)
        print("Before P(obs) = %f" % self.obs_probability(obs))

        for _ in xrange(n_iters):
            alpha = self.forward(obs)
            beta = self.backward(obs)

            xi = np.zeros((self.N, self.N, len(obs) - 1))
            for t in xrange(len(obs) - 1):
                denom = np.dot(np.dot(alpha[t], self.a) * self.b.T[obs[t+1]],
                               beta[t+1])
                for i in xrange(self.N):
                    numer = (alpha[t, i]
                            * self.a[i]
                            * self.b.T[obs[t+1]]
                            * beta[t+1])
                    xi[i, :, t] = numer / denom
  
            # gamma_t(i) = P(q_t = S_i | O, hmm)
            gamma = np.squeeze(np.sum(xi, axis=1))
            # Need final gamma element for new B
            prod = (alpha[-1] * beta[-1]).reshape((-1,1))
            gamma = np.hstack((gamma,  prod / np.sum(prod))) # append one more to gamma!!!

            new_pi = gamma.T[0]
            new_a = np.sum(xi, axis=2) / np.sum(gamma[:, :-1], axis=1).reshape((-1,1))
            new_b = np.array(b)

            if False:
                plt.figure()
                plt.plot(gamma[1])
                plt.ylim(-0.1,1.1)
                plt.legend(('Probability State=1'))
                plt.xlabel('Time')
            
            n_levels = self.b.shape[1]
            sumgamma = np.sum(gamma, axis=1)
            for lev in xrange(n_levels):
                ix = obs == lev
                new_b.T[lev] = np.sum(gamma[:, ix], axis=1) / sumgamma

            self.pi[...] = new_pi
            self.a[...] = new_a
            self.b[...] = new_b
        print("After P(obs) = %f" % self.obs_probability(obs))

N = 2
M = 3
pi = np.array([0.6, 0.4])
a = np.array([[0.7, 0.3], [0.4, 0.6]])
b = np.array([[0.5, 0.4, 0.1], [0.1, 0.3, 0.6]])
obs = np.array([0, 1, 2])

hmm = HMM(a, b, pi)
fwd = hmm.forward(obs)
bwd = hmm.backward(obs)
posterior = fwd * bwd / np.sum(fwd[-1])
# print fwd
# print bwd
# print posterior
# print hmm.obs_probability([2])
hmm.viterbi(obs)
hmm.update([0, 0, 0], n_iters=20)

In [None]:
print hmm.a
print np.sum(hmm.a, axis=1)
print hmm.b
print np.sum(hmm.b, axis=1)
print hmm.pi
print np.sum(hmm.pi)