# Day 20 notebook

The objectives of this notebook are to practice 

* estimating the parameters of an HMM in the fully observed scenario
* constructing and training an HMM for an application with real data

In [2]:
# Modules used in this activity
import math      # for log
import functools # for reduce
import random    # for simulations

## A `HiddenMarkovModel` class

In this activity, we will continue to expand on the `HiddenMarkovModel` class that we developed in the previous activities.  Compared to class in the last activity, the constructor (`__init__`) of the class has been modified slightly in two ways:
1. The parameter arguments to the constructor are now optional, defaulting to None.  When the parameters are not
   given, the HMM will need be trained later using the `estimate_parameters` method.
2. A helper method `_compute_log_parameters` is called to pre-compute the log parameters.  This method should be 
   called anytime the parameters are updated.

In [3]:
class HiddenMarkovModel:
    def __init__(self, states, chars, 
                 transition_prob_matrix=None, initial_probs=None, emission_prob_matrix=None):
        """Initializes a HiddenMarkovModel
        
        Models represented by this class do not explicitly represent a begin state and do
        not allow for an end state.
                
        Args:
            states: a string giving the characters representing the hidden states
                of the model (1 character per state)
            chars: a string giving the set of characters possibly emitted by the
                states of the model
            transition_prob_matrix: a list of lists of probabilities representing a
                transition probability matrix. transition_prob_matrix[s][t] should equal 
                P(pi_i = t | pi_{i-1} = s). Row s is thus the conditional probability 
                distribution P(pi_i | pi_{i-1} = s). The indices in this matrix correspond 
                to the indices of the states in the states argument.  If None, then this
                model will need to be trained via estimate_parameters.
            initial_probs: a list of probabilities representing the initial state 
                probabilities. Entry s of this list is P(pi_1 = s), i.e., the probability that
                the first hidden state in the chain is s.  The indices of this list correspond to the
                indices of the states in the states argument.  If None, then this
                model will need to be trained via estimate_parameters.
            emission_prob_matrix: a list of lists of probabilities representing an emission
                probability matrix.  emission_prob_matrix[s][c] should equal 
                P(X_i = c | pi_i = s), i.e., the probability of state s emitting character c. 
                Row s is thus the conditional probability distribution P(X_i | pi_i = s).
                The row indices of this matrix correspond to the indices of the states in
                the states argument.  The column indices of the matrix correspond to the 
                indices of the characters in the chars argument.  If None, then this
                model will need to be trained via estimate_parameters.
        """
        self.states = states
        self.chars = chars
        self.transition_prob_matrix = transition_prob_matrix
        self.initial_probs = initial_probs
        self.emission_prob_matrix = emission_prob_matrix
        
        # precompute log parameters if the the raw parameters were specified
        if None not in (transition_prob_matrix, initial_probs, emission_prob_matrix):
            self._compute_log_parameters()
    
    def _compute_log_parameters(self):
        """Computes and stores log-transformations of the model parameters.
        
        This method should be run whenever the parameters of the model are updated.
        """
        self.log_transition_prob_matrix = log_transform_matrix(self.transition_prob_matrix)
        self.log_initial_probs = log_transform_vector(self.initial_probs)
        self.log_emission_prob_matrix = log_transform_matrix(self.emission_prob_matrix)
    
    def encode_states(self, state_sequence):
        """Encodes a string of state characters as a list of indices of the states."""
        return [self.states.index(char) for char in state_sequence]

    def decode_states(self, indices):
        """Decodes a sequence of state indices into a string of the state characters."""
        return "".join(self.states[index] for index in indices)

    def encode_sequence(self, sequence):
        """Encodes a string of observed characters as a list of indices of the characters."""
        return [self.chars.index(char) for char in sequence]

    def decode_sequence(self, indices):
        """Decodes a sequence of observed character indices into a string of characters."""
        return "".join(self.chars[index] for index in indices)

    def estimate_parameters(self, training_data, pseudocount=0):
        """Estimates (and sets) the parameters of the model given observed sequences and state paths.
        
        Computes maximum likelihood parameters for the the completely observed scenario.  

        Args:
            training_data: A list of tuples of the form (state_string, char_string) where
                state_string is a string of state characters and char_string is a string
                of observed characters.
            pseudocount: a pseudocount to add to each total observed count when computing the 
                parameter values.  The default is zero, which corresponds to maximimum 
                likelihood estimates without smoothing.  A value of one for this parameter
                corresponds to Laplace smoothing.
        """
        ###
        ### YOUR CODE HERE
        transition_count_matrix = matrix(len(self.states), len(self.states), pseudocount)
        initial_counts = [pseudocount] * len(self.states)
        emission_count_matrix = matrix(len(self.states), len(self.chars), pseudocount)
        
        for state_path, sequence in training_data:
            encoded_sequence = self.encode_sequence(sequence)
            encoded_state_path = self.encode_states(state_path)
            # count transitions
            if state_path: initial_counts[encoded_state_path[0]] += 1
            for k, l in zip(encoded_state_path, encoded_state_path[1:]):
                transition_count_matrix[k][l] += 1
            # count emissions
            for k, c in zip(encoded_state_path, encoded_sequence):
                emission_count_matrix[k][c] += 1
        
        # update transition parameters
        self.initial_probs = normalize_vector(initial_counts)
        self.transition_prob_matrix = normalize_matrix_rows(transition_count_matrix)
        self.emission_prob_matrix = normalize_matrix_rows(emission_count_matrix)
        ###
        
        
        self._compute_log_parameters()
    
    def simulate(self, length):
        """Simulates a sequence of hidden states and emitted characters of
        the given length from this HMM.
        
        Args:
            length: the length of the sequence to simulate
        Returns:
            A tuple of the form (hidden_state_string, char_string) where hidden_state_string is a
            string of state characters and char_string is a string of observed characters.
        """
        state_indices = [None] * length
        char_indices = [None] * length
        for i in range(length):
            state_probs = self.transition_prob_matrix[state_indices[i - 1]] if i > 0 else self.initial_probs
            state_indices[i] = sample_categorical(state_probs)
            char_indices[i] = sample_categorical(self.emission_prob_matrix[state_indices[i]])
            
        return (self.decode_states(state_indices), self.decode_sequence(char_indices))
        
    def log_joint_probability(self, hidden_state_string, char_string):
        """Calculates the (natural) log joint probability of a path of hidden states
        and an observed sequence given this HMM.
        
        Args:
            hidden_state_string: a string representing the sequence of hidden states (pi)
            char_string: a string representing the sequence of observed characters (X)
        Returns:
            log(P(hidden_states, observed_chars))
        """
        state_indices = self.encode_states(hidden_state_string)
        char_indices = self.encode_sequence(char_string)

        log_p = 0.0
        last_state_index = None
        for state_index, char_index in zip(state_indices, char_indices):
            if last_state_index is None:
                log_p += self.log_initial_probs[state_index]
            else:
                log_p += self.log_transition_prob_matrix[last_state_index][state_index]
            log_p += self.log_emission_prob_matrix[state_index][char_index]
            last_state_index = state_index
        return log_p

    def viterbi_matrix(self, char_string):
        """Computes the (log-transformed) Viterbi dynamic programming matrix V for
        the given observed sequence.

        Args:
            char_string: a string representing the sequence of observed characters (X)
        Returns:
            A matrix (list of lists) representing the Viterbi dynamic programming matrix,
            with rows corresponding to states and columns corresponding to positions in the
            sequence.
        """
        char_indices = self.encode_sequence(char_string)
        
        # Initialize the viterbi dynamic programming matrix
        # the entry V[k][i] corresponds to the subproblem V_k(i+1)
        # where i is a 0-based index (e.g., V[k][0] corresponds to the subproblem
        # of the most probable path of the prefix of length = 1). We will not explicitly
        # represent the begin or end states.  As a result, we will not explicitly store the
        # initialization values described in the textbook and lecture.
        V = matrix(len(self.states), len(char_string))
        if not char_string: return V
        
        # initialization (first position in sequence)
        for ell in range(len(self.states)):    # loop over hidden state indices
            V[ell][0] = (self.log_initial_probs[ell] + 
                         self.log_emission_prob_matrix[ell][char_indices[0]])

        # main fill stage
        for i in range(1, len(char_string)):    # loop over positions
            for ell in range(len(self.states)): # loop over hidden state indices
                V[ell][i] = (self.log_emission_prob_matrix[ell][char_indices[i]] + 
                             max(V[k][i - 1] + self.log_transition_prob_matrix[k][ell]
                                 for k in range(len(self.states))))

        return V
    
    def viterbi_traceback(self, V):
        """Computes a most probable path given a (log) Viterbi dynamic programming matrix.
        
        Uses a traceback procedure that does not require traceback pointers.  In the case of
        ties, this traceback prefers the state with the largest index.
        
        Args:
            V: A matrix (list of lists) representing the Viterbi dynamic programming matrix
               containing log-transformed values.
        Returns:
            A string representing a most probable sequence of hidden states
        """
        L = len(V[0])               # deduce the length of the sequence from # columns in V
        if L == 0: return ""        # empty string base case
        state_indices = [None] * L  # initialize hidden state path
        
        # determine the state at the last position in a most probable path
        max_prob, max_state = max((V[k][L - 1], k) for k in range(len(self.states)))
        state_indices[L - 1] = max_state
        
        # traceback from this last state by redoing the recurrence calculation at each step
        # the emission probabilities are not included in the calculations because they are
        # irrelevant for determining the maximizing state
        for i in range(L - 1, 0, -1):
            max_prob, max_state = max((V[k][i - 1] + self.log_transition_prob_matrix[k][max_state], k)
                                      for k in range(len(self.states)))
            state_indices[i - 1] = max_state
            
        # return string representation of hidden state path
        return self.decode_states(state_indices)        

    def most_probable_path(self, char_string):
        """Computes a most probable path of hidden states for the observed sequence.

        Args:
            char_string: a string representing the sequence of observed characters (X)
        Returns:
            A string representing a most probable sequence of hidden states.
        """
        V = self.viterbi_matrix(char_string)
        return self.viterbi_traceback(V)    



