# Hidden Markov Models

Este notebook es un resumen de las nociones teóricas de un modelo oculto de Markov y sus principales algoritmos.

*Por:*
- Benjamín Vera
- Sebastián Toloza

In [None]:
import numpy as np
from matplotlib import pyplot as plt

## Notaciones

Para un modelo de Markov simple (observable) homogeneo, tenemos las siguientes notaciones:
- $t \in \mathbb{N}$: Tiempo discreto.
- $E$: Conjunto de estados posibles. En este documento $E = \{0, \dots, N-1\}$ finito.
- $X_t \in E$: Estado del sistema en el tiempo $t$.
- $\lambda_x = \mathbb{P}(X_0 = x)$: Probabilidad del estado inicial.
- $\lambda = (\lambda_x)_{x \in E}$: Distribución inicial de la cadena.
- $a_{x, y} = \mathbb{P}(X_1 = y | X_0 = x) \geq 0$: Probabilidad de transición de $x \to y$. Se satisface
$$
\forall x \in E: \sum_{y \in E} a_{x, y} = 1
$$
- $A = (a_{x,y})_{x,y \in E} \in \mathbb{R}^{N \times N}$: Matriz estocástica de transición.

Ahora, para un modelo de Markov oculto homogeneo, pensamos en los $X_t$ como desconocidos y tenemos además los siguientes elementos:
- $O$: Conjunto de observaciones posibles. En este documento $O = \{0, \dots, M-1\}$ finito.
- $V_t \in O$: Observación del sistema en el tiempo $t$.
- $b_{x, z} = \mathbb{P}(V_t = z | X_t = x) \geq 0$: Probabilidad de ver la observación $z \in O$ dado que el estado oculto es $x \in E$. Se satisface
$$
\forall x \in E: \sum_{z \in O} b_{x, z} = 1
$$
- $B = (b_{x, z})_{x \in E, z \in O} \in \mathbb{R}^{N \times M}$: Matriz estocástica de observación.

Observar que la existencia de la matriz $B$ implica que las probabilidades de observación son homogeneas en el tiempo.

Antes de implementar las clases correspondientes, escribimos una función auxiliar que, recibiendo un vector de probabilidad $v$, modela una variable aleatoria que tiene la ley dada por el vector $v$:

In [1]:
def randomSample(v):
    """ Recibe un v (numpy array unidimensional) que actúa como densidad de probabilidad.
    Retorna un índice aleatorio de v con la ley dada por sus valores"""
    assert len(v.shape) == 1, f"El vector dado no es unidimensional, tiene forma {v.size}"
    assert np.isclose(np.sum(v), 1), f"El vector dado no está normalizado, sus coordenadas suman {np.sum(v)}"
    u = np.random.random()
    j = 0
    s = 0
    s += v[0]
    while s <= u:
        j += 1
        s += v[j]
    return j

## Problemas de inferencia en CM simples

Teniendo un modelo de Markov simple $X \sim CM(\lambda, P)$, aparte de ser capaces de generar secuencias arbitrarias de observaciones, dos preguntas de inferencia que nos interesaría responder son:
- Dada una secuencia $(x_0, \dots, x_T)$ ¿Qué tan probable es que el modelo la haya producido?
- Dada una secuencia $(x_0, \dots, x_t)$, encontrar la matriz de transición $P$ que mejor se ajusta a este comportamiento observado.

A continuación, implementamos una clase con algoritmos simples que responden a estas preguntas.

In [1]:
class OMM:
    """Observable markov model: Clase que encapsula el comportamiento de una cadena de Markov
    """
    def __init__(self, N):
        """ Recibe un entero que será su dimensionalidad y crea un modelo
        de Markov observable."""
        self.N = N  # Cantidad de estados

        # Inicializamos los parámetros como uniformes
        self.lam = (1/N)*np.ones(N)
        self.A = (1/N)*np.ones((N, N))

    def simulate(self, steps):
        """ Recibe un número de pasos y simula esa cantidad de transiciones de la cadena.
        Devuelve un vector de tamaño steps + 1"""
        C = np.zeros(steps + 1)  # Inicializamos la cadena en cero
        x = randomSample(self.lam)  # Elegir un estado de la distribución inicial
        C[0] = x
        for j in range(steps):
            x = randomSample(self.A[x])  # Elegir un estado según la ley de la fila x de A
            C[j+1] = x  # Añadirlo a la cadena
        return C  # Devolver la cadena

    def seqProb(self, seq):
        """ Recibe una secuencia de estados observados de la cadena.
        Retorna la probabilidad de haber observado esa secuencia de estados"""
        assert ((0 <= seq) & (seq < self.N)).all(), "No todos los estados dados en la secuencia son estados válidos de la cadena"
        p = self.lam[seq[0]]
        for i in range(1, len(seq)):
            p *= self.A[seq[i-1], seq[i]]
        return p
    
    def train(self, seq):
        """ Recibe una secuencia de estados observados de la cadena.
        Actualiza self.A para que sea la matriz de transición que mejor explica el comportamiento observado."""
        assert ((0 <= seq) & (seq < self.N)).all(), "No todos los estados dados en la secuencia son estados válidos de la cadena"
        freq = np.zeros((self.N, self.N))
        # Recorro toda la secuencia contando las transiciones
        for i in range(len(seq)-1):
            freq[seq[i], seq[i+1]] += 1
        
        # Ahora normalizamos cada fila
        for i in range(self.N):
            S = np.sum(freq[i])
            if S != 0:
                freq[i] /= S
        self.A = freq

