<a href="https://colab.research.google.com/github/mok0na/projet-tc4/blob/master/HMM1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Chargement des données

## 1. importation des données

In [0]:
from google.colab import drive
import pickle
drive.mount('/content/gdrive')

# import dataset
data_path ='/content/gdrive/My Drive/S5/TC4/projet-tc4/typos-data/'
train10 = pickle.load(open(data_path + 'train10.pkl','rb'))
test10 = pickle.load(open(data_path + 'test10.pkl','rb'))
train20 = pickle.load(open(data_path + 'train20.pkl','rb'))
test20 = pickle.load(open(data_path + 'test20.pkl','rb'))

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


## 2. obtention des états et observations

In [0]:
def get_states_observations(data):
    observations = []
    states = []
    for sentence in data :
        for w in sentence:
            observations.append(w[0])
            states.append(w[1])
    observation_list = list(set(observations))
    state_list = list(set(states))
    return state_list,observation_list

In [0]:
state_list,observation_list = get_states_observations(train)

print("states :\n", state_list)
print("observations :\n", observation_list)

states :
 ['w', 'g', 'v', 'i', 'm', 'c', 'l', 'k', 'y', 'h', 'e', 'r', 'x', 's', 'p', 'f', 'n', 'z', 'd', 'j', 'o', 'b', 'u', 'q', 't', 'a']
observations :
 ['w', 'g', 'v', 'i', 'm', 'c', 'l', 'k', 'y', 'h', 'e', 'r', 'x', 's', 'p', 'f', 'n', 'z', 'd', 'j', 'o', 'b', 'u', 'q', 't', 'a']


In [0]:
print(train[0])

[('b', 'b'), ('y', 'y')]


# I - HMM d'ordre 1

**Dry run**: 

Train a first-order HMM using the training data. This is basically what we did in lab sessions for POS-tagging. Compute the error rate (at the character level) and compare this results with the dummiest classifier that just do nothing. You can also compute the number of errors your model can correct and the number of errors your model creates.

## 1. implémentation

In [0]:
import nltk
import sklearn
import numpy as np
import sys



