# HMM Implementation
- https://medium.com/p/72865bda430e

In [30]:
import numpy as np
import pandas as pd


class ProbabilityVector:
    def __init__(self, probabilities: dict):
        states = probabilities.keys()
        probs  = probabilities.values()
        
        assert len(states) == len(probs), "The probabilities must match the states."
        assert len(states) == len(set(states)), "The states must be unique."
        assert abs(sum(probs) - 1.0) < 1e-12, "Probabilities must sum up to 1."
        assert len(list(filter(lambda x: 0 <= x <= 1, probs))) == len(probs), "Probabilities must be numbers from [0, 1] interval."
        
        self.states = sorted(probabilities)
        self.values = np.array(list(map(lambda x: 
            probabilities[x], self.states))).reshape(1, -1)
        
    @classmethod
    def initialize(cls, states: list):
        size = len(states)
        rand = np.random.rand(size) / (size**2) + 1 / size
        rand /= rand.sum(axis=0)
        return cls(dict(zip(states, rand)))
    
    @classmethod
    def from_numpy(cls, array: np.ndarray, state: list):
        return cls(dict(zip(states, list(array))))

    @property
    def dict(self):
        return {k:v for k, v in zip(self.states, list(self.values.flatten()))}

    @property
    def df(self):
        return pd.DataFrame(self.values, columns=self.states, index=['probability'])

    def __repr__(self):
        return "P({}) = {}.".format(self.states, self.values)

    def __eq__(self, other):
        if not isinstance(other, ProbabilityVector):
            raise NotImplementedError
        if (self.states == other.states) and (self.values == other.values).all():
            return True
        return False

    def __getitem__(self, state: str) -> float:
        if state not in self.states:
            raise ValueError("Requesting unknown probability state from vector.")
        index = self.states.index(state)
        return float(self.values[0, index])

    def __mul__(self, other) -> np.ndarray:
        if isinstance(other, ProbabilityVector):
            return self.values * other.values
        elif isinstance(other, (int, float)):
            return self.values * other
        else:
            NotImplementedError

    def __rmul__(self, other) -> np.ndarray:
        return self.__mul__(other)

    def __matmul__(self, other) -> np.ndarray:
        if isinstance(other, ProbabilityMatrix):
            return self.values @ other.values

    def __truediv__(self, number) -> np.ndarray:
        if not isinstance(number, (int, float)):
            raise NotImplementedError
        x = self.values
        return x / number if number != 0 else x / (number + 1e-12)

    def argmax(self):
        index = self.values.argmax()
        return self.states[index]

In [31]:
class ProbabilityMatrix:
    def __init__(self, prob_vec_dict: dict):
        
        assert len(prob_vec_dict) > 1, "The numebr of input probability vector must be greater than one."
        assert len(set([str(x.states) for x in prob_vec_dict.values()])) == 1, "All internal states of all the vectors must be indentical."
        assert len(prob_vec_dict.keys()) == len(set(prob_vec_dict.keys())), "All observables must be unique."

        self.states      = sorted(prob_vec_dict)
        self.observables = prob_vec_dict[self.states[0]].states
        self.values      = np.stack([prob_vec_dict[x].values \
                           for x in self.states]).squeeze() 

    @classmethod
    def initialize(cls, states: list, observables: list):
        size = len(states)
        rand = np.random.rand(size, len(observables)) \
             / (size**2) + 1 / size
        rand /= rand.sum(axis=1).reshape(-1, 1)
        aggr = [dict(zip(observables, rand[i, :])) for i in range(len(states))]
        pvec = [ProbabilityVector(x) for x in aggr]
        return cls(dict(zip(states, pvec)))

    @classmethod
    def from_numpy(cls, array: 
                  np.ndarray, 
                  states: list, 
                  observables: list):
        p_vecs = [ProbabilityVector(dict(zip(observables, x))) \
                  for x in array]
        return cls(dict(zip(states, p_vecs)))

    @property
    def dict(self):
        return self.df.to_dict()

    @property
    def df(self):
        return pd.DataFrame(self.values, 
               columns=self.observables, index=self.states)

    def __repr__(self):
        return "PM {} states: {} -> obs: {}.".format(
            self.values.shape, self.states, self.observables)

    def __getitem__(self, observable: str) -> np.ndarray:
        if observable not in self.observables:
            raise ValueError("Requesting unknown probability observable from the matrix.")
        index = self.observables.index(observable)
        return self.values[:, index].reshape(-1, 1)

In [32]:
from itertools import product
from functools import reduce