## Problemas de inferencia en HMM

Por otro lado, teniendo un modelo de Markov oculto $X \sim HM(\lambda, A, B)$, aparte de ser capaces de generar secuencias arbitrarias de observaciones, las tres preguntas grandes que nos interesaría responder son:
- *Evaluación:* Dada una secuencia de observaciones $(v_0, \dots, v_{T-1})$ ¿Qué tan probable es que el modelo la haya producido?
- *Decodificación:* Dada una secuencia de observaciones $(v_0, \dots, v_{T-1})$, encontrar la cadena de estados ocultos con mayor verosimilitud para haber generado esta secuencia.
- *Aprendizaje:* Dada una secuencia de observaciones $(v_0, \dots, v_{T-1})$, encontrar las matrices de transición y observación que mejor explican el comportamiento observado.

La primera pregunta da lugar al algoritmo Forward, la segunda da lugar al algoritmo de Viterbi y la tercera al de Baum-Welch. A continuación, damos una breve explicación de estos algoritmos.

### Algoritmo Forward

En realidad consiste de dos algoritmos distintos que resuelven el problema de evaluación, pero solo vamos a implementar el forward en este documento para resolver ese problema.

El problema de evaluar la verosimilitud de una secuencia de observaciones se podría resolver a través de marginalizar sobre todas las secuencias posibles. Pero esa suma es del orden $O(N^T)$ en que $N$ son los estados posibles y $T$ es la longitud de la secuencia observada. Ya que esto es infactible, la solución consiste en utilizar una variable auxiliar definida como
$$
\alpha_{x, t} = \mathbb{P}(V_0 = v_0, \dots, V_t = v_t, X_t = x)
$$
Es decir, $\alpha_{x, t}$ es la probabilidad de haber producido la secuencia observada terminando en el estado $x \in E$. Satisface las ecuaciones
$$
\text{\textbf{Base:}}\qquad \alpha_{x, 0} = \mathbb{P}(V_0 = v_0, X_0 = x) = \lambda_x \cdot b_{x, v_0}
$$
$$
\text{\textbf{Inductivo:}}\qquad \alpha_{x, t+1} = \sum_{y \in E} \alpha_{y, t} a_{y, x} b_{x, v_{t+1}}
$$
$$
\text{\textbf{Terminal:}}\qquad \mathbb{P}(V_0 = v_0, \dots, V_{t-1} = v_{t-1}) = \sum_{x \in E} \alpha_{x, t}
$$
De modo que si implementamos $\alpha = (\alpha_{x, t})_{x \in E, 0 \leq t \leq T-1} \in \mathbb{R}^{N \times T}$, las ecuaciones base e inductiva se reducen a $\alpha_{\bullet, 0} = \lambda \odot B_{\bullet, v_0}$ y $\alpha_{x, t+1} = (\alpha_{\bullet, t}\cdot A_{\bullet, x}) b_{x, v_{t+1}}$ respectivamente. En que $\odot$ denota el producto de Hadamard entre vectores.

### Algoritmo de Viterbi

Consiste en un algoritmo que, dada unas observaciones y estados, entrega la cadena de estados con mayor verosimilitud, por lo que correspondería a la cadena óptima de estados.

Notemos que esto se podría realizar generando todas las secuencias posibles de estados y calcular su verosimilitud, cosa de rescatar la mayor, pero esto claramente no es eficiente debido a que el número de secuencias es de orden exponencial, por lo que el algoritmo se basa en la programación dinámica.

Para eso, introducimos la variable auxiliar 

