In [None]:
from copy import deepcopy
import math
import itertools
import uuid

import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
import seaborn as sns

from IPython.display import display

### Model classes

In [None]:
class Node(object):
    """Node Wrapper Class"""
    def __init__(self, value, node_id=None):
        self.nid = uuid.uuid4() if not node_id else node_id
        self.value = value
        
    def __repr__(self):
        return str(self.nid)


class HMM(object):
    """Hidden Markov Model"""
    def __init__(self, hidden_state_space, observation_space, 
                 init_dist, transition_probs, emission_probs):
        self.hidden_state_space = hidden_state_space
        self.observation_space = observation_space
        self.init_dist = init_dist
        self.transition_probs = transition_probs
        self.emission_probs = emission_probs
        
        self.hidden_states = []
        self.observations = []
    
    def add_hidden_state(self, hs):
        if isinstance(hs, list):
            for _hs in hs:
                self.hidden_states.append(Node(_hs))
        else:
            self.hidden_states.append(Node(hs))
    
    def add_observation(self, obs):
        if isinstance(obs, list):
            for _obs in obs:
                self.observations.append(Node(_obs))
        else:
            self.observations.append(Node(obs))
    

### Visualization

In [None]:
def _transition_probs_to_df(hidden_state_space, transition_probs):
    df = pd.DataFrame(columns=[f'z_k_1 = \"{fr}\"' for fr in hidden_state_space], 
                      index=[f'z_k = \"{to}\"' for to in hidden_state_space])
    
    for (to, fr), val in transition_probs.items():
        to = f'z_k = \"{to}\"'
        fr = f'z_k_1 = \"{fr}\"'
        df[fr][to] = val
    
    return df


def _emission_probs_to_df(observation_space, hidden_state_space, emission_probs):
    df = pd.DataFrame(columns=[f'z_k = \"{hs}\"' for hs in hidden_state_space],
                      index=[f'x_k = \"{obs}\"' for obs in observation_space])

    for (obs, state), val in emission_probs.items():
        state = f'z_k = \"{state}\"'
        obs = f'x_k = \"{obs}\"'
        df[state][obs] = val
    
    return df


def _initial_dist_to_df(init_dist):
    df = pd.DataFrame(columns=[f'z_0 = \"{z_0}\"' for z_0 in init_dist.keys()],
                      index=['P(z_0)'])
    
    for z_0, p_z_0 in init_dist.items():
        z_0 = f'z_0 = \"{z_0}\"'
        df[z_0]['P(z_0)'] = p_z_0
        
    return df
        
        
def hmm_basic_info(hmm):
    z_0_df = _initial_dist_to_df(hmm.init_dist)
    t_df = _transition_probs_to_df(hmm.hidden_state_space, hmm.transition_probs)
    e_df = _emission_probs_to_df(hmm.observation_space, hmm.hidden_state_space, hmm.emission_probs)

    display(z_0_df)
    display(t_df)
    display(e_df)


In [None]:
def visualize_hmm(hmm, node_size):
    fig, ax = plt.subplots(figsize=(15, 8))
    ax.axis('off')
    G = nx.DiGraph()
    
    node_pos = {}
    node_labels = {}
    edge_labels = {}
    
    for idx, (hs, obs) in enumerate(zip(hmm.hidden_states, hmm.observations)):
        node_pos[hs] = (idx + 1, 0)
        node_pos[obs] = (idx + 1, -0.1)
        
        node_labels[hs] = hs.value
        node_labels[obs] = obs.value

    prev_hs = None
    for hs, obs in zip(hmm.hidden_states, hmm.observations):
        G.add_edge(hs, obs)
        edge_labels[(hs, obs)] = hmm.emission_probs[(obs.value, hs.value)]
        
        if prev_hs:
            G.add_edge(prev_hs, hs)
            edge_labels[(prev_hs, hs)] = hmm.transition_probs[(hs.value, prev_hs.value)]
            
        prev_hs = hs

    nx.draw_networkx_nodes(G, pos=node_pos, ax=ax,
                           nodelist=hmm.hidden_states, 
                           node_size=node_size,
                           node_shape='o', node_color='lightblue')
    nx.draw_networkx_nodes(G, pos=node_pos, ax=ax,
                           nodelist=hmm.observations, 
                           node_size=node_size,
                           node_shape='s', node_color='lightgrey')
    
    nx.draw_networkx_labels(G, pos=node_pos, ax=ax,
                            labels=node_labels)
    nx.draw_networkx_edges(G, pos=node_pos, ax=ax,
                           arrowsize=30)
    nx.draw_networkx_edge_labels(G, pos=node_pos, ax=ax, 
                                 edge_labels=edge_labels)
    

