<a href="https://colab.research.google.com/github/yexf308/AppliedStochasticProcess/blob/main/HMM_and_NLP%2C_part_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
from scipy.special import logsumexp
import math
from typing import List, Tuple
from numpy.typing import ArrayLike
import numpy.typing as npt

In [None]:
#@title  data loader class

class IdentityDict(dict):
    """This special dictionary can map a number to itself, and is used in the 
    mappings of hidden states in the simulated data.
    """
    def __missing__(self, key):
        return key

class DataLoader(object):
    """Data loader for HMM models to infer/learn.

    This class provides list of observed sequences w/o the corresponding hidden 
    states. All strings will be UTF-8 encoded. The data has two kinds of 
    sources: 1) simulation; or 2) real world task Part-of-Speech (POS) tagging
    whose further description can be found at TODO. Format of the two types of
    data are illustrated as follows.
    
    Case 1: simulated data
        * observation: a list of integers, e.g. [1, 2, 3, 4, 0]. 
        * hidden: a list of integers, e.g. [0, 1, 2, 3, 4]. 

    Case 2: TODO

    Attributes:
        method: 
            a string indicating the source of data, could be "simulate" or
            "real"
        include_hiddens: 
            a boolen variable indicating whether hidden states will be returned 
            along with the observed sequences. 
        data_path: 
            a string indicting the path to the real world data set.
        initial: 
            a numpy array to specify the distribution over initial states, whose
            shape should be (num_hiddens,)
        transition: 
            a numpy array to specify the transition probability between
            hidden states, whose shape should be (num_hiddens, num_hiddens)
        emission: 
            a numpy array to specify the emission probability from hidden
            states to observations, whose shape should be 
            (num_observations, num_hiddents)
        D:
            an integer to fix the length of simulated sequences.
        num_hiddens:
            an integer, the number of different hiddent states.
        num_observations:
            an integer, the number of possible observations.
    Methods:
        TODO
        TODO:
            return the marginal probs of the generated sequences
    """
    
    def __init__(self, method:str='simulate', 
                include_hiddens:bool=False, 
                data_path:str=None,
                initial:npt.ArrayLike=None,
                transition:npt.ArrayLike=None,
                emission:npt.ArrayLike=None,
                D:int=None
                ) -> None:
        """Initialise the necessary parameters of DataLoader.
        
        This method initialises all necessary attributes of the class in order 
        to simulate data or read POS tagging data set.
        
        Args:
            meanings of arguments are illustrated in the comments to class
            attributes.
            
            Note that there are two mutually exclusive scenarios:
             1. method is 'simulate':
                initial, transition, and emission all need to be specified in 
                order to simulate data
             2. method is 'pos':
                data_path needs to be specified in order to read data
            
        Returns:
            None
        """
        super().__init__()
        
        # check the value of method
        assert method in ['simulate', 'pos'], \
            "Unknown source of data: {0}; please input either \'simulate\' or \
                \'pos\'.".format(method)
        self.method = method
        self.include_hiddens = include_hiddens
        
        if method == 'simulate':
            assert (initial is not None) and (transition is not None) and \
                (emission is not None), \
                "Please check the specified \'initial\', \'transition\', and" \
                    + " \'emission\'."

            assert (initial.shape[0] == transition.shape[1]) and \
                (transition.shape[0] == transition.shape[1]) and \
                    (transition.shape[0] == emission.shape[1]), \
                "Please check the dimensions of  \'initial\', \'transition\',"\
                    + " and \'emission\'."
        
            self.initial = initial
            self.transition = transition
            self.emission = emission
            self.D = D
            
            self.num_hiddens = self.transition.shape[0]
            self.num_observations = self.emission.shape[0]
            
        else:
            assert data_path is not None, \
                "Please specify the path to the POS tagging dataset."
            
            self.data_path = data_path
            # TODO: complete the pos tagging data set loader.
            
        
        self._construct_idx2str()
    
    def _construct_idx2str(self) -> None:
        """Setup the mappings from indices to hiddens/observations.
        
        This method constructs the mappings from the integers to the capital
        alphabets that will appear in the observations.
        
        Args: 
            None
            
        Returns:
            None
        """
        
        if self.method == 'simulate':
            self.hidden_dict = IdentityDict()
            self.observation_dict = IdentityDict()
        else:
            # TODO
            # create the dictionaries used in real data set
            pass
        
    def _generate_data(self):
        """Generate a sequence of hidden states and the corresponding
        observations.
        
        This method will first sample a sequence of hiddent states following
        the Markov chain specified by self.initial and self.transmition. Then
        it will sample observations for each hidden state following 
        self.emission.
        
        Args:
            None
            
        Returns:
            A **list** consists of the following two elements:
            observations:
                a list of integers
            hiddens:
                a list of integers
        """
        
        hidden_states = []
        observations = []

        end = 0
        last_transition = self.initial
        step = 0
        
        while not end:
            # sample current hidden state
            cur_h = np.random.choice(self.num_hiddens, 1, p=last_transition)[0]
            # sample current observation
            cur_o = np.random.choice(self.num_observations, 1,
                                     p=self.emission[:, cur_h])[0]
            
            # add the sampled hidden state and observation to the lists
            hidden_states.append(cur_h)
            observations.append(cur_o)
            
            step += 1
            # sample if the next hiddent state is the end of sequence
            end = (cur_o == 0) if self.D is None else int(step >= self.D)
            last_transition = self.transition[cur_h, :]
            
        hidden_states = [self.hidden_dict[x] for x in hidden_states]
        observations = [self.observation_dict[x] for x in observations]
        
        return observations, hidden_states
    
    def _read_postag_data(self):
        pass
    
    def get_true_params(self) -> \
        Tuple[npt.ArrayLike, npt.ArrayLike, npt.ArrayLike]:
        """Return a tuple consists of the true parameters of data generation.
        
        This method returns a tuple consisits of the three parameters of the HMM
        that genreates the data list.
        
        Args:
            None
            
        Returns:
            self.initial: 
                an numpy.array which specifies the distribution over initial
                states.
            self.transition: 
                an numpy.array which specifies the transition probability between hidden states.
            self.emission: 
                an numpy.array which specifies the emission probability from hidden states to observations.
            self.end:
                an numpy.array which specifies the transition probability from 
                hidden states to end of sequence.
        """
        if self.method == 'pos':
            print("Oops, we don't know the true parameters of the real-world \
                dataset.")
            return None
        else:
            return self.initial, self.transition, self.emission, self.end
    
    def get_data_list(self, n_samples:int=None):
        """Return a list of sequences as sample for HMM's inference/learning.
        
        This method returns a list of (lists of integers) to be the inputs for 
        the inference/learning modules of HMM. There will be two kinds of lists
        according to different sources of data:
        1) 'simulate': 
            * observation: a list of integers, e.g. [0, 1, 2, 1, 3];
            * hidden: a list of integers, e.g. [0, 1, 2, 1, 3].
        2) 'pos': in this case, each string would be like TODO
        
        Args:
            n_samples:
                Optional, an integer to specify the number of samples in the 
                simulated data list. 
                self.method should be 'simulate' if this argument is given.
        
        Returns:
            data_list:
                a list consists of 
        """
        
        data_list = []
        
        if self.method == 'simulate':
            assert type(n_samples) == int, \
                "Please specify the number of sample you want to generate."
            for _ in range(n_samples):
                if self.include_hiddens:
                    data_list.append(self._generate_data())
                else:
                    data_list.append(self._generate_data()[0])
        else:
            pass # TODO
            
        return data_list
    

