# Viterbi Algorithm with Python

## Setup

In [39]:
import numpy as np
from conllu import parse_incr

import nltk
from nltk import word_tokenize
from nltk.corpus import treebank
from nltk.tag import hmm

from sklearn.model_selection import train_test_split

In [2]:
nltk.download("punkt")
nltk.download("treebank")

[nltk_data] Downloading package punkt to
[nltk_data]     /Users/mmenendezg/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package treebank to
[nltk_data]     /Users/mmenendezg/nltk_data...
[nltk_data]   Package treebank is already up-to-date!


True

In [3]:
ANCORA_PATH = "data/UD_Spanish-AnCora/es_ancora-ud-dev.conllu"

## Load pretrained model

In [4]:
transition_prob_dict = np.load("npy_files/transition_hmm.npy", allow_pickle=True).item()
emission_prob_dict = np.load("npy_files/emission_hmm.npy", allow_pickle=True).item()

In [5]:
state_set = sorted(set([w.split("|")[1] for w in list(emission_prob_dict.keys())]))

In [6]:
tag_state_dict = {}
for i, state in enumerate(state_set):
    tag_state_dict[state] = i
tag_state_dict

{'ADJ': 0,
 'ADP': 1,
 'ADV': 2,
 'AUX': 3,
 'CCONJ': 4,
 'DET': 5,
 'INTJ': 6,
 'NOUN': 7,
 'NUM': 8,
 'PART': 9,
 'PRON': 10,
 'PROPN': 11,
 'PUNCT': 12,
 'SCONJ': 13,
 'SYM': 14,
 'VERB': 15,
 '_': 16}

## Initial distribution of hidden states

In [7]:
wordlist = []
data_file = open(ANCORA_PATH, "r", encoding="utf-8")
count = 0
init_tag_state_prob = {} #\rho_i^(o)

In [8]:
for tokenlist in parse_incr(data_file):
    count += 1
    tag = tokenlist[0]["upos"]
    if tag in init_tag_state_prob.keys():
        init_tag_state_prob[tag] += 1
    else:
        init_tag_state_prob[tag] = 1

for key in init_tag_state_prob.keys():
    init_tag_state_prob[key] /= count

init_tag_state_prob

{'DET': 0.36275695284159615,
 'PROPN': 0.1124546553808948,
 'ADP': 0.15538089480048367,
 'PRON': 0.06348246674727932,
 'SCONJ': 0.02418379685610641,
 'ADV': 0.056831922611850064,
 'PUNCT': 0.08222490931076179,
 'VERB': 0.021160822249093107,
 'ADJ': 0.010882708585247884,
 'CCONJ': 0.032648125755743655,
 'NOUN': 0.02720677146311971,
 '_': 0.009068923821039904,
 'INTJ': 0.0006045949214026602,
 'AUX': 0.019347037484885126,
 'NUM': 0.01995163240628779,
 'PART': 0.0018137847642079807}

In [9]:
# Verify that all probs sum 1
np.array([value for value in init_tag_state_prob.values()]).sum()

1.0

## Construction of the Viterbi Algorithm

Dada una secuencia de palabras $\{p_1, p_2, \dots, p_n \}$, y un conjunto de categorias gramaticales dadas por la convención `upos`, se considera la matriz de probabilidades de Viterbi así:

$$
\begin{array}{c c}
\begin{array}{c c c c}
\text{ADJ} \\
\text{ADV}\\
\text{PRON} \\
\vdots \\
{}
\end{array} 
&
\left[
\begin{array}{c c c c}
\nu_1(\text{ADJ}) & \nu_2(\text{ADJ}) & \dots  & \nu_n(\text{ADJ})\\
\nu_1(\text{ADV}) & \nu_2(\text{ADV}) & \dots  & \nu_n(\text{ADV})\\ 
\nu_1(\text{PRON}) & \nu_2(\text{PRON}) & \dots  & \nu_n(\text{PRON})\\
\vdots & \vdots & \dots & \vdots \\ \hdashline
p_1 & p_2 & \dots & p_n 
\end{array}
\right] 
\end{array}
$$

