
##Part-of-speech tagging with HMMs##
Implement a bigram part-of-speech (POS) tagger based on Hidden Markov Models from scratch. Using NLTK is disallowed, except for the modules explicitly listed below. For this, you will need to develop and/or utilize the following modules:

1. Corpus reader and writer (10 points)
2. Training procedure (30 points)
3. Viterbi tagging, including unknown word handling (50 points) 
4. Evaluation (10 points)

The task is mostly very straightforward, but each step requires careful design. Thus, we suggest you proceed in the following way.


##Viterbi algorithm.##

First, implement the Viterbi algorithm for finding the optimal state (tag) sequence given the sequence of observations (words). We suggest you test your implementation on a small example for which you know the correct tag sequence, such as the Eisner’s Ice Cream HMM from the lecture.
Make sure your Viterbi algorithm runs properly on the example before you proceed to the next step. Submit the best state sequence x that your Viterbi implementation finds for y = 3, 1, 3 and its joint probability P (x, y).
There are plenty of other detailed illustrations for the Viterbi algorithm on the Web from which you can take example HMMs. Please resist the temptation to copy Python code from those websites; that would be plagiarism.

In [1]:
import os
from tqdm import tqdm
from nltk.corpus.reader.conll import ConllCorpusReader
from typing import List, Any, Dict, Tuple
import numpy as np
from collections import deque, defaultdict

In [2]:
# @title HMM
class HMM:
    def __init__(
            self,
            states: List[Any],
            state_transition_probs: List[List[float]],
            initial_probs: List[float],
            emission_probs: List[Dict[Any, float]],
    ):
        """Initializes Hidden Markov Model."""

        assert len(states) == len(initial_probs)
        assert len(states) == len(state_transition_probs)
        assert len(states) == len(emission_probs)

        self.states = np.asarray(states)
        self.state_transition_probs = np.asarray(state_transition_probs)
        self.initial_probs = np.asarray(initial_probs)
        self.emission_probs = np.asarray(emission_probs)
        self._vocab = set()
        for emission_dict in self.emission_probs:
            for word in emission_dict:
                self._vocab.add(word)

    def viterbi(self, observations: List[Any]) -> Tuple[List[Any], float]:
        """Accepts list of observations (words) and returns the optimal state (tag) sequence and its probability."""
        if not observations:
            return [], 0

        n, m = len(self.states), len(observations)

        backpointers_matrix = np.zeros((n, m), 'int')
        viterbi_matrix = np.zeros((n, m))
        initial_emissions_vector = \
            [self.emission_probs[state_i][observations[0]] for state_i in range(n)] \
                if observations[0] in self._vocab \
                else [1] * n

        viterbi_matrix[:, 0] = self.initial_probs * initial_emissions_vector

        for t in range(1, m):
            prev_viterbi_col = viterbi_matrix[:, t - 1]
            for state_j in range(n):
                # we calculate token probability or substituting it with 1 in case on an unknown word
                cur_token_prob = self.emission_probs[state_j][observations[t]] if observations[t] in self._vocab else 1
                # we multiply previous viterbi column and 
                # state transitions for all possible states to the chosen 'state_j'
                # after this temporary vector is getting multiplied 
                # by the scalar probability of the current token emission
                state_i_to_j_probs = cur_token_prob * prev_viterbi_col * self.state_transition_probs[:, state_j]

                # index of the best 'state_i' to move to the 'state_j'
                max_index = np.argmax(state_i_to_j_probs)
                # remember it in backpointers_matrix
                backpointers_matrix[state_j, t] = max_index
                # update viterbi matrix
                viterbi_matrix[state_j, t] = state_i_to_j_probs[max_index]

        last_col_viterbi = viterbi_matrix[:, m - 1]
        max_prob_index = np.argmax(last_col_viterbi)
        max_prob_value = last_col_viterbi[max_prob_index]

        # Deque is used to avoid extra list reversal using appendleft method
        path = deque([max_prob_index, ])
        for t in range(m-1, 0, -1):
            # Invariant: path[0] is t-th element of path
            path.appendleft(backpointers_matrix[path[0], t])
            # Invariant: path[0] is (t-1)-th element of path

        return [self.states[state_i] for state_i in path], max_prob_value

In [3]:
model_np = HMM(states=["H", "C"],
            state_transition_probs=[[0.7, 0.3], [0.4, 0.6]],
            initial_probs=[0.8, 0.2],
            emission_probs=[
                {1: 0.2, 2: 0.4, 3: 0.4},
                {1: 0.5, 2: 0.4, 3: 0.1},
            ])

