# Baum-Welch Implementation

In this sheet we will go through an implementation of the Baum-Welch algorithm and use it to solve some models.

We will start by creating a model

In [None]:
N = 5 # Number of states
M = 4 # Number of words
data = [
  "A A B C".split(),
  "A B B A A D".split(),
  "D A B B C".split(),
  "B D A C A".split()
] # Our corpus
T = len(data)

words = {"A": 0, "B": 1, "C": 2, "D": 3}

def word(n,i):
  return words[data[n][i-1]]



We initialize a model at random. Note that the probabilities must still be valid distributions so we normalize so they sum to one

In [None]:
import numpy as np
from random import random

A = np.zeros((N,N))
B = np.zeros((N,M))

# For convenience the start tag is at zero
A[(0,0)] = 1

for i in range(0,N):
  for j in range(1,N):
    A[(i,j)] = random()
  s = sum(A[(i,)])
  for j in range(N):
    A[(i,j)] /= s

for i in range(1,N):
  for j in range(M):
    B[(i,j)] = random()
  s = sum(B[(i,)])
  for j in range(M):
    B[(i,j)] /= s

The next step is to implement the forward algorithm to calculate $\alpha_{s,i}$

In [None]:
def forwards():
  alphas = [np.zeros((N,len(t)+1)) for t in data]
  for n in range(T):
    alphas[n][(0,0)] = 1 # Initialize start probability
    for i in range(1,len(data[n])+1):
      for s in range(N):

        alphas[n][(s,i)] = sum(alphas[n][(t,i-1)] * A[(t,s)] * B[(s,word(n,i))] for t in range(N))
  return alphas

forwards()