def sample_categorical(distribution):
    """Randomly sample from a categorical distribution (a discrete distribution over K categories).
    
    Args:
        distribution: a list of probabilities representing a discrete distribution over K categories.
    Returns:
        The index of the category sampled.
    """

    r = random.random()
    for i, prob in enumerate(distribution):
        if r < prob:
            return i
        else:
            r -= prob
    # in case we encounter floating point issues return the last index
    return len(distribution) - 1    

def log_transform_vector(v):
    """Returns a new vector (a list) with log-transformed values"""
    return [math.log(x) if x != 0 else float("-inf") for x in v]

def log_transform_matrix(m):
    """Returns a new matrix (a list of lists) with log-transformed values"""
    return list(map(log_transform_vector, m))

def round_matrix(m, digits=2):
    """Returns a new matrix (a list of lists) with rounded values"""
    return [round_vector(v, digits) for v in m]
    
def round_vector(v, digits=2):
    """Returns a new vector (a list) with rounded values"""
    return [round(x, digits) for x in v]

def matrix(num_rows, num_cols, initial_value=None):
    """Constructs a matrix (a list of lists)"""
    return [[initial_value] * num_cols for i in range(num_rows)]

def normalize_vector(v):
    """Returns a new vector with entries scaled such that the sum of the entries is one."""
    s = sum(v)
    return [x / s for x in v]

