In [4]:
import numpy as np
import string
import time

In [5]:
def read_model(path):
    
    f = open(path, "r")
    lines = f.readlines()

    pi = []
    A = []
    B = [] 

    initial_f = False
    A_f = False
    B_f = False
    l_count = 0

    for line in lines:        
        if initial_f:
            initial_f = False
            pi = [float(i) for i in line.split()]
        if A_f:
            if l_count>0:
                A.append([float(i) for i in line.split()])
                l_count -= 1
            else:
                A_f = False
        if B_f:
            if l_count>0:
                B.append([float(i) for i in line.split()])
                l_count -= 1
            else:
                B_f = False
        if line.startswith('initial'):
            initial_f = True
        if line.startswith('transition'):
            l_count = int(line.split()[1])
            A_f = True
        if line.startswith('observation'):
            l_count = int(line.split()[1])
            B_f = True
    return np.array(A,dtype='double'), np.array(B,dtype='double'), np.array(pi,dtype='double')

In [6]:
class HMM:
    def __init__(self, A, B, pi):
        self.A = A
        self.B = B
        self.pi = pi
        self.N = len(pi)
        self.dic = list(string.ascii_uppercase)[:self.N]

        
    def baum_welch(self, observations):
        N = self.N
        alpha = self.forward(observations)[0]
        beta = self.backward(observations)[0]

        T = len(observations)

        xi = np.zeros((T,N,N),dtype='double')
        gamma = np.zeros((T,N),dtype='double')

        for t in range(T):
            for i in range(N):
                gamma[t][i] = (alpha[t][i]*beta[t][i])/sum([alpha[t][k]*beta[t][k] for k in range(N)])
                for j in range(N):
                    symbol_index = self.dic.index(observations[t])
                    xi[t][i][j] = (alpha[t][i]*self.A[i][j]*self.B[symbol_index][j]*beta[t+1][j])/sum(
                        [alpha[t][k]*beta[t][k]for k in range(N)])

        new_pi = gamma[0]

        new_A = np.zeros((N,N,2),dtype='double')
        for i in range(N):
            for j in range(N):
                new_A[i][j] = (sum([xi[t][i][j] for t in range(T)]),sum([gamma[t][i] for t in range(T)]))

        new_B = np.zeros((N,N,2),dtype='double')
        for i in range(N):
            for j in range(N):
                sum1 = 0
                sum2 = 0
                for t in range(T):
                    sum2 += alpha[t][i]*beta[t][i]
                    if observations[t] == self.dic[j]:
                        sum1 += alpha[t][i]*beta[t][i]
                new_B[i][j] = (sum1,sum2)
        return new_A, new_B, new_pi
    
    
    def forward(self, observation):
        N = self.N
        T = len(observation)

        alpha = np.zeros((T+1,N),dtype='double')
        alpha[0] = self.pi

        for i in range(T):
            for j in range(N):
                symbol_index = self.dic.index(observation[i])
                s = sum([alpha[i][k]*self.A[k][j] for k in range(N)])
                alpha[i+1][j] = s*self.B[symbol_index][j]    
                
        return alpha,sum(alpha[-1])
    
    
    def backward(self, observation):
        N = self.N
        T = len(observation)

        beta = np.zeros((T+1,N),dtype='double')
        beta[T] = np.ones(N,dtype='double')

        for i in range(T,0,-1):
            for j in range(N):
                symbol_index = self.dic.index(observation[i-1])
                beta[i-1][j] = sum(self.A[j][k]*self.B[symbol_index][k]*beta[i][k] for k in range(N))

        return beta,sum([self.pi[i]*beta[0][i] for i in range(N)])

    def train_model(self, data):
        N = self.N
        d_pi = np.zeros(N, dtype='double')
        d_A = np.zeros((N,N,2), dtype='double')
        d_B = np.zeros((N,N,2), dtype='double')

        for line in data:
            d = self.baum_welch(line)
            d_A += d[0]
            d_B += d[1]
            d_pi += d[2]

        new_pi = d_pi/len(data)
        new_A = np.zeros((N,N), dtype='double')
        new_B = np.zeros((N,N), dtype='double')

        for k in range(N):
            for j in range(N): 
                new_A[k][j] = d_A[k][j][0]/d_A[k][j][1]
                new_B[k][j] = d_B[k][j][0]/d_B[k][j][1]
        
        self.A = new_A
        self.B = new_B
        self.pi = new_pi

