# Word Disambiguation

When building a model to extract information from text, it often helps to encode some kind of meaning for each word as features for our model. A basic way to do is known as **part-of-speech** (or POS) tagging. This low level task uses various features about the text to frame a classification problem of labeling each word with the correct part of speech in a given document. Why do we do this? The intuition is that having additional information about how a word is being used will allow a model to perform better inference on some higher level task.
<br><br>
Formally, given an unlabeled sequence, POS-tagging is the task of labeling each token in that sequence with its part-of-speech. Here is an example of doing so using **`spaCy`**: 

In [2]:
import spacy

model = spacy.load('en')
doc = model('I am a happy boy.')
labeled_tokens = [(token.text, token.pos_, token.tag_) for token in doc]
print(", ".join(map(str, labeled_tokens)))    

('I', 'PRON', 'PRP'), ('am', 'VERB', 'VBP'), ('a', 'DET', 'DT'), ('happy', 'ADJ', 'JJ'), ('boy', 'NOUN', 'NN'), ('.', 'PUNCT', '.')


## Implementing our own POS-Tagger

While various methods exist of labeling a sequence of tokens, the most popular is the Hidden Markov Model (HMM). On a very high level, HMMs are a sort of supervised model that labels sequences by using a learned probability model and some markov assumptions. Given a sequence of objects, **x**, an HMM assumes the following:
> 1. Each object in **x_i** is generated as a result of some hidden state **z_i**.
> 2. There is a sequence of *hidden* states responsible for generating the observed sequence.
> 3. There is a conditional probabilistic distribution P(x|z) that governs this generation process.
> 4. The sequence of hidden states is generated by a conditional distribution as well; given the current timestep *i* and some window size *j*, the state **z_i** is only dependent on **z_i-j** to **z_i-1**. This is what is known as a Markov assumption.

Here, our words (tokens) are our **x**, while their associated part-of-speech tags are our hidden states, **z**. Thus, we must learn two distributions in order to have a functioning HMM: P(x|z) and P(z_i|z_i-j:z_i-1). Since **x** is a token, we can learn the conditional distribution P(x|z) for a specific **x_i** and **z_i** by using the probabilistic rule that: P(x|z) = P(x, z)/P(z). While we don't have the true probabilities P(x, z) and P(z), we can approximate both empirically using maximum likelihood estimation. Our P(x|z) then becomes a ratio of counts.

We can do the same for the conditional distribution of z. For simplification, we will assume that each pos-tag only depends on the tag of the previous word. Here is a model class that will fit these two distributions given a labeled sequence of tokens.

In [10]:
from collections import defaultdict

import math

# A callable for nesting a defaultdict inside another
defaultdict_int = lambda n: lambda : defaultdict(lambda : n)

START_SENTINEL = 'START'

class HmmDistribution:
    def __init__(self, seq=None, lp_smoother=1):
        self.token_counts = None
        self.label_counts = None
        self._fitted = False
        self._lp_smoother = lp_smoother
        self.N = 0
        if seq is not None:
            self.fit(seq)
    
    def is_fit(self):
        return self._fitted
    
    def _reset(self):
        self.token_counts = None
        self.label_counts = None
        self._fitted = False
        self.N = 0
    
    def fit(self, seq, reset=False):
        if not isinstance(seq[0], tuple):
            raise Exception('Data should be a sequence of (token, label) tuples.')
        
        if reset:
            self._reset()
        elif self.is_fit():
            token_counts = self.token_counts
            label_counts = self.label_counts
        else:
            token_counts = defaultdict(defaultdict_int(self._lp_smoother))
            label_counts = defaultdict(defaultdict_int(self._lp_smoother))
        
        prev_label = START_SENTINEL
        for token, label in seq:
            token_counts[token][label] += 1
            label_counts[label][prev_label] += 1
            self.N += 1
            
            prev_label = label
    
        if not self.is_fit():
            self.token_counts = token_counts
            self.label_counts = label_counts
            self._fitted = True
    
    def get_marginal_token(self, token):
        return self.get_token_count(token) / float(self.N)
    
    def get_marginal_label(self, label):
        return self.get_label_count(label) / float(self.N)
    
    def get_labels(self):
        return list(self.label_counts.keys())
    
    def get_tokens(self):
        return list(self.token_counts.keys())
    
    def get_token_count(self, token):
        return sum(self.token_counts[token].values())
    
    def get_label_count(self, label):
        return sum(self.label_counts[label].values())
    
    def get_top_token_labels(self, token, n=5):
        return sorted(self.token_counts[token].items(), key=lambda pair: pair[0], reverse=True)[:n]
    
    def get_joint(self, token, label):
        return self.token_counts[token][label] / float(self.N)
    
    def get_conditional(self, token, label):
        return self.get_joint(token, label) / self.get_token_count(token)
    
    def get_conditional_label(self, label, cond_label):
        return self.label_counts[label][cond_label] / self.get_label_count(cond_label)