class HiddenMarkovChain:
    def __init__(self, T, E, pi):
        self.T = T  # transmission matrix A
        self.E = E  # emission matrix B
        self.pi = pi
        self.states = pi.states
        self.observables = E.observables
    
    def __repr__(self):
        return "HML states: {} -> observables: {}.".format(
            len(self.states), len(self.observables))
    
    @classmethod
    def initialize(cls, states: list, observables: list):
        T = ProbabilityMatrix.initialize(states, states)
        E = ProbabilityMatrix.initialize(states, observables)
        pi = ProbabilityVector.initialize(states)
        return cls(T, E, pi)
    
    def _create_all_chains(self, chain_length):
        return list(product(*(self.states,) * chain_length))
    
    def score(self, observations: list) -> float:
        def mul(x, y): return x * y
        
        score = 0
        all_chains = self._create_all_chains(len(observations))
        for idx, chain in enumerate(all_chains):
            expanded_chain = list(zip(chain, [self.T.states[0]] + list(chain)))
            expanded_obser = list(zip(observations, chain))
            
            p_observations = list(map(lambda x: self.E.df.loc[x[1], x[0]], expanded_obser))
            p_hidden_state = list(map(lambda x: self.T.df.loc[x[1], x[0]], expanded_chain))
            p_hidden_state[0] = self.pi[chain[0]]
            
            score += reduce(mul, p_observations) * reduce(mul, p_hidden_state)
        return score

In [33]:
class HiddenMarkovChain_FP(HiddenMarkovChain):
    def _alphas(self, observations: list) -> np.ndarray:
        alphas = np.zeros((len(observations), len(self.states)))
        alphas[0, :] = self.pi.values * self.E[observations[0]].T
        for t in range(1, len(observations)):
            alphas[t, :] = (alphas[t - 1, :].reshape(1, -1) 
                         @ self.T.values) * self.E[observations[t]].T
        return alphas
    
    def score(self, observations: list) -> float:
        alphas = self._alphas(observations)
        return float(alphas[-1].sum())

In [34]:
class HiddenMarkovChain_Simulation(HiddenMarkovChain):
    def run(self, length: int) -> (list, list):
        assert length >= 0, "The chain needs to be a non-negative number."
        s_history = [0] * (length + 1)
        o_history = [0] * (length + 1)
        
        prb = self.pi.values
        obs = prb @ self.E.values
        s_history[0] = np.random.choice(self.states, p=prb.flatten())
        o_history[0] = np.random.choice(self.observables, p=obs.flatten())
        
        for t in range(1, length + 1):
            prb = prb @ self.T.values
            obs = prb @ self.E.values
            s_history[t] = np.random.choice(self.states, p=prb.flatten())
            o_history[t] = np.random.choice(self.observables, p=obs.flatten())
        
        return o_history, s_history

In [35]:
class HiddenMarkovChain_Uncover(HiddenMarkovChain_Simulation):
    def _alphas(self, observations: list) -> np.ndarray:
        alphas = np.zeros((len(observations), len(self.states)))
        alphas[0, :] = self.pi.values * self.E[observations[0]].T
        for t in range(1, len(observations)):
            alphas[t, :] = (alphas[t - 1, :].reshape(1, -1) @ self.T.values) \
                         * self.E[observations[t]].T
        return alphas
    
    def _betas(self, observations: list) -> np.ndarray:
        betas = np.zeros((len(observations), len(self.states)))
        betas[-1, :] = 1
        for t in range(len(observations) - 2, -1, -1):
            betas[t, :] = (self.T.values @ (self.E[observations[t + 1]] \
                        * betas[t + 1, :].reshape(-1, 1))).reshape(1, -1)
        return betas
    
    def uncover(self, observations: list) -> list:
        alphas = self._alphas(observations)
        betas = self._betas(observations)
        maxargs = (alphas * betas).argmax(axis=1)
        return list(map(lambda x: self.states[x], maxargs))

In [36]:
class HiddenMarkovLayer(HiddenMarkovChain_Uncover):
    def _digammas(self, observations: list) -> np.ndarray:
        L, N = len(observations), len(self.states)
        digammas = np.zeros((L - 1, N, N))

        alphas = self._alphas(observations)
        betas = self._betas(observations)
        score = self.score(observations)
        for t in range(L - 1):
            P1 = (alphas[t, :].reshape(-1, 1) * self.T.values)
            P2 = self.E[observations[t + 1]].T * betas[t + 1].reshape(1, -1)
            digammas[t, :, :] = P1 * P2 / score
        return digammas