In [7]:
def measure_precision_on_testing_set1(models, test_data):
    dummy, result = measure_p_o_lambda(models,test_data)
    temp = [a for (a,b) in result]
    f = open("hmm_data\\testing_answer.txt", "r")
    answer1 = [line.strip() for line in f.readlines()]
    correct = 0
    all_count = 0
    for l in range(len(temp)):
        all_count += 1
        if temp[l] == answer1[l]:
            correct += 1
    return correct/all_count

In [8]:
def measure_p_o_lambda(models,test_data):
    p_o_lambda1 = {'model_0{}'.format(i):[] for i in range(1,6)}
    result1 = []
    for line in test_data:
        p_max = 0
        for i in range(1,6):
            dummy,p = models['HMM_model{}'.format(i)].backward(line)
            p_o_lambda1['model_0{}'.format(i)].append(p)
            if p >= p_max:
                p_max = p
                model_max = 'model_0{}.txt'.format(i)
        result1.append((model_max,p_max))
    return p_o_lambda1,result1

In [9]:
def write_results_to_file(models,test_data):
    for i in range(1,3):
        pol,result = measure_p_o_lambda(models,test_data['test_lines{}'.format(i)])
        with open('result{}.txt'.format(i), 'w') as f:
            for item in result:
                f.write(item[0]+' {}\n'.format(item[1]))

In [10]:
def write_models_to_files(models):
    for i in range(1,6):
        with open('model_0{}.txt'.format(i), 'w') as f:
            f.write('initial: 6\n')
            for p in models['HMM_model{}'.format(i)].pi:
                f.write('{}\t'.format(p))
            f.write('\n\ntransition: 6')
            for row in models['HMM_model{}'.format(i)].A:
                f.write('\n')
                for a in row:
                    f.write('{}\t'.format(a))
            f.write('\n\nobservation: 6')
            for row in models['HMM_model{}'.format(i)].B:
                f.write('\n')
                for b in row:
                    f.write('{}\t'.format(b)) 

In [11]:
A,B,pi = read_model("hmm_data\model_init.txt")

In [12]:
train_data = {}
for i in range(1,6):
    f = open("hmm_data\\seq_model_0{}.txt".format(i), "r")
    train_data['train_lines{}'.format(i)] = [line.strip() for line in f.readlines()]

test_data = {}
for i in range(1,3):
    f = open("hmm_data\\testing_data{}.txt".format(i), "r")
    test_data['test_lines{}'.format(i)] = [line.strip() for line in f.readlines()]

In [13]:
models = {}
start = time.time()
for i in range(1,6):
    print('Training HMM {} ...'.format(i))
    models['HMM_model{}'.format(i)] = HMM(A,B,pi)
    models['HMM_model{}'.format(i)].train_model(train_data['train_lines{}'.format(i)])
end = time.time()
print('\nTime spent to train 5 HMMs in seconds : ', end-start)

Training HMM 1 ...
Training HMM 2 ...
Training HMM 3 ...
Training HMM 4 ...
Training HMM 5 ...

Time spent to train 5 HMMs in seconds :  998.0047299861908


In [14]:
print('Accuracy on testing_data1 : ', measure_precision_on_testing_set1(models, test_data['test_lines1']))
#write_results_to_file(models,test_data)
#write_models_to_files(models)

Accuracy on testing_data1 :  0.1944