# Inference for Hidden Markov Models
This notebook is referred from Michael Gutmann's notes.

In this notebook, we will implement an unsupervised learning method for a hidden Markov model (HMM).

In this notebook, as before, we work with a homogeneous HMM with discrete observed and hidden states with the necessary (conditional) probability distributions represented by (conditional) probability tables.


## Introduction

When we are learning a new language, we have to learn, among many other things, the grammar of the language as well as its vocabulary. In the `HMM basics` notebook, we have introduced latent sentence templates that we can use to construct sentences, e.g. Subject -> Verb -> Object. The latent sentence templates were given by a hidden Markov chain. We can think that this hidden Markov chain, i.e. the latent variables together with the transition distribution, corresponds to a simple model of the grammar of the language. In turn, proper use of the vocabulary means knowing when one can use a given word. We can think that the emission distribution corresponds to a simple model of the proper use of the vocabulary.

In a supervised setting, e.g. when learning a language from a textbook, we may have annotations of the hidden states together with the used words. For example
 - sentences: `['I like dogs', 'I really love cats']`;
 - annotations: `[[Subject, Verb, Object], [Subject, Adverb, Verb, Object]]`.
 
When we have observations on the hidden variables, the learning problem is simpler: we can use standard maximum likelihood estimation for discrete random variables as with directed acyclic graphs. We have seen that this corresponds to frequency counting of words and annotations, and their co-occurences. Hence, given data on the state of the hidden variables, we could simply count the frequency of 
1. the states $k$ of the first hidden variable $h_1$ as $a_k$, 
2. transitions from hidden state $k'$ to $k$ as $A_{k',k}$, and
3. emitting word $m$ from hidden state $k$ as $B_{m,k}$.

Counting would thus allow us to learn the parameters of the HMM.

But, what if we do not have such annotations? The languages that we speak emerged naturally through complex social interactions and not from a concrete set of hidden grammar rules. So is it possible for computers to learn the grammar and use of vocabulary automatically without supervision? Yes, with some limitations, it can be done using HMMs and unsupervised learning.

As explained in the lecture, we can iteratively learn the parameters of an HMM model solely from observed sentences via the Baum-Welch algorithm, which consists of two steps: 
1. *Expectation*-step (E-step), where the expected log-likelihood is computed on filled-in data using the current parameters of the model and the inference algorithms implemented in the previous notebooks,
2. *Maximisation*-step (M-step), where updated parameters are computed such that they maximise the expectation from the E-step.

Before we implement the Baum-Welch algorithm, we also have to represent the parametrisation of the HMM in terms of variables in computer code which we do next.

## Parametrisation of discrete HMM

Following the slides in the lecture and the example in the `HMM basics` notebook, we model the homogeneous discrete HMM model with the following three parameters:
1. `_initial_`: the distribution over initial hidden states, which is implemented as a vector with shape `(num_hiddens,)`. It corresponds to the vector $\mathbf{a}\in\mathbb{R}^K$ introduced in the lecture where $a_k=p(h_1=k;\mathbf{a})$.
2. `_transition_`: the transition probability between the hidden states, which is implemented as a matrix with shape `(num_hiddens, num_hiddens)`. It corresponds to the matrix $\mathbf{A}\in\mathbb{R}^{K\times K}$ introduced in the lecture. In this notebook, we follow the convention $A_{k',k}=p(h_i=k | h_{i-1}=k'; \mathbf{A})$ for ease of implementation, which is the *transpose* of the matrix in the slides.
3. `_emission_`: the emission probability from a hidden state to an observation, which is implemented as a matrix with shape `(num_observations, num_states)`. It corresponds to the vector $\mathbf{B}\in\mathbb{R}^{M\times K}$ introduced in the lecture where $B_{m,k}=p(v_i=m|h_i=k; \mathbf{B})$.

In this tutorial, all of the above parameters will be stored as tensors, i.e. `np.array`, and the dimensions will be given in the corresponding sections.

In [None]:
#@title HMM class
"""
Implementation of HMMs

We take the following view:
* a  [probabilistic model]  =        a  [class]
* an [inference operation]  =        a  [public member function]
*    [inference operation]  includes    [partition / marginal / argmax / sampling]
* an [inference algorithm]  =        a  [private member function] 

For the HMM model, we have the following correspondance:
  inference operation          inference algorithm
* partition             <--->  forward
* marginal              <--->  forward-backward
* max                   <--->  viterbi
* argmax                <--->  viterbi-backtracking
* sampling              <--->  ancestral sampling
* conditional sampling  <--->  forward-filtering backward-sampling
"""