In [37]:
class HiddenMarkovModel:
    def __init__(self, hml: HiddenMarkovLayer):
        self.layer = hml
        self._score_init = 0
        self.score_history = []

    @classmethod
    def initialize(cls, states: list, observables: list):
        layer = HiddenMarkovLayer.initialize(states, observables)
        return cls(layer)

    def update(self, observations: list) -> float:
        alpha = self.layer._alphas(observations)
        beta = self.layer._betas(observations)
        digamma = self.layer._digammas(observations)
        score = alpha[-1].sum()
        gamma = alpha * beta / score 

        L = len(alpha)
        obs_idx = [self.layer.observables.index(x) \
                  for x in observations]
        capture = np.zeros((L, len(self.layer.states), len(self.layer.observables)))
        for t in range(L):
            capture[t, :, obs_idx[t]] = 1.0

        pi = gamma[0]
        T = digamma.sum(axis=0) / gamma[:-1].sum(axis=0).reshape(-1, 1)
        E = (capture * gamma[:, :, np.newaxis]).sum(axis=0) / gamma.sum(axis=0).reshape(-1, 1)

        self.layer.pi = ProbabilityVector.from_numpy(pi, self.layer.states)
        self.layer.T = ProbabilityMatrix.from_numpy(T, self.layer.states, self.layer.states)
        self.layer.E = ProbabilityMatrix.from_numpy(E, self.layer.states, self.layer.observables)
            
        return score

    def train(self, observations: list, epochs: int, tol=None):
        self._score_init = 0
        self.score_history = (epochs + 1) * [0]
        early_stopping = isinstance(tol, (int, float))

        for epoch in range(1, epochs + 1):
            score = self.update(observations)
            print("Training... epoch = {} out of {}, score = {}.".format(epoch, epochs, score))
            if early_stopping and abs(self._score_init - score) / score < tol:
                print("Early stopping.")
                break
            self._score_init = score
            self.score_history[epoch] = score

In [38]:
np.random.seed(42)

observations = ['3L', '2M', '1S', '3L', '3L', '3L']

states = ['1H', '2C']
observables = ['1S', '2M', '3L']

hml = HiddenMarkovLayer.initialize(states, observables)
hmm = HiddenMarkovModel(hml)

hmm.train(observations, 25)

Training... epoch = 1 out of 25, score = 0.0013006714785001687.
Training... epoch = 2 out of 25, score = 0.005509420540117939.
Training... epoch = 3 out of 25, score = 0.005530858882821673.
Training... epoch = 4 out of 25, score = 0.005567116674468161.
Training... epoch = 5 out of 25, score = 0.005630618250468592.
Training... epoch = 6 out of 25, score = 0.005741158382934683.
Training... epoch = 7 out of 25, score = 0.0059280441106285015.
Training... epoch = 8 out of 25, score = 0.006225866084209232.
Training... epoch = 9 out of 25, score = 0.006652847604517528.
Training... epoch = 10 out of 25, score = 0.00717178787364084.
Training... epoch = 11 out of 25, score = 0.007683641981602105.
Training... epoch = 12 out of 25, score = 0.008100845554754063.
Training... epoch = 13 out of 25, score = 0.008410987791562213.
Training... epoch = 14 out of 25, score = 0.008650795046039236.
Training... epoch = 15 out of 25, score = 0.00886102147011807.
Training... epoch = 16 out of 25, score = 0.00907

# Unfair Casino Problem

- consider an unfair casino that:
    - most of the time uses a fair 6-sided die where:
        - P(1) = P(2) = P(3) = P(4) = P(5) = P(6) = 1/6 = 0.16
    - sometimes uses a loaded 6-sided die where:
        - P(6) = 0.5
        - P(1) = P(2) = P(3) = P(4) = P(5) = 0.1
    - before each roll the casino will switch dice with probability:
        - P(Fair --> Fair) = 0.95
        - P(Fair --> Loaded) = 0.05
        - P(Biased --> Loaded) = 0.9
        - P(Biased --> Fair) = 0.1

In [11]:

import random
outcomes = '123456'
    