print(model_np.viterbi(observations=[3, 1, 3]))

(['H', 'H', 'H'], 0.012544000000000001)


##Training.##

Second, learn the parameters of your HMM from data, i.e. the initial, transition, and emission probabilities. Implement a maximum likelihood training procedure for supervised learning of HMMs.
You can get a corpus at http://www.coli.uni-saarland.de/~koller/materials/anlp/de-utb.zip. It contains a training set, a test set, and an evaluation set. The training set (de-train.tt) and the evaluation set (de-eval.tt) are written in the commonly used CoNLL format. They are text files with two colums; the first column contains the words, the POS tags are in the second column, and empty lines delimit sentences. The test set (de-test.t) is a copy of the evaluation set with tags stripped, as you should tag the test set using your tagger and then compare your results with the gold-standard ones in the evaluation set. The corpus uses the 12-tag universal POS tagset by Petrov et al. (2012). Feel free to use the NLTK module nltk.corpus.reader (and submodules) for reading the corpus.

In [4]:
%%capture
if not os.path.isfile('de-train.tt'):
    ! wget http://www.coli.uni-saarland.de/~koller/materials/anlp/de-utb.zip
    ! unzip de-utb.zip
    ! rm de-utb.zip

In [5]:
train_corpus = ConllCorpusReader(root='.', fileids=['de-train.tt'], columntypes=['words', 'pos'])
print(train_corpus.words())
print(train_corpus.tagged_words())
print(train_corpus.tagged_sents()[0])

['Sehr', 'gute', 'Beratung', ',', 'schnelle', ...]
[('Sehr', 'ADV'), ('gute', 'ADJ'), ...]
[('Sehr', 'ADV'), ('gute', 'ADJ'), ('Beratung', 'NOUN'), (',', '.'), ('schnelle', 'ADJ'), ('Behebung', 'NOUN'), ('der', 'DET'), ('Probleme', 'NOUN'), (',', '.'), ('so', 'ADV'), ('stelle', 'VERB'), ('ich', 'PRON'), ('mir', 'PRON'), ('Kundenservice', 'NOUN'), ('vor', 'PRT'), ('.', '.')]


We need to know the set of hidden states. Here is an excerpt from Petrov's article:

"Our universal POS tagset unifies this previous work and defines the following twelve POS tags: NOUN (nouns), VERB (verbs), ADJ (adjectives), ADV (adverbs), PRON (pronouns), DET (determiners and articles), ADP (prepositions and postpositions), NUM (numerals), CONJ (conjunctions), PRT (particles), ‘.’ (punctuation marks) and X (a catch-all for other categories such as abbreviations or foreign words)."

In [6]:
states = ['NOUN', 'VERB', 'ADJ', 'ADV', 'PRON', 'DET', 'ADP', 'NUM', 'CONJ', 'PRT', '.', 'X']

Here is the theory from Jurafski and Martin book on supervised training for HMM:

X: sequence of hidden states / tags $q_1, ... , q_t$ <br>
Y: the corresponding sequence of observations / words $o_1, ... ,o_t$