In [4]:
'''
Here we can test our class against some POS-labeled text data from Kaggle.
'''
import csv

url = 'https://www.kaggle.com/abhinavwalia95/entity-annotated-corpus/downloads/ner_dataset.csv'

csv_path = 'data/ner_dataset.csv'
with open(csv_path, 'r') as csv_file:
    sequence = []
    line = csv_file.readline()
    
    # There are some bytes in the csv that cannot be decoded with the standard 'utf-8'
    while line:
        try:
            row = line.split(',')
            if len(row) == 4:
                labeled_pair = (row[1], row[2])
            sequence.append(labeled_pair)
            line = csv_file.readline()
        except UnicodeDecodeError as e:
            continue

sequence[:5]

[('Word', 'POS'),
 ('Thousands', 'NNS'),
 ('of', 'IN'),
 ('demonstrators', 'NNS'),
 ('have', 'VBP')]

In [11]:
dist = HmmDistribution(sequence)

print('Verb count: %i' % dist.get_label_count('VBP'))
print('Noun count: %i' % dist.get_label_count('NNS'))

dist.get_top_token_labels('I')

Verb count: 15970
Noun count: 79415


[('PRP', 141), ('NNP', 43)]

# Tagging using HMM Inference

Now that we have a model capable of fitting conditional distributions to a labeled sequence, we can use it to calculate the probability of a sequence of tokens and their hidden POS tags. This is simplified by the markov assumption that HMMs make: the transition from one state to the next is determined completely by fixed size window of prior states (in our case 1). Assuming the window of dependence is 1:

> At a given step the probability of observing a word *w* and tag *t_i* is P(w|t_i)P(t_i|t_i-1)
> The joint probability of a sequence of tags **t** can be factorized using the markov assumption:
>> P(**t**) = P(t_0) * product from i=1 to T of P(t_i|t_i-1)
> The joint probability for a labeled sequence is then:
>> P(**w**, **t**) = P(w_0|t_0) * P(t_0) * product from i=1 to T of P(t_i|t_i-1)P(w_i|t_i)

The code for this is:

In [12]:
import math

def joint_log_probability(model, seq):
    '''
    Calculate the log joint likelihood of a labeled sequence. Using log prevents underflow.
    '''
    N = model.N
    
    # Get the joint probability for the first step in the sequence
    word, tag = seq[0]
    prob = math.log(model.get_marginal_label(tag) * model.get_conditional(word, tag))
    
    # Iteratively compute the likelihood through the remaining sequence
    for w, t in seq[1:]:
        prob += math.log(model.get_conditional_label(t, tag) * model.get_conditional(w, t))
        tag = t
    
    return prob

sentence = "I have a dream"
tags = "NNP VBP DT NN"
labeled_seq = list(zip(sentence.split(' '), tags.split(' ')))
print("Joint probability is: {0:.9f}".format(joint_log_probability(dist, labeled_seq)))

Joint probability is: -66.009707223


# The Viterbi Algorithm
Now that we have a method of computing joint log probabilities for a labeled sequence, we can compare labels for a word sequence and find the optimal one... right?!

