In [1]:
import numpy as np

# Load data

with open("data/model_init.txt") as model_init:
    model_init.readline() # "initial: 6"
    initial = np.array([float(x) for x in model_init.readline().split()])
    model_init.readline()
    model_init.readline() # "transition: 6"
    transition = np.array([[float(x) for x in model_init.readline().split()]
                           for _ in range(6)])
    model_init.readline()
    model_init.readline() # "emission: 6"
    emission = np.array([[float(x) for x in model_init.readline().split()]
                          for _ in range(6)])

vocabulary = "ABCDEF"
lookup = {letter: index for index, letter in enumerate(vocabulary)}
train_data = []
for i in range(1, 6):
    train_data_i = []
    with open(f"data/seq_model_0{i}.txt") as data:
        for line in data:
            train_data_i.append(np.array([lookup[letter] for letter in line.rstrip()]))
    train_data.append(np.array(train_data_i))
    print(f"Train data {i} loaded: {train_data[i-1].shape}")

with open(f"data/dev_data.txt") as data:
    dev_data = []
    for line in data:
        sequence, label = line.split("\t")
        sequence = np.array([lookup[letter] for letter in sequence])
        label = int(label) - 1
        dev_data.append((sequence, label))
print(f"Development data loaded: {len(dev_data)}")

print()
print("Starting initial probabilities:")
print(initial)
print("initial[i] is the probability of starting in state i")
print()
print("Starting transition probabilities:")
print(transition)
print("transition[i,j] is the probability of transitioning from state i to state j")
print()
print("Starting emission probabilities")
print(emission)
print("emission[v,i] is the probability of emitting v in state i")
train_data[0][0]

Train data 1 loaded: (10000, 50)
Train data 2 loaded: (10000, 50)
Train data 3 loaded: (10000, 50)
Train data 4 loaded: (10000, 50)
Train data 5 loaded: (10000, 50)
Development data loaded: 500

Starting initial probabilities:
[0.2 0.1 0.2 0.2 0.2 0.1]
initial[i] is the probability of starting in state i

Starting transition probabilities:
[[0.3 0.3 0.1 0.1 0.1 0.1]
 [0.1 0.3 0.3 0.1 0.1 0.1]
 [0.1 0.1 0.3 0.3 0.1 0.1]
 [0.1 0.1 0.1 0.3 0.3 0.1]
 [0.1 0.1 0.1 0.1 0.3 0.3]
 [0.3 0.1 0.1 0.1 0.1 0.3]]
transition[i,j] is the probability of transitioning from state i to state j

Starting emission probabilities
[[0.2 0.2 0.1 0.1 0.1 0.1]
 [0.2 0.2 0.2 0.2 0.1 0.1]
 [0.2 0.2 0.2 0.2 0.2 0.2]
 [0.2 0.2 0.2 0.2 0.2 0.2]
 [0.1 0.1 0.2 0.2 0.2 0.2]
 [0.1 0.1 0.1 0.1 0.2 0.2]]
emission[v,i] is the probability of emitting v in state i


array([0, 2, 2, 3, 3, 3, 3, 5, 5, 2, 2, 2, 2, 1, 2, 5, 5, 5, 2, 2, 2, 2,
       2, 4, 3, 0, 3, 2, 2, 0, 4, 5, 2, 2, 2, 0, 2, 3, 3, 5, 5, 2, 2, 3,
       3, 5, 5, 2, 2, 3])

