**Hidden Markov models for cracking codes**

In this exercise you have to make a partially built HMM work and use it to solve some simple substitution ciphers. Plaintext data is provided in 'plaintext' directory. Encrypted data is in 'encrypted'. Some of the texts were originally English some of them were Russian; the sequences are also of different lengths. 

This homework is worth **15 points** and is due by the next class (**24th Oct.**), please submit the results of the **TASK 5** (a list of files and names of the author/work) to Anytask in the following format: 'filename author' where 'filename' is a file from "encrypted/\*_encrypted.txt" and 'author' is a file from "plaintext/\*.txt" (not including 'english.txt', 'russian.txt' or 'all.txt') which best matches the decrypted text.




In [1]:
# Utilities for loading data from file and converting characters to integers and back.
import numpy as np
from tqdm import tqdm_notebook as tqdm
    
def get_char_to_int_mapping(path):
    # Load data from path and get mapping from characters to integers and back.
    characters = set()
    for line in open(path):
        characters.update(set([c for c in line.strip()]))
    char_to_int_mapping = dict([(char, i) for i, char in enumerate(sorted(list(characters)))])
    int_to_char_mapping = [char for char, i in char_to_int_mapping.items()]
    return char_to_int_mapping, int_to_char_mapping

def load_sequences(path, char_to_int_mapping):
    # Load data from path and map to integers using mapping.
    return [[char_to_int_mapping[c] for c in line.strip()] for line in open(path)]

def estimate_markov_model_from_sequences(sequences, num_states):
    # Estimate a Markov model based on the sequences (integers) provided.

    # pi[i] = Pr(s_0 = i)
    pi_counts = np.zeros(num_states)

    # A[i, j] = Pr(s_t = j | s_{t-1} = i)
    A_counts = np.zeros((num_states, num_states))
    
    for n, sequence in enumerate(sequences):
        prev_char = None
        for n_char, char in enumerate(sequence):
            if n_char == 0:
                pi_counts[char] += 1
            
            if prev_char is not None:
                A_counts[prev_char, char] += 1
            
            prev_char = char
            
    pi = pi_counts / np.sum(pi_counts)
    A = A_counts / np.sum(A_counts, 1, keepdims=True)
    
    return pi, A

**TASK 1**: Make the following block run by completing the method 'estimate_markov_model_from_sequences' above.

In [2]:
# Some data to use.
plaintext = 'plaintext/english.txt'
# plaintext = 'plaintext/shakespeare.txt'
# plaintext = 'plaintext/russian.txt'

ciphertext = 'encrypted/1_encrypted.txt' # short sequences in english
# ciphertext = 'encrypted/99_encrypted.txt' # longer sequences in russian

# load a character to integer mapping and reverse                                                                                                         
char_to_int_mapping, int_to_char_mapping = get_char_to_int_mapping(plaintext)

# load sequences as ints                                                                                                                                  
plaintext_sequences = load_sequences(plaintext, char_to_int_mapping)
encrypted_sequences = load_sequences(ciphertext, char_to_int_mapping)

# estimate a markov model over characters                                                                                                                 
pi, A = estimate_markov_model_from_sequences(plaintext_sequences, len(char_to_int_mapping))

In [3]:
prev_char, next_char = np.unravel_index(A.argmax(), A.shape)
int_to_char_mapping[prev_char], int_to_char_mapping[next_char]

('q', 'u')

Below is a mostly implemented HMM.

$\alpha_{t+1}(i) = \sum_{j} \alpha_{t}(j)A_{j}(i)B_{i}(x_{t+1})$

$\beta_{t}(i) = \sum_{j}\beta_{t+1}A_{i}(j)B_{j}(x_{t+1})$

In [7]:
class HMM():

    def __init__(self, observations_to_char_mapping={}, states_to_char_mapping={}, seed=None,
                 print_test_output=True, early_stopping_thresh=None):
        # Determine number of states and observation space. 
        self.num_states = len(states_to_char_mapping)
        self.num_outputs = len(observations_to_char_mapping)
        self.states_to_char_mapping = states_to_char_mapping
        self.observations_to_char_mapping = observations_to_char_mapping
        self.print_test_output = print_test_output
        self.early_stopping_thresh = early_stopping_thresh
       
        if seed is not None:
            np.random.seed(seed)
        # Random initialization
        self.pi = np.random.rand(self.num_states)
        self.pi /= np.sum(self.pi)
        
