# Problem 2: Part-of-speech tagging using the Viterbi algorithm

### Learning outcome
In this exercise, you learn how to implement the Viterbi algorithm on part-of-speech (POS) tagging of sentences.

### Introduction
A short introduction to POS can be found here: https://www.youtube.com/watch?v=mHEKZ8jv2SY

In this notebook, first the data constisting of a text and the corresponding grammatical tags of each word is parsed and processed. Then, a Hidden Markov Model (HMM) is implemented: The observed variables are the tokens, which are words or punctuation characters like `the`, `blue` or `.`. The hidden variables are the grammatical tags which take values like `NOUN` or `VERB` or `.` for punctuation characters. Subsequently, the probabilities of the model are learned for the given data. When only the sentences of previously unknown test data are provided, the most likely sequence of tags has to be assigned correctly to each sentence by implementing the Viterbi algorithm is implemented for the HMM.

### Passing criteria:
- For passing this exercise, the `final_test` in the last cell needs to pass when running the complete notebook. The same test is performed when the notebbok is pushed to Artemis. (Make sure that all the cells run without an error or comment them out otherwise.)

- No other libraries than numpy may be used. The `*.pkl` files used for the provided tests must not be used for anything besides these tests.

Fill out the TODOs in the stub of the code which is provided here (recommended) or feel free to create your own model or. However, the names of the provided functions and their input/output arguments must not be changed in order to run the final test.

In [1]:
#!pip install numpy  # uncomment this and rerun in case numpy is not installed
import time
import pickle
import requests
import numpy as np
from collections import Counter

# Parsing the dataset
First, the dataset is downloaded. It can either be downloaded from https://raw.githubusercontent.com/davidpitkanen/Part-of-Speech-Tagger/master/brown-universal.txt or by executing the following code.

In [2]:
filename="brown-universal.txt"

try:
    with open(filename) as f:
        print("Dataset already downloaded")
except FileNotFoundError:
    print("Downloading dataset")
    url = "https://raw.githubusercontent.com/davidpitkanen/Part-of-Speech-Tagger/master/brown-universal.txt"
    r = requests.get(url)
    with open(filename, 'wb') as f:
        f.write(r.content)

Downloading dataset


First, the data is read and - in a Machine Learning manner - split into training, validation and test data:

In [3]:
def read_dataset(filename="brown-universal.txt", number_of_sentences_to_take=None):
    """
    Reads the dataset and splits it into training, validation and test data
    The dataset is the Brown Corpus (https://en.wikipedia.org/wiki/Brown_Corpus), annotated with
    the universal tagset: http://www.nltk.org/nltk_data/
    The dataset is taken from:
        https://github.com/davidpitkanen/Part-of-Speech-Tagger/blob/master/brown-universal.txt

    :param filename: str, the filename of the dataset
    :param number_of_sentences_to_take: None or int, if the latter: for faster debugging, only read the
        first number_of_sentences_to_take into the dataset
    :return: train_sentences, val_sentences, test_sentences, each [[(str,str)]] list of sentences, each 
        sentence is a list of tuples with the first element being the token, the second element the label
    """
    sentences = []
    with open(filename, "r") as f:
        current_sentence = []
        for line in f:
            if line.__contains__("b100-"):  # start of a new sentence
                current_sentence = []
            elif line == "\n":  # end of a sentence
                sentences.append(current_sentence)
            else:  # the line contains the next token (i.e. a word ,'.', ',', ...) of 
                # the current sentence and its label
                token, label = line.split()
                current_sentence.append((token.lower(), label))
    
    # reduced numer of sentences for faster debugging:
    if number_of_sentences_to_take:
        sentences = sentences[:number_of_sentences_to_take]

    train_idx = int(np.ceil((len(sentences) * 0.9)))
    text_idx = int((np.ceil(len(sentences) * 0.95)))

    train_sentences = sentences[:train_idx]
    val_sentences = sentences[train_idx:text_idx]
    test_sentences = sentences[train_idx:]  # for final testing 

    return train_sentences, val_sentences, test_sentences

First, take a look at the dataset and use only a few sentences for faster debugging:

In [4]:
train_sentences, val_sentences, test_sentences = read_dataset()

train_sentences_debug = train_sentences[:5]
train_sentences_debug


