<a href="https://colab.research.google.com/github/svandergoote/LGBIO2060-2021/blob/TP4/LGBIO2060_TP4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# LGBIO2060 Exercice session 4

#Hidden markov model

__Authors:__ Simon Vandergooten, Clémence Vandamme

__Content inspired from__: Neuromatch Academy github.com/NeuromatchAcademy


##Introduction and context
In this tutorial we will make the previous models a little more complex. Until now, we have used binary or continuous variables whose state was fixed in time. It is easy to see that this greatly limits our ability to represent real systems. One way to get closer to reality would be to allow our hidden states to change their values. 

Let's take the example of the fish. Previously they were either on the right or on the left. But now, we will add to the model the fact that fishes can switch side. 

It is exactly what **Hidden markov models** permits. The Markov property specifies that you can fully encapsulate the important properties of a system based on its current state at the current time, any previous history does not matter. It is memoryless. Back to the fishes, it means that the school of fish is only at one position at a time and the probability of them being on the left or on the right at time *t* depends only on the state at time *t-1* and the probabilities of transition from one state to another. It can be represented with a matrix called the **transition matrix**.

<img alt='Solution hint' align='left' width=650 height=300 src=https://raw.githubusercontent.com/svandergoote/LGBIO2060-2021/master/Solutions/HMM.png>


We can use linear algebra to express the probabilities of the current state.

$$P_i = [P(state_i = right), P(state_i = left) ] $$

To compute the vector of probabilities of the state at the time i+1, we can use linear algebra and multiply our vector of the probabilities of the current state with the transition matrix.

$$P_{i+1} = P_{i} T$$
where $T$ is our transition matrix.

This is the same formula for every step, which allows us to get the probabilities for a time more than 1 step in advance easily. If we started at $i=0$ and wanted to look at the probabilities at step $i=2$, we could do:

\begin{align*}
P_{1} &= P_{0}T\\
P_{2} &= P_{1}T = P_{0}TT = P_{0}T^2\\
\end{align*}

So, every time we take a further step we can just multiply with the transition matrix again. So, the probability vector of states at j timepoints after the current state at timepoint i is equal to the probability vector at timepoint i times the transition matrix raised to the jth power.
$$P_{i + j} = P_{i}T^j $$

By the end of this tutorial, you should be able to:
- Describe how the hidden states in a Hidden Markov model evolve over time, both in words, mathematically, and in code
- Estimate hidden states from data using forward inference in a Hidden Markov model
- Describe how measurement noise and state transition probabilities affect uncertainty in predictions in the future and the ability to estimate hidden states.

# Section 1: Binary HMM with Gaussian measurements
We will represent the hidden state *s* of the fish with the values +1 and -1 for the right and left position respectively. 

The probability of switching to state $s_t=j$ from the previous state $s_{t-1}=i$ is the conditional probability distribution $p(s_t = j| s_{t-1} = i)$.

In our case, we can summarize those transition probabilities into the **Transition matrix T**.

\begin{align*}
T = \begin{bmatrix}p(s_t = +1 | s_{t-1} = +1) & p(s_t = -1 | s_{t-1} = +1)\\p(s_t = +1 | s_{t-1} = -1)& p(s_t = -1 | s_{t-1} = -1)\end{bmatrix}
\end{align*}

###Measurements
In a Hidden Markov model, we cannot directly observe the latent states $s_t$. Instead we get noisy measurements $m_t \sim p(m|s_t)$

In [None]:
#@title Imports

import numpy as np
import time
from scipy import stats
from scipy.optimize import linear_sum_assignment
from collections import namedtuple

import matplotlib.pyplot as plt
from matplotlib import patches

In [None]:
#@title Figure Settings
# import ipywidgets as widgets       # interactive display
from IPython.html import widgets
from ipywidgets import interactive, interact, HBox, Layout,VBox
from IPython.display import HTML
%config InlineBackend.figure_format = 'retina'
plt.style.use("https://raw.githubusercontent.com/NeuromatchAcademy/course-content/NMA2020/nma.mplstyle")

In [None]:
# @title Plotting Functions