class Model:
    def __init__(self, pFL, pLF):
        self.emit = list(outcomes)
        self.fairProb = [1.0/6] * 6
        self.loadProb = [0.1] * 5 + [0.5]
        self.tranProb = { 'F':pFL,'L':pLF }
                                
    def setState(self,which=None):
        if which:  self.state = which
        else:      self.state = random.choice('FL')
    
    def rollDice(self):
        if self.state == 'F':  L = self.fairProb
        else:                  L = self.loadProb
        f = random.random()
        S = 0
        for i in range(len(L)):
            S += L[i]
            if f < S:  break
        return self.emit[i]

    def transit(self):
        def switch():
            if self.state == 'F':  self.state = 'L'
            else:                  self.state = 'F'
        p = self.tranProb[self.state]
        f = random.random()
        if f < p:  switch()
            
    def sequence(self,N=50):
        rolls = list()
        states = list()
        for i in range(N):
            rolls.append(self.rollDice())
            states.append(self.state)
            self.transit()
        return ''.join(rolls),''.join(states)

In [12]:
outcomes = '123456'

def checkEStats(m):
    N = 10000
    m.state = 'F'
    fairL = [m.rollDice() for i in range(N)]
    m.state = 'L'
    loadL = [m.rollDice() for i in range(N)]
    for o in outcomes:
        print (o, '  F',) 
        print (str(round(fairL.count(o) * 1.0 / N, 3)).ljust(5)),
        print    ('  L',) 
        print (str(round(loadL.count(o) * 1.0 / N, 3)).ljust(5))
    print()
            
def checkTStats(m):
    N = 10000
    L = list()
    for i in range(N):
        L.append(m.state)
        m.transit()
    totalF = L.count('F')
    totalL = L.count('L')
    D = { 'F':0,'L':0 }
    c1 = L[0]
    for c2 in L[1:]:
        if c1 != c2:  D[c1] += 1
        c1 = c2
        
    print ('F', totalF,) 
    print (str(round(D['F'] * 1.0 / totalF, 3)).ljust(5))
    print ('L', totalL,) 
    print (str(round(D['L'] * 1.0 / totalL, 3)).ljust(5))

In [20]:
m = Model(pFL=0.05,pLF=0.1)
m.setState('F')
L = [m.sequence() for i in range(2)]
for r,s in L:   print (s + '\n' + r + '\n')

checkEStats(m)
checkTStats(m)

FFFFFFFFFFFFFFFFFLLLLLLLLLLLLFFFFFFFFFFFFFFFFFFLLL
13563561163521232266656166163556334655145166214456

LLLLLLLLLLLLLFFFFLLFFFFFFFFFFLLLLLLFFFFFFFFFFFFFFF
14623666356216255653323364613663166456521253632563

1   F
0.162
  L
0.103
2   F
0.171
  L
0.103
3   F
0.165
  L
0.096
4   F
0.166
  L
0.099
5   F
0.169
  L
0.093
6   F
0.167
  L
0.506

F 6855
0.048
L 3145
0.104


# Modified Unfair Casino Problem
- consider an unfair casino that:
    - most of the time uses a fair 2-sided coin where:
        - P(H) = P(T) = 0.5
    - sometimes uses a loaded 2-sided coin where:
        - P(H) = 0.75
        - P(T) = 0.25
    - before each roll the casino will switch dice with probability:
        - P(Fair --> Fair) = 0.9
        - P(Fair --> Loaded) = 0.1
        - P(Biased --> Loaded) = 0.9
        - P(Biased --> Fair) = 0.1

In [45]:
import random
outcomes = 'HT'
    
class Model:
    def __init__(self, pFL, pLF):
        self.emit = list(outcomes)
        self.fairProb = [0.5, 0.5]
        self.loadProb = [0.75, 0.25]
        self.tranProb = { 'F':pFL,'L':pLF }
                                
    def setState(self,which=None):
        if which:  self.state = which
        else:      self.state = random.choice('FL')
            
    def flipCoin(self):
        if self.state == 'F':  L = self.fairProb
        else:                  L = self.loadProb
        f = random.random()
        S = 0
        for i in range(len(L)):
            S += L[i]
            if f < S:  break
        return self.emit[i]

    def transit(self):
        def switch():
            if self.state == 'F':  self.state = 'L'
            else:                  self.state = 'F'
        p = self.tranProb[self.state]
        f = random.random()
        if f < p:  switch()
            
    def sequence(self,N=50):
        rolls = list()
        states = list()
        for i in range(N):
            rolls.append(self.flipCoin())
            states.append(self.state)
            self.transit()
        return ''.join(rolls),''.join(states)

In [46]:
outcomes = 'TH'

def checkEStats(m):
    N = 10000
    m.state = 'F'
    fairL = [m.flipCoin() for i in range(N)]
    m.state = 'L'
    loadL = [m.flipCoin() for i in range(N)]
    for o in outcomes:
        print (o, '  F',) 
        print (str(round(fairL.count(o) * 1.0 / N, 3)).ljust(5)),
        print    ('  L',) 
        print (str(round(loadL.count(o) * 1.0 / N, 3)).ljust(5))
    print()
            