Unfortunately, this is intractable. If we had a sequence of *S* words, and a vocabulary size of *V*, the space we would be comparing over is *V^S*! Luckily, there is a better way to do this without searching over all permutations and it makes use, again, of the Markov assumption.

Observing that each hidden state depends only on the state before it for a *one-off* HMM, the transition from, say, 'NNP' to 'VBP' is the same, regardless of what tag preceded 'NNP'. This means that if we are computing the likelihood of a tag sequence that ends with (...,'NNP', 'VBP'), we would be wasting precious time recomputing the probability for all sequences (..., 'NNP') and then multiplying by P('VBP'|'NNP') for each; instead for any tag, e.g. 'VBP', we can see that the sequence with the highest probability will consist of the subsequence ending at X appended with 'VBP', whereby x is the tag that yields the largest probability P(t_1...x) * P('VBP'|x). Notice that in order to find the best tag x from all possible tags, we must do the same thing we did with 'VBP', that is find the sequence ending at X with the largest probability, and then compare amongst all x's. Thus we have a dynamic programming solution that requires evaluating only two steps of the sequence at each step.
> 1. At step 1, store a table of pointers for each tag to their joint probability with generating the first word.
> 2. For each subsequent step i > 1, we compute the max likelihood for any sequence ending at step i with some tag t_i and storing this probability (log probability in practice) with a pointer to tag t_i.
> 3. This max likelihood is calculated for each t_i, over all possible previous tags, t_i-1 (which will have their largest sequence probabilities stored).
> 4. We will also maintain a matrix of pointers NxT, where the optimal previous tag for each current tag is stored so that we can backtrack and recover the optimal tag sequence at the end of the algorithm.

Here is the code:

In [15]:
import numpy as np # For fast vector computations


class ViterbiTagger:
    def __init__(self, seq=None):
        self.model = HmmDistribution()
        self.seq = seq
        self.tags = None
        self.tokens = None
        self.tags_to_index = None
        if seq is not None:
            self.fit(seq)
    
    def fit(self, seq):
        self.model.fit(seq)
        self.tags = self.model.get_labels()
        self.tags_to_index = dict(((self.tags[i], i) for i in range(len(self.tags))))
        self.tokens = self.model.get_tokens()
        
    def viterbi(self, seq):
        priors = [math.log(self.model.get_joint(seq[0], tag)) for tag in self.tags]
        pointers = []
        
        for i, token in enumerate(seq):
            new_priors = []
            current_pointers = []
            
            for j, t_j in enumerate(self.tags):
                max_prob, max_pointer = 0, 0
                for k, t_k in enumerate(self.tags):
                    prob = math.log(self.model.get_conditional_label(t_j, t_k)) + \
                           priors[k]
                    if prob > max_prob:
                        max_prob = prob
                        max_pointer = k
                new_priors.append(max_prob + math.log(self.model.get_conditional(token, t_j)))
                current_pointers.append(max_pointer)
            
            priors = new_priors
            pointers.append(current_pointers)
        
        return priors, pointers
    
    def backtrack_tags(self, probs, pointers):
        T = len(pointers)
        max_prob, max_index = 0, 0
        for i, prob in enumerate(probs):
            if prob > max_prob:
                max_probs = prob
                max_index = i
        
        tags = [self.tags[max_index]]
        for t in range(T-2, -1, -1):
            max_index = pointers[t][max_index]
            tags.append(self.tags[max_index])
        
        return tags[::-1]
    
    def tag_sequence(self, tokens):
        probs, pointers = self.viterbi(tokens)
        tags = self.backtrack_tags(probs, pointers)
        
        return tags

In [16]:
tagger = ViterbiTagger()

tagger.fit(sequence)
tokens_train, tags_train = zip(*sequence[:100])

def accuracy(y_true, y_pred):
    correct = 0
    for i in range(len(y_true)):
        if y_true[i] == y_pred[i]:
            correct += 1
    return float(correct) / len(y_true)

tags_predict = tagger.tag_sequence(tokens_train)
print(accuracy(tags_train, tags_predict))

0.07
