In [4]:
import nltk
from numpy import array, ones, zeros, multiply
import numpy as np
import sys

In [76]:
import cPickle as pickle
train10=pickle.load( open( "../data/train10.pkl", "rb" ))
test10=pickle.load( open( "../data/test10.pkl", "rb" ))
train20=pickle.load( open( "../data/train20.pkl", "rb" ))
test20=pickle.load( open( "../data/test20.pkl", "rb" ))

In [6]:
class HMM:
        def __init__(self, state_list, observation_list,
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None, smoothing_obs = 0.01):
          
            print "HMM creating with: "
            self.N = len(state_list)       # number of states
            self.M = len(observation_list) # number of possible emissions
            print str(self.N)+" states"
            print str(self.M)+" observations"
            
            self.omega_Y = state_list
            self.omega_Y.append("*")
            self.omega_X = observation_list
            
            if transition_proba is None:
                self.transition_proba = zeros( (self.N+1, self.N+1, self.N), float) 
            else:
                self.transition_proba=transition_proba
            if observation_proba is None:
                self.observation_proba = zeros( (self.M, self.N), float) 
            else:
                self.observation_proba=observation_proba
            """if initial_state_proba is None:
                self.initial_state_proba = zeros( (self.N,), float ) 
            else:
                self.initial_state_proba=initial_state_proba
            """
            self.make_indexes() # build indexes, i.e the mapping between token and int
            self.smoothing_obs = smoothing_obs 
            
        def make_indexes(self):
            """Creates the reverse table that maps states/observations names
            to their index in the probabilities array"""
            self.Y_index = {}
            for i in range(self.N+1):
                self.Y_index[self.omega_Y[i]] = i
                
            self.X_index = {}
            for i in range(self.M):
                self.X_index[self.omega_X[i]] = i
      
        def get_observationIndices( self, observations ):
            """return observation indices, i.e 
            return [self.O_index[o] for o in observations]
            and deals with OOVs TODO 
            """
            indices = zeros( len(observations), int )
            k = 0
            for o in observations:
                if o in self.X_index:
                    indices[k] = self.X_index[o]
                k += 1
            return indices

    
        def data2indices(self, sent): 
            """From a word of the corpus: 
            - extract the letter and coorection 
            - returns two list of indices, one for each
            -> (letterid, correctionid)
            """
            letterids = list()
            correctionids  = list()
            for couple in sent:
                ltr = couple[0]
                crt = couple[1]
                letterids.append(self.X_index[ltr])
                correctionids.append(self.Y_index[crt])
            return letterids,correctionids
            
        def observation_estimation(self, pair_counts):

            for pair in pair_counts:
                letter=pair[0]
                correction=pair[1]
                count=pair_counts[pair]
                
                if letter in self.X_index:
                    k=self.X_index[letter]
                i=self.Y_index[correction]
                self.observation_proba[k,i]=count
            self.observation_proba=self.observation_proba+self.smoothing_obs
            self.observation_proba=self.observation_proba/self.observation_proba.sum(axis=0).reshape(1, self.N)
                        
        def transition_estimation(self, c_bitag, c_tritag):
            
            for tritag in c_tritag:
                #getting indices
                y_2=self.Y_index[tritag[0]]
                y_1=self.Y_index[tritag[1]]
                y=self.Y_index[tritag[2]]
                bigram=(tritag[0],tritag[1])       
                self.transition_proba[y_2,y_1,y]=float(c_tritag[tritag])/float(c_bitag[bigram])               
   
        def init_estimation(self, c_inits, c_inits_bitag):
            somme=float(sum(c_inits.values()))
            for correction in c_inits:
                i=self.Y_index[correction]
                j=self.Y_index["*"]
                self.transition_proba[j,j,i]=float(c_inits[correction])/somme
                
            for pair in c_inits_bitag:
                y_1=self.Y_index[pair[0]]
                y=self.Y_index[pair[1]]
                j=self.Y_index["*"]
                self.transition_proba[j,y_1,y]=float(c_inits_bitag[pair])/float(c_inits[pair[0]])
                
            

        def supervised_training(self, pair_counts, c_bitag, c_tritag ,c_inits, c_inits_bitag):
            """ Train the HMM's parameters. This function wraps everything"""
            self.observation_estimation(pair_counts)
            self.transition_estimation(c_bitag, c_tritag)
            self.init_estimation(c_inits, c_inits_bitag)
        
        def get_possible_corrections(self,k):
            if k == -1:
                return set(['*'])
            if k == 0:
                return set(['*'])
            else:
                return self.omega_Y[0:26]

        def get_letter(self,word,k):
            if k < 0:
                return '*'
            else:
                return word[k]

        def viterbi(self,word):
            V = {}
            path = {}
            # init
            V[0,'*','*'] = 1
            path['*','*'] = []
            
            for k in range(1,len(word)+1):
                temp_path = {}
                letter = self.X_index[self.get_letter(word,k-1)]
                
                for u in self.get_possible_corrections(k-1):
                    
                    for v in self.get_possible_corrections(k):

                        i_u=self.Y_index[u]
                        i_v=self.Y_index[v]
                        
                        V[k,u,v],backpointer = max([(V[k-1,w,u] * self.transition_proba[self.Y_index[w],i_u,i_v] * self.observation_proba[letter,i_v],w) for w in self.get_possible_corrections(k-2)])                       
                        
                        temp_path[u,v] = path[backpointer,u] + [v]                        
                path = temp_path
                
            prob,maxu,maxv= max([(V[k,u,v],u,v) for u in self.omega_Y[0:26] for v in self.omega_Y[0:26]])
                
            
            return prob, path[maxu,maxv]

