In [None]:
from collections import defaultdict
import numpy as np
import pandas as pd
from conllu import parse_incr, TokenList
from enum import Enum
from typing import Iterator, List, Dict, Tuple
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

In [None]:
class Decoder(Enum):
    most_frequent_tag = 1
    viterbi = 2

class SmoothingStrategy(Enum):
    uniform = 1
    always_other = 2
    other_and_misc = 3
    one_shot_word = 4

decoder = Decoder.viterbi
smoothing_strategy = None
laplace_correction = np.finfo(float).tiny
lang = 'eng'
train_set = open(f'data/{lang}/train.conllu', 'r', encoding='utf-8')
test_set = open(f'data/{lang}/test.conllu', 'r', encoding='utf-8')
val_set = open(f'data/{lang}/val.conllu', 'r', encoding='utf-8')

tags = ['START', 'O', 'I-LOC', 'I-MISC', 'I-ORG', 'I-PER', 'B-LOC', 'B-MISC', 'B-PER', 'B-ORG']

## Smoothing strategies

In [None]:
def unknown_word_emission(smoothing_strategy: Enum, tag: str) -> float:
    if smoothing_strategy == SmoothingStrategy.uniform:
        return 1 / len(tags)
    elif smoothing_strategy == SmoothingStrategy.always_other:
        if tag == 'O':
            return 1
    elif smoothing_strategy == SmoothingStrategy.other_and_misc:
        if tag == 'O' or tag == 'B-MISC':
            return 0.5
    elif smoothing_strategy == SmoothingStrategy.one_shot_word:
        return one_shot_distrib[tag]
    return 0

## Matrix Functions

In [None]:
def compute_transition_matrix(tags: List[str], train_set: Iterator[TokenList]) -> np.array:
    transition_matrix = np.zeros((len(tags), len(tags)), dtype=float)

    tag_counter = defaultdict(int)
    transition_counter = defaultdict(int)

    for sentence in parse_incr(train_set):
        # count first tag of sentence and match it with 'START' artificial tag
        first_tag = sentence[0]['lemma']
        tag_counter['START'] += 1
        transition_counter[('START', first_tag)] += 1

        # count middle token pairs
        for (token_a, token_b) in zip(sentence, sentence[1:]):
            tag_counter[token_a['lemma']] += 1
            transition_counter[(token_a['lemma'], token_b['lemma'])] += 1

        # count last tag of sentence
        tag_counter[sentence[-1]['lemma']] += 1

    for i, t1 in enumerate(tags):
        for j, t2 in enumerate(tags):
            if tag_counter[t1] > 0:  # if tag occurs at least once
                transition_matrix[i, j] = transition_counter[(t1, t2)] / tag_counter[t1]  # compute transition probability

    train_set.seek(0)
    return transition_matrix


def compute_emission_probabilities(train_set: Iterator[TokenList]) -> Dict[str, float]:
    word_tag_counter = defaultdict(int)
    tag_counter = defaultdict(int)

    for sentence in parse_incr(train_set):
        for token in sentence:                
            word_tag_counter[(token['form'], token['lemma'])] += 1
            tag_counter[token['lemma']] += 1
    
    emission_probabilities = {(word, tag): count / tag_counter[tag] for (word, tag), count in word_tag_counter.items()}  # compute emission probability
    train_set.seek(0)
    return emission_probabilities


def compute_emission_matrix(tags: List[str], words: List[str], emission_probabilities: [Dict[str, float]]) -> np.array:
    emission_matrix = np.zeros((len(tags), len(words)), dtype=float)
    for i, tag in enumerate(tags):
        for j, word in enumerate(words):
            emission_matrix[i, j] = emission_probabilities.get((word, tag), unknown_word_emission(smoothing_strategy, tag))

    return emission_matrix


def compute_one_shot_distrib(val_set: Iterator[TokenList]) -> Dict[str, float]:
    word_tag_counter = defaultdict(int)
    
    for sentence in parse_incr(val_set):
        for token in sentence:                
            word_tag_counter[(token['form'], token['lemma'])] += 1
    
    one_shot_distrib = defaultdict(int)
    for (word, tag), count in word_tag_counter.items():
        if count == 1:
            one_shot_distrib[tag] += 1
    total_count = sum(one_shot_distrib.values())

    for tag, count in one_shot_distrib.items():
        one_shot_distrib[tag] = count / total_count
    
    return one_shot_distrib

## Learning

In [None]:
emission_probabilities = compute_emission_probabilities(train_set)
transition_matrix = np.log(np.add(compute_transition_matrix(tags, train_set), laplace_correction, where=lambda x: not x))

if smoothing_strategy == SmoothingStrategy.one_shot_word:
    one_shot_distrib = compute_one_shot_distrib(val_set)

tags.remove('START')
transition_matrix = transition_matrix[1:, 1:]

train_set.close()

## Definition of Decoders

In [None]:
def argmax(arr: np.array) -> Tuple[int, float]:
    max_elem = np.amax(arr)
    max_index = np.where(arr == max_elem)[0][0]

    return max_index, max_elem


def viterbi(words: List[str], tags: List[str], Tm: np.array, Em: np.array) -> List[str]:
    W = len(words)
    T = len(tags)

    viterbi_matrix = np.full((T, W), np.NINF, dtype=float)
    backpointer = np.empty((T, W), dtype=int)

    # compute first word initial probability for each tag
    viterbi_matrix[:, 0] = [emission + initial_p for emission, initial_p in zip(Em[:, 0], Tm[0, :])]

    # compute probabilities and fill backpointer for the rest of matrix
    for i in range(1, W):
        for j in range(T):
            k, prob = argmax(viterbi_matrix[:, i-1] + Tm[:, j])
            viterbi_matrix[j, i] = prob + Em[j, i]
            backpointer[j, i] = k

    # get tag index k of last column with highest probability
    k, _ = argmax(viterbi_matrix[:, -1])

    # get best path walking through backpointer
    best_path = list()
    for i in range(W-1, -1, -1):
        best_path.append(tags[k])
        k = backpointer[k, i]
    
    best_path.reverse()
    return best_path


def most_frequent_tag(words: List[str], tags: List[str], Em: np.array) -> List[str]:
    W = len(words)
    prediction = list()
    for i in range(W):
        k, _ = argmax(Em[:, i])
        prediction.append(tags[k])
    
    return prediction

## Decoding

In [None]:
predictions = list()
reference = list()

for sentence in parse_incr(test_set):
    correct_tags = [token['lemma'] for token in sentence]
    words = [token['form'] for token in sentence]

    emission_matrix = np.log(np.add(compute_emission_matrix(tags, words, emission_probabilities), laplace_correction, where = lambda x: not x))

    if decoder == Decoder.viterbi:
        prediction = viterbi(words, tags, transition_matrix, emission_matrix)
    elif decoder == Decoder.most_frequent_tag:
        prediction = most_frequent_tag(words, tags, emission_matrix)
    
    predictions.extend(prediction)
    reference.extend(correct_tags)

## Evaluation

In [None]:
accuracy = accuracy_score(reference, predictions)
report = classification_report(reference, predictions, target_names=tags)
confusion = confusion_matrix(reference, predictions, labels=tags)

print(f'Total words: {len(predictions)}')
print(f'Accuracy: {round(accuracy * 100, 2)}%')
print('\nClassification Report')
print(report)
print('\nConfusion Matrix')
print(pd.DataFrame(confusion, index=tags, columns=tags))