class HMM1:
        def __init__(self, state_list, observation_list,
                 transition_proba = None,
                 observation_proba = None,
                 initial_state_proba = None,
                 epsilon = 1e-16):
            """Builds a new Hidden Markov Model
            state_list is the list of state symbols [q_0...q_(N-1)]
            observation_list is the list of observation symbols [v_0...v_(M-1)]
            transition_proba is the transition probability matrix
                [a_ij] a_ij = Pr(Y_(t+1)=q_i|Y_t=q_j)
            observation_proba is the observation probablility matrix
                [b_ki] b_ki = Pr(X_t=v_k|Y_t=q_i)
            initial_state_proba is the initial state distribution
                [pi_i] pi_i = Pr(Y_0=q_i)"""
            print("HMM creating with: ")
            self.N = len(state_list) # The number of states
            self.M = len(observation_list) # The number of words in the vocabulary
            print(str(self.N)+" states")
            print(str(self.M)+" observations")
            self.omega_Y = state_list # Keep the vocabulary of tags
            self.omega_X = observation_list # Keep the vocabulary of tags
            self.epsilon = epsilon
            # Init. of the 3 distributions : observation, transition and initial states
            if transition_proba is None:
                self.transition_proba = np.zeros((self.N, self.N), np.float64) 
            else:
                self.transition_proba=transition_proba
            if observation_proba is None:
                self.observation_proba = np.zeros((self.M, self.N), np.float) 
            else:
                self.observation_proba=observation_proba
            if initial_state_proba is None:
                self.initial_state_proba = np.zeros((self.N,), np.float) 
            else:
                self.initial_state_proba=initial_state_proba
            # Since everything will be stored in numpy arrays, it is more convenient and compact to 
            # handle words and tags as indices (integer) for a direct access. However, we also need 
            # to keep the mapping between strings (word or tag) and indices. 
            self.make_indexes()

        def make_indexes(self):
            """Creates the reverse table that maps states/observations names
            to their index in the probabilities arrays"""
            self.Y_index = {}
            for i in range(self.N):
                self.Y_index[self.omega_Y[i]] = i
            self.X_index = {}
            for i in range(self.M):
                self.X_index[self.omega_X[i]] = i
                
        def get_X_index(self,observation):
            index = 0
            if observation in self.X_index:
                index = self.X_index[observation]
            return index
      
        def convert_Y_indexes_to_string(self,indexes):
            strings = []
            for ind in indexes:
                strings.append(self.omega_Y[ind])
            return np.array(strings)
          
        def counts(self,data):
            transition_count = np.zeros((self.N, self.N), float)
            observation_count = np.zeros((self.M, self.N), float)
            initial_state_count = np.zeros((self.N), float)
            emission_count = np.zeros((self.N, self.N), float)
            for sentence in data:
                for i,w in enumerate(sentence):
                    # w[0] = observation w[1] = state
                    observation_count[self.get_X_index(w[0]),self.Y_index[w[1]]] +=1
                    if (i==0):
                        initial_state_count[self.Y_index[w[1]]] +=1
                    else :
                        transition_count[self.Y_index[w[1]],self.Y_index[old_w1]] +=1
                    old_w1 = w[1]
            return transition_count, observation_count, initial_state_count
        
        #écrire 3 fonctions qui estimes les paramètres à partir des comptes, une fonction par distribution: observation, transition, état initial.
        def estimate_observation(self,observation_count):
            for i in range(len(observation_count[0])) :
                self.observation_proba[:,i] = observation_count[:,i] + self.epsilon
            self.observation_proba /= np.sum(self.observation_proba,axis=0)
            
        def estimate_transition(self,transition_count):
            for i in range(len(transition_count[0])) :
                self.transition_proba[:,i] = transition_count[:,i] + self.epsilon
            self.transition_proba /= np.sum(self.transition_proba,axis=0)
            
        def estimate_initial_state(self,initial_state_count):
            self.initial_state_proba = initial_state_count + self.epsilon
            self.initial_state_proba /= np.sum(self.initial_state_proba,axis=0)
        #écrire une fonction qui reprend le tout et qui estime tous les paramètres du HMM
        def supervised_training(self, data) :

            # Création de tous les comptes nécessaires pour l'estimation des paramètres du HMM
            transition_count, observation_count, initial_state_count = self.counts(data)

            # Estimation des paramètres à partir des comptes
            self.estimate_observation(observation_count)
            self.estimate_transition(transition_count)
            self.estimate_initial_state(initial_state_count)
            print("... Supervised training ...")
          
        def forward(self, observation):
            obs_seq = [self.get_X_index(w) for w in observation]
            K = len(observation)
            alpha = np.zeros((self.N, K), float)
            # init
            alpha[:,0] = self.obs_proba[:,obs_seq[0]] * self.initial_state_proba
            # loop
            for k in range(1,K):
                for i in range(self.N):
                    alpha[i,k] = np.dot(alpha[:,k-1], self.transition_proba[:,i] * self.observation_proba[obs_seq[k],i])
            return alpha
          
        def backward(self, observation):
            obs_seq = [self.get_X_index(w) for w in observation]
            K = len(observation)
            beta = np.zeros((self.N, K), float)
            # init
            beta[:,K-1] = 1
            # loop
            for k in range(K-2,-1,-1):
                for i in range(N):
                    beta[i,k] = np.sum(beta[:,k+1] * self.transition_proba[:,i] * self.observation_proba[obs_seq[k+1],i])
            return beta                    
              
        def viterbi(self, observation):
            """
            """
            obs_seq = [self.get_X_index(w) for w in observation]
            K = len(observation)
            delta = np.zeros((self.N,K), float)
            psi = np.zeros((self.N,K-1), int)
            # init
            delta[:,0] =  self.initial_state_proba * self.observation_proba[obs_seq[0]]
            # loop
            for k in range(K-1):
                tmp =self.transition_proba * delta[:,k]
                delta[:,k+1] = self.observation_proba[obs_seq[k+1]] * np.max(tmp,axis=1)
                psi[:,k] = np.argmax(tmp,axis=1)
            states_seq = []
            states_seq.append(np.argmax(delta[:,K-1],axis=0))
            for k in range(K-1):
                states_seq.insert(0,psi[states_seq[0],K-2-k])
            #print("observation",observation)
            #print("states_seq",states_seq)
            return self.convert_Y_indexes_to_string(states_seq)
        
        def score(self,data,algo="viterbi"):
            """ 
            Caracter level
            """
            pred_y = []
            true_y = []
            n_added_errors = 0
            n_corrected = 0
            total = 0
            X = np.array([np.array([w[0] for w in sentence]) for sentence in data])
            y = np.array([np.array([w[1] for w in sentence]) for sentence in data])
            for i in range(len(X)):
                if (algo == "viterbi"):
                    predicted_y = self.viterbi(X[i])
                if (algo == "dummy"):
                    predicted_y = X[i]
                pred_y.extend(predicted_y)
                true_y.extend(y[i])
                n_corrected += np.sum((X[i] != y[i]) * (y[i] == predicted_y),axis=0)
                n_added_errors += np.sum((X[i] == y[i]) *(y[i] != predicted_y),axis=0)
                total += len(X[i])
            print("=======================SCORES with {}========================".format(algo))
            print("error rate {:.2f}%".format(100-100*sklearn.metrics.accuracy_score(true_y,pred_y)))
            print("precision {:.2f}%".format(100*sklearn.metrics.precision_score(true_y,pred_y,labels=state_list,average='weighted')))
            print("recall {:.2f}%".format(100*sklearn.metrics.recall_score(true_y,pred_y,labels=state_list,average='weighted')))
            print("f1-score {:.2f}%".format(100*sklearn.metrics.f1_score(true_y,pred_y,labels=state_list,average='weighted')))
            print("number of corrections : {}".format(n_corrected))
            print("number of added errors : {}".format(n_added_errors))
            print("total : {}".format(total))
            #print(sklearn.metrics.classification_report(true_y,pred_y))

## 2. score

In [122]:
hmm1 = HMM1(state_list,observation_list)
hmm1.supervised_training(train10)
hmm1.score(test10)
hmm1.score(test10,"dummy")

HMM creating with: 
26 states
26 observations
... Supervised training ...
error rate 6.80%
precision 93.28%
recall 93.20%
f1-score 93.21%
number of corrections : 310
number of added errors : 63
total : 7320
error rate 10.18%
precision 91.79%
recall 89.82%
f1-score 90.50%
number of corrections : 0
number of added errors : 0
total : 7320


In [123]:
hmm1 = HMM1(state_list,observation_list)
hmm1.supervised_training(train10)
hmm1.score(test20)
hmm1.score(test20,"dummy")

HMM creating with: 
26 states
26 observations
... Supervised training ...
error rate 13.23%
precision 87.09%
recall 86.77%
f1-score 86.87%
number of corrections : 1229
number of added errors : 199
total : 16691
error rate 19.41%
precision 85.44%
recall 80.59%
f1-score 82.34%
number of corrections : 0
number of added errors : 0
total : 16691


# II - HMM d'ordre 2