def normalize_matrix_rows(m):
    """Returns new matrix with entries scaled such that each row sums to one."""
    return list(map(normalize_vector, m))

def print_matrix(m, precision=3, width=8):
    """Prints a matrix with values formatted to the given precision and spaced to the given width."""
    for row in m:
        print(''.join("{:{}.{}g}".format(x, width, precision) for x in row))

# Using the class above, we construct an HMM for the occasionally dishonest casino example
# described in the lecture and textbook
casino_states = "FL"     # F = fair die, L = loaded die
casino_chars = "123456"  # the six sides of the die
casino_initial_probs = [0.5, 0.5]
casino_transition_prob_matrix = [
    [0.95, 0.05],
    [0.10, 0.90]
]

casino_emission_prob_matrix = [
    [ 1/6,  1/6,  1/6,  1/6,  1/6, 1/6],
    [1/10, 1/10, 1/10, 1/10, 1/10, 1/2]
]
casino_hmm = HiddenMarkovModel(casino_states, 
                               casino_chars, 
                               casino_transition_prob_matrix, 
                               casino_initial_probs,
                               casino_emission_prob_matrix)

# States and characters of another HMM that emits a DNA sequence
dna_states = "EIN"
dna_chars = "ACGT"

## PROBLEM 1: Estimating parameters of an HMM in the fully observed case (3 POINTS)