#         self.pi = np.ones((self.num_states)) / self.num_states
        
        self.A = np.random.rand(self.num_states, self.num_states)
        self.A /= np.sum(self.A, 1, keepdims=True)
        
#         self.A = np.ones((self.num_states, self.num_states)) / self.num_states
#       
        
        self.B = np.ones((self.num_states, self.num_outputs)) / self.num_outputs
        self.B = self.B + np.random.rand(self.num_states, self.num_outputs) / 100
        self.B = np.clip(self.B, 0, None)
#         self.B = np.random.rand(self.num_states, self.num_outputs)
        self.B /= np.sum(self.B, 1, keepdims=True)
        
        
    def estimate_with_em(self, sequences, parameters={}, epsilon=0.001, max_iters=100):
        # Estimates all parameters not provided in 'parameters' based on 'sequences'.
        self.fixed_pi = 'pi' in parameters
        if self.fixed_pi:
            print('pi is fixed')
            self.pi = parameters['pi']
        
        self.fixed_A = 'A' in parameters
        if self.fixed_A:
            print('A is fixed')
            self.A = parameters['A']
            
        self.fixed_B = 'B' in parameters
        if self.fixed_B:
            print('B is fixed')
            self.B = parameters['B']
    
        previous_llh = None
        iter = 0
        while True and iter < max_iters:
            # Infer expected counts.
            pi_counts, A_counts, B_counts, log_likelihood = self.e_step(sequences)

            # Update parameters based on counts.
            self.m_step(pi_counts, A_counts, B_counts)

            # Output some sequences for debugging.
            if self.print_test_output:
                self.output(sequences[:10])

            # Log likelihood should be increasing
            print('iteration %d; log likelihood %.4f' % (iter, log_likelihood))
            if previous_llh:
                assert log_likelihood >= previous_llh
                if log_likelihood - previous_llh < epsilon:
                    break
                if self.early_stopping_thresh is not None:
                    if abs(previous_llh - log_likelihood) < self.early_stopping_thresh:
                        print(f'early stopping previous_llh {previous_llh}, log_likelihood {log_likelihood}')
                        break
                        
            previous_llh = log_likelihood
        
            iter += 1


    def e_step(self, sequences):
        # Reset counters of statistics
        pi_counts = np.zeros_like(self.pi)
        A_counts = np.zeros_like(self.A) 
        B_counts = np.zeros_like(self.B) 
        total_log_likelihood = 0.0

        for sequence in sequences:
            # Run Forward-Backward dynamic program
            alpha, beta, gamma, xi, log_likelihood = self.forward_backward(sequence)
  
            # Accumulate statistics.а с
            pi_counts += gamma[:, 0]
            A_counts += xi
            for t, x in enumerate(sequence):
                B_counts[:, x] += gamma[:, t]
            
            total_log_likelihood += log_likelihood

        return pi_counts, A_counts, B_counts, total_log_likelihood

    def m_step(self, pi_counts, A_counts, B_counts):
        if not self.fixed_pi:
            self.pi = pi_counts / np.sum(pi_counts)
        if not self.fixed_A:
            self.A = A_counts / np.sum(A_counts, 1, keepdims=True)
        if not self.fixed_B:
            self.B = B_counts / np.sum(B_counts, 1, keepdims=True)
        
    def max_posterior_decode(self, sequence):
        _, _, gamma, _, log_likelihood = self.forward_backward(sequence)
        return np.argmax(gamma, 0)
        
    def forward_backward(self, sequence):
        # alpha[i][t] = p(x_1, ..., x_t, z_t = i)
        alpha = self.forward(sequence)
        
        # beta[i][t] = p(x_t+1, ..., x_T|z_t = i)
        beta = self.backward(sequence)

        # gamma[i][t] = p(z_t = i|x_1, ..., x_T)
        gamma = (alpha * beta) / np.sum(alpha * beta, 0)

        # xi[i][j] = p(z_t = i, z_{t+1} = j|x_1, ..., x_T)
        xi = np.zeros_like(self.A)
        for t in range(1, len(sequence)-1):
            this_xi = alpha[:, t].reshape(-1, 1) * self.A * beta[:, t+1] * self.B[:, sequence[t+1]]
            