In [7]:
def make_counts(corpus):
    """ 
    Build different count tables to train a HMM. Each count table is a dictionnary. 
    Returns: 
    * c_letter: letter counts
    * c_correction: correction counts
    * c_pairs: count of pairs (letter,correction)
    
    * c_bitag: count of tag bigram 
    * c_tritag: count of tag trigram 
    * c_inits: count of tag found in the first position
    
    """
    c_letter = dict()
    c_correction = dict()
    c_pairs= dict()
    c_bitag = dict()
    c_tritag = dict()
    c_inits = dict()
    c_inits_bitag = dict()
    
    for word in corpus:
        for i in range(len(word)):
            couple= word[i]
            letter = couple[0]
            correction = couple[1]
            #Counting letter 
            if letter in c_letter:
                c_letter[letter] +=1
            else:
                c_letter[letter] =1  
            #Counting correction
            if correction in c_correction:
                c_correction[correction] +=1
            else:
                c_correction[correction] =1
            #Counting par(letter, correction)
            if couple in c_pairs:
                c_pairs[couple] +=1
            else :
                c_pairs[couple] =1
            #Counting bitag(corr_i, corr_(i+1))
            if i > 0 and i < len(word)-1:
                bitag = (word[i-1][1], correction)
                if bitag in c_bitag:
                    c_bitag[bitag] += 1
                else:
                    c_bitag[bitag] =1
                    
            #Counting tritag
            if i > 1:
                tritag = (word[i-2][1],word[i-1][1], correction)
                if tritag in c_tritag :
                    c_tritag[tritag] +=1
                else :
                    c_tritag[tritag] =1
                    
            if i == 0 and len(word)>1:
                if correction in c_inits:
                    c_inits[correction] +=1
                else :
                    c_inits[correction] =1
                bg_first=(correction,word[i+1][1])
                
                if bg_first in c_inits_bitag:
                    c_inits_bitag[bg_first]+=1
                else:
                    c_inits_bitag[bg_first]=1
                    
    return c_letter, c_correction, c_pairs, c_bitag, c_tritag, c_inits, c_inits_bitag

In [80]:
c_letter, c_correction, c_pairs, c_bitag, c_tritag, c_inits, c_inits_bitag=make_counts(train20)

In [81]:
hmm = HMM(state_list=c_correction.keys(), observation_list=c_letter.keys(),
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None)
hmm.supervised_training( c_pairs, c_bitag, c_tritag ,c_inits, c_inits_bitag)


HMM creating with: 
26 states
26 observations


In [86]:
wrong_words=[]
true_words=[] #denotes all underlying hidden states
for sent in test20:
    data = np.asarray(sent)
    obs,states = np.hsplit(data,2)
    wrong_words.append(obs.tostring())
    true_words.append(list(states.tostring()))
wrong_words = np.array(wrong_words)
true_words = np.array(true_words)   #These are the true lables

In [None]:
correction_words=[]
for word in wrong_words:
    if(len(word)>1):
        p,v=hmm.viterbi(word)
    else:
        v=list(word)
    correction_words.append(v)
correction_words=np.array(correction_words)

In [74]:
def compute_error(correction_words,true_words):
    """Compares the corrections and true_vals"""
    error=0
    total=0
    for f, b in zip(correction_words, true_words):
        if cmp(f,b)!=0:
            for i in range(len(f)):
                if f[i]!=b[i]:
                    error+=1
        total+=len(f)

    return float(error)/float(total)  

In [None]:
print compute_error(correction_words,true_words)