In [10]:
import numpy as np
from scipy.special import logsumexp
import matplotlib.pyplot as plt

## Modelos ocultos de Markov.

Vamos a reutilizar la clase del ejercicio anterior pero, para evitar errores de ejecución por problemas de estabilidad numérica cuando trabajemos con secuencias largas, la modificaremos para trabajar casi todo el tiempo en escala logarítmica:

- Modificamos el valor devuelto por los métodos "forward" y "backward" para que sea directamente el logaritmo.
- Eliminamos (aunque podría mantenerse) el método "smooth" y lo sustituimos por "log_smooth", que devolverá los mismos resultados pero en escala logarítmica.

Además, vamos a implementar un método "sample" que permita muestrear una secuencia de longitud T del HMM.

In [42]:
import numpy as np
from scipy.special import logsumexp

class HMM:
    def __init__(self, phghm, ph1, pvgh):
        """
        phghm : distribución de transiciones homogéneas phghm(i,j)=p(h(t)=i|h(t-1)=j)
        ph1 : distribución inicial
        pvgh : distribución de emisión homogénea pvgh(i,j)=p(v(t)=i|h(t)=j)
        """

        self.phghm = phghm
        self.ph1 = ph1
        self.pvgh = pvgh
        self.H = len(ph1)
        self.V = len(pvgh[:, 0])
        
    def forward(self, v):
        phghm, ph1, pvgh = self.phghm, self.ph1, self.pvgh
        T, H = len(v), self.H

        logalpha = np.zeros((H, T))

        logalpha[:, 0] = np.log(pvgh[v[0], :] * ph1)
        
        for t in range(1, T):
            logalpha[:, t] = np.log(pvgh[v[t], :]) + logsumexp(
                np.log(phghm.T) + logalpha[:, t-1].reshape(H, 1), axis=0)
        
        return logalpha, logsumexp(logalpha[:, T-1])

    def backward(self, v):
        phghm, ph1, pvgh = self.phghm, self.ph1, self.pvgh
        T, H = len(v), self.H

        logbeta = np.zeros((H, T))

        for t in range(T-1, 0, -1):
            logbeta[:, t-1] = logsumexp(
                logbeta[:, t].reshape(H, 1) + 
                (np.log(pvgh[v[t], :].reshape(H, 1)) + np.log(phghm)), axis=0)
    
        return logbeta

    def log_smooth(self, v):
        log_phghm = np.log(self.phghm + 1e-10)  # Evitar log(0)
        log_pvgh = np.log(self.pvgh + 1e-10)
        T, H = len(v), self.H

        log_alpha, llik = self.forward(v)
        log_beta = self.backward(v)

        # Normalización estable para p(h(t)|v(1:T)) en escala logarítmica
        log_phtgV1T = log_alpha + log_beta - llik

        log_phthtgV1T = np.zeros((H, H, T-1))
        for t in range(T-1):
            log_phthtgV1T[:, :, t] = (
                log_alpha[:, t].reshape(H, 1) +
                log_phghm +
                log_pvgh[v[t+1], :].reshape(1, H) +
                log_beta[:, t+1].reshape(1, H) -
                llik
            )

        return log_phtgV1T, log_phthtgV1T, llik



    def viterbi(self, v):
        phghm, ph1, pvgh = self.phghm, self.ph1, self.pvgh
        T, H = len(v), self.H

        logmu = np.zeros((H, T))
        psi = np.zeros((H, T), dtype=int)

        logmu[:, 0] = np.log(ph1) + np.log(pvgh[v[0], :])

        for t in range(1, T):
            for h in range(H):
                candidates = logmu[:, t-1] + np.log(phghm[h, :])
                logmu[h, t] = np.max(candidates)
                psi[h, t] = np.argmax(candidates)
                logmu[h, t] += np.log(pvgh[v[t], h])

        path = np.zeros(T, dtype=int)
        path[T-1] = np.argmax(logmu[:, T-1])
        for t in range(T-2, -1, -1):
            path[t] = psi[path[t+1], t+1]

        return path

    def sample(self, T, rng=None):
        if rng is None:
            rng = np.random.default_rng()

        h_states = np.zeros(T, dtype=int)
        obs = np.zeros(T, dtype=int)

        h_states[0] = rng.choice(self.H, p=self.ph1)

        for t in range(T):
            obs[t] = rng.choice(self.V, p=self.pvgh[:, h_states[t]])
            if t < T-1:
                h_states[t+1] = rng.choice(self.H, p=self.phghm[:, h_states[t]])
    
        return h_states, obs


### Algoritmo EM para modelos ocultos de Markov

Aunque podría (y quizá debería) implementarse como otro método de la clase HMM, aquí lo vamos a definir en una clase propia. La implementación sigue la descripción del libro de Barber: tan solo hay que recordar que vamos a trabajar con "log_smooth" para evitar errores numéricos.

In [66]:
import numpy as np

