In [None]:
# instalacion de dependencias previas
!pip install conllu
!git clone https://github.com/UniversalDependencies/UD_Spanish-AnCora.git

# Loading the previously trained HMM model

In [None]:
# Loading the probabilities of the HMM model
import numpy as np 
transitionProbdict = np.load('transitionHMM.npy', allow_pickle='TRUE').item()
emissionProbdict = np.load('emissionHMM.npy', allow_pickle='TRUE').item()

In [None]:
# Identifying the unique POS tags in the corpus
stateSet = set([w.split('|')[1] for w in list(emissionProbdict.keys())])
stateSet

{'ADJ',
 'ADP',
 'ADV',
 'AUX',
 'CCONJ',
 'DET',
 'INTJ',
 'NOUN',
 'NUM',
 'PART',
 'PRON',
 'PROPN',
 'PUNCT',
 'SCONJ',
 'SYM',
 'VERB',
 '_'}

In [None]:
# Enum categorys with numbers to assign to the columns of the Viterbi matrix
tagStateDict = {}
for i, state in enumerate(stateSet):
  tagStateDict[state] = i
tagStateDict

{'ADJ': 9,
 'ADP': 13,
 'ADV': 0,
 'AUX': 14,
 'CCONJ': 10,
 'DET': 12,
 'INTJ': 16,
 'NOUN': 7,
 'NUM': 11,
 'PART': 3,
 'PRON': 4,
 'PROPN': 8,
 'PUNCT': 1,
 'SCONJ': 2,
 'SYM': 5,
 'VERB': 15,
 '_': 6}

# Distribucion inicial de estados latentes

In [None]:
# Calculating the initial state distribution
initTagStateProb = {} # \rho_i^{(0)}
from conllu import parse_incr
wordList = []
data_file = open("UD_Spanish-AnCora/es_ancora-ud-dev.conllu", "r", encoding="utf-8")
count = 0 # Number of sentences in the corpus
for tokenlist in parse_incr(data_file):
  count += 1
  tag = tokenlist[0]['upos']
  if tag in initTagStateProb.keys():
    initTagStateProb[tag] += 1
  else:
    initTagStateProb[tag] = 1

for key in initTagStateProb.keys():
  initTagStateProb[key] /= count

initTagStateProb

{'ADJ': 0.010882708585247884,
 'ADP': 0.16384522370012092,
 'ADV': 0.06287787182587666,
 'AUX': 0.022370012091898428,
 'CCONJ': 0.03325272067714631,
 'DET': 0.3633615477629988,
 'INTJ': 0.0006045949214026602,
 'NOUN': 0.02720677146311971,
 'NUM': 0.01995163240628779,
 'PART': 0.0018137847642079807,
 'PRON': 0.034461910519951636,
 'PROPN': 0.1124546553808948,
 'PUNCT': 0.07799274486094317,
 'SCONJ': 0.02418379685610641,
 'SYM': 0.0006045949214026602,
 'VERB': 0.04353083434099154,
 '_': 0.0006045949214026602}

In [None]:
# Verfying that the sum of the probabilities is 1 (100%)
np.array([initTagStateProb[k] for k in initTagStateProb.keys()]).sum()

1.0

# Viterbi algorithm construction






Given a sequence of words $\{p_1, p_2, \dots, p_n \}$, and a set of grammatical categories given by the `upos` convention, the Viterbi probability matrix is considered as follows:

$$
\begin{array}{c c}
\begin{array}{c c c c}
\text{ADJ} \\
\text{ADV}\\
\text{PRON} \\
\vdots \\
{}
\end{array}
&
\left[
\begin{array}{c c c c}
\nu_1(\text{ADJ}) & \nu_2(\text{ADJ}) & \dots  & \nu_n(\text{ADJ})\\
\nu_1(\text{ADV}) & \nu_2(\text{ADV}) & \dots  & \nu_n(\text{ADV})\\
\nu_1(\text{PRON}) & \nu_2(\text{PRON}) & \dots  & \nu_n(\text{PRON})\\
\vdots & \vdots & \dots & \vdots \\ \hdashline
p_1 & p_2 & \dots & p_n
\end{array}
\right]
\end{array}
$$

Where the probabilities of the first column (for a category $i$) are given by:

$$
\nu_1(i) = \underbrace{\rho_i^{(0)}}_{\text{initial probability}} \times \underbrace{P(p_1 \vert i)}_{\text{emission}}
$$

then, for the second column (given a category $j$) they will be:

$$
\nu_2(j) = \max_i \{ \nu_1(i) \times \underbrace{P(j \vert i)}_{\text{transition}} \times \underbrace{P(p_2 \vert j)}_{\text{emission}} \}
$$