[[('mr.', 'NOUN'),
  ('podger', 'NOUN'),
  ('had', 'VERB'),
  ('thanked', 'VERB'),
  ('him', 'PRON'),
  ('gravely', 'ADV'),
  (',', '.'),
  ('and', 'CONJ'),
  ('now', 'ADV'),
  ('he', 'PRON'),
  ('made', 'VERB'),
  ('use', 'NOUN'),
  ('of', 'ADP'),
  ('the', 'DET'),
  ('advice', 'NOUN'),
  ('.', '.')],
 [('but', 'CONJ'),
  ('there', 'PRT'),
  ('seemed', 'VERB'),
  ('to', 'PRT'),
  ('be', 'VERB'),
  ('some', 'DET'),
  ('difference', 'NOUN'),
  ('of', 'ADP'),
  ('opinion', 'NOUN'),
  ('as', 'ADP'),
  ('to', 'ADP'),
  ('how', 'ADV'),
  ('far', 'ADV'),
  ('the', 'DET'),
  ('board', 'NOUN'),
  ('should', 'VERB'),
  ('go', 'VERB'),
  (',', '.'),
  ('and', 'CONJ'),
  ('whose', 'DET'),
  ('advice', 'NOUN'),
  ('it', 'PRON'),
  ('should', 'VERB'),
  ('follow', 'VERB'),
  ('.', '.')],
 [('such', 'PRT'),
  ('an', 'DET'),
  ('instrument', 'NOUN'),
  ('is', 'VERB'),
  ('expected', 'VERB'),
  ('to', 'PRT'),
  ('be', 'VERB'),
  ('especially', 'ADV'),
  ('useful', 'ADJ'),
  ('if', 'ADP'),
  ('it', 'PR

# Count transistions and emissions
In the following, several helper functions with the corresponding tests are defined. The tests load sample inputs and expected outputs from the corresponding pickle files. The tests should pass without an error. In case there are errors, use your IDEs debugger or print statements to figure out why the results are different. (Or ignore them if you think your implementation should work anyway.)

TODO: Figure out and implement what the methods should do by looking at their signatures and the PartsOfSpeechTaggingModel class below.
Wherever code needs to be inserted, it is marked with a "TODO"-comment.


In [5]:
def count_transitions(label_to_idx, sentences):
    """
    Counts the transitions from one label i to another label j

    :param label_to_idx: dict {str : int} mapping from labels to ids
    :param sentences: [[(str,str)]] list of sentences, each sentence is a list of tuples with the first element
        being the token, the second element the label
    :return: transition_counts: ndarray, shape = (#labels, #labels)
    transition_counts[i,j] : How many times follows idx_to_label[j] after idx_to_label[i] in the dataset?
    """

    transition_counts = np.zeros(shape=(len(label_to_idx), len(label_to_idx)))

    for sentence in sentences:
        before_label = None
        for _, label in sentence:
            if before_label:
                # TODO: Your code here
                #####
                transition_counts[label_to_idx[before_label], label_to_idx[label]] =
                #####
            before_label = label

    return transition_counts

In [6]:
def test_count_transitions():
    label_to_idx, sentences, transition_counts = pickle.load(open("count_transitions_test.pkl", "rb"))

    assert (count_transitions(label_to_idx, sentences) == transition_counts).all()

In [7]:
test_count_transitions()

In [8]:
def count_start_end_labels(sentences, label_to_idx):
    """
    Counts the start and end labels for the sentences

    :param sentences: [[(str,str)]] list of sentences, each sentence is a list of tuples with the 
        first element being the token, the second element the label
    :param label_to_idx: dict {str : int} mapping from labels to ids
    :return: start_label_count_vector, end_label_count_vector, both ndarray , both: shape = (#labels,)
    start_label_count_vector[i]: How many times appeared idx_to_token[i] in the beginning of a sentence 
        in the dataset?
    end_label_count_vector[i]: How many times appeared idx_to_token[i] in the end of a sentence in the dataset?
    """

    start_labels = [sentence[0][1] for sentence in
                    sentences]  # labels of the first token of each sentence
    #####
    # TODO: Your code here
    end_labels =  # labels of the last token of each sentence 
    #####
    start_label_count = dict(Counter(start_labels))
    end_label_count = dict(Counter(end_labels))

    start_label_count_vector = np.zeros(shape=len(label_to_idx))
    end_label_count_vector = np.zeros(shape=len(label_to_idx))
    for label in label_to_idx.keys():
        if label in start_label_count.keys():
            #####
            # TODO: Your code here            
            start_label_count_vector[label_to_idx[label]] 
            #####
        if label in end_label_count.keys():
            #####
            # TODO: Your code here            
            end_label_count_vector[label_to_idx[label]] 
            #####

    return start_label_count_vector, end_label_count_vector

In [9]:
def test_count_start_end_labels():
    train_sentences, label_to_idx, start_label_count_vector, end_label_count_vector = pickle.load(
        open("count_start_end_labels_test.pkl", "rb"))

    result1, result2 = count_start_end_labels(train_sentences, label_to_idx)

    assert (result1 == start_label_count_vector).all()
    assert (result2 == end_label_count_vector).all()

In [10]:
test_count_start_end_labels()

In [11]:
def count_emissions(token_list, labels_list, common_tokens, token_to_idx, label_to_idx):
    """
    Counts the emissions of labels for each token

    :param token_list:  [str] list of strings, each corresponding to a token
    :param labels_list: [str] list of strings, each corresponding to a label of a token
    :param common_tokens: [str] list of strings, each corresponding to a known token
    :param token_to_idx: dict {str : int} mapping from tokens to ids
    :param label_to_idx: dict {str : int} mapping from labels to ids
    :return: emission_counts: ndarray, shape = (#tokens (including placeholder), #labels)
    emission_counts[i,j] : How many times occur idx_to_token[i] and idx_to_label[j] together in the dataset?

    """
    emission_counts = np.zeros(shape=(len(set(common_tokens)) + 1, len(set(
        labels_list))))  # +1 for the placeholder token
    combination_counts = Counter(zip(token_list, labels_list))
    for token, label in dict(combination_counts).keys():
        if token in common_tokens:
            # TODO: Your code here
            #####
            emission_counts[token_to_idx[token], label_to_idx[label]] += 
            #####
        else:
            emission_counts[0, label_to_idx[label]] += combination_counts[
                (token, label)]  # counts as placeholder token

   
    return emission_counts

In [12]:
def test_count_emissions():
    labels_list, token_list, common_tokens, token_to_idx, label_to_idx, emission_counts = pickle.load(
        open("count_emissions_test.pkl", "rb"))
    assert (count_emissions(token_list, labels_list, common_tokens, token_to_idx,
                            label_to_idx) == emission_counts).all()

In [13]:
test_count_emissions()

# Learning parameters of the HMM model
Now, the model with the HMM can be defined and its parameters are learned. All the words/tokens which do not appear often in the training data will be mapped to a placeholder token. In this way, the model learns better how to deal with unseen tokens in the validation/test. 

In [14]:
class PartsOfSpeechTaggingModel:
    """
    This Class represents a model for performing part-of-speech tagging on sentences
    """

    def __init__(self, train_sentences, min_token_count_required=3, epsilon=1.e-5,
                 placeholder_token="UNKNOWN_PLACEHOLDER_TOKEN"):
        """
        In this init method for the PartsOfSpeechTaggingModel, the dataset is read, preprocessed
        and the 'training' is performed by creating a hidden Markov model

        :param train_sentences: [[(str,str)]] list of sentences, each sentence is a list of tuples with 
            the first element being the token, the second element the label; the data to train the model with
        :param min_token_count_required: int, the minimum number of occurrences of a token such that it 
            will be considered as 'known' and not mapped to the placeholder token
        :param epsilon: float, a small value for numerical reasons and as a bias towards rare, unseen 
            transitions
        :param placeholder_token: str, a string which will be inserted for every unknown token
        """

        start_time = time.time()

        self.placeholder_token = placeholder_token
        labels_list = [label for sentence in train_sentences for _, label in sentence]
        token_list = [token for sentence in train_sentences for token, _ in sentence]
        unique_labels_ordered = sorted(list(set(labels_list)))
        self.label_to_idx = {label: i for i, label in enumerate(unique_labels_ordered)}
        # TODO: Your code here
        #####
        self.idx_to_label: dict =
        #####
        token_counts = Counter(token_list)
        self.common_tokens = sorted(
            list(set([token for token in set(token_list) if token_counts[token] >= min_token_count_required])))
        self.token_to_idx = {token: i + 1 for i, token in
                             enumerate(self.common_tokens)}  # idx 0 is reserved for placeholder token
        self.token_to_idx[placeholder_token] = 0
        # TODO: Your code here
        #####
        self.idx_to_token: dict =
        self.idx_to_token[0] =
        #####

        emission_count_matrix = count_emissions(token_list, labels_list, self.common_tokens, self.token_to_idx,
                                                self.label_to_idx)

        start_label_count_vector, end_label_count_vector = count_start_end_labels(train_sentences,
                                                                                  self.label_to_idx)
        transition_count_matrix = count_transitions(self.label_to_idx, train_sentences)

        # for numerical reasons and bias towards rare/unseen transitions
        start_label_count_vector += epsilon
        end_label_count_vector += epsilon
        emission_count_matrix += epsilon
        transition_count_matrix += epsilon
        
        #####
        # TODO: Your code here        
        # start_label_probabilities[i] : Probability that the first label of a sentence is idx_to_label[i]
        self.start_label_probabilities =
        # analogously for last label:
        self.end_label_probabilities =        
        #####
        
        # emission_probabilities[i,j] : Probability that idx_to_token[i] 
            # has label idx_to_label[j] in the dataset
        self.emission_probabilities = emission_count_matrix / emission_count_matrix.sum(axis=1)[:, None]        
        # transition_probabilites[i,j] : Probability that idx_to_label[i] follows 
            # after idx_to_label[j] in the dataset
        self.transition_probabilites = transition_count_matrix / transition_count_matrix.sum(axis=1)        

        end_time = time.time()

        print(f"Initializing the model took { end_time - start_time:.4f} seconds")
        
  

In [15]:
def test_model(train_sentences_debug):
    reference_model_dict = pickle.load(  open("PartsOfSpeechTaggingModel_test.pkl", "rb"))
    debug_model = PartsOfSpeechTaggingModel(train_sentences=train_sentences_debug, min_token_count_required=0)
    
    assert (reference_model_dict['start_label_probabilities'] == debug_model.start_label_probabilities).all()
    assert (reference_model_dict['end_label_probabilities'] == debug_model.end_label_probabilities).all()
    assert (reference_model_dict['emission_probabilities'] == debug_model.emission_probabilities).all()
    assert (reference_model_dict['transition_probabilites'] == debug_model.transition_probabilites).all()
    assert (reference_model_dict['idx_to_label'] == debug_model.idx_to_label)
    assert (reference_model_dict['idx_to_token'] == debug_model.idx_to_token)

In [16]:
test_model(train_sentences_debug)

Initializing the model took 0.0034 seconds


# Viterbi Algorithm
Now, the viterbi algorithm can be implemented in order to obtain the most-likely sequence of tags for a given sentence.

In [17]:
def tag_sentence(hmm_model, sentence, verbose=False):
    """
    This method performs part-of-speech tagging on the given sentence using the viterbi algorithm.
    :param sentence: str, the sentence to be tagged
    :param verbose: bool, if True, the sentence, its tokenization and its predicted tags will be printed
    :return: label_sequence_text:  list of str, every element depicts the tag of the respective token in 
        the sentence
    """

    # replace unknown tokens with placeholder token use it's probabilities as a heuristics    
    tokens = sentence.lower().split()
    tokens = [token if token in hmm_model.common_tokens else hmm_model.placeholder_token for token in tokens]

    # most_probable_ancestor[i,j] :
    # If the i-th token of the sentence had label idx_to_label[j]: What's the most likely 
    # label of the token before?
    most_probable_ancestor = np.zeros(shape=(len(tokens), len(hmm_model.label_to_idx)),
                                      dtype=np.int)

    # frontier_label_probabilities[i] : Probability score of idx_to_label[i] for
    # the current token in the viterbi algorithm
    frontier_label_probabilities = hmm_model.start_label_probabilities
    
    #####
    # TODO: Your code here    
    
    # initialize with start label probabilities and first token
    frontier_label_probabilities = 
    
    for i, token in enumerate(tokens[1:]):  # other tokens
        possible_path_probabilities = 
        most_probable_ancestor[i] = np.argmax(possible_path_probabilities, axis=1)
        most_probable_ancestor_probability =

        frontier_label_probabilities = 
        
        # normalize to prevent underflows
        frontier_label_probabilities = frontier_label_probabilities \
                                       / frontier_label_probabilities.sum()        

    # end label probabilities 
    frontier_label_probabilities = 
    
    # the label for the last token achieving the highest probability
    most_probable_last_label = np.argmax(frontier_label_probabilities)

    # list of labels for the tokens, build in reversed order
    most_probable_label_sequence = [int(most_probable_last_label)]

    for i in range(1, most_probable_ancestor.shape[0]):
        last_label = 
        most_probable_label_sequence.append(int(most_probable_ancestor[-(i + 1), last_label]))
    #####

    most_probable_label_sequence.reverse()

    label_sequence_text = [hmm_model.idx_to_label[idx] for idx in most_probable_label_sequence]

    if verbose:
        print(sentence)
        print(tokens)
        print(label_sequence_text)
        

    return label_sequence_text



# Train and evaluate your model
### Small dataset for debugging
Now, a model for debugging can be instantiated:

In [18]:
debug_model = PartsOfSpeechTaggingModel(train_sentences=train_sentences_debug, min_token_count_required=0)

Initializing the model took 0.0006 seconds


The implementation of the algorithm can be tested:

In [19]:
def test_tag_sentence(model):
    sentence = "Mr. Podger had thanked him gravely ."
    label_sequence_text = tag_sentence(model,sentence, verbose = True)
    assert label_sequence_text == ['NOUN', 'NOUN', 'VERB', 'VERB', 'PRON', 'ADV', '.']

In [20]:
test_tag_sentence(debug_model)

Mr. Podger had thanked him gravely .
['mr.', 'podger', 'had', 'thanked', 'him', 'gravely', '.']
['NOUN', 'NOUN', 'VERB', 'VERB', 'PRON', 'ADV', '.']


The model should be able to overfit on the data and roughly tag the first sentence in the small debug dataset.
It does not know how to deal with unknown words though.

In [21]:
tags = tag_sentence(debug_model,"Mr. Podger had thanked him gravely , and now he made use of the advice .",
                                verbose=True)
# HINT: correct tags are ['NOUN', 'NOUN', 'VERB', 'VERB', 'PRON', 'ADV', '.', 'CONJ', 'ADV', 'PRON',
# 'VERB', 'NOUN', 'ADP', 'DET', 'NOUN', '.']

tags = tag_sentence(debug_model,"The deer is running fast .", verbose=True)
tags = tag_sentence(debug_model,"Google is a company .", verbose=True)

Mr. Podger had thanked him gravely , and now he made use of the advice .
['mr.', 'podger', 'had', 'thanked', 'him', 'gravely', ',', 'and', 'now', 'he', 'made', 'use', 'of', 'the', 'advice', '.']
['NOUN', 'NOUN', 'VERB', 'VERB', 'PRON', 'ADV', '.', '.', 'ADV', 'PRON', 'VERB', 'NOUN', 'ADP', 'VERB', 'NOUN', '.']
The deer is running fast .
['the', 'UNKNOWN_PLACEHOLDER_TOKEN', 'is', 'UNKNOWN_PLACEHOLDER_TOKEN', 'UNKNOWN_PLACEHOLDER_TOKEN', '.']
['DET', 'ADP', 'VERB', 'PRT', 'CONJ', '.']
Google is a company .
['UNKNOWN_PLACEHOLDER_TOKEN', 'is', 'a', 'UNKNOWN_PLACEHOLDER_TOKEN', '.']
['PRT', 'VERB', 'DET', 'CONJ', '.']


### Full dataset
After having debugged your implementation, the full model can be created. This might take several minutes. 


The parameters min_token_count and epsilon might be finetuned for better performance (to pass the final test).


In [22]:
pos_model = PartsOfSpeechTaggingModel(train_sentences=train_sentences, min_token_count_required=5, epsilon=0.1)

Initializing the model took 30.7614 seconds


Now, the previous example can be tagged better:

In [23]:
tags = tag_sentence( pos_model,"Mr. Podger had thanked him gravely , and now he made use of the advice .", 
                          verbose=True)
# HINT: correct tags are ['NOUN', 'NOUN', 'VERB', 'VERB', 'PRON', 'ADV', '.', 'CONJ', 'ADV', 'PRON',
# 'VERB', 'NOUN', 'ADP', 'DET', 'NOUN', '.']

tags = tag_sentence(pos_model,"The deer is running fast .", verbose=True)
tags = tag_sentence(pos_model,"Google is a company .", verbose=True)

Mr. Podger had thanked him gravely , and now he made use of the advice .
['mr.', 'podger', 'had', 'thanked', 'him', 'gravely', ',', 'and', 'now', 'he', 'made', 'use', 'of', 'the', 'advice', '.']
['NOUN', 'NOUN', 'VERB', 'VERB', 'PRON', 'ADV', '.', 'CONJ', 'ADV', 'PRON', 'VERB', 'NOUN', 'ADP', 'DET', 'NOUN', '.']
The deer is running fast .
['the', 'deer', 'is', 'running', 'fast', '.']
['DET', 'NOUN', 'VERB', 'VERB', 'ADV', '.']
Google is a company .
['UNKNOWN_PLACEHOLDER_TOKEN', 'is', 'a', 'company', '.']
['NOUN', 'VERB', 'DET', 'NOUN', '.']


### Evaluating the perfomance
Now, the model can be evaluated on the whole dataset, for which the evaluation method below is defined.

In [24]:
def evaluate(tag_sentence, model, sentences, verbose=False, estimate=False, seed=123):
    """
    Evaluates the accuracy of the model on the given sentences

    :param tag_sentence: callable function (sentence (string) , verbose (bool) -> tagging ([str])), 
    mapping a sentence to its token's tags
    :param sentences: list of list of [(str, str)], A list containing sentences, each sentence is a list of
        tuples of the token and the respective tag
    :param verbose: bool, whether the prediction should be verbose
    :param estimate: bool, whether to just use 100 randomly chosen sentences for evaluation,
     recommended if the list of sentences is very large
    :param seed: int, seed for the numpy random choice
    :return: float, the (estimated) accuracy
    """
    start_time = time.time()
    if estimate:
        np.random.seed(seed)
        idx = np.random.choice(len(sentences), 100, replace=False)
        sentences = [sentences[i] for i in idx]

    sentence_strings = [" ".join([token for token, _ in sentence]) for sentence in sentences]
    sentence_labels = [[label for _, label in sentence] for sentence in sentences]

    label_count = 0.
    correct_label_count = 0.

    for sentence_string, true_labels in zip(sentence_strings, sentence_labels):
        predicted_labels = tag_sentence(model,sentence_string, verbose=verbose)

        label_count += len(true_labels)
        correct_label_count += sum(
            [predicted_label == true_label for predicted_label, true_label
                 in zip(predicted_labels, true_labels)])

    acc = correct_label_count / label_count

    end_time = time.time()

    print("Evaluating the model on the given data took ", end_time - start_time, " seconds")

    return acc


Now we can evaluate the performance on the dataset. For good practice in Machine Learning, only touch the test set once (in the very end) and aim for a validation accuracy above 0.85 before running the evaluation in the test set.

In [25]:
train_acc = evaluate(tag_sentence, pos_model, train_sentences, estimate=True)
print("estiamated training accuracy: ", train_acc)
val_acc = evaluate(tag_sentence, pos_model, val_sentences, estimate=True)
print("estimated validation accuracy: ", val_acc)

Evaluating the model on the given data took  0.6678941249847412  seconds
estiamated training accuracy:  0.877961555654895
Evaluating the model on the given data took  0.6498968601226807  seconds
estimated validation accuracy:  0.8847715736040609


The final evaluation on the test set should yield an accuracy of >0.8 to pass this exercise. The test should pass in a few seconds.

In [None]:
def final_test():
    
    _, _, test_sentences = read_dataset()

    test_acc = evaluate(tag_sentence, pos_model, test_sentences, estimate=True, seed = 321)
    print("estimated test accuracy: ", test_acc)
    assert test_acc >= 0.8

In [None]:
final_test()
