# Hidden Markov Model

In [1]:
import string

import torch
import numpy as np

from sklearn.model_selection import train_test_split

In [2]:
from importlib import reload

In [3]:
class HMM(torch.nn.Module):
    """
    Hidden Markov model with discrete states and observations.
    """
    def __init__(self, n_hidden, n_observed):
        """
        Initialize model with n_hidden states and n_observed states.
        """
        super(HMM, self).__init__()
        self.K = n_hidden
        self.V = n_observed
        self.prior_module = PriorModule(self.K)
        self.transition_module = TransitionModule(self.K)
        self.emission_module = EmissionModule(self.K, self.V)

        # enable cuda is it is available
        # if torch.cuda.is_available():
        #     self.cuda()


### Prior probabilities

The prior probability vector $\boldsymbol{\alpha}$ is defined as
$$
\alpha[k] = P(z_0 = k) .
$$

Since $P(z_0)$ is probability distribution, it must sum to 1:
$$
\sum_{k = 0}^{K - 1} P(z_0 = k) = 1 .
$$

In order to preserve numerical precision, we store $\boldsymbol{\alpha}$ as real values with range from $-\infty$ to $\infty$. We name this variable `real_alpha`. Keeping `alpha` in real space also allows us to randomly initialize it using the standard normal distribution.

Accordingly, we must normalize `real_alpha` before returning it as the prior probability vector. In other words, we must normalize `real_alpha` such that
$$
\sum_{k = 0}^{K - 1} \alpha[k] = 1 .
$$

For this, we can use the `torch.nn.functional.softmax` function.

In [4]:
class PriorModule(torch.nn.Module):
    def __init__(self, K):
        """
        Create prior vector with random initial values.
        """
        super(PriorModule, self).__init__()
        self.K = K
        self.real_alpha = torch.nn.Parameter(torch.randn(K))

    def probs(self):
        """
        Return the prior probability vector.
        """
        return torch.nn.functional.softmax(
            self.real_alpha, dim=0 #Essentially, we are normalizing the values in the vector (Making sure they are between 0 and 1)
        )

### Transition probabilities

The transition probability matrix $\boldsymbol{B}$ is defined such that
$$
B[k, l] = P(z_{t+1} = l \mid z_{t} = k) .
$$
(Note: As per standard mathematical notation, we represent a matrix by an uppercase bold letter. The uppercase of $\beta$ is $B$.)