def plot_hmm1(model, states, measurements, flag_m=True):
  """Plots HMM states and measurements for 1d states and measurements.

  Args:
    model (hmmlearn model):               hmmlearn model used to get state means.
    states (numpy array of floats):       Samples of the states.
    measurements (numpy array of floats): Samples of the states.
  """
  T = states.shape[0]
  nsteps = states.size
  aspect_ratio = 2
  fig, ax1 = plt.subplots(figsize=(8,4))
  states_forplot = list(map(lambda s: model.means[s], states))
  ax1.step(np.arange(nstep), states_forplot, "-", where="mid", alpha=1.0, c="green")
  ax1.set_xlabel("Time")
  ax1.set_ylabel("Latent State", c="green")
  ax1.set_yticks([-1, 1])
  ax1.set_yticklabels(["-1", "+1"])
  ax1.set_xticks(np.arange(0,T,10))
  ymin = min(measurements)
  ymax = max(measurements)

  ax2 = ax1.twinx()
  ax2.set_ylabel("Measurements", c="crimson")

  # show measurement gaussian
  if flag_m:
    ax2.plot([T,T],ax2.get_ylim(), color="maroon", alpha=0.6)
    for i in range(model.n_components):
      mu = model.means[i]
      scale = np.sqrt(model.vars[i])
      rv = stats.norm(mu, scale)
      num_points = 50
      domain = np.linspace(mu-3*scale, mu+3*scale, num_points)

      left = np.repeat(float(T), num_points)
      # left = np.repeat(0.0, num_points)
      offset = rv.pdf(domain)
      offset *= T / 15
      lbl = "measurement" if i == 0 else ""
      # ax2.fill_betweenx(domain, left, left-offset, alpha=0.3, lw=2, color="maroon", label=lbl)
      ax2.fill_betweenx(domain, left+offset, left, alpha=0.3, lw=2, color="maroon", label=lbl)
      ax2.scatter(np.arange(nstep), measurements, c="crimson", s=4)
      ax2.legend(loc="upper left")
    ax1.set_ylim(ax2.get_ylim())
  plt.show(fig)


def plot_marginal_seq(predictive_probs, switch_prob):
  """Plots the sequence of marginal predictive distributions.

    Args:
      predictive_probs (list of numpy vectors): sequence of predictive probability vectors
      switch_prob (float):                      Probability of switching states.
  """
  T = len(predictive_probs)
  prob_neg = [p_vec[0] for p_vec in predictive_probs]
  prob_pos = [p_vec[1] for p_vec in predictive_probs]
  fig, ax = plt.subplots()
  ax.plot(np.arange(T), prob_neg, color="blue")
  ax.plot(np.arange(T), prob_pos, color="orange")
  ax.legend([
    "prob in state -1", "prob in state 1"
  ])
  ax.text(T/2, 0.05, "switching probability={}".format(switch_prob), fontsize=12,
          bbox=dict(boxstyle="round", facecolor="wheat", alpha=0.6))
  ax.set_xlabel("Time")
  ax.set_ylabel("Probability")
  ax.set_title("Forgetting curve in a changing world")
  #ax.set_aspect(aspect_ratio)
  plt.show(fig)

def plot_evidence_vs_noevidence(posterior_matrix, predictive_probs):
  """Plots the average posterior probabilities with evidence v.s. no evidence

  Args:
    posterior_matrix: (2d numpy array of floats): The posterior probabilities in state 1 from evidence (samples, time)
    predictive_probs (numpy array of floats):  Predictive probabilities in state 1 without evidence
  """
  nsample, T = posterior_matrix.shape
  posterior_mean = posterior_matrix.mean(axis=0)
  fig, ax = plt.subplots(1)
  # ax.plot([0.0, T],[0.5, 0.5], color="red", linestyle="dashed")
  ax.plot([0.0, T],[0., 0.], color="red", linestyle="dashed")
  ax.plot(np.arange(T), predictive_probs, c="orange", linewidth=2, label="No evidence")
  ax.scatter(np.tile(np.arange(T), (nsample, 1)), posterior_matrix, s=0.8, c="green", alpha=0.3, label="With evidence(Sample)")
  ax.plot(np.arange(T), posterior_mean, c='green', linewidth=2, label="With evidence(Average)")
  ax.legend()
  ax.set_yticks([0.0, 0.25, 0.5, 0.75, 1.0])
  ax.set_xlabel("Time")
  ax.set_ylabel("Probability in State +1")
  ax.set_title("Gain confidence with evidence")
  plt.show(fig)


