# Defining the HMM

In [1]:
import nltk
from numpy import array, ones, zeros, multiply
import numpy as np
import sys

In [38]:
class HMM:
        def __init__(self, state_list, observation_list,
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None, smoothing_obs = 0.01):
          
            print "HMM creating with: "
            self.N = len(state_list)       # number of states
            self.M = len(observation_list) # number of possible emissions
            print str(self.N)+" states"
            print str(self.M)+" observations"
            
            UNKid = self.M+1;  
            
            self.omega_Y = state_list
            self.omega_Y.append("*")
            self.omega_X = observation_list
            
            if transition_proba is None:
                self.transition_proba = zeros( (self.N+1, self.N+1, self.N), float) 
            else:
                self.transition_proba=transition_proba
            if observation_proba is None:
                self.observation_proba = zeros( (self.M+1, self.N), float) 
            else:
                self.observation_proba=observation_proba

            self.make_indexes() 
            self.smoothing_obs = smoothing_obs 
            
        def make_indexes(self):
            """Creates the reverse table that maps states/observations names
            to their index in the probabilities array"""
            self.Y_index = {}
            for i in range(self.N+1):
                self.Y_index[self.omega_Y[i]] = i
                
            self.X_index = {}
            for i in range(self.M):
                self.X_index[self.omega_X[i]] = i
      
        def get_observationIndices( self, observations ):
            """return observation indices, i.e 
            return [self.O_index[o] for o in observations]
            and deals with OOVs  
            """
            indices = zeros( len(observations), int )
            k = 0
            for o in observations:
                if o in self.X_index:
                    indices[k] = self.X_index[o]
                else:
                    indices[k] = UNKid
                k += 1
            return indices
    
        def data2indices(self, sent): 
            """From a word of the corpus: 
            - extract the letter and coorection 
            - returns two list of indices, one for each
            -> (letterid, correctionid)
            """
            letterids = list()
            correctionids  = list()
            for couple in sent:
                ltr = couple[0]
                crt = couple[1]
                letterids.append(self.X_index[ltr])
                correctionids.append(self.Y_index[crt])
            return letterids,correctionids
            
        def observation_estimation(self, pair_counts):

            for pair in pair_counts:
                letter=pair[0]
                correction=pair[1]
                count=pair_counts[pair]
                
                if letter in self.X_index:
                    k=self.X_index[letter]
                i=self.Y_index[correction]
                self.observation_proba[k,i]=count
            self.observation_proba=self.observation_proba+self.smoothing_obs
            self.observation_proba=self.observation_proba/self.observation_proba.sum(axis=0).reshape(1, self.N)
                        
        def transition_estimation(self, c_bitag, c_tritag):
            
            for tritag in c_tritag:
                #getting indices
                y_2=self.Y_index[tritag[0]]
                y_1=self.Y_index[tritag[1]]
                y=self.Y_index[tritag[2]]
                bigram=(tritag[0],tritag[1])       
                self.transition_proba[y_2,y_1,y]=float(c_tritag[tritag])/float(c_bitag[bigram])               
   
        def init_estimation(self, c_inits, c_inits_bitag):
            somme=float(sum(c_inits.values()))
            for correction in c_inits:
                i=self.Y_index[correction]
                j=self.Y_index["*"]
                self.transition_proba[j,j,i]=float(c_inits[correction])/somme
                
            for pair in c_inits_bitag:
                y_1=self.Y_index[pair[0]]
                y=self.Y_index[pair[1]]
                j=self.Y_index["*"]
                self.transition_proba[j,y_1,y]=float(c_inits_bitag[pair])/float(c_inits[pair[0]])
                
            
        def supervised_training_ME(self, pair_counts, c_bitag, c_tritag ,c_inits, c_inits_bitag):
            """ Train the HMM's parameters. This function wraps everything"""
            self.observation_estimation(pair_counts)
            self.transition_estimation(c_bitag, c_tritag)
            self.init_estimation(c_inits, c_inits_bitag)
            
        
        def supervised_training_perceptron(self, T, data):
            #Set initially parameters
   
            #self.transition_proba = np.zeros( (self.N+1, self.N+1, self.N), float)
            #self.observation_proba = np.zeros( (self.M+1, self.N), float)

            #self.transition_proba = np.ones( (self.N+1, self.N+1, self.N), float)
            #self.observation_proba = np.ones( (self.M+1, self.N), float)
    
    
            for iteration in range(T):
                for observations in data:
                    if len(observations) > 1:
                        prob, predicted_tags = self.viterbi(observations)
                        count_pair, count_tritag, count_tritag_predicted, count_pair_predicted = make_counts_each_observation_sequence(observations, predicted_tags)

                        for tritag in count_tritag:
                            c1 = count_tritag.get(tritag)
                            if count_tritag_predicted.has_key(tritag):
                                c2 = count_tritag_predicted.get(tritag)
                            else:
                                c2 = 0
                    
                            self.transition_proba[ self.Y_index[tritag[0]], self.Y_index[tritag[1]], self.Y_index[tritag[2]] ] = self.transition_proba[ self.Y_index[tritag[0]], self.Y_index[tritag[1]], self.Y_index[tritag[2]] ] + c1 - c2 
                
                        for pair in count_pair:
                            c1 = count_pair.get(pair)
                            if count_pair_predicted.has_key(pair):
                                c2 = count_pair_predicted.get(pair)
                            else:
                                c2 = 0
                    
                            self.observation_proba[self.X_index[pair[0]],self.Y_index[pair[1]] ] = self.observation_proba[self.X_index[pair[0]],self.Y_index[pair[1]] ] + c1 - c2
                        
        
        def get_possible_corrections(self,k):
            if k == -1:
                return set(['*'])
            if k == 0:
                return set(['*'])
            else:
                return self.omega_Y[0:26]

        def get_letter(self,word,k):
            if k < 0:
                return '*'
            else:
                return word[k][0]
    
            
        def viterbi(self,word):
            
            
            V = {}
            path = {}
            # init
            V[0,'*','*'] = 1
            path['*','*'] = []
            
            for k in range(1,len(word)+1):
                temp_path = {}
                letter = self.X_index[self.get_letter(word,k-1)]
                
                for u in self.get_possible_corrections(k-1):
                    
                    for v in self.get_possible_corrections(k):

                        i_u=self.Y_index[u]
                        i_v=self.Y_index[v]
                        
                        V[k,u,v],backpointer = max([(V[k-1,w,u] * 
                                                     self.transition_proba[self.Y_index[w],i_u,i_v] * 
                                                     self.observation_proba[letter,i_v],w) 
                                                    for w in self.get_possible_corrections(k-2)])                       
                        
                        temp_path[u,v] = path[backpointer,u] + [v]                        
                path = temp_path
                 
            prob,maxu,maxv= max([(V[k,u,v],u,v) for u in self.omega_Y[0:26] for v in self.omega_Y[0:26]])
                
            return prob, path[maxu,maxv]

