In [1]:
!pip install -q pgmpy
!gdown -q --id 1b2ryMxJOhAOhiPmphsS-0sJkJKuLJ9D4
!gdown -q --id 19xqIcNf2WvcG3Hi95d4zv7CpMaJPriMe
!gdown -q --id 1pD496PHFtTydlXqnY4s8JpFanP46bO0E
!gdown -q --id 1kOSQm_PBSXR1rEr7-6xStUyfja3vXH3l

You should consider upgrading via the '/Users/umutekingezer/opt/anaconda3/bin/python3 -m pip install --upgrade pip' command.[0m


# Implemented by:
Umut Ekin Gezer - u195839
Pablo Brando Alvira - u172915

# Inference in Hidden Markov Models


In this lab session you will work with hidden Markov models (HMMs) using the [pgmpy library](http://www.pgmpy.org). By the end of the session you will be able to

- Understand how to learn **hidden Markov models** from data
- Implement the **belief propagation** to answer **filtering and prediction** queries
- **Implement the Viterbi algorithm** for finding the most likely sequence of hidden states, given some evidence
- Experiment with two tasks: a robot navigating on a grid, and how to improve a mispelled text

This practice is inspired by https://www.cs.princeton.edu/courses/archive/fall12/cos402/assignments/programs/viterbi/

## The Hidden Markov Model

A HMM is defined by a Markov chain over hidden variables $h_{1:T}=h_1,h_2,...,h_T$ and it is defined by
- a probability distribution over the initial hidden state $p(h_1)$.
- a state transition distribution $p(h_t|h_{t-1})$.

Each hidden variable $h_t$ influences a corresponding visible variable $v_t$ through the observation model $p(v_t|h_t)$. The joint distribution can be written as
$$p(v_{1:T},h_{1:T}) = p(h_1)p(v_1|h_1)\prod_{t=2}^T p(h_{t}|h_{t-1})p(v_t|h_t).$$

Let's define a class that encodes an HMM. This class will create the HMM with a predefined length ``n_vars``:

In [2]:
from pgmpy.models import FactorGraph

class HMM:
    def __init__(self, n_vars, prior_fn, transition_fn, observation_fn, h_states, v_states, h_name='h', v_name='v'):
        self.h = [f"{h_name}{i}" for i in range(n_vars)]
        self.v = [f"{v_name}{i}" for i in range(n_vars)]
        self.variables = self.h + self.v
        self.state_names = dict([(h, h_states) for h in self.h] +
                                [(v, v_states) for v in self.v])
        self.f = self.create_factors(n_vars, prior_fn, transition_fn, observation_fn)
        
    def create_factors(self, n_vars, prior_fn, transition_fn, observation_fn):
        """
        Given the amount of variables (n_vars) in the hidden markov model, it creates factors for
        the prior of h_0, for all the transitions from h_{t-1} to h_t (for t in n_vars), and for
        the observation function from h_t to v_t.
        Returns a dict where keys are (tuples of) variables and values are factors.
        E.g. {"h_0": DiscreteFactor,
              ("h_1", "h_2"): DiscreteFactor,
              ("h_1", "v_1"): DiscreteFactor,
                           ...
              }
        """
        
        factors = dict()      
        for i in range(n_vars):
            if i == 0:
                # Prior factor
                factors[self.h[i]] = prior_fn(self.h[i])
            else:
                # Transition factor
                factors[(self.h[i-1], self.h[i])] = transition_fn(self.h[i-1], self.h[i])
            # Observation factor
            factors[(self.h[i], self.v[i])] = observation_fn(self.h[i], self.v[i])
        return factors
    
    def to_factor_graph(self):
        G = FactorGraph()
        assert set(self.variables) == set(v for f in self.f.values() for v in f.variables)
        G.add_nodes_from(self.variables)
        G.add_factors(*self.f.values())
        G.add_edges_from([(v, f) for f in self.f.values() for v in f.variables])
        assert G.check_model()
        return G

## Basic setting

Before tackling larger problems, and to be able to test our implementations, we will first consider a small version of our previous hidden Markov model of weather change over three days, where we can observe if an umbrella has been used or not. We define this factor graph next:

In [3]:
from pgmpy.factors.discrete import DiscreteFactor
from pgmpy.models import FactorGraph

weather_states = ["sunny", "cloudy", "rainy"]
umbrella_states = [True, False]

def day_prior(d):
    return DiscreteFactor(variables=[d],
                          cardinality=[3],
                          values=[0.3, 0.4, 0.3],
                          state_names={d: weather_states})

def day_transition(d1, d2): # P(d2|d1)
     return DiscreteFactor(variables=[d1, d2],
                           cardinality=[3, 3],
                           values=[0.7, 0.25, 0.05,
                                   0.25, 0.35, 0.4,
                                   0.25, 0.5, 0.25],
                           state_names={d1: weather_states,
                                        d2: weather_states})

def take_umbrella_transition(d, u): # P(u|d)
    return DiscreteFactor(variables=[d, u],
                          cardinality=[3, 2],
                          values=[0.2, 0.8,
                                  0.6, 0.4,
                                  0.95, 0.05],
                          state_names={d: weather_states,
                                       u: umbrella_states})

# Expand the model for 3 days
hmm_weather_3 = HMM(n_vars=3,
                    prior_fn=day_prior,
                    transition_fn=day_transition,
                    observation_fn=take_umbrella_transition,
                    h_states=weather_states,
                    v_states=umbrella_states,
                    h_name="w",
                    v_name="u")

## Probabilistic Queries

We will implement three types of queries:

1. **Filtering** : the probability of the current hidden sate given a partial sequence of observations $p(h_t|v_{1:t})$.
2. **Prediction** : the probability of a future hidden sate given a partial sequence of observations $p(h_s|v_{1:t})$, for a $s>t$.
3. **Evidence** : the probability of a given a full sequence of observations $p(v_{1:T})$.
4. **MAP state** : the most likely hidden trajectory given a full sequence of observations $p(v_{1:T})$.

A naive way of computing the MAP of the hidden variables can be:
1. Clamp the corresponding visible variables $v_{1:t}$.
2. Run BP.
3. Compute the joint probability of the corresponding hidden variables.
3. Get the assignment with maximum probability.


The following code calculates the joint hidden trajectory for the weather example with $T=3$ using belief propagation. Clearly, this approach will not scale to larger models. However, we will use it to check that our code is correct.


In [4]:
from pgmpy.inference import BeliefPropagation
import numpy as np

bp = BeliefPropagation(hmm_weather_3.to_factor_graph())
# Compute the joint probability
joint = bp.query(variables=['w0','w1','w2'],
                 evidence={"u0":True,"u1":True, "u2":True})
# Get the index of the maximum value
amax = np.argmax(joint.values)
print("Maximum probability:", joint.values.flatten()[amax])
print("State assignment:", sorted(joint.assignment([amax])[0])) # pgmpy's assignment function gives us the states for the given index

HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))