initial probabilities: $c_j = \frac{\text{#sentences with } X_1=q_j}{\text{#sentences}}$ <br>


transition probabilities: $a_{ij}$ = $\frac{C(X_t = q_i, X_{t+1} = q_j)}{C(X_t = q_i)}$ <br>

emission probabilities: $b_j(o) = \frac{C(X_t = q_j, Y_t = o)}{C(X_t = q_j)}$ <br>

In [7]:
#@title HMM training
def train_HMM_params(states: List[Any], corpus: ConllCorpusReader) -> Tuple[
    List[List[float]], List[float], List[Dict[Any, float]]]:
    n = len(states)
    state_to_index = {state: i for i, state in enumerate(states)}

    state_occurence = np.zeros(n)
    state_occurence_with_transition = np.zeros(n)
    state_cooccurence_matrix = np.zeros((n, n))
    state_to_word_counter = [defaultdict(int) for _ in range(n)]
    states_for_sentence_start = np.zeros(n)

    for tagged_sentence in tqdm(corpus.tagged_sents()):
        for i, (observed_word, word_state) in enumerate(tagged_sentence):
            state_index = state_to_index[word_state]
            if i == 0:
                states_for_sentence_start[state_index] += 1
            if i != len(tagged_sentence) - 1:
                # we have a transition to the next tag (current state is the first item in bigram)
                state_occurence_with_transition[state_index] += 1
                next_state = tagged_sentence[i + 1][1]
                next_state_index = state_to_index[next_state]
                state_cooccurence_matrix[state_index][next_state_index] += 1
            state_to_word_counter[state_index][observed_word] += 1
            state_occurence[state_index] += 1

    # it is just a smart way to divide each row by the correspoding vector element
    state_transition_probs = state_cooccurence_matrix / state_occurence_with_transition[:, None]

    initial_probs = states_for_sentence_start / len(corpus.tagged_sents())

    emission_probs = [defaultdict(int) for _ in range(n)]
    for state_i, counted_words in enumerate(state_to_word_counter):
        for word, word_count in counted_words.items():
            emission_probs[state_i][word] = word_count / state_occurence[state_i]

    return state_transition_probs, initial_probs, emission_probs

In [8]:
trained_state_transition_probs, trained_initial_probs, trained_emission_probs = train_HMM_params(states, train_corpus)

100%|██████████| 14118/14118 [00:02<00:00, 6869.75it/s]


##Evaluation.##

Once you have trained a model, evaluate it on the unseen data from the test set. Run the Viterbi algorithm with each of your models, and output a tagged corpus in the two-column CoNLL format (*.tt). We will provide an evaluation script on Classroom. Run it on the output of your tagger and the evaluation set and report your results.
Note that your tagger will initially fail to produce output for sentences that contain words you haven’t seen in training. If you have such a word w appear at sentence position t, you will have bj(w) = 0 for all states/tags j, and therefore Vt(j) = 0 for all j. Adapt your tagger by implementing the following crude approach to unknown words. Whenever you get Vt(j) = 0 for all j because of an unknown word w at position t, pretend that bj(w) = 1 for all j. This will basically set Vt(j) = maxi Vt−1(i) · aij, and allow you to interpolate the missing POS tag based on the transition probabilities alone.

In [9]:
test_corpus = ConllCorpusReader(root='.', fileids=['de-test.t'], columntypes=['words'])
print(test_corpus.sents()[0])

['Der', 'Hauptgang', 'war', 'in', 'Ordnung', ',', 'aber', 'alles', 'andere', 'als', 'umwerfend', '.']


In [10]:
model = HMM(states=states,
            state_transition_probs=trained_state_transition_probs,
            initial_probs=trained_initial_probs,
            emission_probs=trained_emission_probs)

In [11]:
%%capture
! wget https://raw.githubusercontent.com/tsimafeip/LCT-master-course/main/Computational_Linguistics/HW2_hmm_pos_tagger/eval.py

In [12]:
# with open('de-train-res.tt', 'w') as f:
#     for sentence in tqdm(train_corpus.sents()):
#         predicted_tags, max_prob_value = model.viterbi(observations=sentence)
    
#         for tag, word in zip(predicted_tags, sentence):
#             f.write(f"{word}\t{tag}\n")
#         f.write('\n')

In [13]:
# ! python eval.py de-train.tt de-train-res.tt

In [14]:
with open('de-test-res.tt', 'w') as f:
    for sentence in tqdm(test_corpus.sents()):
        predicted_tags, max_prob_value = model.viterbi(observations=sentence)
    
        for tag, word in zip(predicted_tags, sentence):
            f.write(f"{word}\t{tag}\n")
        f.write('\n')

100%|██████████| 1000/1000 [00:01<00:00, 522.54it/s]


In [15]:
! python eval.py de-eval.tt de-test-res.tt


Comparing gold file "de-eval.tt" and system file "de-test-res.tt"

Precision, recall, and F1 score:

  DET 0.8232 0.9755 0.8929
 NOUN 0.9296 0.9141 0.9218
 VERB 0.9202 0.9211 0.9206
  ADP 0.9348 0.9775 0.9557
    . 0.9608 1.0000 0.9800
 CONJ 0.9498 0.8974 0.9228
 PRON 0.8671 0.8364 0.8515
  ADV 0.9043 0.8058 0.8523
  ADJ 0.8099 0.7222 0.7635
  NUM 0.9905 0.7704 0.8667
  PRT 0.8712 0.9251 0.8973
    X 0.2222 0.0909 0.1290

Accuracy: 0.9095



##Extra credit.##

The task is challenging as it stands. However, feel free to go further for extra credit, e.g. by doing one of the following: implement better unknown word handling, use a trigram tagger, plot a learning curve for your tagger (accuracy as a function of training data size), plot a speed vs. sentence length curve.

Please submit your code, instructions for running your tagger and tagging output(s). Document any additional data you submit. With this, you will have implemented your first POS tagger! Well done!