In [198]:
start = [0.5, 0.5]

transition_mat = np.array([
  [.9, .1],
  [.1, .9]
])

emission_mats = [
    np.array([
        [.1, .5],
        [.1, .4],
        [.8, .1]
    ]), 
    np.array([
        [.8, .1],
        [.1, .8],
        [.1, .1]
    ])
]

def generate_sentence(lam):
    y, states = [], []
    state = np.random.choice(2, 1, p=start)[0]
    
    n = np.random.poisson(lam=lam)
    for i in range(n):
        states.append(state)
        emissions = []
        for j in range(len(emission_mats)):
            probs = emission_mats[j][:,state].reshape((3,)).tolist()
            emissions.append(np.random.choice(3, 1, p=probs)[0])
        y.append(emissions)
        
        probs = transition_mat[state,:].reshape((2,)).tolist()
        state = np.random.choice(2, 1, p=probs)[0]
        
    return y, states
    
def generate_dataset(n, lam):
    y, z = [], []
    for i in range(n):
        y_, z_ = generate_sentence(lam)
        y.append(y_)
        z.append(z_)
    return y, z

def print_expected_matrices(Y, Z):
    transition_mat = np.array([
      [0, 0],
      [0, 0]
    ])

    emission_mats = [
        np.array([
            [0, 0],
            [0, 0],
            [0, 0]
        ]),
        np.array([
            [0, 0],
            [0, 0],
            [0, 0]
        ])
    ]
    
    for i in range(len(Y)):
        prev = None
        for j in range(len(Y[i])):
            z = Z[i][j]
            if not prev is None:
                transition_mat[prev][z] += 1

            for k in range(len(emission_mats)):
                emission_mats[k][Y[i][j][k]][z] += 1
            prev = z
  
    transition_mat = transition_mat / np.sum(transition_mat, axis=-1).reshape((2, 1))
    
    print(transition_mat)
    for k in range(len(emission_mats)):
        emission_mats[k] = emission_mats[k] / np.sum(emission_mats[k], axis=0).reshape((1, 2))
        print(emission_mats[k])

In [192]:
import sys
import numpy as np

class HMM():
    def __init__(self):
        self.states = np.array(['H', 'C'])

        self.start = [.5, .5]
        self.end = [1, 1]

        self.transition_mat = np.array([
          [.6, .4],
          [.4, .6]
        ])
        
        self.emission_mats = [
            np.array([
                [.2, .5],
                [.4, .4],
                [.4, .1]
            ]), 
            # np.array([
            #     [.2, .5],
            #     [.4, .4],
            #     [.4, .1]
            # ])
        ]
        
        self.transition_mat = np.random.rand(2,2) 
        self.transition_mat = self.transition_mat / np.sum(self.transition_mat, axis=-1).reshape((2, 1))
        
        for k in range(len(self.emission_mats)):
            self.emission_mats[k] = np.random.rand(3,2) 
            self.emission_mats[k] = self.emission_mats[k] / np.sum(self.emission_mats[k], axis=0).reshape((1, 2))
            # self.emission_mats[1] = np.random.rand(3,2) 
            # self.emission_mats[1] = self.emission_mats[1] / np.sum(self.emission_mats[1], axis=0).reshape((1, 2))

        # print(self.transition_mat)
        # print(self.emission_mats[0])
        # print(self.emission_mats[1])
        
    def viterbi(self, Y):
        backprobs = np.zeros((len(Y), len(self.states)))
        backpointers = np.zeros((len(Y), len(self.states)))

        alpha = None
        for i, y in enumerate(Y):  
            if alpha is None:
                alpha = self.start 
                for j in range(len(self.emission_mats)):
                    alpha *= self.emission_mats[j][y[j]]                
                backprobs[i] = alpha
                continue

            alpha_mat = self.transition_mat
            for j in range(len(self.emission_mats)):            
                alpha_mat = alpha_mat * self.emission_mats[j][y[j]].reshape((1, 2))
            
            alpha_mat = np.transpose(alpha_mat) * alpha
            alpha = np.amax(alpha_mat, axis=1)
            pointers = np.argmax(alpha_mat, axis=1)

            backprobs[i] = np.amax(alpha_mat, axis=1)
            backpointers[i] = np.argmax(alpha_mat, axis=1)

        last_state = np.argmax(backprobs[len(Y) - 1])
        res = [last_state]
        for i in range(0, len(Y) - 1):
            last_state = int(backpointers[len(Y) - i - 1][last_state])
            res.append(last_state)

        res.reverse()
        return res        

    def forward_algorithm(self, Y):
        alphas = np.zeros((len(Y), len(self.states)))

        alpha = None
        for i, y in enumerate(Y):
            if alpha is None:
                alpha = self.start 
                for j in range(len(self.emission_mats)):
                    alpha *= self.emission_mats[j][y[j]]
                alphas[i] = alpha
                continue

            alpha = np.matmul(alpha, self.transition_mat)
            for j in range(len(self.emission_mats)):
                alpha *= self.emission_mats[j][y[j]]
            alphas[i] = alpha
        return alphas, np.matmul(alpha, np.transpose(self.end))

    def backward_algorithm(self, Y):
        betas = np.zeros((len(Y), len(self.states)))

        beta = self.end
        betas[len(Y) - 1] = beta
        for i, y in enumerate(reversed(Y)):
            for j in range(len(self.emission_mats)):
                beta *= self.emission_mats[j][y[j]]
            
            if i < len(Y) - 1:
                beta = np.matmul(beta, np.transpose(self.transition_mat))
                betas[len(Y) - i - 2] = beta

        return betas, np.matmul(beta, np.transpose(self.start))

    def forward_backward_algorithm(self, Y):
        for i in range(0, 100):  
            transition_numerator = np.zeros(shape=(2,2))
            transition_denominator = np.zeros(shape=(1,2))
            emission_numerators = [np.zeros(shape=(2,3)), np.zeros(shape=(2,3))]
            emission_denominators = [np.zeros(shape=(1,2)), np.zeros(shape=(1,2))]
            
            num_sentences = len(Y)
            for y in range(num_sentences):
                alphas, n = self.forward_algorithm(Y[y])
                betas, n = self.backward_algorithm(Y[y])
                # print(n)

                # Transition probs.
                numerator = np.matmul(np.transpose(alphas), betas) * self.transition_mat
                transition_numerator += numerator
                transition_denominator += np.sum(numerator, axis=1)

                # Emission probs.
                numerator = alphas.T * betas.T
                for j in range(len(self.emission_mats)):
                    emission_denominators[j] += np.sum(numerator, axis=1)
                        
                    unary = np.zeros((len(Y[y]), self.emission_mats[j].shape[0]))
                    for k, o in enumerate(Y[y]):
                        unary[k][o[j]] = 1

                    numerator_ = np.matmul(numerator, unary)
                    emission_numerators[j] += np.matmul(numerator, unary)
                    
            self.transition_mat = (transition_numerator.T / transition_denominator).T
            for j in range(len(self.emission_mats)):
                self.emission_mats[j] = emission_numerators[j].T / emission_denominators[j]

        print(np.round(self.transition_mat, 4))
        
        for k in range(len(self.emission_mats)):
            print(np.round(self.emission_mats[k], 4))        