class HMM(object):
    def __init__(self, initial, transition, emission):
        """
        Args:
            initial: size=[num_state]
            transition: size=[num_state, num_state] from state -> to state
            emission: size=[num_observation, num_state]
        """
        self.initial = initial
        self.transition = transition
        self.emission = emission
        self.num_state = transition.shape[0]

        # assume the end state is the last state
        # self.end_state = self.num_state - 1
        return 

    def _forward(self, x):
        """Forward algorithm for computing the alpha and the partition,
        implemented in the log space

        Args:
          x: size=[max_len]

        Returns:
          alpha: size=[max_len, num_state]
          Z: float
        """
        T = len(x)
        N = self.num_state

        alpha = np.zeros((T, N)) 
        alpha[0] = self.emission[x[0]] + self.initial
        for t in range(1, T):
            emission_t = self.emission[x[t]]
            alpha[t] = logsumexp(alpha[t - 1].reshape(N, 1) +
                                 self.transition + 
                                 emission_t.reshape(1, N)
                                 , axis=0)

        Z = logsumexp(alpha[T - 1], axis=0)
        return alpha, Z

    def _backward(self, x):
        """Backward algorithm for computing the beta and the marginals,
        implemented in the log space,
        could be replaced by automatic differentiation

        Args:
          x: size=[max_len]

        Returns:
          beta: size=[max_len, num_state]
        """
        T = len(x)
        N = self.num_state

        beta = np.zeros((T, N))

        # t = [T - 2, T - 3 ,..., 0]
        for t in range(T - 2, -1, -1):
            emission_t_1 = self.emission[x[t + 1]]
            beta[t] = logsumexp(beta[t + 1].reshape(1, N) + 
                                self.transition + 
                                emission_t_1.reshape(1, N), 
                                axis=1)
        return beta

    def _viterbi(self, x):
        """Viterbi algorithm with back-tracking for computing the most probable
        latent state sequence.

        Args:
          x: size=[max_len]

        Returns:
          max_z: size=[max_len]
          max_p: float
        """
        T = len(x)
        N = self.num_state

        max_s = np.zeros((T, N))
        max_ptr = np.zeros((T, N)) # look up table 
        max_s[0] = self.initial + self.emission[x[0]]
        for t in range(1, T):
            emission_t = self.emission[x[t]]
            log_phi_t = self.transition + emission_t.reshape(1, N)
            max_s[t] = np.max(max_s[t-1].reshape(N, 1) + log_phi_t, axis=0)
            max_ptr[t] = np.argmax(max_s[t-1].reshape(N, 1) + log_phi_t, axis=0)
        
        # max_p = np.max(max_s[T - 1] + self.transition[:, self.end_state])
        max_p = np.max(max_s[T - 1])

        # backtracking
        max_z = np.zeros(T).astype(int)
        # max_z[T - 1] = np.argmax(max_s[T - 1] + self.transition[:, self.end_state])
        max_z[T - 1] = np.argmax(max_s[T - 1])
        for t in range(T - 2, -1, -1):
            # print(t + 1)
            # print(max_z[t + 1])
            max_z[t] = max_ptr[t + 1, max_z[t + 1]]
        return max_z, max_p

    def partition(self, x):
        """Log partition the of HMM

        Args:
          x: size=[max_len]

        Returns:
          log_z: float
        """
        _, log_z = self._forward(x)
        return log_z

    def marginal(self, x):
        """Marginal distribution of the latent sequences given the observation x

        Args:
          x: size=[max_len]

        Returns:
          node_marginal, size=[max_len, num_states] # p(h_t | x)
          edge_marginal, size=[max_len - 1, num_states, num_states] p(h_t, h_{t + 1} | x)
        """
        alpha, log_px = self._forward(x)
        beta = self._backward(x)
        node_marginal = alpha + beta - log_px

        T = alpha.shape[0]
        N = alpha.shape[1]
        emission = self.emission[x]  # size = [T, num_state]

        # log edge marginal probability at step t from state i to state j is a T * N * N tensor
        # log p(t, i, j) = alpha(t, i) + transition(i, j) + emission(t + 1, j) + beta(t, j) - Z
        if(T >= 2):
            edge_marginal = alpha[:-1].reshape(T - 1, N, 1) +\
                            self.transition.reshape(1, N, N) +\
                            emission[1:].reshape(T - 1, 1, N) +\
                            beta[1:].reshape(T - 1, 1, N) - log_px
        else: edge_marginal = None
        return alpha, beta, log_px, node_marginal, edge_marginal

    def argmax(self, x):
        """Most probable latent sequence given the observation x
        
        Args:
          x: size=[max_len]

        Returns:
          marginal: size=[batch, max_len, num_state]
        """
        max_z, max_log_prob = self._viterbi(x)
        return max_z, max_log_prob

    def log_prob(self, x):
        """Log probability of a given pair of observed x and latent z
        
        Args:
          x: size=[max_len]

        Returns:
          log_prob: size=[batch]
        """
        _, log_px = self._forward(x)
        return log_px

    def get_delta_param(self, initial, transition, emission):
        assert (initial.shape == self.initial.shape) and \
            (transition.shape == self.transition.shape) and \
            (emission.shape == self.emission.shape), """
            Please make sure the inputs have the same shape to the current\
                parameters.
            """
            
        params_current = np.vstack([self.initial, self.transition, self.emission]).flatten()
        params_new = np.vstack([initial, transition, emission]).flatten()

        return np.linalg.norm(params_new - params_current, ord=2)

## Overview of the implementation

We implement the unsupervised learning algorithm in an object-oriented manner. We here define the 
`HMMOptimiser` class and provide an overview of its attributes and methods.

