In [54]:
import nltk
import numpy as np
from numpy import array, ones, zeros, multiply
import sys
from ipywidgets import FloatProgress
from IPython.display import display
import pickle

train_10 = pickle.load(open( r"/Users/feidong/Desktop/M2/TC4/typos-data/train10.pkl", 'rb'))
train_20 = pickle.load(open( r"/Users/feidong/Desktop/M2/TC4/typos-data/train20.pkl", 'rb'))
test_10 = pickle.load(open( r"/Users/feidong/Desktop/M2/TC4/typos-data/test10.pkl" , 'rb'))
test_20 = pickle.load(open( r"/Users/feidong/Desktop/M2/TC4/typos-data/test20.pkl", 'rb'))
print (len(train_10),len(train_20),len(test_10),len(test_20))
#print (test_10[2][1][0])


(29057, 27184, 1501, 3374)


# In this class, we have reserved the most part of one order HMM, and add some functions of unsupervised training:
 ---def forward(self, obs)-----------------
 input: an unlabeled observation
 output: 1)an array alpha of forward probilities of an unlabeled observation; 
         2)the probility of the observation
 ---def backward(self,obs)-----------------
 input: an unlabeled observation
 output:  an array alpha of backward probilities of an unlabeled observation
 ---def make_count(self, sequences)--------
 input: list of unlabeled observations
 output: 1) C_init: an numpyarray which counts the probility of initial states;
         2) C_trans: an numpyarray which counts the probility of transition between different states;
         3) C_obs: an numpyarray which counts the probility of observation of different states.
 ---def update(self)-----------------------
 this function improves the inside parameters of the class:
 1)initial_state_proba; 2)transition_proba  3)observation_proba
 ---def unsupervised training(self, sequences)--------------
 input: list of unlabeled observations
 this function run certain interations to get the estimated parameter of the model