Maximum probability: 0.16289702507051773
State assignment: [('w0', 'rainy'), ('w1', 'cloudy'), ('w2', 'rainy')]


## Task 1: Robot Navigation on a Grid

In this problem, a robot is wandering through the following small world:

<img src="https://drive.google.com/uc?export=view&id=1Rq-izqt8vkkkX6FbWrhaeCuGkgggJiPF"  alt="A robot on a maze" title="Title text" width=350 height=350>

The robot can only occupy the colored squares.  At each time step, the robot attempts to move up, down, left or right, where the choice of direction is made at random.  If the robot attempts to move onto a black square, or to leave the confines of its world, its action has no effect and it does not move at all.  The robot can only sense the color of the square it occupies.  However, its sensors are only $90\%$ accurate, meaning that $10\%$ of the time, it perceives a random color rather than the true color of the currently occupied square.  The robot begins each walk in a randomly chosen colored square.

In this problem, state refers to the location of the robot in the world in x:y coordinates, and output refers to a perceived color (r, g, b or y).  Thus, a typical random walk looks like this:

``3:3 r``
``3:3 r``
``3:4 y``
``2:4 b``
``3:4 y``
``3:3 r``
``2:3 b``
``1:3 g``
``2:3 b``
``2:4 r``
``3:4 y``
``4:4 y``

Here, the robot begins in square ``3:3`` perceiving red, attempts to make an illegal move (to the right), so stays in ``3:3``, still perceiving red.  On the next step, the robot moves up to ``3:4`` perceiving yellow, then left to ``2:4`` perceiving blue (erroneously), and so on.

We will provide the HMM model for this world. Then, given only sensor information (i.e., a sequence of colors), you will have to answer different queries regarding the actual path taken by the robot through its world.

The data for this problem is in [robot.data](files/robot.data), a file containing $200$ training sequences (random walks) and $200$ test sequences, each sequence consisting of $200$ steps.

We provide next the implementation on how to build the HMM. First, we define some functions that will become handy:

In [5]:
from pgmpy.factors.discrete import TabularCPD

def split(s, sep):
    """
    Generator that splits a sequence s into subsequences, when separator sep is found.
    """
    chunk = []
    for val in s:
        if val == sep:
            yield chunk
            chunk = []
        else:
            chunk.append(val)
    yield chunk

def to_cpd(phi, x):
    """
    Returns a TabularCPD object from a DiscreteFactor. For a given factor phi(x_0, ..., x_n)
    and a variable x_i, it interprets the factor as a CPD P(x_i|Y), where Y is the set of all
    variables in phi except x_i. I.e. P(x_i|x_0, ..., x_{i-1}, x_{i+1}, ..., x_n).
    It also checks that the factor is a valid conditional probability distribution.
    """
    assert x in phi.variables
    idx = phi.variables.index(x)
    card = list(phi.cardinality)
    var_card = card[idx]
    evidence_card = card[:idx] + card[idx+1:]
    values = np.moveaxis(phi.values, idx, 0) # move variable x to dimension 0
    return TabularCPD(variable=x,
                      variable_card=var_card,
                      evidence=phi.variables[:idx] + phi.variables[idx+1:],
                      evidence_card=evidence_card,
                      values=values.reshape(var_card, int(np.prod(evidence_card))), # int cast since np.prod([])=1.0
                      state_names=phi.state_names)

def normalize_cpd_values(variables, cardinality, values):
    """
    Normalize a numpy array of CPD values. It also accounts for unreachable states (all 0s).
    """
    assert len(variables) in (1,2)
    f = DiscreteFactor(variables=variables, cardinality=cardinality, values=values)
    # Normalize it as a CPD
    f = to_cpd(f, variables[-1]) # Get the last variable for the CPD (e.g. [ht-1, ht] -> p(ht|ht-1))
    f.normalize()
    # Remove NaNs, put 0s instead
    f.values[np.logical_not(np.isfinite(f.values))] = 0.0
    if len(f.variables) == 2:
        # The process f->cpd swapped the variables, let's swap them back
        assert f.variables[1] == variables[0]
        return np.transpose(f.values)
    return f.values
    