print('Expected matrices:')
Y, Z = generate_dataset(2500, 10)
print_expected_matrices(Y, Z)
  
print('Actual matrices:')
print(transition_mat)
print(emission_mats[0])
# print(emission_mats[1])    
print()

Expected matrices:
[[ 0.90386941  0.09613059]
 [ 0.10345148  0.89654852]]
[[ 0.09980598  0.49068582]
 [ 0.09771052  0.40840364]
 [ 0.80248351  0.10091053]]
Actual matrices:
[[ 0.9  0.1]
 [ 0.1  0.9]]
[[ 0.1  0.5]
 [ 0.1  0.4]
 [ 0.8  0.1]]



In [193]:
print('Expected matrices:') 
print_expected_matrices(Y, Z)
print()

print('Calculated matrices:')    
hmm = HMM()
hmm.forward_backward_algorithm(Y)
print()

print('Sentences:')  
for i in range(1):
    actual = np.array([z for z in Z[i]])
    preds = np.array(hmm.viterbi(Y[i]))
    print(actual)
    print(preds)
    
    acc = np.sum(actual == preds) / float(len(actual))
    print('Accuracy:', acc)

Expected matrices:
[[ 0.90386941  0.09613059]
 [ 0.10345148  0.89654852]]
[[ 0.09980598  0.49068582]
 [ 0.09771052  0.40840364]
 [ 0.80248351  0.10091053]]

Calculated matrices:
[[ 0.3537  0.6463]
 [ 0.9154  0.0846]]
[[ 0.  0.]
 [ 0.  0.]
 [ 1.  1.]]

Sentences:
[0 0 0 1 1 0 0 1 1 1 1 0 0 0 0 0]
[1 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0]
Accuracy: 0.5625


In [242]:
import numpy as np
from hmmlearn import hmm

Y, Z = generate_dataset(300, 10)
# print_expected_matrices(Y, Z)

transmat = [
    [0.7, 0.3],
    [0.1, 0.9]
]
model = hmm.GaussianHMM(2)

samples = []
lengths = []
actual = []

for i in range(len(Y)):
    samples += [y_ for y_ in Y[i]]
    actual += [z_ for z_ in Z[i]]
    lengths.append(len(Y[i]))

samples = np.array(samples)
# print(samples)

model.fit(samples, lengths)
preds = model.predict(samples, lengths)

print(actual)
print(preds.tolist())
acc = np.sum(actual == preds) / float(len(actual))
print('Accuracy:', acc)

[1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 

In [263]:
def load_raw_dataset(f):
  with open(f, 'r') as f:
    data = f.read().strip()

    sentences = data.split('\n\n')
    sentences = [s for s in sentences if not s.startswith('-DOCSTART-')]
    X = [[t.split(' ') for t in s.split('\n') if len(s) > 0] for s in sentences]
    Y = []
    T = []
    for i, s in enumerate(X):
      tkns, labels = [], []
      for j, t in enumerate(s):
        l = ['O', 'B-PER', 'I-PER'].index(t[1])
        labels.append(0 if l == 0 else 1)
        tkns.append(t[0])
        X[i][j] = [float(x) for x in X[i][j][3:12]]

      Y.append(labels)
      T.append(tkns)

    return X, Y, T
    
Y, Z, _ = load_raw_dataset('../data/ner_on_html/test')

Y = Y
Z = Z

samples = []
lengths = []
actual = []

for i in range(len(Y)):
    samples += [y_ for y_ in Y[i]]
    actual += [z_ for z_ in Z[i]]
    lengths.append(len(Y[i]))    
    
model = hmm.GaussianHMM(2)

samples = np.array(samples)
model.fit(samples, lengths)
preds = model.predict(samples, lengths)

# print(actual)
# print(preds.tolist())
acc = np.sum(actual == preds) / float(len(actual))
print('Accuracy:', acc)

Accuracy: 0.263042750307