Donde las probabilidades de la primera columna (para una categoria $i$) están dadas por: 

$$
\nu_1(i) = \underbrace{\rho_i^{(0)}}_{\text{probabilidad inicial}} \times \underbrace{P(p_1 \vert i)}_{\text{emisión}}
$$

luego, para la segunda columna (dada una categoria $j$) serán: 

$$
\nu_2(j) = \max_i \{ \nu_1(i) \times \underbrace{P(j \vert i)}_{\text{transición}} \times \underbrace{P(p_2 \vert j)}_{\text{emisión}} \}
$$

así, en general las probabilidades para la columna $t$ estarán dadas por: 

$$
\nu_{t}(j) = \max_i \{ \overbrace{\nu_{t-1}(i)}^{\text{estado anterior}} \times \underbrace{P(j \vert i)}_{\text{transición}} \times \underbrace{P(p_t \vert j)}_{\text{emisión}} \}
$$

In [10]:
# Get the Viterbi Matrix


def viterbi_matrix(
    sequence,
    transition_prob_dict=transition_prob_dict,
    emission_prob_dict=emission_prob_dict,
    tag_state_dict=tag_state_dict,
    init_tag_state_prob=init_tag_state_prob,
):
    seq = word_tokenize(sequence.lower())
    viterbi_prob = np.zeros((17, len(seq)))

    # Initialize the first column
    for key in tag_state_dict.keys():
        tag_row = tag_state_dict[key]
        word_tag = f"{seq[0]}|{key}"
        if word_tag in emission_prob_dict.keys():
            viterbi_prob[tag_row, 0] = (
                init_tag_state_prob[key] * emission_prob_dict[word_tag]
            )

    # Probs of the following columns
    for col in range(1, len(seq)):
        for key in tag_state_dict.keys():
            tag_row = tag_state_dict[key]
            word_tag = f"{seq[col]}|{key}"
            if word_tag in emission_prob_dict.keys():
                possible_prob = []
                for key_2 in tag_state_dict.keys():
                    tag_row_2 = tag_state_dict[key_2]
                    tag_prev_tag = f"{key}|{key_2}"
                    if tag_prev_tag in transition_prob_dict.keys():
                        if viterbi_prob[tag_row_2, col - 1] > 0:
                            possible_prob.append(
                                viterbi_prob[tag_row_2, col - 1]
                                * transition_prob_dict[tag_prev_tag]
                                * emission_prob_dict[word_tag]
                            )
                viterbi_prob[tag_row, col] = max(possible_prob) if possible_prob else 0

    return viterbi_prob


In [11]:
matrix = viterbi_matrix("El mundo es pequeño")
matrix

array([[0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.12461736, 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.00034087, 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.00015494, 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ]])

In [12]:
# Get the Viterbi Matrix