Implement the `estimate_parameters` method of the `HiddenMarkovModel` class, which computes (optionally smoothed) maximum likelihood estimates for the model given a set of training data in the full observed scenario (i.e., when both the sequences and state paths are given).  Note that this method should also *set* the parameters of the model to these maximum likelihood estimates. You may find the following utility functions, defined above, of use in your implemenation:
* `matrix`
* `normalize_vector`
* `normalize_matrix_rows`

In [4]:
tiny_dataset = [('FFLLLLLFFL', 
                 '2456166656')]

small_dataset = [('FFFFFFFFFF', 
                  '4416531624'),
                 ('FFFFLLLLLL', 
                  '1245466666'),
                 ('LFFFFFFFFF', 
                  '6553321132'),
                 ('FFFLFFFFFF', 
                  '4544163311'),
                 ('LFFFFFFFFF', 
                  '6541416123')]

long_dataset = [
    ('FFFFLFFLLLLLFFFFFFLLFFFFFFFFFFFLFFFFLLLFFFFFFFFFFFFFFFFFFLLLLLLLFFFFFFFLLLFLLFFFFFLLFFFFFLLFFLLLLFFF',
     '3423612666222146536634655556613623346661122323413226235445665663456631266656664533662121662322666666'),
    ('FFFFLFFFFLLLLLLLLLLLFLLLFFLFFFFFLFFFLLLLLLLLLLFLLFFFLLLLLFFFFFFFFFFFFLFLLLFFFFFFFLLLLFLLLLFFFFFFLFFF',
     '4424361422656666662616461565531162256646151266424444656664141651441416446655453526616166635313246162'),
    ('FFFFFFFFFFFFFFFFLLLFFFFFFFFFFFFFFLFFLLLLLLLLLLLLLFFFFFFFFFLLLLFFLLLFFFLLLLFFLLLLFFFFFFFFFLLFFFLLLLLL',
     '1151666324121112666245665164434166456662666616166632324144662615665216366664466366526251266143446666'),
    ('LFFFFFFFLLFFFFFFFFFFFLFFFFLLLLLFLFFLFFLFFFFFLLLLLFFFFLLFFFLFFFFFFFFLLFFLLLLLFLLLLLLLFFFFLLLLFFFFFFFL',
     '6511631166663446212416411166666465363614122562566314161641634423623632666166256154662335363613226414'),
    ('LLLLFFFFFFFFFLFFFLLFFLFFFFFLLLFFFFFLFFFFFLLLFLLLFFFLLLLFFLFLLLLLLLFFFFLLLLLLLLLFFFFFFLLFFFFLLFFFFLLL',
     '6626136241445642566146151343663552163264466113332556666216165666654333265666546611515654236662321633'),
    ('FFFFFFFFFFFFFFFLLLLLFFFFFFFFLFFFLLLLLLLLLLLLFFLLLLLFFLLLLLLLFFFFLLLLLLFFFFFFFFFFFFLLLLLLLFFFFFLFFFFF',
     '1463624613423236666113353212455365566666666624666666666666661313663366466454525516466664644146362254'),
    ('LLLLLLLLLLLLLLFLLLLLLLLLLLLLFFFLLLLLFFFFFFFLLLFLFFFFFLLLLLLFFFFFLFFFLLLLLLLLLLLLFFFFFFLLFFFFFFFFFLLF',
     '6666664662646661666466666664156662665312515665364213146646663415514666661616666665316615465415236561'),
    ('FFLLLLFFFFLLFFFFLLFFFFFFFFFFFFLLFFFFFFFLLFFFFLFFFLFFFLLFFFLLLFFLLLLLFFFFFFFFFFFFFFFFFFFFFFFFFLLLFFLL',
     '5361664234262515665463155244266646514656632436652652326114636146166221536123613646512655325151665466'),
    ('LLFFFFLLLFLFFFFLFFFLLLLLLLLLFFFFFFFLLLLFFFFFFFFFFFFLLLLLLLLLFLLLFFLLLLFFFFFFLFFFLLLFFFFFFFFFFFFFFFFL',
     '4413665663512626261666566656622453166661245132442236646661663666236656246364636466622131225533644616'),
    ('LFLLLFFFLFFLLLLFFLLLLLLFFLLLFFFLLFLLFFLFFFFFFFFFFFFFFFFFFFFFFFFFFLLLLLLFFFFFFFFFLFFFLLLLLFFFFFFFFFLF',
     '2664664565163633266656652632566261663464243214633262142164543661166366151256125266446662644662652666')]