In [2]:
def make_counts(corpus):
    """ 
    Build different count tables to train a HMM. Each count table is a dictionnary. 
    Returns: 
    * c_letter: letter counts
    * c_correction: correction counts
    * c_pairs: count of pairs (letter,correction)
    
    * c_bitag: count of tag bigram 
    * c_tritag: count of tag trigram 
    * c_inits: count of tag found in the first position
    
    """
    c_letter = dict()
    c_correction = dict()
    c_pairs= dict()
    c_bitag = dict()
    c_tritag = dict()
    c_inits = dict()
    c_inits_bitag = dict()
    
    for word in corpus:
        for i in range(len(word)):
            couple= word[i]
            letter = couple[0]
            correction = couple[1]
            #Counting letter 
            if letter in c_letter:
                c_letter[letter] +=1
            else:
                c_letter[letter] =1  
            #Counting correction
            if correction in c_correction:
                c_correction[correction] +=1
            else:
                c_correction[correction] =1
            #Counting par(letter, correction)
            if couple in c_pairs:
                c_pairs[couple] +=1
            else :
                c_pairs[couple] =1
            #Counting bitag(corr_i, corr_(i+1))
            if i > 0 and i < len(word)-1:
                bitag = (word[i-1][1], correction)
                if bitag in c_bitag:
                    c_bitag[bitag] += 1
                else:
                    c_bitag[bitag] =1
                    
            #Counting tritag
            if i > 1:
                tritag = (word[i-2][1],word[i-1][1], correction)
                if tritag in c_tritag :
                    c_tritag[tritag] +=1
                else :
                    c_tritag[tritag] =1
                    
            if i == 0 and len(word)>1:
                if correction in c_inits:
                    c_inits[correction] +=1
                else :
                    c_inits[correction] =1
                bg_first=(correction,word[i+1][1])
                
                if bg_first in c_inits_bitag:
                    c_inits_bitag[bg_first]+=1
                else:
                    c_inits_bitag[bg_first]=1
                    
    return c_letter, c_correction, c_pairs, c_bitag, c_tritag, c_inits, c_inits_bitag