def plot_forward_inference(model, states, measurements, states_inferred,
                           predictive_probs, likelihoods, posterior_probs,
                           t=None,
                           flag_m=True, flag_d=True, flag_pre=True, flag_like=True, flag_post=True,
                           ):
  """Plot ground truth state sequence with noisy measurements, and ground truth states v.s. inferred ones

      Args:
          model (instance of hmmlearn.GaussianHMM): an instance of HMM
          states (numpy vector): vector of 0 or 1(int or Bool), the sequences of true latent states
          measurements (numpy vector of numpy vector): the un-flattened Gaussian measurements at each time point, element has size (1,)
          states_inferred (numpy vector): vector of 0 or 1(int or Bool), the sequences of inferred latent states
  """
  T = states.shape[0]
  if t is None:
    t = T-1
  nsteps = states.size
  fig, ax1 = plt.subplots(figsize=(11,6))
  # inferred states
  #ax1.step(np.arange(nstep)[:t+1], states_forplot[:t+1], "-", where="mid", alpha=1.0, c="orange", label="inferred")
  # true states
  states_forplot = list(map(lambda s: model.means[s], states))
  ax1.step(np.arange(nstep)[:t+1], states_forplot[:t+1], "-", where="mid", alpha=1.0, c="green", label="true")
  ax1.step(np.arange(nstep)[t+1:], states_forplot[t+1:], "-", where="mid", alpha=0.3, c="green", label="")
  # Posterior curve
  delta = model.means[1] - model.means[0]
  states_interpolation = model.means[0] + delta * posterior_probs[:,1]
  if flag_post:
    ax1.step(np.arange(nstep)[:t+1], states_interpolation[:t+1], "-", where="mid", c="grey", label="posterior")

  ax1.set_xlabel("Time")
  ax1.set_ylabel("Latent State", c="green")
  ax1.set_yticks([-1, 1])
  ax1.set_yticklabels(["-1", "+1"])
  ax1.legend(bbox_to_anchor=(0,1.02,0.2,0.1), borderaxespad=0, ncol=2)



  ax2 = ax1.twinx()
  ax2.set_ylim(
      min(-1.2, np.min(measurements)),
      max(1.2, np.max(measurements))
      )
  if flag_d:
    ax2.scatter(np.arange(nstep)[:t+1], measurements[:t+1], c="crimson", s=4, label="measurement")
    ax2.set_ylabel("Measurements", c="crimson")

  # show measurement distributions
  if flag_m:
    for i in range(model.n_components):
      mu = model.means[i]
      scale = np.sqrt(model.vars[i])
      rv = stats.norm(mu, scale)
      num_points = 50
      domain = np.linspace(mu-3*scale, mu+3*scale, num_points)

      left = np.repeat(float(T), num_points)
      offset = rv.pdf(domain)
      offset *= T /15
      # lbl = "measurement" if i == 0 else ""
      lbl = ""
      # ax2.fill_betweenx(domain, left, left-offset, alpha=0.3, lw=2, color="maroon", label=lbl)
      ax2.fill_betweenx(domain, left+offset, left, alpha=0.3, lw=2, color="maroon", label=lbl)
  ymin, ymax = ax2.get_ylim()
  width = 0.1 * (ymax-ymin) / 2.0
  centers = [-1.0, 1.0]
  bar_scale = 15

  # Predictions
  data = predictive_probs
  if flag_pre:
    for i in range(model.n_components):
      domain = np.array([centers[i]-1.5*width, centers[i]-0.5*width])
      left = np.array([t,t])
      offset = np.array([data[t,i]]*2)
      offset *= bar_scale
      lbl = "todays prior" if i == 0 else ""
      ax2.fill_betweenx(domain, left+offset, left, alpha=0.3, lw=2, color="dodgerblue", label=lbl)

  # Likelihoods
  # data = np.stack([likelihoods, 1.0-likelihoods],axis=-1)
  data = likelihoods
  data /= np.sum(data,axis=-1, keepdims=True)
  if flag_like:
    for i in range(model.n_components):
      domain = np.array([centers[i]+0.5*width, centers[i]+1.5*width])
      left = np.array([t,t])
      offset = np.array([data[t,i]]*2)
      offset *= bar_scale
      lbl = "likelihood" if i == 0 else ""
      ax2.fill_betweenx(domain, left+offset, left, alpha=0.3, lw=2, color="crimson", label=lbl)
  # Posteriors
  data = posterior_probs
  if flag_post:
    for i in range(model.n_components):
      domain = np.array([centers[i]-0.5*width, centers[i]+0.5*width])
      left = np.array([t,t])
      offset = np.array([data[t,i]]*2)
      offset *= bar_scale
      lbl = "posterior" if i == 0 else ""
      ax2.fill_betweenx(domain, left+offset, left, alpha=0.3, lw=2, color="grey", label=lbl)
  if t<T-1:
    ax2.plot([t,t],ax2.get_ylim(), color='black',alpha=0.6)
  if flag_pre or flag_like or flag_post:
    ax2.plot([t,t],ax2.get_ylim(), color='black',alpha=0.6)

    ax2.legend(bbox_to_anchor=(0.4,1.02,0.6, 0.1), borderaxespad=0, ncol=4)
  ax1.set_ylim(ax2.get_ylim())
  return fig
  # plt.show(fig)