class HMM_EM:
    def __init__(self, v, H, V):
        """
        v : lista de arrays, de manera que v[3][5] corresponde a la secuencia 3, paso 5
        H : número de estados ocultos
        V : número de estados visibles (observables) 
        """
        self.v = v
        self.V = V
        self.H = H
        self.N = len(v)  # Número de secuencias

    def em(self, maxit):
        v, H, V, N = self.v, self.H, self.V, self.N

        # Inicialización aleatoria con normalización.
        phghm = np.random.rand(H, H)  # Distribución de transiciones p(h(t)|h(t-1))
        phghm /= phghm.sum(axis=1, keepdims=True)  # Normalizar filas
        pvgh = np.random.rand(V, H)  # Distribución de emisión p(v(t)|h(t))
        pvgh /= pvgh.sum(axis=0, keepdims=True)  # Normalizar columnas
        ph1 = np.random.rand(H)  # p(h) inicial
        ph1 /= ph1.sum()  # Normalizar

        llik = np.zeros(maxit)  # Log-verosimilitudes a lo largo de las iteraciones

        for emloop in range(maxit):
            A = np.zeros((H, H))  # Transiciones esperadas
            a = np.zeros(H)       # Estados iniciales esperados
            B = np.zeros((V, H))  # Emisiones esperadas
            llik[emloop] = 0

            # Paso E: Calcular las probabilidades suavizadas y acumular los estadísticos
            for n in range(N):
                hmm = HMM(phghm, ph1, pvgh)  # Crear modelo HMM con los parámetros actuales
                log_phtgV1T, log_phthtgV1T, loglik = hmm.log_smooth(v[n])  # Probabilidades suavizadas

                # Convertir a escala normal
                phtgV1T = np.exp(log_phtgV1T)  # p(h(t)|v(1:T))
                phthtgV1T = np.exp(log_phthtgV1T)  # p(h(t),h(t+1)|v(1:T))

                # Acumular log-verosimilitud
                llik[emloop] += loglik

                # Calcular estadísticos para el paso M
                a += phtgV1T[:, 0]  # Número esperado de veces que h(1) = i
                A += phthtgV1T.sum(axis=2)  # Transiciones esperadas de h(t) a h(t+1)
                for t in range(len(v[n])):
                    B[v[n][t], :] += phtgV1T[:, t]  # Número esperado de veces que h(t) genera v(t)
            
            epsilon = 1e-3  # Pequeño valor para evitar ceros
            # Paso M: Actualizar los parámetros
            ph1 = (a + epsilon) / (a.sum() + H * epsilon)  # Actualizar distribución inicial
            phghm = ((A.T + epsilon) / (A.sum(axis=1) + H * epsilon)).T  # Normalizar filas de la matriz de transición
            pvgh = (B + epsilon) / (B.sum(axis=0, keepdims=True) + V * epsilon)  # Normalizar columnas de la matriz de emisión

        # Devolver los parámetros finales y la última log-verosimilitud
        return ph1, phghm, pvgh, llik[-1]


## El casino fraudulento.

En este casino, la banca hace trampas: utiliza un dado normal en que las seis caras son equiprobables pero de vez en cuando lo cambian por uno trucado en el que el 6 sale la mitad de las veces. Para evitar sospechas, no se cambia de dado muy a menudo y una vez se hace se tiende a seguir usándolo durante bastante tiempo. Más concretamente:
- si se está jugando con el dado normal, se sigue jugando con él con probabilidad 0.95 y se cambia con probabilidad 0.05;
- si se está utilizando el dado trucado, se sigue jugando con él con probabilidad 0.90 y se cambia con probabilidad 0.10. 
- se empieza jugando con la misma probabilidad con cualquiera de los dos dados.


In [67]:
# Formalización de las probabilidades.
# El dado normal es el 0 y el trucado, el 1.
ph1 = np.array([0.5, 0.5])
phghm = np.array([[0.95, 0.10],
                  [0.05, 0.90]])
pvgh = np.array([[1/6,  1/6,  1/6,  1/6,  1/6,  1/6],      # dado normal
                 [1/10, 1/10, 1/10, 1/10, 1/10, 5/10]]).T  # dado trucado

### Muestrear datos del modelo real.

Aunque conocemos los datos reales, vamos a intentar aprenderlos a partir de observaciones de juegos en el casino. Para ello vamos a muestrear algunas secuencias de lanzamientos en el casino real.

In [68]:
num_batches = 5
num_timesteps = 2000
casino = HMM(phghm, ph1, pvgh)

batch_h = np.zeros((num_batches,num_timesteps), dtype='int')
batch_v = np.zeros((num_batches,num_timesteps), dtype='int')
rng = np.random.default_rng(seed=42)  # Crear RNG con una semilla específica
for t in range(num_batches):
    batch_h[t], batch_v[t] = casino.sample(num_timesteps, rng)

Ahora aplicamos el algoritmo EM para averigüar las probabilidades.

In [69]:
casino_EM = HMM_EM(batch_v, 2, 6)

# Unos minutos.
tries = 4
best_llik = -np.inf
for i in range(tries):
    ph1, phghm, pvgh, llik = casino_EM.em(100)
    if llik > best_llik:
        best_llik = llik
        ph1_l, phghm_l, pvgh_l = ph1, phghm, pvgh

Comprobemos si los valores obtenidos se parecen a los reales.

In [70]:
# Distribución de emisiones
pvgh_l[:,0], pvgh_l[:,1]

(array([3.42996715e-04, 5.38744413e-01, 3.42996715e-04, 1.95513042e-01,
        1.95812453e-01, 6.92440984e-02]),
 array([0.14304165, 0.14768597, 0.13794016, 0.14808619, 0.14778602,
        0.27546001]))

In [71]:
# Distribución de emisiones
phghm_l[:,0], phghm_l[:,1]

(array([1.00132823e-07, 1.00108346e-07]), array([0.9999999, 0.9999999]))