dna_dataset = [
    ('EENNEINIIE', 
     'GCATGGCCAT'),
    ('NEEINNIENI', 
     'GATCGAATGA')]

In [5]:
# tests for estimation of initial_probs
tiny_estimated_casino_hmm = HiddenMarkovModel(casino_states, casino_chars)
tiny_estimated_casino_hmm.estimate_parameters(tiny_dataset, pseudocount=1)
small_estimated_casino_hmm = HiddenMarkovModel(casino_states, casino_chars)
small_estimated_casino_hmm.estimate_parameters(small_dataset, pseudocount=1)
long_estimated_casino_hmm = HiddenMarkovModel(casino_states, casino_chars)
long_estimated_casino_hmm.estimate_parameters(long_dataset)
dna_hmm = HiddenMarkovModel(dna_states, dna_chars)
dna_hmm.estimate_parameters(dna_dataset)

assert round_vector(tiny_estimated_casino_hmm.initial_probs, 2) == [0.67, 0.33]
assert round_vector(small_estimated_casino_hmm.initial_probs, 2) == [0.57, 0.43]
assert round_vector(long_estimated_casino_hmm.initial_probs, 2) == [0.5, 0.5]
assert round_vector(dna_hmm.initial_probs, 2) == [0.5, 0.0, 0.5]
print("SUCCESS: estimation of initial_probs passed all tests")

SUCCESS: estimation of initial_probs passed all tests


In [6]:
# tests for estimation of transition_prob_matrix
tiny_estimated_casino_hmm = HiddenMarkovModel(casino_states, casino_chars)
tiny_estimated_casino_hmm.estimate_parameters(tiny_dataset, pseudocount=1)
small_estimated_casino_hmm = HiddenMarkovModel(casino_states, casino_chars)
small_estimated_casino_hmm.estimate_parameters(small_dataset, pseudocount=1)
long_estimated_casino_hmm = HiddenMarkovModel(casino_states, casino_chars)
long_estimated_casino_hmm.estimate_parameters(long_dataset)
dna_hmm = HiddenMarkovModel(dna_states, dna_chars)
dna_hmm.estimate_parameters(dna_dataset)

assert (round_matrix(tiny_estimated_casino_hmm.transition_prob_matrix, 2) == 
        [[0.5, 0.5], 
         [0.29, 0.71]])
assert (round_matrix(small_estimated_casino_hmm.transition_prob_matrix, 2) == 
        [[0.92, 0.08],
         [0.4,  0.6]])
assert (round_matrix(long_estimated_casino_hmm.transition_prob_matrix, 2) == 
        [[0.81, 0.19], 
         [0.28, 0.72]])
assert (round_matrix(dna_hmm.transition_prob_matrix, 2) == 
        [[0.33, 0.33, 0.33], 
         [0.4 , 0.2 , 0.4 ], 
         [0.29, 0.43, 0.29]])
print("SUCCESS: estimation of transition_prob_matrix passed all tests")

SUCCESS: estimation of transition_prob_matrix passed all tests


In [7]:
# tests for estimation of emission_prob_matrix
tiny_estimated_casino_hmm = HiddenMarkovModel(casino_states, casino_chars)
tiny_estimated_casino_hmm.estimate_parameters(tiny_dataset, pseudocount=1)
small_estimated_casino_hmm = HiddenMarkovModel(casino_states, casino_chars)
small_estimated_casino_hmm.estimate_parameters(small_dataset, pseudocount=1)
long_estimated_casino_hmm = HiddenMarkovModel(casino_states, casino_chars)
long_estimated_casino_hmm.estimate_parameters(long_dataset)
dna_hmm = HiddenMarkovModel(dna_states, dna_chars)
dna_hmm.estimate_parameters(dna_dataset)