In [3]:
def make_counts_each_observation_sequence(observations, predicted_tags):
    count_pair = dict()
    count_bitag = dict()
    count_tritag = dict()
    
    count_tritag_predicted = dict()
    count_pair_predicted = dict()

    for i in range(len(observations)):
        pair = observations[i]
        observation = pair[0]
        tag = pair[1]
            
        #Counting pair(observation, tag)
        if pair in count_pair:
            count_pair[pair] += 1
        else :
            count_pair[pair] = 1
            

        #Counting tritag(tag_(i-2), tag_(i-1), tag_i)
        if i > 1:
            tritag = (observations[i-2][1],observations[i-1][1], tag)
            if tritag in count_tritag :
                count_tritag[tritag] += 1
            else :
                count_tritag[tritag] = 1
        
        
        #Counting predicted pair
        predicted_pair = (observation, predicted_tags[i])
        
        if predicted_pair in count_pair_predicted:
            count_pair_predicted[predicted_pair] += 1
        else :
            count_pair_predicted[predicted_pair] = 1
        
        
        #Counting predicted tritag(tag_(i-2), tag_(i-1), tag_i)
        tag_predicted = predicted_tags[i]
        if i > 1:
            tritag_predicted = (predicted_tags[i-2],predicted_tags[i-1], tag_predicted)
            if tritag_predicted in count_tritag_predicted :
                count_tritag_predicted[tritag_predicted] += 1
            else :
                count_tritag_predicted[tritag_predicted] = 1

    return count_pair, count_tritag, count_tritag_predicted, count_pair_predicted

In [24]:
from __future__ import absolute_import
from __future__ import unicode_literals
import unicodedata
import re 
from nltk.tag.api import TaggerI

try:
    import pycrfsuite
except ImportError:
    pass

class CRFTagger(TaggerI):
    """
    A module for POS tagging using CRFSuite https://pypi.python.org/pypi/python-crfsuite
    
    >>> from nltk.tag import CRFTagger
    >>> ct = CRFTagger()
 
    >>> train_data = [[('University','Noun'), ('is','Verb'), ('a','Det'), ('good','Adj'), ('place','Noun')],
    ... [('dog','Noun'),('eat','Verb'),('meat','Noun')]]
    
    >>> ct.train(train_data,'model.crf.tagger')
    >>> ct.tag_sents([['dog','is','good'], ['Cat','eat','meat']])
    [[('dog', 'Noun'), ('is', 'Verb'), ('good', 'Adj')], [('Cat', 'Noun'), ('eat', 'Verb'), ('meat', 'Noun')]]
    
    >>> gold_sentences = [[('dog','Noun'),('is','Verb'),('good','Adj')] , [('Cat','Noun'),('eat','Verb'), ('meat','Noun')]] 
    >>> ct.evaluate(gold_sentences) 
    1.0
    
    Setting learned model file  
    >>> ct = CRFTagger() 
    >>> ct.set_model_file('model.crf.tagger')
    >>> ct.evaluate(gold_sentences)
    1.0
    
    """
    
    
    def __init__(self,  feature_func = None, verbose = False, training_opt = {}):
        """
        Initialize the CRFSuite tagger 
        :param feature_func: The function that extracts features for each token of a sentence. This function should take 
        2 parameters: tokens and index which extract features at index position from tokens list. See the build in 
        _get_features function for more detail.   
        :param verbose: output the debugging messages during training.
        :type verbose: boolean  
        :param training_opt: python-crfsuite training options
        :type training_opt : dictionary 
        
        Set of possible training options (using LBFGS training algorithm).  
         'feature.minfreq' : The minimum frequency of features.
         'feature.possible_states' : Force to generate possible state features.
         'feature.possible_transitions' : Force to generate possible transition features.
         'c1' : Coefficient for L1 regularization.
         'c2' : Coefficient for L2 regularization.
         'max_iterations' : The maximum number of iterations for L-BFGS optimization.
         'num_memories' : The number of limited memories for approximating the inverse hessian matrix.
         'epsilon' : Epsilon for testing the convergence of the objective.
         'period' : The duration of iterations to test the stopping criterion.
         'delta' : The threshold for the stopping criterion; an L-BFGS iteration stops when the
                    improvement of the log likelihood over the last ${period} iterations is no greater than this threshold.
         'linesearch' : The line search algorithm used in L-BFGS updates:
                           { 'MoreThuente': More and Thuente's method,
                              'Backtracking': Backtracking method with regular Wolfe condition,
                              'StrongBacktracking': Backtracking method with strong Wolfe condition
                           } 
         'max_linesearch' :  The maximum number of trials for the line search algorithm.
         
        """
                   
        self._model_file = ''
        self._tagger = pycrfsuite.Tagger()
        
        if feature_func is None:
            self._feature_func =  self._get_features
        else:
            self._feature_func =  feature_func
        
        self._verbose = verbose 
        self._training_options = training_opt
        self._pattern = re.compile(r'\d')
        
    def set_model_file(self, model_file):
        self._model_file = model_file
        self._tagger.open(self._model_file)

            
    def _get_features(self, tokens, idx):
        """
        Extract basic features about this word including 
             - Current Word 
             - Is Capitalized ?
             - Has Punctuation ?
             - Has Number ?
             - Suffixes up to length 3
        Note that : we might include feature over previous word, next word ect. 
        
        :return : a list which contains the features
        :rtype : list(str)    
        
        """ 
        token = tokens[idx]
        
        feature_list = []
        
        if not token:
            return feature_list
        