def viterbi_tags(
    sequence,
    transition_prob_dict=transition_prob_dict,
    emission_prob_dict=emission_prob_dict,
    tag_state_dict=tag_state_dict,
    init_tag_state_prob=init_tag_state_prob,
):
    seq = word_tokenize(sequence.lower())
    viterbi_prob = np.zeros((17, len(seq)))

    # Initialize the first column
    for key in tag_state_dict.keys():
        tag_row = tag_state_dict[key]
        word_tag = f"{seq[0]}|{key}"
        if word_tag in emission_prob_dict.keys():
            viterbi_prob[tag_row, 0] = (
                init_tag_state_prob[key] * emission_prob_dict[word_tag]
            )

    # Probs of the following columns
    for col in range(1, len(seq)):
        for key in tag_state_dict.keys():
            tag_row = tag_state_dict[key]
            word_tag = f"{seq[col]}|{key}"
            if word_tag in emission_prob_dict.keys():
                possible_prob = []
                for key_2 in tag_state_dict.keys():
                    tag_row_2 = tag_state_dict[key_2]
                    tag_prev_tag = f"{key}|{key_2}"
                    if tag_prev_tag in transition_prob_dict.keys():
                        if viterbi_prob[tag_row_2, col - 1] > 0:
                            possible_prob.append(
                                viterbi_prob[tag_row_2, col - 1]
                                * transition_prob_dict[tag_prev_tag]
                                * emission_prob_dict[word_tag]
                            )
                viterbi_prob[tag_row, col] = max(possible_prob) if possible_prob else 0
    
    # Get the sequence of tags 
    
    res=[]
    for i, p in enumerate(seq):
        for tag in tag_state_dict.keys():
            if tag_state_dict[tag] == np.argmax(viterbi_prob[:, i]):
                res.append((p, tag))

    return res


In [13]:
viterbi_tags("Yo no me llamo Marlon")

[('yo', 'PRON'),
 ('no', 'ADJ'),
 ('me', 'ADJ'),
 ('llamo', 'ADJ'),
 ('marlon', 'ADJ')]

## Training HMM with NLTK

In [18]:
train_data = treebank.tagged_sents()[:3000]

In [19]:
train_data

[[('Pierre', 'NNP'), ('Vinken', 'NNP'), (',', ','), ('61', 'CD'), ('years', 'NNS'), ('old', 'JJ'), (',', ','), ('will', 'MD'), ('join', 'VB'), ('the', 'DT'), ('board', 'NN'), ('as', 'IN'), ('a', 'DT'), ('nonexecutive', 'JJ'), ('director', 'NN'), ('Nov.', 'NNP'), ('29', 'CD'), ('.', '.')], [('Mr.', 'NNP'), ('Vinken', 'NNP'), ('is', 'VBZ'), ('chairman', 'NN'), ('of', 'IN'), ('Elsevier', 'NNP'), ('N.V.', 'NNP'), (',', ','), ('the', 'DT'), ('Dutch', 'NNP'), ('publishing', 'VBG'), ('group', 'NN'), ('.', '.')], ...]

In [25]:
tagger = hmm.HiddenMarkovModelTagger.train(train_data)

In [26]:
tagger.tag("I will get old".split())

[('I', 'PRP'), ('will', 'MD'), ('get', 'VB'), ('old', 'JJ')]

In [27]:
# Training accuracy
valid_data = treebank.tagged_sents()[3000:3500]
tagger.evaluate(valid_data)

  Function evaluate() has been deprecated.  Use accuracy(gold)
  instead.
  tagger.evaluate(valid_data)


0.9040199439077594

## Exercise:

Train a `HiddenMarkovModelTagger` using the `UD_Spanish_AnCora` corpus.

In [38]:
ancora_corpus = open(ANCORA_PATH, "r", encoding="utf-8")

tagtype = "upos"
tagged_corpus = []
for tokenlist in parse_incr(ancora_corpus):
    tagged_sentence = []
    for token in tokenlist:
        tagged_word = (token["form"], token[tagtype])
        tagged_sentence.append(tagged_word)
    tagged_corpus.append(tagged_sentence)

len(tagged_sentence)

46

In [43]:
# Split the dataset
train_data, test_data = train_test_split(tagged_corpus, train_size=0.8, random_state=1992, shuffle=True)


In [49]:
tagger = hmm.HiddenMarkovModelTagger.train(train_data)

In [51]:
tagger.evaluate(train_data)

  Function evaluate() has been deprecated.  Use accuracy(gold)
  instead.
  tagger.evaluate(train_data)


0.9699498791730063

In [50]:
tagger.evaluate(test_data)

  Function evaluate() has been deprecated.  Use accuracy(gold)
  instead.
  tagger.evaluate(test_data)


0.9012048192771084