In [97]:
class unsupervisedHMM:
        def __init__(self, observation_list,
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None,smoothing_obs = 0.001):
            """Builds a new Hidden Markov Model
            state_list is the list of state symbols [q_0...q_(N-1)]
            observation_list is the list of observation symbols [v_0...v_(M-1)]
            transition_proba is the transition probability matrix
                [a_ij] a_ij = Pr(Y_(t+1)=q_i|Y_t=q_j)
            observation_proba is the observation probablility matrix
                [b_ki] b_ki = Pr(X_t=v_k|Y_t=q_i)
            initial_state_proba is the initial state distribution
                [pi_i] pi_i = Pr(Y_0=q_i)"""
            print ("HMM creating with: ")
            #self.N = len(state_list) # The number of states
            self.N = len(observation_list) #The number of states same as observed characters
            self.M = len(observation_list) # The number of words in the vocabulary
            print (str(self.N)+" states")
            print (str(self.M)+" observations")
            self.omega_Y = list(observation_list) # Keep the vocabulary of tags
            self.omega_X = list(observation_list) # Keep the vocabulary of tags
            # Init. of the 3 distributions : observation, transition and initial states
            if transition_proba is None:
                self.transition_proba = ones( (self.N, self.N), float)/self.N
            else:
                self.transition_proba=transition_proba
            if observation_proba is None:
                self.observation_proba = ones( (self.M, self.N), float)/self.M 
            else:
                self.observation_proba=observation_proba
            if initial_state_proba is None:
                self.initial_state_proba = ones( (self.N,), float )/self.N 
            else:
                self.initial_state_proba=initial_state_proba
            # Since everything will be stored in numpy arrays, it is more convenient and compact to 
            # handle words and tags as indices (integer) for a direct access. However, we also need 
            # to keep the mapping between strings (word or tag) and indices. 
            self.make_indexes()
            self.smoothing_obs = smoothing_obs

        def make_indexes(self):
            """Creates the reverse table that maps states/observations names
            to their index in the probabilities arrays"""
            self.Y_index = {}
            for i in range(self.N):
                self.Y_index[self.omega_Y[i]] = i
            self.X_index = {}
            for i in range(self.M):
                self.X_index[self.omega_X[i]] = i
                
        def get_observationIndices( self, observations ):
            """return observation indices, i.e 
            return [self.O_index[o] for o in observations]
            and deals with OOVs
            """
            indices = zeros( len(observations), int )
            k = 0
            for o in observations:
                if o in self.X_index:
                    indices[k] = self.X_index[o]
                else:
                    indices[k] = UNKid
                k += 1
            return indices
        
        def data2indices(self, sent): 
            """From one tagged sentence of the brown corpus: 
            - extract the words and tags 
            - returns two list of indices, one for each
            -> (wordids, tagids)
            """
            wordids = list()
            tagids  = list()
            for couple in sent:
                wrd = couple[0]
                tag = couple[1]
                if wrd in self.X_index:
                    wordids.append(self.X_index[wrd])
                tagids.append(self.Y_index[tag])
            return wordids,tagids
        
        def observation_estimation(self,pair_counts):
            """ Build the observation distribution: 
                observation_proba is the observation probablility matrix
                    [b_ki],  b_ki = Pr(X_t=v_k|Y_t=q_i)"""
            # fill in with counts
            for pair in pair_counts:
                wrd=pair[0]
                tag=pair[1]
                cpt=pair_counts[pair]
                k = 0 # for <unk>
                if wrd in self.X_index: 
                    k=self.X_index[wrd]
                i=self.Y_index[tag]
                self.observation_proba[k,i]=cpt
            # normalize
            self.observation_proba=self.observation_proba+self.smoothing_obs
            self.observation_proba=self.observation_proba/self.observation_proba.sum(axis=0).reshape(1,self.N)
        
        def transition_estimation(self,trans_counts):
            """ Build the transition distribution: 
                transition_proba is the transition matrix with : 
                [a_ij] a[i,j] = Pr(Y_(t+1)=q_i|Y_t=q_j)
            """
            # fill with counts
            for trans in trans_counts:
                i=self.Y_index[trans[0]]
                j=self.Y_index[trans[1]]
                cpt = trans_counts[trans]
                self.transition_proba[j,i] = cpt
            # normalize
            self.transition_proba = self.transition_proba/self.transition_proba.sum(axis = 0).reshape(1,self.N)
                
            
        def init_estimation(self,init_counts):
            for init in init_counts:
                index = self.Y_index[init]
                self.initial_state_proba[index] = init_counts[init]
            self.initial_state_proba = self.initial_state_proba/sum(self.initial_state_proba)
        
        def supervised_training(self, pair_counts, trans_counts,init_counts):
            """ Train the HMM's parameters. This function wraps everything"""
            self.observation_estimation(pair_counts)
            self.transition_estimation(trans_counts)
            self.init_estimation(init_counts)
            
        def viterbi(self,obs):
            B = self.observation_proba
            A = self.transition_proba
            T = len(obs)
            N = self.N
            
            delta = zeros(N,float)
            tmp = zeros(N,float)
            psi = zeros((T,N),int)
            delta_t = zeros(N,float)
            
            delta = B[obs[0]]*self.initial_state_proba
            for t in range(1,T):
                O_t = obs[t]
                for j in range(N):
                    tmp = multiply(delta,A[j,:])
                    idx = psi[t,j] = tmp.argmax()
                    delta_t[j] = tmp[idx]*B[O_t,j]
                delta,delta_t = delta_t,delta
            i_star = [delta.argmax()]
            temp = delta.argmax()
            for psi_t in psi[T-1:0:-1]:
                i_star.append(psi_t[temp])
                temp = psi_t[temp]
            i_star.reverse()
            return i_star
        
    
        def forward(self, obs):
            B = self.observation_proba
            A = self.transition_proba
            PI= self.initial_state_proba
            T = len(obs)
            N = self.N
            alpha = [{}]

            # Initialize base cases (t == 0)
            for i in range(N):
                alpha[0][i] = PI[i] * B[obs[0],i]
            # Run Forward algorithm for t > 0
            for t in range(1, T):
                alpha.append({})     
                for i in range(N):
                    alpha[t][i] = sum((alpha[t-1][j] * A[i,j] * B[obs[t],i]) for j in range(N))
            prob = sum((alpha[T - 1][i]) for i in range(N))
            return alpha, prob
    
        def backward(self, obs):
        
            B = self.observation_proba
            A = self.transition_proba
            PI= self.initial_state_proba
            T = len(obs)
            N = self.N
            
            beta = [{} for t in range(T)]
            # Initialize base cases (t == T)
            for i in range(N):
                beta[T-1][i] = 1 #A[i]["Final"] #PI[i] * B[i][obs[0]]
            for t in reversed(range(T-1)):
                for i in range(N):
                    beta[t][i] = sum((beta[t+1][j] * A[j,i] * B[obs[t+1],j]) for j in range(N))
            return beta
        
        
        def make_counts(self, sequences):
            B = self.observation_proba
            A = self.transition_proba
            PI= self.initial_state_proba
            N = self.N
            M=self.M
            
            f = FloatProgress(min=0, max=len(sequences))
            display(f)
            
            C_trans=zeros( (N, N), float)
            C_obs=zeros((M,N),float)
            C_init=zeros(N,float)
            
            for obs in sequences:
                f.value+=1
                T=len(obs)
                alpha,prob=self.forward(obs)
                beta=self.backward(obs)
                gamma=[[alpha[t][i]*beta[t][i] for i in range(N)] for t in range(T)]
                C_init += gamma[0]/prob
                MM=zeros((N,N),float)
                for t in range(T):
                    C_obs[obs[t]] += gamma[t]/prob
                for t in range(T-1):
                    for j in range(N):
                        MM[:,j]=multiply(A[:,j],B[obs[t+1],:])
                    P=[[alpha[t][j]*beta[t+1][i]*MM[i][j] for j in range(N)] for i in range(N)]
                    C_trans+=P/prob
            return C_trans,C_obs,C_init
        
        def update(self,C_trans,C_obs,C_init):
            self.initial_state_proba=C_init/sum(C_init)
            self.transition_proba=C_trans/C_trans.sum(axis = 0).reshape(1,self.N)
            C_obs+=self.smoothing_obs
            self.observation_proba=C_obs/C_obs.sum(axis = 0).reshape(1,self.N)
            
        def unsupervised_training(self,sequences):
            #later add the converge condition
            iteration      = 0
            max_iterations = 20
        
            while iteration < max_iterations:
                self.make_count(sequences)
                self.update()