#             this_xi = np.zeros_like(self.A)
#             for i in range(self.num_states):
#                 for j in range(self.num_states):
#                     this_xi[i, j] += alpha[i, t] * self.A[i, j] * beta[j, t+1] * self.B[j, sequence[t+1]]
#             assert(np.allclose(vector_this_xi, this_xi))
            xi += this_xi / np.sum(this_xi)
                
        return alpha, beta, gamma, xi, np.log(np.sum(alpha[:, len(sequence)-1]))
    
    def forward(self, sequence):
        # alpha[i][t] = p(x_1, ..., x_t, z_t = i)
        alpha = np.zeros((len(self.pi), len(sequence)))
        
        for t, xt in enumerate(sequence):
            if t == 0:
                prev_alpha = self.pi * self.B[:, xt]
                alpha_initialized = True
            else:
                prev_alpha = self.A.T.dot(prev_alpha).flatten() * self.B[:, xt]
            
#             prev_alpha = prev_alpha / np.sum(prev_alpha)
#             print(prev_alpha)
            alpha[:, t] = prev_alpha.copy()
        
        assert(alpha_initialized)
        
        return alpha
    
    def backward(self, sequence):
        # beta[i][t] = p(x_t+1, ..., x_T|z_t = i)
        num_states = len(self.pi)
        beta = np.zeros((num_states, len(sequence)))
    
        for ind, xt in enumerate(sequence[::-1]):
            t = len(sequence) - ind - 1
            if ind == 0:
                prev_beta = np.ones(num_states)
                beta_initialized = True
            else:
                prev_beta = (prev_beta * self.B[:, xt_plus1]).dot(self.A.T)
#                 for i in range(num_states):
#                     for j in range(num_states):
#                         beta[i, t] += beta[j, t+1] * self.A[j, i] * self.B[j, xt_plus1]
#             prev_beta = prev_beta / np.sum(prev_beta)
            beta[:, t] = prev_beta.copy()
            xt_plus1 = xt
    
        assert(beta_initialized)
        
        return beta

    def output(self, sequences):
        # Output some decoded states. 
        for i, sequence in enumerate(sequences):
            
            observations = [self.observations_to_char_mapping[x] for x in sequence]                
            map_states = [self.states_to_char_mapping[x] for x in self.max_posterior_decode(sequence)]
            print('(states):       %s\n(observations): %s' % (''.join(map_states), ''.join(observations)))


In [8]:
class HMM_cycles(HMM):
    def forward(self, sequence):
        # alpha[i][t] = p(x_1, ..., x_t, z_t = i)
        alpha = np.zeros((len(self.pi), len(sequence)))
        
        num_states = len(self.pi)
        for t, xt in enumerate(sequence):
            if t == 0:
                alpha[:, 0] = self.pi * self.B[:, xt]
                alpha_initialized = True
            else:
                for i in range(num_states):
                    for j in range(num_states):
                        alpha[i, t] += alpha[j, t - 1] * self.A[j, i] * self.B[i, xt]
    
        assert(alpha_initialized)
        
        return alpha
    
    def backward(self, sequence):
        # beta[i][t] = p(x_t+1, ..., x_T|z_t = i)
        num_states = len(self.pi)
        beta = np.zeros((num_states, len(sequence)))
        
        for ind, xt in enumerate(sequence[::-1]):
            t = len(sequence) - ind - 1
            if ind == 0:
                beta[:, t] = 1.0
                beta_initialized = True
            else:
                for i in range(num_states):
                    for j in range(num_states):
                        beta[i, t] += beta[j, t + 1] * self.A[i, j] * self.B[j, xt_plus1]
            xt_plus1 = xt

        assert(beta_initialized)
        
        return beta

Compare numpy and cycles

In [9]:
# # Since it's a substitution cipher we assume hidden states and observations have same alphabet.
# state_to_char_mapping = int_to_char_mapping
# observation_to_char_mapping = int_to_char_mapping