The `TransitionModule` will store the transition matrix as real values (i.e. their ranges are from $-\infty$ to $\infty$. It will normalize the transition matrix.

Therefore, this module needs to normalize the `real_Beta` in order to obtain proper probability values; that is, for each hidden state $i$,
$$
\sum_{l = 0}^{K-1} P(z_{t+1} = l \mid z_{t} = k) = 1 .
$$

###

In [5]:
class TransitionModule(torch.nn.Module):
    def __init__(self, K):
        """
        Create transition matrix with K hidden states and random initial values.
        """
        super(TransitionModule, self).__init__()
        self.K = K
        # sample initial values from standard normal
        self.real_Beta = torch.nn.Parameter(torch.randn(K, K)) # Initializing real_Beta with random values sampled from a standard normal distribution

    def probs(self):
        """
        Return the transition probability matrix.
        """
        return torch.nn.functional.softmax(self.real_Beta, dim=1) # Applying softmax to real_Beta along the columns to normalize each row

### Emission probabilities

The emission probability matrix $\boldsymbol{\Gamma}$ is defined as
$$
\Gamma[k, v] = P( x_t = v \mid z_t = k) .
$$

In [6]:
class EmissionModule(torch.nn.Module):
    def __init__(self, K, V):
        """
        Randomly initialize emission matirx with K hidden states and V observed states.
        """
        super(EmissionModule, self).__init__()
        self.real_Gamma = torch.nn.Parameter(torch.randn(K, V)) # Initializing real_Gamma with random values sampled from a standard normal distribution

    def probs(self):
        """
        Return the emission probability matrix.
        """
        return torch.nn.functional.softmax(self.real_Gamma, dim=1) # Applying softmax to real_Gamma to normalize


### Sampling from the model

Once the parameters $(\boldsymbol{\alpha}, \boldsymbol{B}, \boldsymbol{\Gamma})$ are fully specified, we can sample from the HMM simply by following the model definition:

For $t = 0$,
$$
\begin{aligned}
z_0 \; &\sim \; \text{Categorical}( \boldsymbol{\alpha} ) , \\
x_0 \mid z_0 = k \; &\sim \; \text{Categorical}( \boldsymbol{\gamma}_k ) ,
\end{aligned}
$$
where $\boldsymbol{\gamma}_k$ is the $k$-th row of $\boldsymbol{\Gamma}$.

For $1 < t < T$,
$$
\begin{aligned}
z_t \mid z_{t-1} \; &\sim \; \text{Categorical}( \boldsymbol{\beta}_k ) , \\
x_t \mid z_t = k \; &\sim \; \text{Categorical}( \boldsymbol{\gamma}_k ) ,
\end{aligned}
$$
where $\boldsymbol{\beta}_k$ is the $k$-th row of $\boldsymbol{B}$.

We can draw a sample from a categorical distribution using `torch.distributions.categorical.Categorical`.

In [7]:

def sample(self, T=10):
    """
    Sample from the hidden Markov model for T time points.
    """

    from torch.distributions.categorical import Categorical

    # obtain normalized probabilities
    priors = self.prior_module.probs()
    transitions = self.transition_module.probs()
    emissions = self.emission_module.probs()

    # initialize state arrays
    z = np.zeros(T, dtype=np.int8)
    x = np.zeros(T, dtype=np.int8)

    # sample initial state
    z_t = Categorical(priors).sample().item()
    z[0] = z_t

    for t in range(0, T):
        # sample emission
        x_t = Categorical(emissions[z_t]).sample().item()
        x[t] = x_t

        # sample transition
        z_t = Categorical(transitions[z[t-1]]).sample().item()
        z[t] = z_t

    return x, z

# add sampling method to HMM class
HMM.sample = sample

In [8]:
model = HMM(4, 6)

In [9]:
model.sample()

(array([2, 2, 3, 0, 2, 1, 1, 3, 5, 0], dtype=int8),
 array([3, 0, 3, 0, 0, 3, 2, 3, 2, 0], dtype=int8))

### Learning model parameters

You can hard-code HMM parameters to achieve the behaivour that
you desire. However, we would typically want to learn the parameters by training our model on data.

The PyTorch allows us to minimize a loss function using stochastic gradient descent. Thanks to automatic differentiation, we will not have to analytically derive the gradient equations.

All we need to do is to specify and implement a loss function. A sensible objective function is the model evidence $p(\boldsymbol{x})$ for the observed data $\boldsymbol{x} = (x_0 \dots x_{T-1})$.

In HMM, the model evidence follows directly from its model specifiction:
$$
p(\boldsymbol{x})
= \sum_{\boldsymbol{z}}
  p( \boldsymbol{x} \mid \boldsymbol{z} ) \;
  p( \boldsymbol{z} )
= \sum_{\boldsymbol{z}} \; p(x_0 \mid z_0) \, p( z_0 ) \;
  \prod_{t > 0} p( x_t \mid z_t ) \, p( z_t \mid z_{t-1} )
$$

We want to *maximize* the model evidence $p(\boldsymbol{x})$. To avoid numeric underflow, we can work in log scale such that we maximize $\log p(\boldsymbol{x}) $, which is mathematically maximizing $p(\boldsymbol{x})$. Since in PyTorch, we need to *minimize* a loss function, we can therefore define our loss function as
$$
L_{\theta}(\boldsymbol{x}) = - \log p_{\theta}(\boldsymbol{x}) ,
$$
where we use $\theta$ to represent the collection of all model parameters.

The model evidence $p(\boldsymbol{x})$ for HMM can be efficiently calculated using the dynamic programming algorithm, and in HMM, we call this algorithm the **forward algorithm**.

The key idea of the forward algorithm is that we can calculate the following probability efficiently:
$$
\begin{aligned}
a_{k, t}
 &\triangleq p( x_0, \ldots, x_t, z_t = k) \\
 &= p( x_t \mid z_t = k) \;
   \sum_l \;
    p( z_t = k \mid z_{t-1} = l ) \,
    p( x_0, \ldots, x_{t-1}, z_{t-1} = l ) \\
 &= \boldsymbol{\gamma}_k \;
   \sum_l \;
    \beta_{k, l} \, a_{l, t-1}
\end{aligned}
$$
And this recursive equation can be calculated efficient using dynamic programming.

After we calculate $a_{k,t}$, we can determine the model evidence by
$$
p(\boldsymbol{x}) = \sum_k a_{k, T}
$$

Again, in order to avoid numeric underflow, we work in log scale.


In [10]:
def forward_algorithm(self, x, T):
    """
    Compute log p(x) for each sample in a batch.

    x: IntTensor of shape (batch_size, T_max)
    T: IntTensor of shape (batch_size)

    x[i, :] contains a sample of input whose length is
    specified in T[i]. T_max is the maximum sample length
    across all samples in a batch.
    """

    batch_size = x.shape[0]
    T_max = x.shape[1]

    log_priors = self.prior_module()

    log_a = torch.zeros(batch_size, T_max, self.K)
    log_a[:, 0, :] = self.emission_module(x[:,0]) + log_priors
    for t in range(1, T_max):
        log_a[:, t, :] = self.emission_module(x[:,t]) + \
            self.transition_module(log_a[:, t-1, :])

    # select the sum for the final time (T - 1)
    # (each sample x can have a different T).
    log_sums = log_a.logsumexp(dim=2)
    log_probs = torch.gather(log_sums, 1, T.view(-1,1) - 1)

    return log_probs

def prior_module_forward(self):
    """
    Return log prior probability vector.
    """
    return torch.nn.functional.log_softmax(self.real_alpha, dim=0)

def transition_module_forward(self, prev_log_a):
    """
    Return log a_{0:(K-1), t} using log a_{0:(K-1), t-1}

    prev_log_a : Tensor of shape (batch_size, K); column k is log_a{k, t-1}
    """
    log_probs = torch.nn.functional.log_softmax(self.real_Beta, dim=1)
    log_a = log_matmul(log_probs, prev_log_a.transpose(0,1)).transpose(0,1)
    return log_a

def emission_module_forward(self, x_t):
    """
    Return log emission probabilities p( x_t \mid z_t) for
    observation x_t.
    """
    log_probs = torch.nn.functional.log_softmax(self.real_Gamma, dim=1)
    return log_probs[:, x_t].transpose(0,1)

def log_matmul(log_A, log_B):
	"""
    Compute log(A B) given log A and log B

	log_A: m x k
	log_B: k x n
	output: m x n matrix

    For C = A B, we have
	C_{i,j} = sum_k A_{i,k} x B_{k,j}

    Here, we compute log C by
	log C_{i,j} = logsumexp_k log_A_{i,k} + log_B_{k,j}
	"""
	m = log_A.shape[0]
	k = log_A.shape[1]
	n = log_B.shape[1]

	log_A_expanded = torch.reshape(log_A, (m,k,1))
	log_B_expanded = torch.reshape(log_B, (1,k,n))

	elementwise_sum = log_A_expanded + log_B_expanded
	out = torch.logsumexp(elementwise_sum, dim=1)

	return out

PriorModule.forward = prior_module_forward
TransitionModule.forward = transition_module_forward
EmissionModule.forward = emission_module_forward
HMM.forward = forward_algorithm


### Encoding and decoding

We will be working with text input, so we need to encode the data appropriately. For HMM, since we only use the encoded values for the purposing of indexing the emission and transition probability matrices, integer encoding is appropriate.

In [11]:
def encode(xs):
    character_set = string.ascii_lowercase
    char_to_index = {ch: idx for idx, ch in enumerate(character_set)}
    encoded_xs = [char_to_index[ch.lower()] for ch in xs if ch.lower() in char_to_index]
    return encoded_xs

def decode(ys, character_set=string.ascii_lowercase):
    character_set = string.ascii_lowercase
    index_to_char = {idx: ch for idx, ch in enumerate(character_set)}
    decoded_string = ''.join(index_to_char[idx] for idx in ys if idx in index_to_char)
    return decoded_string

In [12]:
encode("Hidden")

[7, 8, 3, 3, 4, 13]

In [13]:
decode(np.array([7, 8, 3, 3, 4, 13]))

'hidden'

In [14]:
decode(model.sample()[0])

'baacdccbee'

### Running forward algorithm

With the forward algorithm and the encoder/decoder functions implemented, we are now ready to run the forward algorithm to calculate the log model evidence. This should output a floating value that is usually negative.

In [15]:
x = torch.stack( [torch.tensor(encode("cat"))] )
T = torch.tensor([3])

model = HMM(n_hidden=2, n_observed=26)

model.forward(x, T)

tensor([[-9.8939]], grad_fn=<GatherBackward0>)

Now that we have successfully implemented the forward pass in PyTorch, we can take advantage of its automatic differentiation capability to optimize our objective function by stochastic gradient descent. No mathematical derivation needed!

### Data preprocessing

In [16]:
!mkdir -p data
!curl -L -o data/words.txt https://github.com/dwyl/english-words/raw/master/words_alpha.txt

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 4135k  100 4135k    0     0  3119k      0  0:00:01  0:00:01 --:--:-- 3119k


In [17]:
import utils.preprocess as pp
reload(pp)

in_fname = "data/words.txt"

with open(in_fname, "r") as f:
  words_all = [x.rstrip() for x in f.readlines()]

# use only words that begin with 'q' in order to reduce training time
words = []
for word in words_all:
    if word[0] == 'q':
        words.append(word)

# split data into training and testing
train_words, valid_words = train_test_split(words, test_size=0.1, random_state=1)
train_dataset = pp.TextDataset(train_words, encode)
valid_dataset = pp.TextDataset(valid_words, encode)

# construct character set
character_set = list(pp.Counter(("".join(words))).keys())
n_observed = len(character_set)

In [18]:
train_words[0:5]

['quixotically',
 'quadruplicity',
 'quadriplicated',
 'quadriplegia',
 'quincentenary']

In [19]:
valid_words[0:5]

['qual', 'quirite', 'quatorzes', 'quantizable', 'quebracho']

### Training

In [20]:
from utils import train

reload(train)

model = HMM(4, n_observed=n_observed)

# number of training passes through the data
n_epochs = 10
trainer = train.Trainer(model, learning_rate=0.01)

for epoch in range(n_epochs):
    print("========= Epoch %d of %d =========" % (epoch+1, n_epochs))
    train_loss = trainer.train(train_dataset, decode)
    valid_loss = trainer.test(valid_dataset, decode)

    print("========= Results: epoch %d of %d =========" % (epoch+1, n_epochs))
    print("train loss: %.2f| valid loss: %.2f\n" % (train_loss, valid_loss) )



  0%|          | 0/13 [00:00<?, ?it/s]

loss: 31.077190399169922
twzsfyuyip
[3 0 2 1 1 1 1 2 1 1]
hqibhwyxlq
[2 1 1 2 2 1 1 1 2 3]
zboohoasiy
[3 3 3 0 3 3 3 1 3 3]
qyprpyqijz
[3 0 2 0 1 1 1 0 0 2]
ruvhogkyug

  self.pid = os.fork()
 46%|████▌     | 6/13 [00:00<00:00, 22.53it/s]


[2 3 0 3 3 2 1 3 3 1]


100%|██████████| 13/13 [00:00<00:00, 26.32it/s]


loss: 30.830371856689453
uomyyiwuis
[3 1 1 1 0 3 3 1 0 3]
train loss: 30.22| valid loss: 31.34



  8%|▊         | 1/13 [00:00<00:01,  8.53it/s]

loss: 30.218198776245117
uyyhvnajjp
[3 3 0 2 2 3 1 2 2 0]
xmyxbjzlzb
[3 1 2 1 1 0 0 3 1 0]
iwqitpyyqx
[3 0 0 2 1 1 1 1 1 3]
qqyofauerp
[2 1 3 1 3 1 3 0 2 2]
ipqurtrlnb
[3 2 1 0 3 3 0 3 2 1]


100%|██████████| 13/13 [00:00<00:00, 28.46it/s]


loss: 30.07282257080078
uwfoauaajl
[3 0 3 0 3 1 3 1 1 1]
train loss: 29.11| valid loss: 30.29



  8%|▊         | 1/13 [00:00<00:01,  8.24it/s]

loss: 27.671920776367188
ivfyjidrgu
[3 0 3 3 0 3 0 3 3 3]
iuwuzjgyje
[0 3 1 0 2 2 2 1 0 3]
imyiqmkkqa
[3 2 2 2 0 0 1 2 3 2]
tgrozjawzn
[3 0 3 0 3 0 3 0 3 3]
tnticuyhfj
[3 1 2 1 3 3 0 2 0 2]


100%|██████████| 13/13 [00:00<00:00, 32.49it/s]


loss: 30.218486785888672
qutiwojlxw
[3 2 2 0 3 2 1 0 3 3]
train loss: 28.18| valid loss: 29.43



 15%|█▌        | 2/13 [00:00<00:00, 18.94it/s]

loss: 26.005043029785156
ngqnyrtypi
[3 2 2 1 0 2 1 2 0 3]
uzdnugtyqw
[3 0 2 1 2 2 1 2 2 1]
qjeeioqyqu
[0 3 0 0 3 2 1 2 3 0]
rywupyiitg
[3 2 0 0 3 0 0 0 3 2]
lmqetrcrsq
[3 2 3 0 3 0 0 3 2 1]


100%|██████████| 13/13 [00:00<00:00, 45.10it/s]


loss: 29.124326705932617
diprwilqwm
[0 0 0 3 0 3 1 3 0 3]
train loss: 27.42| valid loss: 28.71



 23%|██▎       | 3/13 [00:00<00:00, 26.74it/s]

loss: 26.787975311279297
qajtiroweu
[3 3 2 1 0 3 2 0 3 0]
rylismaaer
[3 1 0 3 0 3 3 3 0 3]
iauyuisxil
[3 0 3 2 2 2 2 2 1 2]
iraxliyetu
[0 3 1 1 1 1 3 0 3 3]
roiiixavym
[3 0 0 0 0 3 3 3 0 0]


100%|██████████| 13/13 [00:00<00:00, 43.06it/s]


loss: 28.353164672851562
roaaaevqen
[3 0 3 2 2 2 2 2 1 0]
train loss: 26.79| valid loss: 28.09



 15%|█▌        | 2/13 [00:00<00:00, 19.47it/s]

loss: 25.975135803222656
mlatvitisi
[3 3 0 3 1 1 1 2 0 3]
fggrnannip
[0 3 0 0 3 2 2 1 2 1]
ijiyapxmns
[3 0 3 1 2 1 1 1 2 0]
xurnlaulll
[3 0 3 2 1 3 1 1 0 3]
tadwxhiufq
[3 3 3 2 1 2 1 1 1 1]


100%|██████████| 13/13 [00:00<00:00, 45.61it/s]


loss: 27.170148849487305
alenntweur
[0 3 3 2 0 3 1 3 0 3]
train loss: 26.23| valid loss: 27.54



 23%|██▎       | 3/13 [00:00<00:00, 26.43it/s]

loss: 25.065462112426758
njoaklllqb
[0 3 3 0 3 1 1 2 1 2]
qafuusqltu
[3 2 2 1 1 2 1 0 3 3]
liaywqoiei
[0 3 0 3 2 3 0 3 0 3]
lyerrfuith
[1 1 0 0 0 0 0 0 3 0]
iryyifnjcw
[3 1 2 1 0 3 1 0 3 2]


100%|██████████| 13/13 [00:00<00:00, 45.43it/s]


loss: 26.461307525634766
qynfeyruud
[3 1 2 2 3 0 3 3 2 2]
train loss: 25.72| valid loss: 27.03



 15%|█▌        | 2/13 [00:00<00:00, 18.94it/s]

loss: 24.702861785888672
qnhbiufemu
[3 2 1 0 3 2 1 0 3 2]
dziyscigiu
[3 0 3 0 0 0 3 2 3 0]
zuiqigutsi
[3 0 0 0 3 1 0 3 0 3]
epgyfqnour
[2 3 2 3 2 2 3 3 0 3]
pqitfcreao
[2 1 2 2 1 0 3 3 2 0]


100%|██████████| 13/13 [00:00<00:00, 44.04it/s]


loss: 26.06682586669922
fuhaxxacei
[3 2 1 2 0 0 0 0 3 3]
train loss: 25.25| valid loss: 26.59



 23%|██▎       | 3/13 [00:00<00:00, 28.08it/s]

loss: 25.380290985107422
elninfuyma
[0 3 0 3 2 1 1 0 3 3]
qneisnniey
[3 3 1 0 3 2 0 3 3 0]
qyewmayemc
[3 3 2 0 3 2 3 0 3 3]
xrmmqbuiio
[0 0 3 2 0 3 1 2 1 0]
tsuxaqutui
[3 2 2 3 2 1 0 3 2 1]


100%|██████████| 13/13 [00:00<00:00, 45.86it/s]


loss: 26.215030670166016
ioeneciuyu
[3 0 3 3 3 2 1 1 2 0]
train loss: 24.85| valid loss: 26.20



 23%|██▎       | 3/13 [00:00<00:00, 26.81it/s]

loss: 24.936466217041016
anooutlisa
[1 1 0 3 2 1 0 3 3 2]
iirneyqrfn
[0 0 3 2 1 2 3 2 1 0]
ufinmtauua
[0 0 3 2 0 3 2 0 0 3]
fclttlruqu
[1 1 2 0 0 0 3 2 3 0]
iyrudebqwn
[3 0 3 3 0 3 2 1 2 2]


100%|██████████| 13/13 [00:00<00:00, 47.01it/s]


loss: 25.76019859313965
qsamqcdtlq
[0 3 3 2 1 3 2 1 2 3]
train loss: 24.50| valid loss: 25.85



### Examining the trained model

We can examine the trained model by sampling a few words from it. We need to decode the output in order to read it.

In [30]:
for i in range(10):
    s = model.sample()
    print(
        "x:", decode(s[0]),
        ", z:", s[1]
    )

x: ruqyqsimqu , z: [3 2 3 2 3 0 3 0 3 0]
x: zaatyfnvuu , z: [0 0 3 1 0 3 2 3 3 0]
x: qulsnqufmi , z: [3 0 3 3 2 2 2 1 0 0]
x: qvqtnqrefn , z: [3 2 0 3 2 0 3 3 2 2]
x: qaoidkrmyf , z: [3 3 2 0 0 0 0 3 2 1]
x: xaddunynqq , z: [3 0 3 2 0 3 3 2 2 0]
x: tsriueaqqi , z: [0 0 0 3 0 3 2 1 1 2]
x: caitopramz , z: [3 1 0 3 2 0 3 2 3 0]
x: jsereieulv , z: [0 0 0 3 2 0 3 1 2 2]
x: icqefymzqu , z: [3 0 3 2 1 0 3 2 1 0]


### Questions

1. How do you know whether the training is complete?
2. Is it better to have more or fewer hidden states in the HMM? Why?
3. How can you improve the model further?

### Bonus Questions

4. Train a HMM to learn the restriction endonuclease
   recognition sequences from [New England Biolabs][1].
   You will need to preprocess the data appropriately.

5. Implement the Viterbi algorithm.
   (Hint: it is very similar to the forward algorithm.)

[1]: https://www.neb.com/en/tools-and-resources/selection-charts/alphabetized-list-of-recognition-specificities

**Answer 1.** The training is completed when the specified number of epcochs (in our case 10) is reached. Also essentially the changes in loss and accuracy tends to be very minor in between epochs when training is almost done.

**Answer 2.** More hidden states can capture more complex patterns but might make the model overfit the data, so rather we should increase the hidden states to a limit, else there is risk of overfitting. And to litte hidden states can result in underfitting.

**Answer 3.** We can increase the training data, also perhaps better clean data can be utilised. We can also try to use different hidden layer 'counts' to check which one tends to give us better results. Also we can perhaps decrease the learning rate (although at on point the learning rate may stop effecting the perfomance, or even effect it negativly)