# Remove for typos correction            
#         # Capitalization 
#         if token[0].isupper():
#             feature_list.append('CAPITALIZATION')
        
#         # Number 
#         if re.search(self._pattern, token) is not None:
#             feature_list.append('HAS_NUM') 
        
#         # Punctuation
#         punc_cat = set(["Pc", "Pd", "Ps", "Pe", "Pi", "Pf", "Po"])
#         if all (unicodedata.category(x) in punc_cat for x in token):
#             feature_list.append('PUNCTUATION')
        
        # Suffix up to length 3
        if len(token) > 1:
            feature_list.append('SUF_' + token[-1:]) 
        if len(token) > 2: 
            feature_list.append('SUF_' + token[-2:])    
        if len(token) > 3: 
            feature_list.append('SUF_' + token[-3:])
            
        feature_list.append('WORD_' + token )
        
        return feature_list
        
    def tag_sents(self, sents):
        '''
        Tag a list of sentences. NB before using this function, user should specify the mode_file either by 
                       - Train a new model using ``train'' function 
                       - Use the pre-trained model which is set via ``set_model_file'' function  
        :params sentences : list of sentences needed to tag. 
        :type sentences : list(list(str))
        :return : list of tagged sentences. 
        :rtype : list (list (tuple(str,str))) 
        '''
        if self._model_file == '':
            raise Exception(' No model file is found !! Please use train or set_model_file function')
        
        # We need the list of sentences instead of the list generator for matching the input and output
        result = []  
        for tokens in sents:
            features = [self._feature_func(tokens,i) for i in range(len(tokens))]
            labels = self._tagger.tag(features)
                
            if len(labels) != len(tokens):
                raise Exception(' Predicted Length Not Matched, Expect Errors !')
            
            tagged_sent = list(zip(tokens,labels))
            result.append(tagged_sent)
            
        return result 

    
    def train(self, train_data, model_file):
        '''
        Train the CRF tagger using CRFSuite  
        :params train_data : is the list of annotated sentences.        
        :type train_data : list (list(tuple(str,str)))
        :params model_file : the model will be saved to this file.     
         
        '''
        trainer = pycrfsuite.Trainer(verbose=self._verbose)
        if "algorithm" in self._training_options:
            trainer.select(self._training_options["algorithm"])
        #print trainer.get_params()
        #trainer.set_params(self._training_options)
        
        for sent in train_data:
            tokens,labels = zip(*sent)
            features = [self._feature_func(tokens,i) for i in range(len(tokens))]
            trainer.append(features,labels)
                        
        # Now train the model, the output should be model_file
        trainer.train(model_file)
        # Save the model file
        self.set_model_file(model_file) 


    def tag(self, tokens):
        '''
        Tag a sentence using Python CRFSuite Tagger. NB before using this function, user should specify the mode_file either by 
                       - Train a new model using ``train'' function 
                       - Use the pre-trained model which is set via ``set_model_file'' function  
        :params tokens : list of tokens needed to tag. 
        :type tokens : list(str)
        :return : list of tagged tokens. 
        :rtype : list (tuple(str,str)) 
        '''
        
        return self.tag_sents([tokens])[0]