def get_hmm_factors(trajectories, h_states, v_states):
    """
    Gets the prior, transition and observation probabilities of an HMM from data.
    """
    prior = np.zeros(len(h_states))
    transition = np.zeros([len(h_states), len(h_states)])
    observation = np.zeros([len(h_states), len(v_states)])
    for t in trajectories:
        for i in range(len(t)):
            s, o = t[i].split(" ") # h and v come as a string "h v"
            s_i, o_i = h_states.index(s), v_states.index(o)
            if i == 0: # Prior
                prior[s_i] += 1
            else:
                prev_s_i = h_states.index(t[i-1].split(" ")[0])
                transition[prev_s_i][s_i] += 1 # Transition
            observation[s_i][o_i] += 1 # Observation

    return normalize_cpd_values(['h0'], [len(h_states)], prior), \
           normalize_cpd_values(['ht-1', 'ht'], [len(h_states), len(h_states)], transition), \
           normalize_cpd_values(['ht', 'vt'], [len(h_states), len(v_states)], observation)

Then, we extract the probabilities from the training sequences, and print one sample trajectory:

In [6]:
with open("robot.data") as f:
    robot_data = f.read().splitlines() # this accounts for '\n'

# Split into train and test data (separated by ".." in the file, end of single trajectory is marked with ".")
train_robot, test_robot = [list(split(dataset, '.')) for dataset in split(robot_data, '..')]
print(f"ROBOT: {len(train_robot) + len(test_robot)} robot trajectories read.")

size_square = 4
positions = [f"{x+1}:{y+1}" for x in range(size_square) for y in range(size_square)]
colors = ['r', 'g', 'b', 'y']
r_prior, r_transition, r_observation = get_hmm_factors(trajectories = train_robot,
                                                       h_states=positions,
                                                       v_states=colors) # This will raise a warning: invalid value encountered in true_divide

# Define factor functions
def robot_prior(pos):
    return DiscreteFactor(variables=[pos],
                          cardinality=[len(positions)],
                          values=r_prior,
                          state_names = {pos: positions})

def robot_transition(prev_pos, next_pos):
    return DiscreteFactor(variables=[prev_pos, next_pos],
                          cardinality=[len(positions), len(positions)],
                          values=r_transition,
                          state_names = {prev_pos: positions,
                                         next_pos: positions})

def robot_observation(pos, col):
    return DiscreteFactor(variables=[pos, col],
                          cardinality=[len(positions), len(colors)],
                          values=r_observation,
                          state_names = {pos: positions,
                                         col: colors})

robot_pos_trajectories = [[f"{p.split(' ')[0]}" for p in traj] for traj in test_robot][:-1] # last one is empty
robot_color_trajectories = [[f"{p.split(' ')[1]}" for p in traj] for traj in test_robot][:-1]

print(robot_pos_trajectories[-1][:10])
print(robot_color_trajectories[-1][:10])

ROBOT: 401 robot trajectories read.
['2:4', '3:4', '3:4', '4:4', '4:4', '4:4', '4:4', '4:4', '4:4', '3:4']
['r', 'y', 'y', 'b', 'b', 'r', 'b', 'b', 'b', 'y']


  tabular_cpd.values = (cpd / cpd.sum(axis=0)).reshape(tabular_cpd.cardinality)


### Questions

Given the following sequence of observations $v_{1:T}$:
```
['g', 'g', 'b', 'r', 'r', 'b', 'b', 'r', 'r', 'y', 'r', 'b', 'r', 'y', 'b']
```

Answer the following questions using your implementation of belief propagation:
1. Filtering : what is $p(h_6|v_{1:6})$?
2. Prediction : what is $p(h_7|v_{1:6})$?
3. Probability of evidence : what is $p(v_{1:T})$?

In [7]:
from functools import reduce
import operator
import numpy as np
from collections import defaultdict
from copy import deepcopy

def prod(iterable):
    
    """
      Returns the product of all the items in the iterable given in as input.
    """

    return reduce(operator.mul, iterable, 1)