$$
\nu_t(j) = \max_{x_0,\cdots,x_{t-1}} \mathbb{P}(x_0,\cdots,x_{t-1},v_0,\cdots,v_t,x_t=j)
$$

El cual corresponde a la probabilidad de que la cadena oculta esté en el estado $j$ después de ver las primeras $t$ observaciones y pasar por la secuencia más probable de estados $x_1,...,x_{T-1}$, dados los parámetros del modelo. Sin embargo, esto se puede definir recursivamente como

$$
\nu_t(j) = \max_{i=1}^N \nu_{t-1}(j)a_{ij}b_j(v_t) 
$$

que es lo que se calculará a continuación, guardando también una matriz que indica el origen de cada máxima verosimilitud (BP), para poder recuperar el camino más probable. Así, el algoritmo viene dado por

$\textbf{Inicialización:}$

$$
\nu_0(j) = \lambda_0b_j(v_0), \hspace{3mm} BP_0(j)=-1, \hspace{3mm} 1 \leq j \leq N
$$

$\textbf{Inductivo:}$

$$
\nu_t(j) = \max_{i=1}^N \nu_{t-1}(j)a_{ij}b_j(v_t), \hspace{3mm}  1\leq j \leq N, 1< t \leq T
$$
$$
BP_t(j) = arg\max_{i=1}^N \nu_{t-1}(j)a_{ij}b_j(v_t), \hspace{3mm} 1\leq j \leq N, 1< t \leq T
$$


$\textbf{Terminal:}$

$$
\text{Mejor verosimilitud} \hspace{3mm} P^*= \max_{i=1, \dots N} \nu_{T}(i)
$$

$$
\text{Inicio de backtrace} \hspace{3mm} q_{T^*} = \text{argmax}_{i=1, \dots, N} \nu_T(i)
$$

### Algoritmo de Baum Welch

Ahora nos interesa el problema del entrenamiento, en donde, dada una secuencia de observaciones y el conjunto de los posibles estados en la cadena oculta, se tiene que determinar los parámetros $A$, $B$ y $\lambda$ del modelo.

Para poder ejecutar el algoritmo, necesitamos

Parámetro *forward*
$$
\alpha_{x, t} = \mathbb{P}(V_0 = v_0, \dots, V_t = v_t, X_t = x)
$$

Parámetro *backward*
$$
\beta_{x, t} = \mathbb{P}(V_{t+1} = v_{t+1}, \dots, V_T = v_T \mid X_t = x)
$$

Parámetro *gamma* (responde a la probabilidad de estar en $x$ a tiempo $t$ conociendo todas las observaciones (pasadas y futuras))
$$
\gamma_{x,t} = \mathbb{P}(X_t =x\mid O)
$$

Y también necesitamos definir la matriz temporal de matrices

$$
\xi_t(i,j) = \mathbb{P}(X_t = i, X_{t+1}=j \mid O) = \frac{\alpha_{i,t}a_{ij}b_j(V_{t+1})\beta_j(V_{t+1})}{\mathbb{P}(V\mid (A,B,\lambda))}
$$

que satisface

$$
\gamma_{i,t} = \sum_{j=1}^N \xi_t(i,j)
$$

De esta forma

$$
\sum_{t=1}^{T-1} \gamma_{i,t} = \text{Número esperado de transiciones desde $x_i$}
$$

$$
\sum_{t=1}^{T-1} \xi_{i,t} = \text{Número esperado de transiciones de $x_i$ a $x_j$}
$$

Por lo que nos queda determinar los nuevos parámetros $(\bar{A},\bar{B},\bar{\lambda})$.

En efecto, la distribución inicial se obtiene mediante

$$
\bar{\lambda}_i = \gamma_{i,0} = \text{Frecuencia esperada en $x_i$ a tiempo $t=0$} 
$$

Los coeficientes de $\bar{A}$ vienen dados Por

$$
\bar{a}_{ij} = \frac{\text{Número esperado de transiciones de $x_i$ a $x_j$}}{\text{Número esperado de transiciones desde $x_i$}}
$$

Es decir,

$$
\bar{a}_{ij} = \frac{\sum_{t=1}^{T-1} \xi_t(i,j)}{\sum_{t=1}^{T-1}\gamma_{i,t}} 
$$

Y para $\bar{B}$, tenemos 

$$
\bar{b}_{j,k} = \frac{\text{Número esperado de veces en el estado $j$ y observando $v_k$}}{\text{Número esperado de veces en el estado $j$}}
$$

Es decir, 

$$
\bar{b}_{j,k} = \frac{\sum_{t=1}^T \gamma_{i,t} 1_{O_t=v_k}}{\sum_{t=1}^T \gamma_{i,t}}
$$