# # Initialize a HMM with the correct state/output spaces.
# hmm = HMM(observation_to_char_mapping, state_to_char_mapping)
# hmm_cycles = HMM_cycles(observation_to_char_mapping, state_to_char_mapping)

# hmm_cycles.pi = hmm.pi.copy()
# hmm_cycles.A = hmm.A.copy()
# hmm_cycles.B = hmm.B.copy()

# for sequence in tqdm(encrypted_sequences[:100]):
#     assert(np.allclose(hmm.forward(sequence), hmm_cycles.forward(sequence)))
#     assert(np.allclose(hmm.backward(sequence), hmm_cycles.backward(sequence)))

**TASK 2**: Implement the assertions in 'forward' and 'backward' methods on the HMM class so that the following block passes.

In [15]:
# Since it's a substitution cipher we assume hidden states and observations have same alphabet.
state_to_char_mapping = int_to_char_mapping
observation_to_char_mapping = int_to_char_mapping

# Initialize a HMM with the correct state/output spaces.
hmm = HMM(observation_to_char_mapping, state_to_char_mapping, seed=None, print_test_output=False, early_stopping_thresh=1)

# Estimate the parameters and decode the encrypted sequences.
hmm.estimate_with_em(encrypted_sequences[:100],
                     parameters={'A':A, 'pi':pi},
                    )

pi is fixed
A is fixed
iteration 0; log likelihood -12069.1866
iteration 1; log likelihood -10426.0091
iteration 2; log likelihood -10414.7779
iteration 3; log likelihood -10401.6116
iteration 4; log likelihood -10383.4688
iteration 5; log likelihood -10355.2655
iteration 6; log likelihood -10308.6621
iteration 7; log likelihood -10231.4049
iteration 8; log likelihood -10108.4666
iteration 9; log likelihood -9927.1098
iteration 10; log likelihood -9696.0348
iteration 11; log likelihood -9465.2005
iteration 12; log likelihood -9280.0454
iteration 13; log likelihood -9140.8159
iteration 14; log likelihood -9028.4377
iteration 15; log likelihood -8930.9627
iteration 16; log likelihood -8847.6208
iteration 17; log likelihood -8779.9658
iteration 18; log likelihood -8727.4448
iteration 19; log likelihood -8688.0165
iteration 20; log likelihood -8658.8629
iteration 21; log likelihood -8636.3325
iteration 22; log likelihood -8618.3976
iteration 23; log likelihood -8605.4603
iteration 24; log 

In [16]:
hmm.output(encrypted_sequences[:10])

(states):       than such a roman
(observations): noeixjtcoxexhwfei
(states):       cassous brutus bait not me
(observations): cejjgtjxkhtntjxkegnxiwnxfq
(states):       ill not endure it you forget yourself
(observations): gddxiwnxqi thqxgnxbwtxpwhvqnxbwthjqdp
(states):       to hedge me in i am a soldier i
(observations): nwxoq vqxfqxgixgxefxexjwd gqhxg
(states):       older in prackice abler than yourself
(observations): wd qhxgixyhecngcqxekdqhxnoeixbwthjqdp
(states):       to mave conditions
(observations): nwxfeaqxcwi gngwij
(states):       brutus go to you are not cassous
(observations): khtntjxvwxnwxbwtxehqxiwnxcejjgtj
(states):       cassous i am
(observations): cejjgtjxgxef
(states):       brutus i say you are not
(observations): khtntjxgxjebxbwtxehqxiwn
(states):       cassous urge me no more i thall forget myself
(observations): cejjgtjxthvqxfqxiwxfwhqxgxjoeddxpwhvqnxfbjqdp


**TASK 3**: Some of the encrypted sequences are quite long. Try decoding some from 'encrypted/99_encrypted.txt' (note these are in Russian).

In [18]:
# Some data to use.
plaintext = 'plaintext/russian.txt'

ciphertext = 'encrypted/99_encrypted.txt' # longer sequences in russian

# load a character to integer mapping and reverse                                                                                                         
char_to_int_mapping, int_to_char_mapping = get_char_to_int_mapping(plaintext)