class MyBeliefPropagation:
    def __init__(self, factor_graph):
        assert factor_graph.check_model()
        self.original_graph = factor_graph
        self.variables = factor_graph.get_variable_nodes()
        
        self.state_names = dict()
        for f in self.original_graph.factors:
            self.state_names.update(f.state_names)
        self.bp_done = False

    def factor_ones(self, v):
        """
        Returns a DiscreteFactor for variable v with all ones.
        """

        return DiscreteFactor(variables=[v],
                        cardinality=[len(self.state_names[v])],
                        values=np.repeat(1.0,len(self.state_names[v])), 
                        state_names=self.state_names)
        
    def initialize_messages(self):
        """
        This function creates, for each edge factor-variable, two messages: m(f->v) and 
        m(v->f). It initiliazies each message as a DiscreteFactor with all ones. It stores all
        the messages in a dict of dict. Keys of both dicts are either factors or variables.
        Messages are indexed as messages[to][from]. For example, m(x->y) is in messages[y][x].
        It's done this way because it will be useful to get all messages that go to a variable
        or a factor.
        """

        self.messages = defaultdict(dict)
        for fact in self.working_graph.get_factors():
          if fact not in self.messages:
            self.messages[fact] = dict()
          for var in fact.variables:
            if var not in self.messages:
              self.messages[var] = dict() 
            self.messages[var][fact] = self.factor_ones(var)
            self.messages[fact][var] = self.factor_ones(var)
                
    def factor_to_variable(self, f, v):
        """
        Computes message m from factor to variable. It computes it from all messages from all
        other variables to the factor (i.e. all variables connected the factor except v).
        Returns message m.
        """
        assert v in self.variables and f in self.working_graph.factors
        variables = []
        messages = []
        for var in f.scope():
            if var != v:
                variables.append(var)
                messages.append(self.messages[f][var])
        
        #We calculate the probability value: factors * prodfunc(messages)
        m = f * prod(messages)
        message = m.marginalize(variables, inplace = False)
        return message
        

    def variable_to_factor(self, v, f):
        """
        Computes message m from variable to factor. It computes it from all messages from all
        other factors to the variable (i.e. all factors connected the variable except f).
        Returns message m.
        """
        assert v in self.variables and f in self.working_graph.factors
        factors = []
        for factor in self.messages[v]:
          if f != factor: 
            factors.append(self.messages[v][factor])
        m = prod(factors)
        return m
    
    def get_evidence_factors(self, evidence):
        """
        For each evidence variable v, create a factor with p(v=e)=1. Recieves a dict of
        evidence, where keys are variables and values are variable states. Returns a list of
        DiscreteFactor.
        """
        evidences = []        
        for variable in evidence:
          factor = DiscreteFactor(variables = [variable],
                                   cardinality = [len(self.state_names[variable])],
                                   values = [float(state == evidence[variable]) for state in self.state_names[variable]], 
                                   state_names = self.state_names) 
          evidences.append(factor)
        
        return evidences
        
    def update(self, m_to, m_from):
        """
        Performs an update of a message depending on whether it is variable-to-factor or
        factor-to-variable.
        """
        if m_from in self.working_graph.factors:
          self.messages[m_to][m_from] = self.factor_to_variable(m_from,m_to)
        else:
          self.messages[m_to][m_from] = self.variable_to_factor(m_from,m_to)
        

    
    def collect_evidence(self, node, parent=None):
        """
        Passes messages from the leaves to the root of the tree.
        The parent argument is used to avoid an infinite recursion.
        """
        for child in self.working_graph.neighbors(node):
          if child != parent:
            self.update(node,self.collect_evidence(child,node))
        return node 

    def distribute_evidence(self, node, parent=None):
        """
        Passes messages from the root to the leaves of the tree.
        The parent argument is used to avoid an infinite recursion.
        """
        for child in self.working_graph.neighbors(node):
          if child != parent:
            self.update(child,node)
            self.distribute_evidence(child,node)
    
    def set_evidence(self, evidence):
        """
        Generates a new graph with the evidence factors
        evidence (keys: variables, values: states)
        """
        evidence_factors = self.get_evidence_factors(evidence)
        self.working_graph = self.original_graph.copy()
        for factor in evidence_factors:
          self.working_graph.add_factors(factor)
          for vars in factor.variables:
            self.working_graph.add_edge(factor,vars)

        self.bp_done = False
    
    def run_bp(self, root):
        """
        After initializing the messages, this function performs Belief Propagation
        using collect_evidence and distribute_evidence from the given root node.
        """
        assert root in self.variables, "Variable not in the model"
        self.initialize_messages()
        self.collect_evidence(root)
        self.distribute_evidence(root)
        self.bp_done = True

    def get_marginal(self, variable):
        """
        To be used after run_bp. Returns p(variable | evidence) unnormalized.
        """
        assert self.bp_done, "First run BP!"
        probs = []
        for msg in self.messages[variable]:
          probs.append(self.messages[variable][msg])
        return prod(probs)

In [34]:
observed_colors = ['g', 'g', 'b', 'r', 'r', 'b', 'b', 'r', 'r', 'y', 'r', 'b', 'r', 'y', 'b']

robot_HMM = HMM(n_vars=len(observed_colors),
                prior_fn=robot_prior,
                transition_fn=robot_transition,
                observation_fn=robot_observation,
                h_states=positions,
                v_states=colors,
                h_name="position",
                v_name="color")


robot_bp = MyBeliefPropagation(robot_HMM.to_factor_graph())
#Evidences
robot_bp.set_evidence({"color0":"g","color1":"g","color2":"b","color3":"r","color4":"r","color5":"b"})
robot_bp.run_bp("position0")

#We get the marginals and the state assignments (highest probability)
#filtering
filtering = robot_bp.get_marginal("position5").normalize(inplace=False)
mymax = np.argmax(filtering.values)
print(filtering)
print("Maximum probability:", filtering.values.flatten()[mymax])
print("State assignment:", sorted(filtering.assignment([mymax])[0]))

#prediction 
predict = robot_bp.get_marginal("position6").normalize(inplace=False)
print(predict)
mymax2 = np.argmax(predict.values)
print("Maximum probability:", predict.values.flatten()[mymax2])
print("State assignment:", sorted(predict.assignment([mymax2])[0]))

#Probability of evidence p(v1:15)
robot_bp2 = MyBeliefPropagation(robot_HMM.to_factor_graph())
robot_bp2.set_evidence({"color0":"g","color1":"g","color2":"b","color3":"r","color4":"r","color5":"b",
                     "color6":"b","color7":"r","color8":"r","color9":"y","color10":"r","color11":"b",
                     "color12":"r","color13":"y","color14":"b"})

robot_bp2.run_bp("position0")
prob = robot_bp2.get_marginal("position3")
print("Probability of evidence p(v1:15)",np.sum(prob.values))

#comparing with the given BP to debug

q1 = bp.query(variables="position5", evidence={'color0':'g','color1':'g','color2':'b', 'color3':'r','color4':'b','color5':'b' })
print(q1)
amax = np.argmax(q1.values)
print("Maximum probability:", q1.values.flatten()[amax])
print("State assignment:", sorted(q1.assignment([amax])[0]))

#We have the tuple index error but we have the result,the error does not have any influence on the results.

#raise IndexError ("tuple index out of range")