The *attributes* of this class are:
 1. `model`: the HMM model whose parameters we would like to fit. The `HMM` model class is implemented includes the inference operations implemented in the previous notebooks.
 2. `num_hiddens`: an integer that specifies the number of possible hidden states. Since we do not know the *real* number of hidden states in the unsupervised setting, this is a tunable hyper-parameter in the [Baum-Welch algorithm](https://en.wikipedia.org/wiki/Baum%E2%80%93Welch_algorithm).
 3. `num_observations`: an integer that specifies the number of possible observed states. To handle the situations where the test data may contain observed states that do not appear in the training data, we can set this number to more than all the observed states in the training data.

The *methods* of the  `HMMOptimiser` class are:
1. `__init__`: initialises the optimiser with the provided arguments.
2. `baum_welch`: implements the Baum-Welch algorithm, which iteratively invokes the following two methods to update the parameters:
    1. `_e_step`: implements the expectation step by using the `marginal` method from the `HMM` class.
    2. `_m_step`: implements the maximisation step.
    3. `_initial_params_`: initialises the parameters.
    4. `_stop_criterion_`: checks if the iterations should be stopped.


In the following sections we explain how to implement the above methods in order to learn the parameters of an HMM model from data.



## Implementation
### E-step

The E-step of the Baum-Welch algorithm computes the ***expected* complete-data log-likelihood** given the observed data $\mathcal{D}$ and the current parameter estimate $\theta_{\text{old}}$.
Although we do not have the complete data, we can **probabilistically fill-in the latents** using the HMM model with current parameters $\theta_{\text{old}}$.

For the discrete-valued HMM in this notebook, we compute the EM objective derived in the lecture, i.e. 

$$
\begin{aligned}
J(\theta, \theta_{\text{old}}) =
& \sum_{j=1}^{N} \sum_{k=1}^{K} p(h_1=k|\mathcal{D}_j; \theta_{\text{old}})\log a_k\\
&+ \sum_{j=1}^{N} \sum_{i=2}^{d_j} \sum_{k=1}^{K} \sum_{k'=1}^{K} p(h_i=k, h_{i-1}=k'|\mathcal{D}_j; \theta_{\text{old}})\log A_{k',k}\\
&+ \sum_{j=1}^{N} \sum_{i=1}^{d_j} \sum_{k=1}^{K} \sum_{m=1}^{M} \mathbb{I}(v_i^{(j)}=m)p(h_i=k|\mathcal{D}_j, \theta_{old})\log B_{m,k}
\end{aligned}
$$
where $N$ is the number of sequences, $K$ is the number of possible hidden states, $d_j$ is the length of the $j$-th sequence, $M$ is the number of observed states (e.g. words), and $\theta_{\text{old}} = (\log a_k, \log A_{k',k}, \log B_{m,k})$ are the current parameters.

To compute the above objective in the E-step we must know the following distributions:
 - $p(h_1|\mathcal{D}_j; \theta_{\text{old}})$,
 - $p(h_i, h_{i-1}|\mathcal{D}_j; \theta_{\text{old}})$,
 - $p(h_i|\mathcal{D}_j, \theta_{old})$.

You should recognise these quantities from the previous notebooks: the first and last are the quantities calculated in the inference notebook, i.e. $p(h_i \mid v_{1:d})$, and the second quantity is the joint marginal of neighbouring states $p(h_i, h_{i-1} \mid v_{1:d})$ that can be computed as 

$$
p(h_i, h_{i-1} \mid v_{1:d}) = \frac{\alpha_{i-1}(h_{i-1}) \beta_{i}(h_{i}) p(h_{i} \mid h_{i-1}) p(v_i \mid h_i)}{p(v_{1:d})}
$$

This shows that the learning algorithm is based on the inference methods discussed before.
The code that computes the required quantities using the forward-backward method is implemented in the `HMM` model class.


Some useful information to understand the above code:
 1.  `hk_list` stores the values of $p(h_i = k| \mathcal{D}_j; \theta_{old}), \forall k\in[1,\dots,K], \forall i\in[1,\dots,d_j], \forall j\in[1,\dots, N]$; thus each element is an `np.array` with size [$d_j$, $K$].
 2.  `hkk_list` stores the values of $p(h_i=k, h_{i-1}=k' | D_j; \theta_{old}), \forall k,k'\in[1,\dots,K], \forall i\in[2,\dots,d_j], \ \forall j\in[1,\dots,N]$, thus each element is an `np.array` with size [$d_j$, $K$, $K$].

### M-step

The M-step of the Baum-Welch algorithm maximises the expected log-likelihood after filling-in the unobserved data. For the discrete-valued HMM parametrised using probability tables we can obtain the following closed-form solutions to the optimisation problem:
$$
\begin{aligned}
a_k & = \frac{1}{n}\sum_{j=1}^{n}p(h_1=k|\mathcal{D}_j;\theta_{\text{old}}); \\
A_{k', k} & = \frac{\sum_{j=1}^{n}\sum_{i=2}^{d_j}p(h_i=k,h_{i-1}=k'|\mathcal{D}_j;\theta_{\text{old}})}{\sum_k\sum_{j=1}^{n}\sum_{i=2}^{d_j}p(h_i=k,h_{i-1}=k'|\mathcal{D}_j;\theta_{\text{old}})} \\
B_{m,k} & = \frac{\sum_{j=1}^{n}\sum_{i=1}^{d_j}\mathbb{I}(v_i^{(j)}=m)p(h_i=k|\mathcal{D}_j;\theta_{\text{old}})}{\sum_{m}\sum_{j=1}^{n}\sum_{i=1}^{d_j}\mathbb{I}(v_i^{(j)}=m)p(h_i=k|\mathcal{D}_j;\theta_{\text{old}})}
\end{aligned}
$$

As you can see in the above equations, we enumerate through all the hidden/observed states in all sequences.
Thus, the above update bears similarity to the supervised learning case discussed above, i.e. it is similar to counting the frequency of 1) the first hidden state $h_1$, 2) transitioning from a hidden state $k'$ to state $k$, and 3) emitting symbol $m$ from hidden state $k$. However, crucially it uses the model with the current parameters to infer the probabilities over the unknown hidden states, rather than using known observations.

### Other parts
We have discussed and implemented the two main iterative steps in the Baum-Welch algorithm. We must now also address the following essential implementation questions:

- Q1. *initialisation of parameters*: how should we initialise parameters, i.e. $\mathbf{a},\mathbf{A},\mathbf{B}$?
- Q2. *stop criterion*: when should we stop the iteration of E/M-steps?


### Initialisation of parameters

Let us first consider the term $p(h_i=k|\mathcal{D}_j;\theta_{\text{old}})$, which can be computed using the alpha-beta recursion $p(h_i=k|\mathcal{D}_j;\theta_{\text{old}}) = \frac{1}{Z}\alpha_i(h_i = k)\beta_i(h_i = k)$.  
You should recall that 
$$
\alpha_i(h_i = k) = p(v_{1:i}, h_{i} = k) = \sum_{k'} \alpha_{i-1}(k')p(h_i = k \mid h_{i - 1} = k')p(v_i \mid h_i = k), \quad \text{with} \quad \alpha_1(h_1 = k) = p(h_1 = k)p(v_1 \mid h_1 = k)
$$ 
and 
$$\beta_i(h_i = k) = p(v_{i+1:T} \mid h_{i} = k) = \sum_{k'} \beta_{i+1}(k') p(h_{i+1} = k' \mid h_{i} = k) p(v_{i+1} \mid h_{i+1}=k'), \quad \text{with} \quad \beta_T(h_i = k) = 1.
$$

Now note that $\alpha_1(k)$ is a constant for all $k$ since both $p(h_1 = k) = a_k$ and $p(v_1 = m \mid h_1 = k) = B_{m, k}$ are constant for all $k$ and $m$. Similarly, by recursion $\alpha_i(k)$ for all $i$ are constant, since $p(h_i = k \mid h_{i - 1} = k') = A_{k', k}$ is constant for all $k$ and $k'$. The same argument applies to $\beta_i(k)$ for all $i$, which is constant for all $k$. Because $\alpha_i(k)$ and $\beta_i(k)$ are constant for all $k$, $p(h_i=k|\mathcal{D}_j;\theta_{\text{old}})$ is also a constant uniform distribution. 
A similar argument can be made to note that $p(h_i=k, h_{i-1}=k'|\mathcal{D}_j;\theta_{\text{old}})$ would also be constant for all $k$ and $k'$.


Given that all inferred distributions ($p(h_i=k|\mathcal{D}_j;\theta_{\text{old}})$ and $p(h_i=k, h_{i-1}=k'|\mathcal{D}_j;\theta_{\text{old}})$) would be uniform, looking at the update equations of $\mathbf{a}$ and $\mathbf{A}$ in the M-step you should note that the updated parameters $\mathbf{a}$ and $\mathbf{A}$ will remain uniform and no learning is happening. 
Moreover, because $p(h_i=k|\mathcal{D}_j;\theta_{\text{old}})$ is constant, these terms in the numerator and denominator of the update for $\mathbf{B}$ will cancel, and hence 
$$
B_{m,k} = \frac{\sum_{j=1}^{n}\sum_{i=1}^{d}\mathbb{I}(v_i^{(j)}=m)}{\sum_{m}\sum_{j=1}^{n}\sum_{i=1}^{d}\mathbb{I}(v_i^{(j)}=m)} = \frac{\text{# }m\text{ words in the dataset}}{\text{total # words in the dataset}}.
$$
In particular $\mathbf{B}_m$ vector is independent of the hidden states $k$, i.e. $B_{m, k} = B_{m, k'}$ for $\forall k, k'$. It hence corresponds to a Markov model of order 0.


### Stop criterion

Having implemented the parameter initialisation method and the E/M-steps above, we can now run the loop of the Baum-Welch algorithm.
However there is still one more remaining question: when should we stop iterating?

There are some heuristic criteria:
 1. *small change of parameters*: if a local maxima is reached, then the derivative of the parameters would be 0. Therefore, there should be no change of parameters in the M-step either, and hence we can stop the iterations.
 2. *small change of likelihood*: alternatively, we can stop the iteration if the change of (log-)likelihood of the model given the data becomes small. Note that the log-likelihood is available as a by-product from the computation of the marginal and joint of the latents that are needed in the E-step.
 3. *number of steps*: sometimes our initialisation might be particularly bad, then it may take a very long time for the learning to converge (in terms of the above criteria). To avoid excessive number of iterations we can set a maximum number of iterations and stop the loop immediately if the maximum number of iterations is exceeded.
 4. *mix*: in practice a mixture of the above criteria is most often used.
 
Below we ask you to fill in the code for the stopping criterion. The algorithm should stop when the maximum number of steps is exceeded, the change in parameters (`delta_param`) is less than $10^{-16}$, or the change in the log-likelihood (`delta_logpx`) is less than $10^{-8}$.

### Main loop of the Baum-Welch algorithm

We now have all the steps of the Baum-Welch algorithm implemented. We must now run them in the following order:
 1. initialise the parameters of HMM;
 2. compute the E-step with the current parameters;
 3. update parameters with the M-step;
 4. continue back to Step 2 if the stop criterion is not met.


### Putting it all together

We have implemented all the necessary methods of the Baum-Welch and we can finally run it on some data.
We have provided a `DataLoader` class  to make it easy to load data or generate sequences following a hidden Markov structure.

Below, we the data loader to synthesise a dataset of 300 random sequences by specifying the `initial`, `transition`, and `emission` matrices and then optimise the parameters of an `HMM` on them. We will then inspect if the learnt parameters are *close* to the true ones.


In [None]:
#@title HMM Optimiser class

class HMMOptimiser(object):
    """Optimiser for the parameters of HMM.

    This class implements the algorithms to optimise the parameters of HMM 
    model in discrete case, with either supervised or unsupervised data. 

    Attributes:
        model: 
            an HMM model 
        supervised: 
            a boolean variable that indicates the HMM model is trained in
            supervised or unsupervised fashion
        num_hiddens:
            an integer which is the number of possible hidden states
        num_observations:
            an integer which is the number of possible observed states
    
    Methods:
        __init__:
            initialise the class
        fit:
            fit the parameters of an HMM model on a given data set
        baum_welch:
            fit on unsupervised data
        _initial_params:
            initialise the parameters of HMM model in the Baum-Welch algorithm
        _e_step:
            estimation step of the Baum-Welch algorithm
        _m_step:
            maximisation step of the Baum-Welch algorithm
        _stop_criterion:
            criterion for stopping the iteration in the Baum-Welch algorithm
        counts:
            fit on supervised data
        get_trained_model:
            get an HMM class instance with parameters fit on a given data set
    """
    
    def __init__(self, 
                supervised:bool=False,
                num_hiddens:int=None,
                num_observations:int=None
                ) -> None:
        super().__init__()
        assert num_hiddens is not None and num_observations is not None
        self.model = None
        self.supervised = supervised
        self.num_hiddens = num_hiddens
        self.num_observations = num_observations
        
    def fit(self, data_loader:List) -> None:
        """Fit the parameters on the given data loader.
        
        This method will fit the three parameters of HMM model on the given 
        sequences, i.e. *initial* (distribution), *transition* probability 
        matrix (between hidden states), and *emission* probability matrix (from
        hidden states to observed states).
        
        There are two different types of learning:
        1) supervised, where true hidden states are also provided in the data
            loader;
        2) unsupervised, where only sequences of observed states are provided in
            the data loader.
            
        Note that this method doesn't return the optimised model, instead users
        need to get it through the method "get_trained_model".
            
        Args:
            data_loader:
                a list whose element should be 
                    1) a tuple containing two lists of integers, if learning is
                        supervised;
                    2) a list of integers, if learning is unsupervised
        
        Return:
            None
        """
        
        if self.supervised:
            assert type(data_loader[0]) == tuple and len(data_loader[0]) == 2, \
                """Samples in the data list should contain observations and 
                hiddens at the same time if we want to train the model in a
                supervised way."""
                
            self.counts(data_loader)
        else:
            self.baum_welch(data_loader)
    
    def _e_step(self, data_loader:complex) -> Tuple[float, List, List]:
        """E-step of the Baum-Welch algorithm
        
        This method implements the E-step which estimates the probability 
        distribution over the hidden states given a sequence of observations, 
        i.e. the following two quantities:
            1. $p(h_i, h_{i-1} | D_j; \theta_{old})$;
            2. $p(h_i | D_j; \theta_{old})$.
        Since both of the above values have been calculated in the `marginal` 
        method of the HMM model in `hmm.py`, we can directly get the results
        from it.
        
        Args:
            data_loader:
                a list of lists whose elements are integers.
        
        Return:
            loglikelihood:
                float whose value equals to the average loglikelihood of the 
                input data.
            hk_list:
                a list of np.arrays correspondding to the value of 
                $p(h_i | D_j; \theta_{old})$
            hkk_list:
                a list of np.arrays correspondding to the value of 
                $p(h_i, h_{i-1} | D_j; \theta_{old})$

        """
        
        hk_list = []
        hkk_list = []
        log_ps = []
        
        for o_seq in data_loader:
            
            _, _, log_p, hk, hkk = self.model.marginal(o_seq) 
            hk_list.append(hk)
            hkk_list.append(hkk)
            log_ps.append(log_p)
            
        return np.mean(log_ps), hk_list, hkk_list
    
    def _m_step(self, 
                data_loader:object, 
                hk_list:List, 
                hkk_list:List
                ) -> Tuple[ArrayLike, ArrayLike, ArrayLike]:
        """ M-step of the Baum-Welch algorithm
        
        This method implements the M-step which maximises the parameters of the
        HMM model (self.model) given observations and the following estimations:
            1. $p(h_i, h_{i-1} | D_j; \theta_{old})$;
            2. $p(h_i | D_j; \theta_{old})$.
        The equations used for updating the parameters are given as following:
            1. $a_k = \frac{1}{n}\sum_{j=1}^{n}p(h_1=k|\mathcal{D}_j;\theta_{\text{old}})$;
            2. $A_{k,k'} = \frac{\sum_{j=1}^{n}\sum_{i=2}^{d}p(h_i=k,h_{i-1}=k'|\mathcal{D}_j;\theta_{\text{old}})}{\sum_k\sum_{j=1}^{n}\sum_{i=2}^{d}p(h_i=k,h_{i-1}=k'|\mathcal{D}_j;\theta_{\text{old}})}$
            3. $B_{m,k} = \frac{\sum_{j=1}^{n}\sum_{i=1}^{d}\mathbb{I}(v_i^{(j)}=m)p(h_i=k|\mathcal{D}_j;\theta_{\text{old}})}{\sum_{m}\sum_{j=1}^{n}\sum_{i=1}^{d}\mathbb{I}(v_i^{(j)}=m)p(h_i=k|\mathcal{D}_j;\theta_{\text{old}})}$
        
        Args: 
            data_loader:
                a list of lists whose elements are integers.
            hk_list:
                a list of np.arrays correspondding to the value of 
                $p(h_i | D_j; \theta_{old})$
            hkk_list:
                a list of np.arrays correspondding to the value of 
                $p(h_i, h_{i-1} | D_j; \theta_{old})$
            
        Returns:
            _initial_:
                1D array with shape (self.num_hiddens,) which specifies the 
                distribution over the initial hidden states
            _transition_:
                2D array with shape (self.num_hiddens, self.num_hiddens) where
                cell (j,k) specifies the following probability:
                    $p(h_{t-1}=j, h_{t}=k | D_j; \theta_{old})$
            _emission_:
                2D array with shape (self.num_observations, self.num_hiddens) 
                where cell (j,k) specifies the following probability:
                    $p(v_i=j | h_i=k, D_j; \theta_{old})$
        """
        
        _initial_ = np.zeros(self.num_hiddens)
        _transition_ = np.zeros([self.num_hiddens, self.num_hiddens]) 
        _emission_ = np.zeros([self.num_observations, self.num_hiddens])
        
        for j, obs in enumerate(data_loader):
            # Retrieve the distributions inferred in the E-step for the current observation obs
            hk = hk_list[j]
            hkk = hkk_list[j]
            # Handle obs of length 1 for which hkk is None
            hkk = hkk if hkk is not None else float('-inf')*np.ones([1, self.num_hiddens, self.num_hiddens])

            
            _initial_ += np.exp(hk[0])
            _transition_ += np.exp(hkk).sum(axis=0)
            for m, ob in enumerate(obs):
                _emission_[ob] += np.exp(hk[m])
                #hint: not _emission_[obs] += np.exp(hk)

        # Normalise the distributions
        _initial_ /= len(data_loader)
        _transition_ /= _transition_.sum(axis=1, keepdims=True)
        _emission_ /= _emission_.sum(axis=0, keepdims=True)
        
        return _initial_, _transition_, _emission_
        
    @staticmethod
    def _stop_criterion(step:int=0, 
                        delta_param:float=1e-3,
                        delta_logpx:float=1e-1
                        ) -> bool:
        """ Criterion for stopping the iteration in Baum-Welch algorithm

        This method keeps tracking the number of steps and change of parameters/
        loglikelihood in order to check if the stop criterion has been 
        satisfied. If so, return True such that the Baum-Welch algorithm could
        stop. Otherwise, return False such that the Baum-Welch algorithm could
        keep going.
        
        Args:
            step:
                an integer indicating the index of the last iteration
            delta_param:
                a float indicating the change of parameters during the last
                iteration
            delta_logpx: 
                a float indicating the change of log-likelihood of the data
                during the last iteration
        
        Returns:
            bool:
                True for stopping the iteration, False for keeping it going.
        """
        max_steps = 100
        min_delta_param = 1e-16
        min_delta_logpx = 1e-8
        stop_condition = (step >= max_steps 
                        or delta_param < min_delta_param 
                        or delta_logpx < min_delta_logpx)
        return stop_condition
        
    def _initial_params(self) -> Tuple[ArrayLike, ArrayLike, ArrayLike]:
        """Initialisation of the parameters
        
        This method would return the parameters for initialising a discrete 
        HMM model. 
        """
        initial = np.random.uniform(size=self.num_hiddens)
        initial /= initial.sum(axis=0)
        
        transition = np.random.uniform(size=[self.num_hiddens, 
                                             self.num_hiddens])
        transition /= transition.sum(axis=1, keepdims=True)
        
        emission = np.random.uniform(size=[self.num_observations, 
                                           self.num_hiddens])
        emission /= emission.sum(axis=0, keepdims=True)
        
        # Try initialise initial/transition to constants and randomly initialise emission, see what would happen
        
        return initial, transition, emission
    
    def baum_welch(self, data_loader:List[List[int]], verbose:bool=True):
        """Unsupervised learning method of discrete HMM model, i.e. Baum-Welch algorithm.
        
        This method would fit the parameters of an HMM model in an unsupervised 
        fashion. The overall procedure has been illustrated in the lecture and
        the HMM Learning notebook.
        
        In the following implementation, the overall framework has been 
        provided. The each step has also been commented below.
        
        Args:
            data_loader:
                a list whose elements are lists of integers.
            verbose:
                a boolean variable, print loglikelihood of each step if true.
        Returns:
            loglikelihood_list:
                a list of float, stores all the loglikelihood during the fitting
                procedure.
        """
        
        # Step 1: initialise the parameters for HMM model
        initial, transition, emission = self._initial_params()
        self.model = HMM(np.log(initial), 
                         np.log(transition), 
                         np.log(emission)
                         )
        
        # Step 2: set up the following variables for repeating the e/m-steps.
        stop = False                    # flag for stopping the loop
        step = 0                        # track the number of steps
        delta_param = math.inf          # track the change of parameters
        delta_loglikelihood = math.inf  # track the change of log-likelihood
        last_loglikelihood = 0.
        loglikelihood_list = []
        
        # Step 3: repeat the e/m-steps
        while not stop:
            # step 3.1: e-step
            loglikelihood, hk_list, hkk_list = self._e_step(data_loader)
            # step 3.2: m-step
            _initial_, _transition_, _emission_ = \
                self._m_step(data_loader, hk_list, hkk_list)
            
            # step 3.3: track step and change of parameters/log-likelihoods
            step += 1
            delta_param = self.model.get_delta_param(
                                        np.log(_initial_),
                                        np.log(_transition_),
                                        np.log(_emission_)
                                    )
            delta_loglikelihood = abs(loglikelihood - last_loglikelihood)
            last_loglikelihood = loglikelihood
            
            # step 3.4: update the parameters of HMM model
            self.model.initial = np.log(_initial_)
            self.model.transition = np.log(_transition_)
            self.model.emission = np.log(_emission_)
            
            #step 3.5: check if we're going to end the loop now
            stop = self._stop_criterion(step, delta_param, delta_loglikelihood)
            
            # monitor the learning procedure
            loglikelihood_list.append(loglikelihood)
            if verbose:
                print('step:', step, '\tloglikelihood:', loglikelihood)

        self._trained_ = True
        return loglikelihood_list
    
    def counts(self, data_loader:List) -> None:
        """Supervised learning method of HMM model, i.e. counting.
        
        This method would fit the parameters of an HMM model in a supervised 
        fashion where the optimisation problem is reduced to counting. 
        
        Args:
            data_loader:
                a list whose element should be a tuple containing two lists of integers.

        Returns:
            None
        """
        _initial_ = np.zeros(self.num_hiddens)
        _transition_ = np.zeros([self.num_hiddens, self.num_hiddens]) 
        _emission_ = np.zeros([self.num_observations, self.num_hiddens])
        
        for pair in data_loader:
            observations = pair[0]
            hiddens = pair[1]
            
            _initial_[hiddens[0]] += 1
            _emission_[observations[0]][hiddens[0]] += 1
            
            for i in range(1, len(hiddens)):
                _transition_[hiddens[i]][hiddens[i-1]] += 1
                _emission_[observations[i]][hiddens[i]] += 1
                
        _initial_ / len(data_loader)
        _transition_ = _transition_ / _transition_.sum(axis=0, keepdims=True)
        _emission_ = _emission_ / _emission_.sum(axis=0, keepdims=True)
        
        self.model = HMM(np.log(_initial_), 
                         np.log(_transition_), 
                         np.log(_emission_)
                         )
    
    def get_trained_model(self) -> HMM:
        """Return the HMM model with fitted parameters
        
        This method will first check if the model has been trained. If so, it 
        will return the model as an object of class HMM from hmm.py.
        
        Args:
            None
            
        Returns:
            self.model:
                an instance of HMM class from hmm.py
        """
        
        assert self.model is not None and self._trained_, \
            "The model has not been trained yet!"
        return self.model

In [None]:
np.random.seed(1234)


In [None]:
# Step 1: specify the parameters for synthesising data
# there are 2 hidden states, and 3 possible observed states where observation
# `0` means the end of sequence (<EOS>).
initial = np.array([0.2, 0.8])
transition = np.array([[0.2, 0.8],
                       [0.6, 0.4]])
emission = np.array([[0.0, 0.1], # probability of emitting <EOS>
                     [0.3, 0.8],
                     [0.7, 0.1]])

# Step 2: synthesise sequences
dataloader = DataLoader(initial=initial,
                        transition=transition,
                        emission=emission)

data_list = dataloader.get_data_list(300)

# Step 3: fit an HMM by using HMMOptimiser class
optim = HMMOptimiser(num_hiddens=2, num_observations=3)
_ = optim.baum_welch(data_list)
hmm = optim.model

# Step 4: print the parameters fit on the synthetic data
# Note that parameters of our HMM class are in the log space
print('true initial:\n', initial)
print('fitted initial:\n', np.exp(hmm.initial))
print('true transition:\n', transition)
print('fitted transition:\n', np.exp(hmm.transition))
print('true emission:\n', emission)
print('fitted emission:\n', np.exp(hmm.emission))

step: 1 	loglikelihood: -14.981079461366814
step: 2 	loglikelihood: -14.053867634844227
step: 3 	loglikelihood: -14.053738674535843
step: 4 	loglikelihood: -14.053584575719285
step: 5 	loglikelihood: -14.053394346307233
step: 6 	loglikelihood: -14.053153174451314
step: 7 	loglikelihood: -14.052841094665018
step: 8 	loglikelihood: -14.052431227574118
step: 9 	loglikelihood: -14.051887486901876
step: 10 	loglikelihood: -14.051161646131646
step: 11 	loglikelihood: -14.050189682690764
step: 12 	loglikelihood: -14.048887396145068
step: 13 	loglikelihood: -14.047145472549225
step: 14 	loglikelihood: -14.04482450498228
step: 15 	loglikelihood: -14.041751063359504
step: 16 	loglikelihood: -14.037716807428477
step: 17 	loglikelihood: -14.032483834924248
step: 18 	loglikelihood: -14.025800662989331
step: 19 	loglikelihood: -14.01743362695463
step: 20 	loglikelihood: -14.007216474586672
step: 21 	loglikelihood: -13.995114591870875
step: 22 	loglikelihood: -13.98128904952611
step: 23 	loglikelihoo

Note that the HMM model is non-identifiable in general. Specifically, it means that you could permute the hidden state values $h$ and the corresponding matrices $\mathbf{a}, \mathbf{A}$, and $\mathbf{B}$ and obtain the same log-likelihood. As we see the hidden states in the learnt model above were permuted, i.e. the order of the values in the fitted parameters $\hat{\mathbf{a}}$ corresponds to a reversed order compared to the true parameter $\mathbf{a}$, and an equivalent permutation can also be seen in the fitted $\hat{\mathbf{A}}$ and $\hat{\mathbf{B}}$ matrices compared to the true ones $\mathbf{A}$ and $\mathbf{B}$.

### Tips on the sanity check of your implementation

Since the Baum-Welch algorithm can only guarantee that the model converges to a **local optima**, the learnt parameters might not be exactly the same to the true parameters used to synthesise data.
But, if the implementation is correct, we should be able to observe all of the following phenomena during learning:
 1. *Non-decreasing likelihood of data*: the (log-)likelihood of the sequences should never decrease during the iteration of E- and M-steps;
 2. *Variance of the learnt parameter distance to the true parameters decreases with more data*: the more observed sequences we have for learning, the smaller the variance of the distance between the learnt parameter values and the true values.
 3. *Parameters do not diverge if true parameters are used in initialisation*: if we use the true parameters to initialise the parameters of the `HMM`, the parameters should stay around the initial values during learning. Note that the true parameters might not be the optimal values for a particular dataset sample due to the finite sample size.

In the following cell we provide a function to verify the first two cases in the above list.

In [None]:
def get_true_params() -> Tuple[ArrayLike]:
    initial = np.array([0.2, 0.8])
    transition = np.array([[0.2, 0.8],
                           [0.6, 0.4]])
    emission = np.array([[0.0, 0.1], # probability of emitting <EOS>
                         [0.3, 0.8],
                         [0.7, 0.1]])
    return initial, transition, emission

def get_dist_to_true_params(params:Tuple[ArrayLike], 
                            true_params:Tuple[ArrayLike]) -> float: 
    # Compute the Euclidean distance (L2 norm) between the learnt parameters and the true parameters
    
    # To handle the non-identifiability of the HMM, for the 2D hiddens case
    # we simply consider the two possible orderings and keep whichever is smaller
    true_params_stacked = np.vstack([true_params[0],
                                     true_params[1],
                                     true_params[2]]).flatten()
    true_params_permuted = np.vstack([np.flip(true_params[0]),
                                      np.flip(true_params[1]),
                                      true_params[2][:, ::-1]]).flatten()
    
    params_vec = np.vstack([params[0], params[1], params[2]]).flatten()
    
    return min(np.linalg.norm(params_vec - true_params_stacked, ord=2),
               np.linalg.norm(params_vec - true_params_permuted, ord=2))

def sanity_check() -> List:
    num_sample_list = [50, 500]
    num_runs = 50

    true_params = get_true_params()
    dataloader = DataLoader(initial=true_params[0],
                            transition=true_params[1],
                            emission=true_params[2])
    data_list = dataloader.get_data_list(max(num_sample_list))

    result_list = []
    for num_samples in num_sample_list:
        # Only use the first num_samples data-points from the data_list
        dl = data_list[:num_samples]
        for i in range(num_runs):
            print(f'Starting run {i}/{num_runs} with data size {num_samples}.')
            optim = HMMOptimiser(num_hiddens=2,
                                 num_observations=3)
            loglikelihood_list = optim.baum_welch(dl, verbose=False)
            nondec_loglike = all(x<=y for x, y in zip(loglikelihood_list[:-1], loglikelihood_list[1:]))
            params = (np.exp(optim.model.initial),
                      np.exp(optim.model.transition),
                      np.exp(optim.model.emission))
            dist = get_dist_to_true_params(params, true_params)

            result = {
                'num_samples': num_samples,
                'distance': dist,
                'non_decreasing': 1 if nondec_loglike else -1
            }
            result_list.append(result)

    return result_list

Now, we can run the sanity check. Note that it may take quite a while to run the following sanity check as it repeats the EM fitting algorithm 50 times for sample sizes of 50 and 500, hence running a total of 100 runs.

In [None]:
result_list = sanity_check()

Starting run 0/50 with data size 50.
Starting run 1/50 with data size 50.
Starting run 2/50 with data size 50.
Starting run 3/50 with data size 50.
Starting run 4/50 with data size 50.
Starting run 5/50 with data size 50.
Starting run 6/50 with data size 50.
Starting run 7/50 with data size 50.
Starting run 8/50 with data size 50.
Starting run 9/50 with data size 50.
Starting run 10/50 with data size 50.
Starting run 11/50 with data size 50.
Starting run 12/50 with data size 50.
Starting run 13/50 with data size 50.
Starting run 14/50 with data size 50.
Starting run 15/50 with data size 50.
Starting run 16/50 with data size 50.
Starting run 17/50 with data size 50.
Starting run 18/50 with data size 50.
Starting run 19/50 with data size 50.
Starting run 20/50 with data size 50.
Starting run 21/50 with data size 50.
Starting run 22/50 with data size 50.
Starting run 23/50 with data size 50.
Starting run 24/50 with data size 50.
Starting run 25/50 with data size 50.
Starting run 26/50 wit

KeyboardInterrupt: ignored

In [None]:

distances = {r['num_samples']: [] for r in result_list}
for r in result_list:
    distances[r['num_samples']].append(r['distance'])
    
fig, ax = plt.subplots()
    
for i, (num_samples, dist) in enumerate(distances.items()):
    mean_dist = np.mean(dist)
    std_err = np.std(dist, ddof=1)/(len(dist)**(0.5))
    ax.errorbar(num_samples, mean_dist, yerr=std_err, marker='_', capsize=50, markersize=30)
    print(f'Num samples: {num_samples}, mean {mean_dist}, std. err. {std_err}')
    
ax.set_xlim(-200, 750)
ax.set_xticks(ticks=list(distances.keys()), labels=list(distances.keys()))
ax.set_ylabel('distance')
ax.set_xlabel('# samples')