# load sequences as ints                                                                                                                                  
plaintext_sequences = load_sequences(plaintext, char_to_int_mapping)
encrypted_sequences = load_sequences(ciphertext, char_to_int_mapping)

# estimate a markov model over characters                                                                                                                 
pi, A = estimate_markov_model_from_sequences(plaintext_sequences, len(char_to_int_mapping))

In [19]:
# Since it's a substitution cipher we assume hidden states and observations have same alphabet.
state_to_char_mapping = int_to_char_mapping
observation_to_char_mapping = int_to_char_mapping

# Initialize a HMM with the correct state/output spaces.
hmm = HMM(observation_to_char_mapping, state_to_char_mapping, seed=17, print_test_output=False, early_stopping_thresh=0.1)

# Estimate the parameters and decode the encrypted sequences.
hmm.estimate_with_em(encrypted_sequences[:100],
                     parameters={'A':A, 'pi':pi},
                    )

pi is fixed
A is fixed




iteration 0; log likelihood -inf
iteration 1; log likelihood nan


AssertionError: 

**TASK 4**: Make your implementation of forward and backward more efficient by removing all but the outermost for-loop.

**TASK 5**: Try to classify the author of each text. 

---

In [None]:
# sequence = [1, 2, 3, 27]
# num_states = 27
# num_output_states = 28

# alpha = np.random.rand(num_states, len(sequence))
# prev_alpha = np.random.rand(num_states, 1)
# A = np.random.rand(num_states, num_states)
# B = np.random.rand(num_states, num_output_states)

def multiply_non_numpy_way(alpha, A, B, sequence):
    num_states = len(alpha)
    
    res = np.zeros((num_states, len(sequence)))
    for t, xt in enumerate(sequence):
        for i in range(num_states):
            for j in range(num_states):
                res[i, t] += alpha[j, t] * A[i, j] * B[i, xt]
    return res

def multiply_numpy_way(alpha, A, B, sequence):
    num_states = len(alpha)
    
    res = np.zeros((num_states, len(sequence)))
    for t, xt in enumerate(sequence):
        res[:, t] = A.dot(alpha[:, t]) * B[:, xt]
    return res

try:
    np.allclose(multiply_numpy_way(alpha, A, B, sequence), multiply_non_numpy_way(alpha, A, B, sequence))
except NameError:
    print('skip check')
    
def multiply_non_numpy_way_prev(prev_alpha, A, B, sequence):
    num_states = len(prev_alpha)
    
    res = np.zeros((num_states, len(sequence)))
    for t, xt in enumerate(sequence):
        for i in range(num_states):
            for j in range(num_states):
                res[i, t] += prev_alpha[j, 0] * A[i, j] * B[i, xt]
        prev_alpha = res[:, t].reshape(-1, 1)
        
    return res

def multiply_numpy_way_prev(prev_alpha, A, B, sequence):
    num_states = len(alpha)
    
    res = np.zeros((num_states, len(sequence)))
    for t, xt in enumerate(sequence):
        res[:, t] = A.dot(prev_alpha).flatten() * B[:, xt]
        prev_alpha = res[:, t].copy()
        
    return res

try:
    np.allclose(multiply_non_numpy_way_prev(prev_alpha, A, B, sequence), multiply_numpy_way_prev(prev_alpha, A, B, sequence))
except NameError:
    print('skip check')
    

def B_non_numpy_way(beta, A, B, sequence):
    num_states = len(beta)
    res = np.zeros((num_states, len(sequence)))
    
    for t, xt in enumerate(sequence):
        for i in range(num_states):
            for j in range(num_states):
                res[i, t] += beta[j, t] * A[j, i] * B[j, xt]
    return res

def B_numpy_way(beta, A, B, sequence):
    num_states = len(beta)
    res = np.zeros((num_states, len(sequence)))
    
    for t, xt in enumerate(sequence):
#         res[i, t] += beta[j,t] * A[j,i] * B[j, xt]
        res[:, t] = (beta[:,t] * B[:, xt]).dot(A)
    return res


try:
    assert(np.allclose(B_numpy_way(beta, A, B, sequence), B_non_numpy_way(beta, A, B, sequence)))
except NameError:
    print('skip check')