In [56]:
def make_counts(data):
    c_letter_t = dict()
    c_letter_c = dict()
    c_pairs = dict()
    c_transition = dict()
    c_init = dict()
    for d in data:   
        if not d[0][1] in c_init:
            c_init[d[0][1]]=1
        else: c_init[d[0][1]] = c_init.get(d[0][1])+1
        for i in range(len(d)):
            if not d[i][0] in c_letter_t:
                c_letter_t[d[i][0]] = 1
            else: c_letter_t[d[i][0]] = c_letter_t.get(d[i][0])+1
            if not d[i][1] in c_letter_c:
                c_letter_c[d[i][1]] = 1
            else: c_letter_c[d[i][1]] = c_letter_c.get(d[i][1])+1
            if not d[i] in c_pairs:
                c_pairs[d[i]] = 1
            else: c_pairs[d[i]] = c_pairs.get(d[i])+1
            if i <= len(d)-2:
                if not (d[i][1],d[i+1][1]) in c_transition:
                    c_transition[(d[i][1],d[i+1][1])] = 1
                else: c_transition[(d[i][1],d[i+1][1])] = c_transition.get((d[i][1],d[i+1][1]))+1
    
    return c_letter_t,c_letter_c,c_pairs,c_transition,c_init

In [57]:
c_letter_t,c_letter_c,c_pairs,c_transition,c_init = make_counts(train_10)
print (len(c_letter_t),len(c_letter_c),len(c_pairs),len(c_transition),len(c_init))

(26, 26, 127, 403, 25)


In [5]:
tot = 0.0
correct = 0.0
#confusion = zeros([len(c_letter_c),len(c_letter_c)])
f = FloatProgress(min=0, max=len(test_10))
display(f)
type_ids = list()
correct_ids  = list()
for origin in test_10:
    f.value+=1
    for i in origin:
        type_ids.append(i[0])
        correct_ids.append(i[1])
    #type_ids,correct_ids = hmm_train.data2indices(origin)
    #correct_ids_pre = hmm_train.viterbi(type_ids)
for i in range(len(type_ids)):
    f.value+=1
    hyp = type_ids[i]
    ref = correct_ids[i]
    if hyp == ref:
        correct+=1
    #confusion[hyp][ref]+=1
    tot+=1
print ("OK : "+str(correct)+" / "+str(tot)+ " -> "+ str(correct*100/tot))

OK : 6575.0 / 7320.0 -> 89.8224043716


In [85]:
c_words_train,c_tags_train,c_pairs_train,c_transition_train,c_init_train = make_counts(train_10)
print (len(c_words_train),len(c_tags_train),len(c_pairs_train),len(c_transition_train),len(c_init_train))

print(type(c_tags_train))

hmm_train = unsupervisedHMM(observation_list=c_words_train,
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None)
#hmm_train.supervised_training(c_pairs_train, c_transition_train,c_init_train)
print (hmm_train.observation_proba.sum(axis =0))
print (hmm_train.transition_proba.sum(axis =0))
print (sum(hmm_train.initial_state_proba))
print (len(hmm_train.observation_proba))
print (len(hmm_train.transition_proba))
print (len(hmm_train.initial_state_proba))

#hmm_test.init_estimation(c_init_test)

(26, 26, 127, 403, 25)
<type 'dict'>
HMM creating with: 
26 states
26 observations
[ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.
  1.  1.  1.  1.  1.  1.  1.  1.]
[ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.  1.
  1.  1.  1.  1.  1.  1.  1.  1.]
1.0
26
26
26


# sequences are takes the observations in train_10 as the training data for unsupervised training.

In [59]:
sequences=[]
tags=[]
for train in train_10:
    wordids,tagids= hmm_train.data2indices(train)
    sequences.append(wordids)
    tags.append(tagids)

In [8]:
tot_1 = 0.0
correct_1 = 0.0
confusion_1 = zeros([hmm_train.N,hmm_train.N])
f = FloatProgress(min=0, max=len(test_10))
display(f)
for test in test_10:
    f.value+=1
    wordids_test,tagids_test = hmm_train.data2indices(test)
    tagids_pre = hmm_train.viterbi(wordids_test)
    for i in range(len(tagids_pre)):
        hyp = tagids_pre[i]
        ref = tagids_test[i]
        if hyp == ref:
            correct_1+=1
        confusion_1[hyp][ref]+=1
        tot_1+=1
print ("OK : "+str(correct_1)+" / "+str(tot_1)+ " -> "+ str(correct_1*100/tot_1))

OK : 6822.0 / 7320.0 -> 93.1967213115