##Coding exercice 1.1 : Hidden states
You will first implement the function `generate_state`. This function will create the vector of states *S* based on the model parameters.

In [None]:
def generate_state(switch_proba, start_proba, n_states):
  '''
  Create an HMM binary state variable.
  Args:
    switch_proba (array): the probabilities to switch from states. [rightToLeft, LeftToRight]
    start_proba (array): the initial probabilities of being on each side. [p_right, p_left]
    n_states (int): the number of time steps

  Returns:
    S (array): the vector of state for each time step.
  '''
  ##########################
  ##### Your code here #####
  ##########################
  #Initialize S
  S = ...

  #Step 1: Initial state (Hint: np.random.choice)
  S[0] = ...

  #Step 2: Transition matrix
  T = ...
  
  #Step 3: Iterate on each time step to find the new state s[t] based on S[t-1]



  return S

#Set random seed
np.random.seed(54)

#Set parameters of HMM
switch_proba = np.array([0.4, 0.7])
start_proba = np.array([0, 1]) #The initial state is right (+1)
n_states = 50

#Generate the hidden states vector
S = generate_state(switch_proba, start_proba, n_states)

#Print values
print(S[:5])

You should see that the first five states are:
 
 `[-1.  1.  1.  1.  1.]`

##Coding exercice 1.2 : Noisy measurements
Now that you have created the hidden states, you will create the noisy Gaussian measurements vector *M* from it.

Recall that in reality we don't have access to the hidden states but only to noisy measurements that give us information about those hidden states we want to infer.

You will implement the function `sample` that generates samples $m_t$ based on the hidden states $s_t$. 
- if hidden state $s_t = +1 $ : $m_t \sim N(+1,\sigma)$
- if hidden state $s_t = -1 $ : $m_t \sim N(-1,\sigma)$



In [None]:
def sample(means, var, S):
  '''
  Create a Gaussian measurement from HMM states

    Args: 
      means (array): Mean mesurement for each state [right, left].
      var (float): Variance of measurement models. Same for each state.
      S (array): The series of hidden states.
    
    Returns:
      M (array): The series of measurements.
  '''
  ##########################
  ##### Your code here #####
  ##########################
  
  #Initialize measurements vector M
  M = ...

  #Calculate measurements conditioned on the latent states (Hint: np.random.normal)
  

  return M

#Set parameters of Gaussian 
means = np.array([1, -1])
variance = 4

#Generate the measurements vector M
M = sample(means, variance, S)

#Print values
print(M[:5])

You should see that the first five measurements are:
 
 `[0.62973701 1.46418237 2.15507524 1.17004125 0.40396364]`

# Interactive Demo 1.2: Binary HMM

In the demo below, we simulate and plot a similar HMM. You can change the probability of switching states and the noise level (the standard deviation of the Gaussian distributions for measurements). You can click the empty box to also visualize the measurements.