thus, in general, the probabilities for column $t$ are given by:

$$
\nu_{t}(j) = \max_i \{ \overbrace{\nu_{t-1}(i)}^{\text{previous state}} \times \underbrace{P(j \vert i)}_{\text{transition}} \times \underbrace{P(p_t \vert j)}_{\text{emission}} \}
$$


In [None]:
import nltk
nltk.download('punkt')
from nltk import word_tokenize

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [None]:
def ViterbiMatrix(secuencia, transitionProbdict=transitionProbdict, emissionProbdict=emissionProbdict,
            tagStateDict=tagStateDict, initTagStateProb=initTagStateProb):
  seq = word_tokenize(secuencia)
  viterbiProb = np.zeros((17, len(seq)))  # 17 POS tags in the corpus

  # First column initialization
  for key in tagStateDict.keys():
    tag_row = tagStateDict[key]
    word_tag = seq[0].lower()+'|'+key
    if word_tag in emissionProbdict.keys():
      viterbiProb[tag_row, 0] = initTagStateProb[key]*emissionProbdict[word_tag]

  # columns computation
  for col in range(1, len(seq)):
    for key in tagStateDict.keys():
      tag_row = tagStateDict[key]
      word_tag = seq[col].lower()+'|'+key
      if word_tag in emissionProbdict.keys():
        # miramos estados de la col anterior
        possible_probs = []
        for key2 in tagStateDict.keys():
          tag_row2 = tagStateDict[key2]
          tag_prevtag = key+'|'+key2
          if tag_prevtag in transitionProbdict.keys():
            if viterbiProb[tag_row2, col-1]>0:
              possible_probs.append(
                  viterbiProb[tag_row2, col-1]*transitionProbdict[tag_prevtag]*emissionProbdict[word_tag])
        viterbiProb[tag_row, col] = max(possible_probs)

  return viterbiProb

matrix = ViterbiMatrix('el mundo es pequeño')
matrix