### Viterbi algorithm

In [None]:
def run_viterbi(hmm, observations):
    all_hs = hmm.hidden_state_space
    
    # Forward step - calculate u[(k, z_k)]
    u = {}
    
    for z_0 in all_hs:
        u[(0, z_0)] = (math.log(hmm.init_dist[z_0]) + math.log(hmm.emission_probs[(observations[0], z_0)]), 
                       None)
        
    for k in range(1, len(observations)):
        for z_k in all_hs:
            max_u = (-9999, None)
            for z_k_1 in all_hs:
                u_k = math.log(hmm.emission_probs[(observations[k], z_k)]) + \
                      math.log(hmm.transition_probs[(z_k, z_k_1)]) + \
                      u[(k - 1, z_k_1)][0]
                        
                if u_k > max_u[0]:
                    max_u = (u_k, z_k_1)
            
            u[(k, z_k)] = max_u
    
    # Backward step - build hidden state sequence
    most_probable_hidden_states = []
    k_max = len(observations) - 1

    max_entry = (None, -9999, None)
    for hs in all_hs:
        if u[(k_max, hs)][0] > max_entry[1]:
            max_entry = (hs, *u[(k_max, hs)])
    
    most_probable_hidden_states.append(max_entry[0])
    
    prev_hs = max_entry[2]
    for k in reversed(range(0, len(observations) - 1)):
        most_probable_hidden_states.append(prev_hs)
        prev_hs = u[(k, prev_hs)][1]
            
    most_probable_hidden_states = list(reversed(most_probable_hidden_states))
    return most_probable_hidden_states, u

In [None]:
def _u_to_df(u, k_max, hidden_state_space):
    df = pd.DataFrame(columns=[f'k = {k}' for k in range(k_max)], 
                      index=[f'z_k = \"{z_k}\"' for z_k in hidden_state_space])
    
    for (k, z_k), v in u.items():
        k = f'k = {k}'
        z_k = f'z_k = \"{z_k}\"'
        df[k][z_k] = (np.round(v[0], 4), v[1])
    
    return df

### Usage

In [None]:
def get_weather_hmm():
    _hmm = HMM(
        hidden_state_space=['Rainy', 'Sunny'],
        observation_space=['Walk', 'Shop', 'Clean'],
        init_dist={'Rainy': 0.6, 'Sunny': 0.4},
        # P(z_k | z_k_1)
        transition_probs={('Rainy', 'Rainy'): 0.7, ('Sunny', 'Rainy'): 0.3,
                          ('Sunny', 'Sunny'): 0.6, ('Rainy', 'Sunny'): 0.4}, 
        # P(x_k | z_k)
        emission_probs={('Walk', 'Rainy'): 0.1, ('Shop', 'Rainy'): 0.4, ('Clean', 'Rainy'): 0.5, 
                        ('Walk', 'Sunny'): 0.6, ('Shop', 'Sunny'): 0.3, ('Clean', 'Sunny'): 0.1}
    )
    
    return _hmm

In [None]:
def sample_viterbi_run(hmm, observations):
    model = deepcopy(hmm)
    hidden_states, u = run_viterbi(model, observations)
    
    model.add_observation(observations)
    model.add_hidden_state(hidden_states)
    
    hmm_basic_info(model)
    display(_u_to_df(u, len(observations), hmm.hidden_state_space))
    visualize_hmm(model, node_size=1100)

In [None]:
sample_viterbi_run(get_weather_hmm(), observations=['Clean', 'Shop', 'Walk', 'Walk'])

### Learning

In [None]:
def get_robot_dataset():
    ds_robot_small = {
        'observation_space': ['r', 'g', 'b', 'y'],
        'hidden_state_space': [f'{i}:{j}' for i in range(1, 5) for j in range(1, 5)],

        'train': {
            'Z': [['3:3', '3:3', '3:4', '2:4', '3:4'], 
                  ['1:3', '2:3', '3:3', '2:3', '2:3', '2:4', '2:4' '2:4'],
                  ['2:1', '2:1', '2:1']],
            'X': [['r', 'r', 'y', 'r', 'y'],
                  ['g', 'b', 'r', 'b', 'b', 'r', 'r', 'r'],
                  ['g', 'g', 'g']]
        },

        'test': {
            'Z': [['2:1', '2:1', '2:1', '2:1'],
                  ['2:4', '2:3', '2:3', '3:3', '2:3']],
            'X': [['b', 'g', 'g', 'b'],
                  ['r', 'b', 'b', 'r', 'b']]
        }
    }
    
    return ds_robot_small