In [19]:
def compute_error(correction_words,true_words):
    """Compares the corrections and true_vals"""
    error=0
    total=0
    for f, b in zip(correction_words, true_words):
        if cmp(f,b)!=0:
            for i in range(len(f)):
                if f[i]!=b[i]:
                    error+=1
        total+=len(f)

    return float(error)/float(total)  

# Loading data and making counts

In [6]:
import cPickle as pickle
train10=pickle.load( open( "typos-data/train10.pkl", "rb" ))
test10=pickle.load( open( "typos-data/test10.pkl", "rb" ))
train20=pickle.load( open( "typos-data/train20.pkl", "rb" ))
test20=pickle.load( open( "typos-data/test20.pkl", "rb" ))

In [25]:
c_letter, c_correction, c_pairs, c_bitag, c_tritag, c_inits, c_inits_bitag=make_counts(train10)

# Creating the HMM and supervised training ME

In [37]:
hmm = HMM(state_list=c_correction.keys(), observation_list=c_letter.keys(),
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None)
hmm.supervised_training_ME( c_pairs, c_bitag, c_tritag ,c_inits, c_inits_bitag)

HMM creating with: 
26 states
26 observations


# Testing HMM with ML parameter estimates

In [39]:
wrong_words=[]
true_words=[] #denotes all underlying hidden states
for sent in test10:
    data_test = np.asarray(sent)
    obs,states = np.hsplit(data_test,2)
    wrong_words.append(obs.tostring())
    true_words.append(list(states.tostring()))
wrong_words = np.array(wrong_words)
true_words = np.array(true_words)   #These are the true lables

In [40]:
correction_words=[]
for word in wrong_words:
    if(len(word)>1):
        p,v=hmm.viterbi(word)
    else:
        v=list(word)
    correction_words.append(v)
correction_words=np.array(correction_words)

print compute_error(correction_words,true_words)

KeyboardInterrupt: 

# Training and testing HMM with Perceptron parameter estimates 

In [41]:
hmm.supervised_training_perceptron(1,train10)

KeyboardInterrupt: 

In [36]:
correction_words=[]
for word in wrong_words:
    if(len(word)>1):
        p,v=hmm.viterbi(word)
    else:
        v=list(word)
    correction_words.append(v)
correction_words=np.array(correction_words)

print compute_error(correction_words,true_words)

0.99043715847


# Testing CRF

In [23]:
#Modify original code: Remove sentence modification (Punctuation) -> Typos Correction

# Download: pip install python-crfsuite

ct = CRFTagger() #For default ML-estimation
#ct = CRFTagger(training_opt={"algorithm":"ap"}) #For changing the training method to perceptron
ct.train(train10,'model.crf.tagger')

correction_words_new=[]
for word in wrong_words:
    v = ct.tag_sents(list(word))
    correct = []
    for c in v:
        correct.append(c[0][1])
        
    correction_words_new.append(correct)
    
print compute_error(correction_words_new,true_words)

['feature.minfreq', 'feature.possible_states', 'feature.possible_transitions', 'max_iterations', 'epsilon']
0.201912568306


# MaxEnt model with Perceptron training

In [32]:
from nltk.tag.perceptron import PerceptronTagger
tagger = PerceptronTagger(load=False)
tagger.train(train10)
correction_words_new=[]

for word in wrong_words:
    v = tagger.tag(list(word))
    correct = []
    for c in v:
        correct.append(c[1])
    correction_words_new.append(correct)
    
print compute_error(correction_words_new,true_words)

0.0520491803279