Con lo cual de forma iterativa generamos nuevos parámetros (volviéndolos a usar en el modelo como input) hasta que estos convergan a un "punto fijo".

A continuación se implementa la clase HMM que incorpora todos estos algoritmos.

In [None]:
class HMM:
    """ Hidden markov model: Clase que encapsula el comportamiento de una cadena oculta de Markov"""
    def __init__(self, N, M):
        """Recibe el número de estados ocultos y de observables. Crea un Modelo de Markov Oculto
        Cuyos parámetros son inicialmente distribuciones de probabilidad uniformes"""
        self.N = N
        self.M = M

        # Inicializamos los parámetros aleatoriamente y normalizamos
        # Para obtener matrices estocásticas.
        self.lam = np.random.rand(self.N)
        self.lam /= np.sum(self.lam)
        self.A = np.random.rand(self.N, self.N)
        self.B = np.random.rand(self.N, self.M)
        for i in range(self.N):
            self.A[i] /= np.sum(self.A[i])
            self.B[i] /= np.sum(self.B[i])

    def simulate(self, steps):
        """ Recibe un número de pasos y simula esa cantidad de pasos del modelo.
        Devuelve un vector de observaciones de tamaño steps + 1"""
        H = np.zeros(steps + 1, dtype=np.int8)  # Inicializamos la cadena en cero
        x = randomSample(self.lam)  # Elegir un estado de la distribución inicial
        H[0] = x
        for j in range(steps):
            x = randomSample(self.A[x])  # Elegir un estado según la ley de la fila x de A
            H[j+1] = x  # Añadirlo a la cadena
        
        # Genial, ahora H es una cadena de estados ocultos. Para cada uno de ellos escogemos una observación
        O = np.zeros(steps + 1)
        for j in range(steps + 1):
            O[j] = randomSample(self.B[H[j]])
        return O  # Devolver la cadena de observaciones

    def forward(self, obs):  
        """ Recibe una secuencia de estados observables de la cadena.
        Retorna la verosimilitud de esa secuencia con los parámetros de la cadena
        Implementando el algoritmo Forward"""
        assert ((obs >= 0) & (obs < self.M)).all(), "Las observaciones dadas no son válidas"
        T = len(obs)
        # Creamos matriz forward de tamaño MxN 
        ForwardMatrix = np.zeros((self.N, T))
        # Inicializamos la matriz 
        ForwardMatrix[:,0] = self.lam * self.B[:,obs[0]]
        # Trabajamos recursión    
        for t in range(1,T):
            ForwardMatrix[:, t] = ((self.A * self.B[:, obs[t]]).T).dot(ForwardMatrix[:, t-1])
        # Entregamos la verosimilitud de la observación        
        return np.sum(ForwardMatrix[:,T-1]), ForwardMatrix
    
    def viterbi(self, obs, states):
        ''' Recibe observaciones y estados, y entrega la cadena óptima de estados (es decir, en el 
        sentido de ser el camino más probable)'''
        T = len(obs)
        ViterbiMatrix = np.zeros([self.N,T])
        Backpointer = np.zeros([self.N,T])
        for s in range(self.N):
            ViterbiMatrix[s,0] =  lam[s]*self.B[s,obs[0]]
            Backpointer[s,0] = -1 # Estado inicial corresponde a distribución inicial 
            # -1 dado que el 0 confunde con el primer estado posible de la cadena 
        for t in range(1,T):
            for s in range(self.N):
                maxVit = []
                for i in range(self.N):
                    maxVit.append(ViterbiMatrix[i,t-1]*self.A[i,s]*self.B[s,obs[t]])   
                ViterbiMatrix[s,t] = max(maxVit)     
                Backpointer[s,t] = np.argmax(maxVit)
        VitFinal = ViterbiMatrix[:,T-1]   
        BestPathProb = max(VitFinal)     
        BestPathPointer = np.argmax(VitFinal)
        # Falta reconstruir camino óptimo de estados
        Camino = np.append(Backpointer[BestPathPointer],BestPathPointer)
        CaminoEstados = []
        for i in Camino:
            if i==-1:
                CaminoEstados.append('Origen')
            else:
                CaminoEstados.append(states[int(i)])                        
        return BestPathProb, CaminoEstados
    
    def backward(self,obs):
        T = len(obs)
        Beta = np.zeros([self.N,T])
        for i in range(self.N):
            Beta[i,T-1] = 1  
        for t in reversed(range(T-1)):
            for i in range(self.N):
                Beta[i,t] = 0
                for j in range(self.N):
                    Beta[i,t] = Beta[i,t] + self.A[i,j]*self.B[j,obs[t+1]]*Beta[j,t+1]
        P = 0
        for j in range(self.N):
            P = P + self.lam[j]*self.B[j,obs[0]]*Beta[0,j]
        return P, Beta

    def baumwelch(self, obs):
        ''' Realiza 1 iteración del algoritmo de Baum Welch, encontrando matrices A_bar y B_bar
        para el entrenamiento del modelo'''
        T = len(obs)

        A_bar = np.zeros([self.N,self.N])
        B_bar = np.zeros([self.N,self.M])
        lam_bar = np.zeros([self.N])

        # Definimos matrices alfa y beta según lo anterior

        Alfa = self.forward(obs)[1]
        Beta = self.backward(obs)[1]
        
        # Creamos Xi_t(i,j)

        Xi = []
        for i in range(T-1):
            Xi.append(np.zeros([self.N,self.N]))
        for t in range(T-1):
            for j in range(self.N):
                for i in range(self.N):
                    P = 0
                    for s in range(self.N):
                        P = P + Alfa[s,t]*Beta[s,t]      
                    Xi[t][i,j] = (Alfa[i,t]*self.A[i,j]*self.B[j,obs[t+1]]*Beta[j,t+1])/P        

        # Cálculo de A_bar

        # Numerador
        for j in range(self.N):
            for i in range(self.N):
                Xi_sum = sum(Xi)
                A_bar[i,j] = Xi_sum[i,j]

        # Denominador
        for j in range(self.N):
            for i in range(self.N):
                Den = 0
                for t in range(T-1):
                    Den = Den + sum(Xi[t][i])
                A_bar[i,j] = A_bar[i,j]/Den    

        # Cálculo de B_bar y lam_bar

        Gamma = np.zeros([self.N,T])

        for t in range(T):
            for j in range(self.N):
                P = 0
                for s in range(self.N):
                    P = P + Alfa[s,t]*Beta[s,t] 
                Gamma[j,t] = Alfa[j,t]*Beta[j,t]/P        

        for k in range(self.M):
            for j in range(self.N):
                S1 = 0
                S2 = 0
                for t in range(T):
                    if obs[t] == k:
                        S1 = S1 + Gamma[j,t]
                    S2 = S2 + Gamma[j,t]
                B_bar[j,k] = S1/S2 

        for i in range(self.N):
            lam_bar[i] = Gamma[i,0]

        # Actualizamos los atributos del modelo con los valores nuevos
        self.A = A_bar
        self.B = B_bar
        self.lam = lam_bar

    def train(self, seq, eps, num, verbose=False):
        """ Recibe una secuencia de observaciones de la cadena, un grado de tolerancia y un número
        máximo de iteraciones. Itera el algoritmo de B-W hasta que o bien la distancia entre las tres
        matrices sucesivas es menor a {eps}, o bien se ha alcanzado el número máximo de iteraciones {num}."""
        counter = 0
        while counter < num:
            counter += 1

            lam_0 = np.copy(self.lam)
            A_0 = np.copy(self.A)
            B_0 = np.copy(self.B)

            self.baumwelch(seq)

            lam_1 = self.lam
            A_1 = self.A
            B_1 = self.B

            dA = np.linalg.norm(A_1 - A_0, ord='fro')
            dB = np.linalg.norm(B_1 - B_0, ord='fro')
            dlam = np.linalg.norm(lam_1 - lam_0, ord=2)
            if verbose:
                print(f"Iteración {counter} | dA = {dA:.2f} | dB = {dB:.2f} | dlam = {dlam:.2f}")

            if max([dA, dB, dlam]) < eps:
                if verbose:
                    print(f"Convergencia alcanzada. El proceso termina a las {counter} iteraciones")
                break
        
        print("Número máximo de iteraciones alcanzado.")



## Testing

A continuación, se dejan bloques de código que han sido útiles durante el proceso de implementación de los algoritmos de HMM.

In [None]:
# Ejemplo 
A = np.array([[0.6,0.4],[0.5, 0.5]])
B = np.array([[0.2,0.4,0.4],[0.5,0.4,0.1]])
lam = np.array([0.8,0.2])

Modelo = HMM(lam, A, B)

In [None]:
Modelo.viterbi(np.array([2,0,2]),np.array(['Caliente','Frio']))

(0.012800000000000004, ['Origen', 'Caliente', 'Frio', 'Caliente'])

In [None]:
Modelo.baumwelch(np.array([2,0,2]))

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=15d2711e-8488-4966-b405-4363c7f8973c' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>