array([[0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 1.47448483e-04, 2.91245828e-10, 0.00000000e+00],
       [0.00000000e+00, 2.00411724e-05, 0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.01922433e-10],
       [0.00000000e+00, 0.00000000e+00, 5.02871314e-09, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [8.76142797e-02, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e

In [None]:
def ViterbiTags(secuencia, transitionProbdict=transitionProbdict, emissionProbdict=emissionProbdict,
            tagStateDict=tagStateDict, initTagStateProb=initTagStateProb):
  seq = word_tokenize(secuencia)
  viterbiProb = np.zeros((17, len(seq)))  # 17 POS tags in the corpus

  # First column initialization
  for key in tagStateDict.keys():
    tag_row = tagStateDict[key]
    word_tag = seq[0].lower()+'|'+key
    if word_tag in emissionProbdict.keys():
      viterbiProb[tag_row, 0] = initTagStateProb[key]*emissionProbdict[word_tag]

  # Next columns computation
  for col in range(1, len(seq)):
    for key in tagStateDict.keys():
      tag_row = tagStateDict[key]
      word_tag = seq[col].lower()+'|'+key
      if word_tag in emissionProbdict.keys():
        # Look at the states of the previous column
        possible_probs = []
        for key2 in tagStateDict.keys():
          tag_row2 = tagStateDict[key2]
          tag_prevtag = key+'|'+key2
          if tag_prevtag in transitionProbdict.keys():
            if viterbiProb[tag_row2, col-1]>0:
              possible_probs.append(
                  viterbiProb[tag_row2, col-1]*transitionProbdict[tag_prevtag]*emissionProbdict[word_tag])
        viterbiProb[tag_row, col] = max(possible_probs)

    # Tagging the sequence
    res = []
    for i, p in enumerate(seq):
      for tag in tagStateDict.keys():
        if tagStateDict[tag] == np.argmax(viterbiProb[:, i]):
          res.append((p, tag))

  return res

ViterbiTags('el mundo es muy pequeño')

[('el', 'DET'),
 ('mundo', 'NOUN'),
 ('es', 'AUX'),
 ('muy', 'ADV'),
 ('pequeño', 'ADJ')]

In [None]:
ViterbiTags('estos instrumentos han de rasgar')

[('estos', 'DET'),
 ('instrumentos', 'NOUN'),
 ('han', 'AUX'),
 ('de', 'ADP'),
 ('rasgar', 'VERB')]

# Direct training of HMM with NLTK

* Python Class (NLTK) from HMM: https://www.nltk.org/_modules/nltk/tag/hmm.html

In [None]:
#@title English Treebank
import nltk
nltk.download('treebank')
from nltk.corpus import treebank
train_data = treebank.tagged_sents()[:3900]

[nltk_data] Downloading package treebank to /root/nltk_data...
[nltk_data]   Package treebank is already up-to-date!


In [None]:
#@title Data Structure of the Training Data
train_data

[[('Pierre', 'NNP'), ('Vinken', 'NNP'), (',', ','), ('61', 'CD'), ('years', 'NNS'), ('old', 'JJ'), (',', ','), ('will', 'MD'), ('join', 'VB'), ('the', 'DT'), ('board', 'NN'), ('as', 'IN'), ('a', 'DT'), ('nonexecutive', 'JJ'), ('director', 'NN'), ('Nov.', 'NNP'), ('29', 'CD'), ('.', '.')], [('Mr.', 'NNP'), ('Vinken', 'NNP'), ('is', 'VBZ'), ('chairman', 'NN'), ('of', 'IN'), ('Elsevier', 'NNP'), ('N.V.', 'NNP'), (',', ','), ('the', 'DT'), ('Dutch', 'NNP'), ('publishing', 'VBG'), ('group', 'NN'), ('.', '.')], ...]

In [None]:
#@title Pre-built HMM in NLTK
from nltk.tag import hmm
tagger = hmm.HiddenMarkovModelTrainer().train_supervised(train_data)
tagger

<HiddenMarkovModelTagger 46 states and 12385 output symbols>

In [None]:
tagger.tag("Pierre Vinken will get old".split())

[('Pierre', 'NNP'),
 ('Vinken', 'NNP'),
 ('will', 'MD'),
 ('get', 'VB'),
 ('old', 'JJ')]

In [None]:
#@title training accuracy
tagger.evaluate(treebank.tagged_sents()[:3900])

0.9815403947224078

## Practice Exercise

**Objective:** Train an HMM using the `hmm.HiddenMarkovModelTrainer()` class on the `UD_Spanish_AnCora` dataset.

1. **Pre-processing:** In the previous example, we used the English `treebank` dataset, which has a different structure compared to `AnCora`. In this part, write code to transform the structure of `AnCora` so that it matches the structure of the `treebank` dataset as follows:

$$\left[ \left[ (\text{'El'}, \text{'DET'}), (\dots), \dots\right], \left[\dots \right] \right]$$


In [None]:
# Solving practice exercises
!pip install conllu
!git clone https://github.com/UniversalDependencies/UD_Spanish-AnCora.git
from conllu import parse_incr

In [None]:
data_file = open("UD_Spanish-AnCora/es_ancora-ud-train.conllu", "r", encoding="utf-8")
data_array = []
for tokenlist in parse_incr(data_file):
  tokenized_text = []
  for token in tokenlist:
    tokenized_text.append((token['form'], token['upos']))
  data_array.append(tokenized_text)

In [None]:
len(data_array)

2. **Training:** Once the dataset is in the correct structure, use the `hmm.HiddenMarkovModelTrainer()` class to train with 80% of the dataset as the `training` set and 20% as the `test` set.

**Hint:** For the separation between training and test sets, you can use the function from Scikit Learn:

https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html

At this point, the Machine Learning with Scikit Learn course is a good complement to better understand the functionalities of Scikit Learn: https://platzi.com/cursos/scikitlearn-ml/


In [None]:
# desarrolla tu código aquífrom sklearn.model_selection import train_test_split
train_data, test_data = train_test_split(data_array, test_size=0.2, random_state=42)
print(len(train_data))
print(len(test_data))


In [None]:
from nltk.tag import hmm
tagger = hmm.HiddenMarkovModelTrainer().train_supervised(train_data)
tagger

3. **Validación del modelo:** Un vez entrenado el `tagger`, calcula el rendimiento del modelo (usando `tagger.evaluate()`) para los conjuntos de `entrenamiento` y `test`.



In [None]:
tagger.evaluate(train_data)

In [None]:
tagger.evaluate(test_data)

## Important Observations

* If you use the `es_ancora-ud-dev.conllu` dataset, you will notice that it is very small. You can try with `es_ancora-ud-train.conllu`.

* In practice, it is customary to train the model with `es_ancora-ud-train.conllu` and validate the test with `es_ancora-ud-test.conllu`. The `es_ancora-ud-dev.conllu` file is typically used for quick prototyping of the model.


In [None]:
tagger = hmm.HiddenMarkovModelTrainer().train_supervised(data_array)

In [None]:
data_file = open("UD_Spanish-AnCora/es_ancora-ud-train.conllu", "r", encoding="utf-8")
test_array = []
for tokenlist in parse_incr(data_file):
  tokenized_text = []
  for token in tokenlist:
    tokenized_text.append((token['form'], token['upos']))
  test_array.append(tokenized_text)
len(test_array)