In [2]:
class HMM:
    def __init__(self, initial, transition, emission):
        self.initial = initial.copy()
        self.transition = transition.copy()
        self.emission = emission.copy()
        self.num_emissions, self.num_states = emission.shape
    
    def save(self, filename):
        with open(filename, "wb") as out_file:
            np.savez(out_file, initial=self.initial, transition=self.transition, emission=self.emission)
    
    @classmethod
    def load(cls, filename):
        with open(filename, "rb") as in_file:
            arrays = np.load(in_file)
            hmm = cls(arrays["initial"], arrays["transition"], arrays["emission"])
            arrays.close()
        return hmm       
    
    def forward(self,obs):
        a=self.transition
        b=self.emission
        pi=self.initial
        nStates = np.shape(b)[0]
        T = np.shape(obs)[0]

        alpha = np.zeros((nStates,T))

        alpha[:,0] = pi*b[:,obs[0]]

        for t in range(1,T):
            for s in range(nStates):
                alpha[s,t] = b[s,obs[t]] * np.sum(alpha[:,t-1] * a[:,s])

        return alpha 
    def forward_batch(self, observations):
        nStates = np.shape(self.emission)[0]
        batch=50
        t=np.zeros((50,nStates,50))
        for i in range(batch):
            t[i,:]=self.forward(observations[i])
        return t

       
    def backward(self,obs):
        a=self.transition
        b=self.emission
        nStates = np.shape(b)[0]
        T = np.shape(obs)[0]

        beta = np.zeros((nStates,T))

        beta[:,T-1] = 1.0 #aLast

        for t in range(T-2,-1,-1):
            for s in range(nStates):
                beta[s,t] = np.sum(b[:,obs[t+1]] * beta[:,t+1] * a[s,:])

        return beta 
    def backward_batch(self, observations):
        nStates = np.shape(b)[0]
        batch=50
        BB=np.zeros((batch,nStates,batch))
        for i in range(50):
            BB[i,:]=self.backward(observations[i])
        return BB
    
    def viterbi(self,obs):
        V=obs
        a=self.transition
        b=self.emission
        initial_distribution=self.initial
        T = V.shape[0]
        M = a.shape[0]

        omega = np.zeros((T, M))
        omega[0, :] = np.log(initial_distribution * b[:, V[0]])

        prev = np.zeros((T - 1, M))

        for t in range(1, T):
            for j in range(M):
                # Same as Forward Probability
                probability = omega[t - 1] + np.log(a[:, j]) + np.log(b[j, V[t]])

                # This is our most probable state given previous state at time t (1)
                prev[t - 1, j] = np.argmax(probability)

                # This is the probability of the most probable state (2)
                omega[t, j] = np.max(probability)

        # Path Array
        S = np.zeros(T)

        # Find the most probable last hidden state
        last_state = np.argmax(omega[T - 1, :])

        S[0] = last_state

        backtrack_index = 1
        for i in range(T - 2, -1, -1):
            S[backtrack_index] = prev[i, int(last_state)]
            last_state = prev[i, int(last_state)]
            backtrack_index += 1

        # Flip the path array since we were backtracking
        S = np.flip(S, axis=0)
        return S,omega
    
    
    def baum_welch(self,obs):
        nStates = 6
        # Initialise pi, a, b randomly
        pi = 1./nStates*np.ones((nStates))
        a = np.random.rand(nStates,nStates)
        b = np.random.rand(nStates,np.max(obs)+1)
        nStates = np.shape(b)[0]
        T = np.shape(obs)[0]
        xi = np.zeros((nStates,nStates,T))
        tol = 1e-2
        error = tol+1
        maxits = 50
        nits = 0
        pi_l=np.zeros((6,6))
        for i in range(50):
            while ((error > tol) & (nits < maxits)):
                nits += 1
                oldpi = pi.copy()
                olda = a.copy()
                oldb = b.copy()

                # E step
                alpha = self.forward(obs[i])
                beta = self.backward(obs[i])

                for t in range(T-1):
                    for i in range(nStates):
                        for j in range(nStates):
                            xi[i,j,t] = alpha[i,t]*a[i,j]*b[j,obs[i][t+1]]*beta[j,t+1]
                    xi[:,:,t] /= np.sum(xi[:,:,t])

                # The last step has no b, beta in
                for i in range(nStates):
                    for j in range(nStates):
                        xi[i,j,T-1] = alpha[i,T-1]*a[i,j]
                xi[:,:,T-1] /= np.sum(xi[:,:,T-1])

                # M step
                for i in range(nStates):
                    pi[i] = np.sum(xi[i,:,0])
                    for j in range(nStates):
                        a[i,j] = np.sum(xi[i,j,:T-1])/np.sum(xi[i,:,:T-1])

                    for k in range(max(obs[0])):
                        found = (obs==k).nonzero()
                        b[i,k] = np.sum(xi[i,:,found])/np.sum(xi[i,:,:])

                error = (np.abs(a-olda)).max() + (np.abs(b-oldb)).max()
            
            pi_l+=pi

        return pi_l/50


In [3]:
import os
from tqdm.notebook import tqdm
hmms = [HMM(initial, transition, emission) for i in range(5)]
batch_size = 50

os.makedirs("models", exist_ok=True)

for n,(dataset, hmm) in enumerate(zip(train_data, hmms)):
    pbar = tqdm(total=len(dataset), ncols="100%")
    for i in range(0, len(dataset), batch_size):
        likelihood = hmm.baum_welch(dataset[i:i+batch_size])
        pbar.set_description(f"log-likelihood: {np.mean(np.log(likelihood)):.2f}")
        pbar.update(batch_size)
    pbar.close()
    hmm.save(os.path.join("models", f"model_{n+1}"))


HBox(children=(IntProgress(value=0, layout=Layout(flex='2'), max=10000), HTML(value='')), layout=Layout(displa…




HBox(children=(IntProgress(value=0, layout=Layout(flex='2'), max=10000), HTML(value='')), layout=Layout(displa…




HBox(children=(IntProgress(value=0, layout=Layout(flex='2'), max=10000), HTML(value='')), layout=Layout(displa…




HBox(children=(IntProgress(value=0, layout=Layout(flex='2'), max=10000), HTML(value='')), layout=Layout(displa…




HBox(children=(IntProgress(value=0, layout=Layout(flex='2'), max=10000), HTML(value='')), layout=Layout(displa…




In [4]:
import os
from tqdm.notebook import tqdm
hmms = [HMM(initial, transition, emission) for i in range(5)]
correct = 0
pbar = tqdm(total=len(dev_data))
for sample, label in dev_data:
    decision = np.argmax([hmm.viterbi(sample)[1] for hmm in hmms])
    correct += (decision == label)
    pbar.update()
pbar.close()
print(f"Accuracy: {correct / len(dev_data):.1%}")

HBox(children=(IntProgress(value=0, max=500), HTML(value='')))


Accuracy: 20.8%