def checkTStats(m):
    N = 10000
    L = list()
    for i in range(N):
        L.append(m.state)
        m.transit()
    totalF = L.count('F')
    totalL = L.count('L')
    D = { 'F':0,'L':0 }
    c1 = L[0]
    for c2 in L[1:]:
        if c1 != c2:  D[c1] += 1
        c1 = c2
        
    print ('F', totalF,) 
    print (str(round(D['F'] * 1.0 / totalF, 3)).ljust(5))
    print ('L', totalL,) 
    print (str(round(D['L'] * 1.0 / totalL, 3)).ljust(5))

In [53]:
m = Model(pFL=0.1,pLF=0.1)
m.setState('F')
L = [m.sequence() for i in range(2)]
for r,s in L:   print (s + '\n' + r + '\n')

checkEStats(m)
checkTStats(m)

FFFFFFFFLLLLLLFFFFLLLFFFFFFFFFFFFFFFFFFFFFFFLFFFFF
THHHTTTHTTTHTTTHTHTHHHHTHTHHHHHTTHHTHTTTTTHHTTHTHT

LLLFFFFFFFFFFFFFFFFFFFFFFLLLLFFFFFFFFFFFFFFFFFFFFF
TTHTHHHHHTHHHTHHTHTHTTHTHTHTTHTTTHTTHTTHHHTHTTHTHH

T   F
0.498
  L
0.752
H   F
0.502
  L
0.248

F 5101
0.094
L 4899
0.098


In [55]:
import numpy as np

def viterbi(A, C, B, O):
    """Viterbi algorithm for solving the uncovering problem

    Notebook: C5/C5S3_Viterbi.ipynb

    Args:
        A (np.ndarray): State transition probability matrix of dimension I x I
        C (np.ndarray): Initial state distribution  of dimension I
        B (np.ndarray): Output probability matrix of dimension I x K
        O (np.ndarray): Observation sequence of length N

    Returns:
        S_opt (np.ndarray): Optimal state sequence of length N
        D (np.ndarray): Accumulated probability matrix
        E (np.ndarray): Backtracking matrix
    """
    I = A.shape[0]    # Number of states
    N = len(O)  # Length of observation sequence

    # Initialize D and E matrices
    D = np.zeros((I, N))
    E = np.zeros((I, N-1)).astype(np.int32)
    D[:, 0] = np.multiply(C, B[:, O[0]])

    # Compute D and E in a nested loop
    for n in range(1, N):
        for i in range(I):
            temp_product = np.multiply(A[:, i], D[:, n-1])
            D[i, n] = np.max(temp_product) * B[i, O[n]]
            E[i, n-1] = np.argmax(temp_product)

    # Backtracking
    S_opt = np.zeros(N).astype(np.int32)
    S_opt[-1] = np.argmax(D[:, -1])
    for n in range(N-2, -1, -1):
        S_opt[n] = E[int(S_opt[n+1]), n]

    return S_opt, D, E

# Define model parameters
A = np.array([[0.8, 0.1, 0.1], 
              [0.2, 0.7, 0.1], 
              [0.1, 0.3, 0.6]])

C = np.array([0.6, 0.2, 0.2])

B = np.array([[0.7, 0.0, 0.3], 
              [0.1, 0.9, 0.0], 
              [0.0, 0.2, 0.8]])


O = np.array([0, 2, 0, 2, 2, 1]).astype(np.int32)
#O = np.array([1]).astype(np.int32)
#O = np.array([1, 2, 0, 2, 2, 1]).astype(np.int32)

# Apply Viterbi algorithm
S_opt, D, E = viterbi(A, C, B, O)
#
print('Observation sequence:   O = ', O)
print('Optimal state sequence: S = ', S_opt)
np.set_printoptions(formatter={'float': "{: 7.4f}".format})
print('D =', D, sep='\n')
np.set_printoptions(formatter={'float': "{: 7.0f}".format})
print('E =', E, sep='\n')

Observation sequence:   O =  [0 2 0 2 2 1]
Optimal state sequence: S =  [0 0 0 2 2 1]
D =
[[ 0.4200  0.1008  0.0564  0.0135  0.0033  0.0000]
 [ 0.0200  0.0000  0.0010  0.0000  0.0000  0.0006]
 [ 0.0000  0.0336  0.0000  0.0045  0.0022  0.0003]]
E =
[[0 0 0 0 0]
 [0 0 0 0 2]
 [0 2 0 2 2]]