[array([[1.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
         0.00000000e+00],
        [0.00000000e+00, 2.95842141e-02, 4.68160038e-03, 9.40436407e-04,
         3.58607447e-04],
        [0.00000000e+00, 6.01229351e-03, 1.06792129e-03, 7.18616985e-04,
         5.93193274e-04],
        [0.00000000e+00, 5.14040257e-02, 3.89700192e-03, 2.78708837e-03,
         2.22401179e-04],
        [0.00000000e+00, 2.00610402e-02, 1.00146184e-02, 1.89137217e-03,
         1.05298614e-03]]),
 array([[1.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
         0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
        [0.00000000e+00, 2.95842141e-02, 5.38756829e-03, 1.65330428e-03,
         4.41694970e-04, 7.63520835e-05, 2.79737043e-05],
        [0.00000000e+00, 6.01229351e-03, 5.12014827e-03, 1.63357595e-03,
         1.05430201e-04, 1.40782937e-05, 1.09586743e-05],
        [0.00000000e+00, 5.14040257e-02, 1.21230311e-02, 3.71781065e-03,
         3.64280049e-04, 8.33069954e-

Let's quickly verify this implementation works as a language model:

In [None]:
def lm():
  a = forwards()
  return np.sum([a[n][:,len(data[n])] for n in range(T)], axis=1)

lm_scores_old = lm()

Similarly we implement the backwards algorithm to calculate $\beta_{s,i}$

In [None]:
def backwards():
  betas = [np.zeros((N, len(t)+1)) for t in data]
  for n in range(T):
    for s in range(1,N):
      betas[n][(s,len(data[n]))] = 1
    for i in reversed(range(0, len(data[n]))):
      for s in range(N):
        betas[n][(s,i)] = sum(betas[n][(t,i+1)] * A[(s,t)] * B[(t,word(n,i+1))] for t in range(N))
  return betas

backwards()

[array([[0.00222719, 0.01151216, 0.08821106, 0.17921754, 0.        ],
        [0.00405032, 0.02138238, 0.11663247, 0.32226689, 1.        ],
        [0.00326397, 0.0175322 , 0.09463925, 0.379882  , 1.        ],
        [0.00380577, 0.02036852, 0.09925946, 0.373101  , 1.        ],
        [0.00418016, 0.02204145, 0.1191538 , 0.3231841 , 1.        ]]),
 array([[5.28741342e-05, 3.74400801e-04, 1.25499908e-03, 3.00500595e-03,
         1.70544908e-02, 1.24163935e-01, 0.00000000e+00],
        [9.78486187e-05, 5.30446645e-04, 1.74680128e-03, 5.47693744e-03,
         2.91259911e-02, 1.56406214e-01, 1.00000000e+00],
        [7.99125807e-05, 4.27392620e-04, 1.37594168e-03, 4.43145833e-03,
         2.41276047e-02, 1.95904081e-01, 1.00000000e+00],
        [9.29463922e-05, 4.61354569e-04, 1.49171447e-03, 5.15883463e-03,
         2.72221365e-02, 1.63215715e-01, 1.00000000e+00],
        [1.00895565e-04, 5.43152052e-04, 1.79131877e-03, 5.65010453e-03,
         2.98562157e-02, 1.42542870e-01, 1.00000000

Let's consider these two variables we have calculated. $\alpha_{s,i}$ represents **the probability of all subsequences from $1,\ldots,i$ where the $i^{th}$ tag is $s$**.

Similarly $\beta_{s,i}$ represents **the probability of all sequences for $i,\ldots,n$ where the $i^{th}$ tag is $s$**

It then follows that if we multiply these values we get **$\alpha_{s,i}\beta_{s,i}$ the probability of all sequnces where the $i^{th}$ tag is $s$**

In [None]:
alpha = forwards()
beta = backwards()

[alpha[0][(s,2)] * beta[0][(s,2)] for s in range(N)]

[0.0,
 0.0005460266143265999,
 0.00010106727064248265,
 0.00038681432141839556,
 0.0011932798305450373]

If we normalize this, we get **the probability that the $i^{th}$ tag is $s$**. We call this $\gamma_{i}$

In [None]:
def gamma(n,i,alpha,beta):
  s = np.array([alpha[n][(s,i)] * beta[n][(s,i)] for s in range(N)])
  return s / sum(s)

print(gamma(2,0,alpha,beta))
print(gamma(2,1,alpha,beta))

[1. 0. 0. 0. 0.]
[0.         0.48917994 0.17061257 0.33199091 0.00821659]


If we wish we can know figure out how many times a particular tag is expected to be used in the corpus. Remember that 

$E[c(x)] = \sum p(x)$

Hence we have that

$E[c(t)] = \sum_i \gamma_i(t)$

However, for the hidden Markov model we actually wish to find the expected count per output word, $k$. We do this by only summing the probabilities for positions where we have word $k$, e.g.,

$E[c(k,t)] = \sum_{i | w_i = k} \gamma_i(t)$

We can use this to calculate our new prediction for $b$

$b^*_{t,k} = \frac{\sum_{i|w_i = k} \gamma_i(t)}{\sum_i \gamma_i(t)}$

In [None]:
def b_star(alpha,beta):
  b_star = np.zeros((N,M))
  s = np.zeros((N))
  for n in range(T):
    for i in range(1,len(data[n])+1):
      g = gamma(n,i,alpha,beta)
      b_star[:,word(n,i)] += g
      s += g
  for i in range(1,N):
    for j in range(M):
      b_star[(i,j)] /= s[i]
  return b_star

b_star(alpha,beta)

array([[0.        , 0.        , 0.        , 0.        ],
       [0.40377193, 0.18471569, 0.0970583 , 0.31445408],
       [0.15953337, 0.31455106, 0.27767608, 0.24823949],
       [0.37203064, 0.45009411, 0.05642139, 0.12145386],
       [0.51603654, 0.25020163, 0.22364532, 0.01011651]])

For the transition probabilities we need to know how often we see a tag $s$ followed by a tag $t$, we can use the probability $\alpha_{s,i}$ and the probability $\beta_{t,i+1}$, which including the relevant probabilities for word $i+1$ is:

$\alpha_{s,i}\beta_{t,i+1}a_{s,t}b_{t,w_{i+1}}$

We normalize this to get the probability $\xi_i$

In [None]:
def xi(n,i,alpha,beta):
  s = np.array([[alpha[n][s,i] * beta[n][t,i+1] * A[s,t] * B[t,word(n,i)] for t in range(N)] for s in range(N)])
  return s / sum(sum(s))

print(xi(0,0,alpha,beta))
print(xi(0,2,alpha,beta))

[[0.         0.22894112 0.26627852 0.28517903 0.21960133]
 [0.         0.         0.         0.         0.        ]
 [0.         0.         0.         0.         0.        ]
 [0.         0.         0.         0.         0.        ]
 [0.         0.         0.         0.         0.        ]]
[[0.         0.         0.         0.         0.        ]
 [0.         0.05305068 0.00884204 0.07173576 0.10693426]
 [0.         0.01434789 0.00626464 0.0048129  0.01874946]
 [0.         0.04489464 0.01536729 0.02673438 0.0981234 ]
 [0.         0.09794419 0.01497941 0.16356243 0.25365664]]


As before we use the expected counts in order to estimate our next probability function:

$a^*_{s,t} = \frac{E(c(s,t))}{E(c(s))} = \frac{\sum_i\xi_i(s,t)}{\sum_i\gamma_i(s)}$

In [None]:
def a_star(alpha, beta):
  a_star = np.zeros((N,N))
  s = np.zeros(N)
  for n in range(len(data)):
    for i in range(len(data[n])):
      x = xi(n,i,alpha,beta)
      a_star += x
      s += np.sum(x,axis=1)
  for i in range(N):
    for j in range(1,N):
      a_star[(i,j)] /= s[i]
  return a_star


We can now define the Baum-Welch algorithm. For this simple version we will just run it for 5 iterations

In [None]:
def baum_welch():
  global A,B
  for _ in range(5):
    alpha = forwards()
    beta = backwards()
    a_st = a_star(alpha,beta)
    b_st = b_star(alpha,beta)
    A = a_st
    B = b_st

baum_welch()

We can verify that this has improved the model by looking at the language model scores:

In [None]:
print(lm() / lm_scores_old)

[ 3.64809603 14.12747231  1.55176063  1.97437677]


As all (or nearly all) of the values are greater than 1 in the above we can see that the model now thinks the current data is more likely, showing that the model has fitted to the current data. 🎉