assert (round_matrix(tiny_estimated_casino_hmm.emission_prob_matrix, 2) == 
        [[0.1, 0.2, 0.1, 0.2, 0.2, 0.2], 
         [0.17, 0.08, 0.08, 0.08, 0.17, 0.42]])
assert (round_matrix(small_estimated_casino_hmm.emission_prob_matrix, 2) == 
        [[0.26, 0.13, 0.17, 0.19, 0.15, 0.11], 
         [0.07, 0.07, 0.07, 0.2, 0.07, 0.53]])
assert (round_matrix(long_estimated_casino_hmm.emission_prob_matrix, 2) ==
        [[0.19, 0.17, 0.15, 0.18, 0.14, 0.16], 
         [0.05, 0.05, 0.05, 0.06, 0.06, 0.73]])
assert (round_matrix(dna_hmm.emission_prob_matrix, 2) ==
        [[0.14, 0.14, 0.29, 0.43], 
         [0.5,  0.33, 0.17, 0.0 ], 
         [0.29, 0.14, 0.43, 0.14]])
print("SUCCESS: estimation of emission_prob_matrix passed all tests")

SUCCESS: estimation of emission_prob_matrix passed all tests


## Application activity: Predicting text messages from numeric keyboard entries

Back in the "old" days, people used to type text messages on their "dumb" phones using the numeric keypad.  The numbers 2-9 mapped to the letters of the alphabet and 0 mapped to a space character (see the layout in the image below).  Unfortunately, there are more letters of the alphabet than numbers on the keypad, and so each number generally maps to more than one character.  To specify a particular character, the user would typically have to press the number corresponding to that character multiple times until the correct character was selected.  This was very tedious and it is amazing that we survived this stone age of cell phone technology.

![Telephone keypad](phone_keypad.png)

An interesting task presented by this system of typing text messages on a phone is to predict the string of characters that the user is attempting to type given a sequence of numbers, where the user only presses the number corresponding to each character once.  For example, given the sequence of numbers 2, 2, 8, we might predict that user was typing the word "cat" (although there are other possibilities, such as "bat" and "act").

This task maps well to a hidden Markov model, where the characters that the user is trying to type are the "hidden" states and the observed sequence is the sequence of numbers pressed.  In this part of the activity, we will train such an HMM using a set of real text messages, and see how well it does in predicting text messages given numeric keypad sequences.

### The text message data set

I have downloaded and cleaned a dataset of SMS text messages from the [SMS Spam Collection](http://www.dt.fee.unicamp.br/~tiago/smsspamcollection/).  For each text message, I have computed the numeric sequence corresponding to it as the "observed" sequence.  The text messages are partitioned into a large training set, with which we can estimate parameters of an HMM, and a test set, on which we can analyze the predictions of the model.

In [8]:
def read_text_message_set(filename):
    return [line.rstrip().split("\t") for line in open(filename)]

text_message_training_set = read_text_message_set("text_message_training_set.txt")
text_message_test_set = read_text_message_set("text_message_test_set.txt")

### Your task

1. Create an HMM for this task.  The HMM states should be all lowercase letters, numeric digits, and the space character.  The emission characters should be the numeric digits (0-9).
2. Train your HMM with the provided training set
3. Use your trained HMM and the Viterbi algorithm (or posterior decoding, if you like) to predict on the test set. 
4. How well does the HMM do in predicting the true messages? Does adding pseudocounts help?

*Super bonus activity: Sadly, a basic first-order HMM does not perform too well for this task.  One can improve performance by using a higher-order model, such as a second-order (bigram) HMM.  Construct and train a bigram HMM for this task and see how it performs.  You can use the same HMM class as before, but your states will not be single characters (they should be two-character strings, representing the current character and the previous character); instead of passing a string as the `states` argument to the HMM constructor, pass a list of strings, where each string represents one of the possible bigrams.*

In [None]:
###
### Construct/train/predict with/assess your HMM here
###