def get_robot_hmm(hss, os):
    # Initial distibution probs
    ind = {}
    p = np.random.randint(1, len(hss), size=len(hss))
    p = p / sum(p)
    
    for idx, z_k in enumerate(hss):
        ind[z_k] = p[idx]
    
    # Random transition probs
    tp = {}
    for z_k_1 in hss:
        p = np.random.randint(1, len(hss), size=len(hss))
        p = p / sum(p)
        
        for idx, z_k in enumerate(hss):
            tp[(z_k, z_k_1)] = p[idx]
            
    # Random emission probs
    ep = {}
    for z_k in hss:
        p = np.random.randint(1, len(os), size=len(os))
        p = p / sum(p)
        
        for idx, obs in enumerate(os):
            ep[(obs, z_k)] = p[idx]
    
    _hmm = HMM(
        hidden_state_space=hss,
        observation_space=os,
        init_dist=ind,
        # P(z_k | z_k_1)
        transition_probs=tp,
        # P(x_k | z_k)
        emission_probs=ep,
    )
    
    return _hmm

#### Utils

In [None]:
def parameter_loss(prev_theta, theta):
    if prev_theta == (None, None, None):
        return 1
    
    prev_indp, prev_tp, prev_ep = prev_theta
    indp, tp, ep = theta
    
    loss = 0
    
    # Initial dist loss
    for k in prev_indp.keys():
        loss += (prev_indp[k] - indp[k]) ** 2
    
    # Transition prob. loss
    for k in prev_tp.keys():
        loss += (prev_tp[k] - tp[k]) ** 2
        
    # Emission prob. loss
    for k in prev_ep.keys():
        loss += (prev_ep[k] - ep[k]) ** 2
    
    return loss


def init_transition_counts(hss):
    tc = {}
    
    for z_k_1, z_k in itertools.product(hss, repeat=2):
        tc[(z_k_1, z_k)] = 0  
    
    return tc


def get_transition_counts(hs_seq, tc):
    tc = deepcopy(tc)
    
    for idx in range(len(hs_seq) - 1):
        tc[hs_seq[idx], hs_seq[idx + 1]] += 1
        
    return tc


def get_transition_probs(tc, hss):
    tp = {}
    
    for (z_k, z_k_1), v in tc.items():
        tp[(z_k, z_k_1)] = (v + 1) / sum([tc[(hs, z_k_1)] + 1 for hs in hss])
    
    return tp


def init_emission_counts(hss, os):
    ec = {}
    
    for obs in os:
        for hs in hss:
            ec[(obs, hs)] = 0
    
    return ec


def get_emission_counts(hs_seq, x, ec):
    ec = deepcopy(ec)
    
    for obs, hs in zip(x, hs_seq):
        ec[(obs, hs)] += 1
    
    return ec


def get_emission_probs(ec, os):
    ep = {}
    
    for (x_k, z_k), v in ec.items():
        ep[(x_k, z_k)] = (v + 1) / sum([ec[(obs, z_k)] + 1 for obs in os])
    
    return ep


def calc_acc(z_true, z_pred):
    eq = 0
    
    for idx in range(len(z_true)):
            if z_true[idx] == z_pred[idx]:
                eq += 1
                
    acc = eq / len(z_true)
    
    return acc


def init_initial_dist_counts(hss):
    indc = {}
    
    for z_k in hss:
        indc[z_k] = 0  
    
    return indc


def get_initial_dist_counts(hs_seq, indc):
    indc = deepcopy(indc)
    
    for z_k in hs_seq:
        indc[z_k] += 1
        
    return indc


def get_initial_dist_probs(indc):
    indp = {}
    total = sum([v + 1 for v in indc.values()])
    
    for z_k, v in indc.items():
        indp[z_k] = (v + 1) / total
    
    return indp


#### Training