**First**, think about and discuss these questions:

1.   What will the states do if the switching probability from Left to Right is zero? One?
2.   What will measurements look like with high noise? Low?



**Then**, play with the demo to see if you were correct or not.

In [None]:
#@title

#@markdown Execute this cell to enable the widget!
GaussianHMM1D = namedtuple('GaussianHMM1D', ['startprob', 'transmat','means','vars','n_components'])
def create_HMM(switch_prob_RtL=0.1, switch_prob_LtR=0.1, noise_level=1e-1, startprob=[1.0, 0.0]):
  """Create an HMM with binary state variable and 1D Gaussian measurements
  The probability to switch to the other state is `switch_prob`. Two
  measurement models have mean 1.0 and -1.0 respectively. `noise_level`
  specifies the standard deviation of the measurement models.

  Args:
      switch_prob (float): probability to jump to the other state
      noise_level (float): standard deviation of measurement models. Same for
      two components

  Returns:
      model (GaussianHMM instance): the described HMM
  """

  ############################################################################
  # Insert your code here to:
  #   * Create the transition matrix, `transmat_mat` so that the odds of
  #      switching is `switch_prob`
  #		* Set the measurement model variances, to `noise_level ^ 2` for both
  #      states
  #raise NotImplementedError("`create_HMM` is incomplete")
  ############################################################################

  n_components = 2

  startprob_vec = np.asarray(startprob)

  # STEP 1: Transition probabilities
  transmat_mat = np.array([[1. - switch_prob_RtL, switch_prob_RtL], [switch_prob_LtR, 1. - switch_prob_LtR]])

  # STEP 2: Measurement probabilities

  # Mean measurements for each state
  means_vec = np.array([-1., 1.])

  # Noise for each state
  vars_vec = np.ones(2) * noise_level * noise_level

  # Initialize model
  model = GaussianHMM1D(
    startprob = startprob_vec,
    transmat = transmat_mat,
    means = means_vec,
    vars = vars_vec,
    n_components = n_components
  )

  return model

def sample(model, T):
  """Generate samples from the given HMM

  Args:
    model (GaussianHMM1D): the HMM with Gaussian measurement
    T (int): number of time steps to sample

  Returns:
    M (numpy vector): the series of measurements
    S (numpy vector): the series of latent states

  """
  ############################################################################
  # Insert your code here to:
  #   * take row i from `model.transmat` to get the transition probabilities
  #       from state i to all states
  #raise NotImplementedError("`sample` is incomplete")
  ############################################################################
  # Initialize S and M
  S = np.zeros((T,),dtype=int)
  M = np.zeros((T,))

  # Calculate initial state
  S[0] = np.random.choice([0,1],p=model.startprob)

  # Latent state at time `t` depends on `t-1` and the corresponding transition probabilities to other states
  for t in range(1,T):#permet de commencer a 1

    # STEP 3: Get vector of probabilities for all possible `S[t]` given a particular `S[t-1]`
    transition_vector = model.transmat[S[t-1],:]

    # Calculate latent state at time `t`
    S[t] = np.random.choice([0,1],p=transition_vector)

  # Calculate measurements conditioned on the latent states
  # Since measurements are independent of each other given the latent states, we could calculate them as a batch
  means = model.means[S]
  scales = np.sqrt(model.vars[S])
  M = np.random.normal(loc=means, scale=scales, size=(T,))

  return M, S

nstep = 100

@widgets.interact
def plot_samples_widget(
    switch_RtoL=widgets.FloatSlider(min=0.0, max=1.0, step=0.02, value=0.1),
    switch_LtoR=widgets.FloatSlider(min=0.0, max=1.0, step=0.02, value=0.1),
    log10_noise_level=widgets.FloatSlider(min=-1., max=1., step=.01, value=-0.3),
    flag_m=widgets.Checkbox(value=False, description='measurements', disabled=False, indent=False)
    ):
  np.random.seed(54)
  model = create_HMM(switch_RtoL,switch_LtoR,
                     noise_level=10.**log10_noise_level)
  print(model)
  observations, states = sample(model, nstep)
  
  plot_hmm1(model, states, observations, flag_m=flag_m)