+----------------+------------------+
| position5      |   phi(position5) |
| position5(1:1) |           0.0000 |
+----------------+------------------+
| position5(1:2) |           0.0109 |
+----------------+------------------+
| position5(1:3) |           0.0039 |
+----------------+------------------+
| position5(1:4) |           0.0000 |
+----------------+------------------+
| position5(2:1) |           0.0000 |
+----------------+------------------+
| position5(2:2) |           0.0000 |
+----------------+------------------+
| position5(2:3) |           0.8905 |
+----------------+------------------+
| position5(2:4) |           0.0353 |
+----------------+------------------+
| position5(3:1) |           0.0002 |
+----------------+------------------+
| position5(3:2) |           0.0081 |
+----------------+------------------+
| position5(3:3) |           0.0092 |
+----------------+------------------+
| position5(3:4) |           0.0267 |
+----------------+------------------+
| position5(

IndexError: tuple index out of range

#### SOLUTION:
Original trajectory:
```
robot_pos_trajectories[9][:15]
['1:3', '1:3', '2:3', '2:4', '2:4', '2:3', '2:3', '2:4', '2:4', '3:4', '3:3', '2:3', '2:4', '3:4', '4:4']
['g', 'g', 'b', 'r', 'r', 'b', 'b', 'r', 'r', 'y', 'r', 'b', 'r', 'y', 'b']
```

## Task 2: Correcting typos without a dictionary

The second domain deals with the problem of correcting typos in text without using a dictionary.  Here, you will be given text containing many typographical errors and the goal is to correct as many typos as possible.

In this problem, state refers to the correct letter that should have been typed, and output refers to the actual letter that was typed.  Given a sequence of outputs (i.e., actually typed letters), the problem is to reconstruct the hidden state sequence (i.e., the intended sequence of letters).  Thus, data for this problem looks like this:

<code>
i i
n n 
t t
r r
o o
d x
u u
c c
t t
i i
o i
n n
_ _
t t
h h
e e
_ _
</code>


where the left column is the correct text and the right column contains text with errors.

Data for this problem was generated as follows: we started with a text document, in this case, the Unabomber's Manifesto, which was chosen not for political reasons, but as a convenient, on-line, single-author text of about the right length.  For simplicity, all numbers and punctuation were converted to white space and all letters converted to lower case.  The remaining text is a sequence only over the lower case letters and the space character, represented in the data files by an underscore character.  Next, typos were artificially added to the data as follows: with $90\%$ probability, the correct letter is transcribed, but with $10\%$ probability, a randomly chosen neighbor (on an ordinary physical keyboard) of the letter is transcribed instead.  Space characters are always transcribed correctly.  In a harder variant of the problem, the rate of errors is increased to $20\%$.  The first (roughly) $20,000$ characters of the document have been set aside for testing.  The remaining $161,000$ characters are used for training.

As an example, the original document begins:

<code>introduction the industrial revolution and its consequences have been a disaster for the human race they have greatly increased the life expectancy of those of us who live in advanced countries but they have destabilized society
</code>    
    
With $20\%$ noise, it looks like this:

<code>introductipn the industfial revolhtjon and its consequences bafw newn a diszster rkr the yumab race thdy have grwatky increased the ljte esoectandy od thosr of is who libe in advanced coubfries but they have fewtabipuzee xociwty</code>

The error rate (fraction of  characters that are mistyped) is about $16.5\%$ (less than $20\%$ because space characters were not corrupted).

Data for this part of the assignment is in [typos10.data](files/typos10.data) and [typos20.data](files/typos20.data), representing data generated with a $10\%$ or $20\%$ error rate, respectively.

Next, we provide the code to get the HMM from the training data.

In [11]:
# 10% error rate
with open("typos10.data") as f:
    text_data = f.read().splitlines() # this accounts for '\n'

# Split into train and test data (separated by ".." in the file, end of single word is marked with "_ _")
train_typos10, test_typos10 = [list(split(dataset, '_ _')) for dataset in split(text_data, '..')]
print(f"TYPOS 10: {len(train_typos10) + len(test_typos10)} words read.")

characters = [chr(i+97) for i in range(26)]
t10_prior, t10_transition, t10_observation = get_hmm_factors(trajectories = train_typos10,
                                                             h_states=characters,
                                                             v_states=characters)

# 20% error rate
with open("typos20.data") as f:
    text_data = f.read().splitlines() # this accounts for '\n'
train_typos20, test_typos20 = [list(split(dataset, '_ _')) for dataset in split(text_data, '..')]
print(f"TYPOS 20: {len(train_typos20) + len(test_typos20)} words read.")
t20_prior, t20_transition, t20_observation = get_hmm_factors(trajectories = train_typos20,
                                                             h_states=characters,
                                                             v_states=characters)

# Define factor functions
def text10_prior(c):
    return DiscreteFactor(variables=[c],
                          cardinality=[len(characters)],
                          values=t10_prior,
                          state_names = {c: characters})

def text10_transition(prev_c, next_c):
    return DiscreteFactor(variables=[prev_c, next_c],
                          cardinality=[len(characters), len(characters)],
                          values=t10_transition,
                          state_names = {prev_c: characters,
                                         next_c: characters})

def text10_observation(c, c_typed):
    return DiscreteFactor(variables=[c, c_typed],
                          cardinality=[len(characters), len(characters)],
                          values=t10_observation,
                          state_names = {c: characters,
                                         c_typed: characters})

def text20_prior(c):
    return DiscreteFactor(variables=[c],
                          cardinality=[len(characters)],
                          values=t20_prior,
                          state_names = {c: characters})

def text20_transition(prev_c, next_c):
    return DiscreteFactor(variables=[prev_c, next_c],
                          cardinality=[len(characters), len(characters)],
                          values=t20_transition,
                          state_names = {prev_c: characters,
                                         next_c: characters})

def text20_observation(c, c_typed):
    return DiscreteFactor(variables=[c, c_typed],
                          cardinality=[len(characters), len(characters)],
                          values=t20_observation,
                          state_names = {c: characters,
                                         c_typed: characters})

# Get test text from dataset
original_text = " ".join(["".join([c.split(" ")[0] for c in word]) for word in test_typos10])
typos10_text = " ".join(["".join([c.split(" ")[1] for c in word]) for word in test_typos10])
typos20_text = " ".join(["".join([c.split(" ")[1] for c in word]) for word in test_typos20])
print("Original text: ", original_text[:95])
print("Typos 10% text:", typos10_text[:95])
print("Typos 20% text:", typos20_text[:95])

TYPOS 10: 30558 words read.
TYPOS 20: 30558 words read.
Original text:  introduction the industrial revolution and its consequences have been a disaster for the human 
Typos 10% text: introxuctiin the ibdudtrial revokufuon anf its consequences ysfe been a disaster for the hyman 
Typos 20% text: introductipn the industfial revolhtjon and its consequences bafw newn a diszster rkr the yumab 


### Questions

Given the following sequence of observations $v_{1:T}$:
```
['i', 'n', 't', 'r', 'o', 'x', 'u', 'c', 't', 'i', 'i', 'n']
```

Answer the following questions:
1. Filtering : what is $p(h_{11}|v_{1:11})$?
2. Prediction : what is $p(h_{12}|v_{1:11})$?
3. Probability of evidence : what is $p(v_{1:12})$?


In [12]:
word = ['i', 'n', 't', 'r', 'o', 'x', 'u', 'c', 't', 'i', 'i', 'n']


# the ['i', 'n', 't', 'r', 'o', 'x', 'u', 'c', 't', 'i', 'i', 'n']

word_hmm = HMM(n_vars=len(word),
               prior_fn=text10_prior,
               transition_fn=text10_transition,
               observation_fn=text10_observation,
               h_states=characters,
               v_states=characters,
               h_name="c",
               v_name="c_typed")

# ...

#
typo_bp = MyBeliefPropagation(word_hmm.to_factor_graph())
#First the evidences are set
typo_bp.set_evidence({"c_typed0":'i',"c_typed1":'n',"c_typed2":'t',
                                            "c_typed3":'r',"c_typed4":'o',"c_typed5":'x',
                                           "c_typed6":'u',"c_typed7":'c',"c_typed8":'t',
                                           "c_typed9":'i',"c_typed10":'i'})
#Belief Propagation is running.. c0:initial
typo_bp.run_bp("c0")
#filtering for the   𝑝(ℎ11|𝑣1:11): 
filtering = typo_bp.get_marginal("c10").normalize(inplace=False)
print("Filtering p(h11|v1:11):",filtering)
#Prediction for the 𝑝(ℎ12|𝑣1:11):
 
prediction = typo_bp.get_marginal("c11").normalize(inplace=False)
print("Prediction p(h12|v1:11):", prediction)

#Probability of evidence task:

typo_bp2 = MyBeliefPropagation(word_hmm.to_factor_graph())

typo_bp2.set_evidence({"c_typed0":'i',"c_typed1":'n',"c_typed2":'t',
                                            "c_typed3":'r',"c_typed4":'o',"c_typed5":'x',
                                           "c_typed6":'u',"c_typed7":'c',"c_typed8":'t',
                                           "c_typed9":'i',"c_typed10":'i'})

#Belief Propagation is running.. c0:initial
typo_bp2.run_bp("c10")
#Chosen randomly c10 to marginalization:
probability = typo_bp2.get_marginal("c10")
prob_evindence=np.sum(probability.values)
print("Probability evidence of p(v1:12)",prob_evindence)

Filtering p(h11|v1:11): +--------+------------+
| c10    |   phi(c10) |
| c10(a) |     0.0000 |
+--------+------------+
| c10(b) |     0.0000 |
+--------+------------+
| c10(c) |     0.0000 |
+--------+------------+
| c10(d) |     0.0000 |
+--------+------------+
| c10(e) |     0.0000 |
+--------+------------+
| c10(f) |     0.0000 |
+--------+------------+
| c10(g) |     0.0000 |
+--------+------------+
| c10(h) |     0.0000 |
+--------+------------+
| c10(i) |     0.1140 |
+--------+------------+
| c10(j) |     0.0000 |
+--------+------------+
| c10(k) |     0.0602 |
+--------+------------+
| c10(l) |     0.0000 |
+--------+------------+
| c10(m) |     0.0000 |
+--------+------------+
| c10(n) |     0.0000 |
+--------+------------+
| c10(o) |     0.8078 |
+--------+------------+
| c10(p) |     0.0000 |
+--------+------------+
| c10(q) |     0.0000 |
+--------+------------+
| c10(r) |     0.0000 |
+--------+------------+
| c10(s) |     0.0000 |
+--------+------------+
| c10(t) |     0



```
`# This is formatted as code`
```

## The Viterbi algorithm

We have seen how we can obtain the MAP query from a BP query, and how that approach did not scale. Let's implement now the Viterbi algorithm to perform MAP queries on a HMM. It consists of the following steps

1. Compute the factors $\mu_t$ that will recursively be used to obtain the maximum probability

$$\mu(h_{t-1}) = \max_{h_t} p(v_t|h_t)p(h_t|h_{t-1})\mu(h_t),\qquad 2\leq t\leq T,$$
$$\mu(h_T) = 1.$$

2. Obtain the desired maximum probability and backtrack to obtain the state trajectory $h^*_{1:T}$ using the previous computations

$$p_\text{max} = \text{max}_{h_1} p(v_1|h_1)p(h_1)\mu(h_1),$$
$$h_1^* = \text{argmax}_{h_1} p(v_1|h_1)p(h_1)\mu(h_1),$$
$$h_t^* = \text{argmax}_{h_t} p(v_t|h_t)p(h_t|h_{t-1}^*)\mu(h_t).$$

In [13]:
def factor_ones(v, state_names):
    """
    Returns a DiscreteFactor with all ones for variable v with its domain defined in states_names.
    """
    card = len(state_names[v])
    return DiscreteFactor(variables=[v],
                          cardinality=[card],
                          values=np.ones(card),
                          state_names=state_names)

class Viterbi:
    def max_and_argmax(self, f):
        """
        Given a factor f, returns its maximum value and the corresponding assignment. We assume that f
        is a DiscreteFactor of one variable.
        """
        assert len(f.variables)==1, "Factor connected to more than one variable: "+str(f.variables)
        v = f.variables[0]
        am = np.argmax(f.values)
        return f.values[am], list(f.state_names[v])[am]
    
    def compute_messages(self, hmm, evidence):
        """
        Given an HMM and the evidence (a list of states in order from left to right), compute the
        messages (from right to left).
        Returns a list of messages.
        """
        n_vars = len(evidence)
        messages = [None]*n_vars
        
        #https://jessicastringham.net/2018/05/13/viterbi-message-passing/
        # IMPLEMENT
        #gegeben: HMM, evidence(left to right)
        #gesucht: message (right to left)
        #REMEMBER:.reverse() function should be used at the end 
        
        #We start with last message to first one, thus reverse iteration is required.
        
        last_index=(len(hmm.h)-1) #last_index: t-1, first index=0
        
        
        for index in range(last_index,-1,-1):
            
            if index==last_index:
                #u(ht)=1
                
                messages[index]=1 
                
            else:
                #u(ht-1)= maxht[p(vt|ht)*p(ht|ht-1)*u(ht)]
                
                ht     = hmm.f[(hmm.h[index+1]),(hmm.v[index+1])] # p(vt|ht)
                ht_1   = hmm.f[(hmm.h[index]),(hmm.h[index+1])]   # p(ht|ht-1)
                ut     = messages[index+1]                    # u(ht)
                
                
                multiplication= (ht)*(ht_1)*(messages[index+1])   
                
                #Time to reduce discrete factors according to given evidences vt:
                
                reduced_multiplication= multiplication.reduce([(hmm.v[index+1],evidence[index+1])],inplace=False)
                
                
                #Then self.maximize and save in messages[index]:
                
                messages[index]= reduced_multiplication.maximize([hmm.h[index+1]],inplace=False)
                
               
            
            
           
        
        assert all(m is not None for m in messages)
        messages.reverse()
        return messages
    
    def backtrack(self, hmm, messages, evidence):
        """
        Given an HMM, the messages (computed from right to left), and the evidence (a list of states
        in order from left to right), it computes the MAP states as well as their value in the joint
        distribution.
        Returns a list of states and the joint probability of these states.
        """
        #IMPLEMENT: h0, compute MAP value
        
        #given: HMM,the messages (computed from right to left),evidence (a list of states in order from left to right)
        #return:a list of states and the joint probability of these states.
        
        #Make the message reverse
        messages.reverse()
        
        #value, h0_opt =
        
        #map_h=[]
        
        #Initialize first likely state #u(ht-1)= maxht[p(v0|h0)*p(h0)*u(h0)]
        
        initial=hmm.f[hmm.h[0],hmm.v[0]]*hmm.f[hmm.h[0]]* messages[0] 
        
        #Reducing discrete factor according to given first evidences:
        
        reduced_initial=initial.reduce([(hmm.v[0],evidence[0])],inplace=0)
        
        #We have had alread argmax function in the Class viterbi
        
        h0_value,h0_opt=self.max_and_argmax(reduced_initial)
        
        
        
        value=h0_value      
        map_h = [h0_opt]
        
        #0 index values are already implemented in above,Now we implement in general:
        
        for t in range(1, len(hmm.h)):
            
             #u(ht-1)= maxht[p(vt|ht)*p(ht|ht-1)*u(ht)
            
        
            multiplication=hmm.f[hmm.h[t],hmm.v[t]]*hmm.f[hmm.h[t-1],hmm.h[t]]* messages[t]
            reduced_multiplication=multiplication.reduce([(hmm.v[t],evidence[t]),(hmm.h[t-1],map_h[t-1])],inplace=False)
            h_value,h_opt=self.max_and_argmax(reduced_multiplication)
            
            #Here is the map_h list which is define before the iteration, the optimum trajectories appended inside:
            map_h.append(h_opt)
            #
            value=h_value
            map_h.append(h_opt)
        return map_h, value
    
    def map_query(self, hmm, evidence):
        """
        Given an hmm and the evidence (a list of states in order from left to right), returns the
        MAP states as well as the MAP probability.
        """
        assert type(evidence) in (list, tuple), "The evidence should be a list of observed states"
        assert len(evidence) == len(hmm.v), "To get the MAP of the states we need the whole sequence of observed states"
        messages = self.compute_messages(hmm, evidence)
        return self.backtrack(hmm, messages, evidence)

### MAP queries in the Basic setting
First we try it in our simple setting, and check that the result is the same as from the BP query.

In [14]:
viterbi = Viterbi()
map_states, map_prob = viterbi.map_query(hmm_weather_3, evidence=[True, True, True])
print("MAP states:", map_states)
print("MAP prob:", map_prob)

"""Maximum probability: 0.16289702507051773
State assignment: [('w0', 'rainy'), ('w1', 'cloudy'), ('w2', 'rainy')]"""

MAP states: ['rainy', 'cloudy', 'cloudy', 'rainy', 'rainy']
MAP prob: 0.38


"Maximum probability: 0.16289702507051773\nState assignment: [('w0', 'rainy'), ('w1', 'cloudy'), ('w2', 'rainy')]"

How does this method compare, in terms of complexity, to our previous, naive, approach?

Viterbi method:
MAP states: ['rainy', 'cloudy', 'cloudy', 'rainy', 'rainy']
MAP prob: 0.38



Belief Propagation:
Maximum probability: 0.16289702507051773
State assignment: [('w0', 'rainy'), ('w1', 'cloudy'), ('w2', 'rainy')]

#Viterbi algorithm has greater probability then Belief Propagation and has more states then previouse algorithm. So we can suggest that Viterbi algorithm is more sensitive and complex



### MAP queries in the robot navigation setting

To try it in the robot navigation setting, let's first define the DiscreteFactor functions for the HMM model, according to the values extracted from the training data:

In [21]:
observed_colors = robot_color_trajectories[4]
actual_trajectory = robot_pos_trajectories[4]

robot_HMM = HMM(n_vars=len(observed_colors),
                prior_fn=robot_prior,
                transition_fn=robot_transition,
                observation_fn=robot_observation,
                h_states=positions,
                v_states=colors,
                h_name="position",
                v_name="color")

map_states, map_value = viterbi.map_query(robot_HMM, evidence=observed_colors)


print("Traj:", observed_colors)
print("Infered traj.: ", map_states)
print("Solution:      ", actual_trajectory)
print("MAP value:", map_value)

def error(t1, t2):
    error = 0
    for c1, c2 in zip(t1, t2):
        if c1 != c2:
            error += 1
    return error/len(t1)

print("Map states:",map_states)
print("Error:", error(map_states, actual_trajectory))

Traj: ['g', 'g', 'r', 'r', 'g', 'r', 'g', 'g', 'r', 'r', 'g', 'r', 'r', 'r', 'r', 'r', 'g', 'r', 'b', 'g', 'g', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'b', 'y', 'r', 'g', 'r', 'r', 'g', 'g', 'r', 'r', 'r', 'r', 'r', 'g', 'g', 'g', 'b', 'b', 'r', 'b', 'g', 'g', 'g', 'g', 'g', 'y', 'b', 'b', 'y', 'g', 'y', 'y', 'y', 'b', 'y', 'g', 'y', 'g', 'g', 'r', 'b', 'g', 'r', 'r', 'r', 'r', 'r', 'r', 'r', 'g', 'g', 'b', 'r', 'r', 'r', 'b', 'r', 'y', 'y', 'g', 'g', 'b', 'r', 'r', 'b', 'g', 'b', 'g', 'r', 'y', 'r', 'y', 'b', 'g', 'r', 'b', 'r', 'y', 'b', 'y', 'y', 'r', 'g', 'b', 'y', 'y', 'y', 'g', 'b', 'y', 'b', 'b', 'b', 'y', 'g', 'y', 'b', 'y', 'g', 'y', 'y', 'b', 'b', 'y', 'y', 'y', 'b', 'b', 'y', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'y', 'y', 'y', 'r', 'y', 'y', 'g', 'y', 'y', 'r', 'g', 'g', 'b', 'b', 'b', 'b', 'b', 'b', 'y', 'y', 'y', 'g', 'r', 'b', 'g', 'g', 'g', 'g', 'g', 'b', 'r', 'b', 'b', 'b', 'g', 'g', 'b', 'r', 'b', 'r', 'b', 'g', 'y'

### MAP queries in the typo correction setting


Correct the following word containing typos with the HMM: ['i','n','t','r','o','x','u','c','t','i','i','n']

In [24]:
word = ['i', 'n', 't', 'r', 'o', 'x', 'u', 'c', 't', 'i', 'i', 'n']


map_states, map_value = viterbi.map_query(word_hmm,word)

print(map_states)
print(map_value)


#print("Error:", error(map_states, word))

['i', 'n', 'n', 't', 't', 't', 't', 'o', 'o', 's', 's', 'u', 'u', 'c', 'c', 't', 't', 'u', 'u', 'i', 'i', 'n', 'n']
0.0034127995358762954


Now, let's correct the full text. We have the original text and the one that contains typos in the variables original_text and typos10_text respectively. We provide a function above to test the error between two texts.

In [32]:
rec10_text = []
for word in typos10_text.split(" "):
    chs = [c for c in word]
    
    # ...
    # build hmm model
    word_hmm = HMM(n_vars=len(word),
               prior_fn=text10_prior,
               transition_fn=text10_transition,
               observation_fn=text10_observation,
               h_states=characters,
               v_states=characters,
               h_name="c",
               v_name="c_typed")
    
    # run viterbi to get MAP states
    #print(chs)
    #print(word)
    map_states, map_value = viterbi.map_query(word_hmm, evidence=chs)
    
    new_word = map_states
    rec10_text.append("".join(new_word))
rec10_text = " ".join(rec10_text)

print("Corrected:", error(rec10_text, original_text))
print("Not corrected:", error(typos10_text, original_text))

Corrected: 0.5594499865192775
Not corrected: 0.08248604465709729


Do the same for the case of 20% of error rate.

In [33]:
rec20_text = []
for word in typos20_text.split(" "):
    chs = [c for c in word]
    
    # ...
    # build hmm model
    word_hmm = HMM(n_vars=len(word),
               prior_fn=text20_prior,
               transition_fn=text20_transition,
               observation_fn=text20_observation,
               h_states=characters,
               v_states=characters,
               h_name="c",
               v_name="c_typed")
    
    # run viterbi to get MAP states
    #print(chs)
    #print(word)
    map_states, map_value = viterbi.map_query(word_hmm, evidence=chs)
    
    new_word = map_states
    rec20_text.append("".join(new_word))
rec20_text = " ".join(rec10_text)

print("Corrected:", error(rec20_text, original_text))
print("Not corrected:", error(typos20_text, original_text))

Corrected: 0.26446578091999823
Not corrected: 0.16143341307814993