In [None]:
def train_hmm(hmm, Z, X, eps=0.001, max_step=10):
    model = deepcopy(hmm)
    
    hss = model.hidden_state_space
    os = model.observation_space
    
    prev_theta = (None, None, None)
    theta = (model.init_dist,
             model.transition_probs,
             model.emission_probs)
    
    losses = []
    accs = []
    step = 0
    
    while True:
        loss = parameter_loss(prev_theta, theta)
        losses.append(loss)
        
        if loss < eps or step > max_step:
            break  
            
        indc = init_initial_dist_counts(hss)
        tc = init_transition_counts(hss)    
        ec = init_emission_counts(hss, os)
        acc = 0
            
        for z, x in zip(Z, X):
            hs_seq, _ = run_viterbi(deepcopy(model), x)
            
            acc += calc_acc(z, hs_seq)
            
            indc = get_initial_dist_counts(hs_seq, indc)
            tc = get_transition_counts(hs_seq, tc)
            ec = get_emission_counts(hs_seq, x, ec)
            
        indp = get_initial_dist_probs(indc)
        tp = get_transition_probs(tc, hss)
        ep = get_emission_probs(ec, os)
        
        prev_theta, theta = theta, (indp, tp, ep)
    
        model.init_dist = theta[0]
        model.transition_probs = theta[1]
        model.emission_probs = theta[2]
        
        accs.append(acc / len(Z))
        
        # Logging
        print(f'Step: {step} => Loss: {np.round(loss, 5)}, Training accuracy: {np.round(100 * accs[-1], 2)}')
        
        step += 1
    
    return model, losses, accs

#### Testing

In [None]:
def test_hmm(hmm, Z, X):
    acc = 0
    
    for z, x in zip(Z, X):
        z_pred, _ = run_viterbi(deepcopy(hmm), x)
        acc += calc_acc(z, z_pred)
#         print(f'Z: {z} | Z_pred: {z_pred}')
    
    acc /= len(Z)
    
    return acc

In [None]:
def sample_viterbi_learning_run(ds, eps, max_step):
    hss = ds['hidden_state_space']
    os = ds['observation_space']
    hmm = get_robot_hmm(hss, os)
    
    hmm_trained, losses, accs = train_hmm(hmm, ds['train']['Z'], ds['train']['X'], eps, max_step)
    hmm_basic_info(hmm_trained)
    
    fig, ax = plt.subplots(ncols=2, figsize=(15, 4))
    ax[0].plot(range(len(losses)), losses, marker='x', linestyle='--')
    ax[0].set_xlabel('Step')
    ax[0].set_ylabel('Loss')
#     ax[0].set_ylim((0, 1))
    
    ax[1].plot(range(len(accs)), accs, marker='x', linestyle='--')
    ax[1].set_xlabel('Step')
    ax[1].set_ylabel('Train Accuracy')
#     ax[1].set_ylim((0, 1))
    
    acc = test_hmm(hmm_trained, ds['test']['Z'], ds['test']['X'])
    print(f'Test Accuracy: {np.round(100 * acc, 2)} (%)')

In [None]:
sample_viterbi_learning_run(get_robot_dataset(), eps=0.001, max_step=20)

In [None]:
# Robot dataset reader
def read_big_robot_dataset():
    ds = {
        'observation_space': ['r', 'g', 'b', 'y'],
        'hidden_state_space': [f'{i}:{j}' for i in range(1, 5) for j in range(1, 5)],
        'train': {'Z': [], 'X': []},
        'test': {'Z': [], 'X': []}
    }

    content = []
    
    with open('robot_no_momentum.data', 'r') as f:
        content = f.read().split('\n')
        
    is_train = True
    X = []
    Z = []
    idx = 1
    
    for entry in content:
        if entry == '':
            continue
    
        elif entry == '.':
            ds['train' if is_train else 'test']['Z'].append(Z)
            ds['train' if is_train else 'test']['X'].append(X)
            Z = []
            X = []
        
        elif entry == '..':
            is_train = False
            
        else:
            z, x = entry.split(' ')
            Z.append(z)
            X.append(x)
        
        idx += 1

    return ds


In [None]:
big_robot_ds = read_big_robot_dataset()

In [None]:
def clip_ds(ds, nb_clipped):
    ds = deepcopy(ds)
    
    ds['train']['X'] = ds['train']['X'][:nb_clipped]
    ds['train']['Z'] = ds['train']['Z'][:nb_clipped]

    ds['test']['X'] = ds['test']['X'][:nb_clipped]
    ds['test']['Z'] = ds['test']['Z'][:nb_clipped]
    
    return ds

In [None]:
sample_viterbi_learning_run(clip_ds(big_robot_ds, 1), eps=0.001, max_step=50)