# Part of Speech Tagging in Python

Recently, I came across Professor Michael Collins's [collection of lectures](http://www.cs.columbia.edu/~mcollins/) on  statistical natural language processing. Inspired by his first two chapters, I've put together a simple part-of-speech tagger that incorporates many of the concepts covered.

All functions are available on GitHub, [here](https://github.com/jtgonz/simple_pos_tagger).

## The Task

**Read in a sentence, and assign the correct part of speech (noun, verb, etc.) to each word.**

Say we have a sequence of words:

$$
\underset{\displaystyle\mathstrut x_1}{\strut \text{The}}\ 
\underset{\displaystyle\mathstrut x_2}{\strut \text{big}}\ 
\underset{\displaystyle\mathstrut x_3}{\strut \text{dog}}\ 
\underset{\displaystyle\mathstrut x_4}{\strut \text{barks}}
$$

We also have a sequence of part-of-speech tags:

$$
\underset{\displaystyle\mathstrut y_1}{\strut \text{DT}}\ 
\underset{\displaystyle\mathstrut y_2}{\strut \text{JJ}}\ 
\underset{\displaystyle\mathstrut y_3}{\strut \text{NN}}\ 
\underset{\displaystyle\mathstrut y_4}{\strut \text{VB}}\ 
\underset{\displaystyle\mathstrut y_5}{\strut \text{STOP}}
$$
<br>
Our tagger will model the probabliity that the sequence of words $x_1...x_n$ is seen with the sequence of part-of-speech tags $y_1...y_{n+1}$. Then, given a sequence of words, we can use the model to choose the sequence of tags that maximizes that probability $p$.


This tagger takes the form of a **Hidden Markov Model**, shown below.
<p>

$$p(x_1...x_n,y_1...y_{n+1}) = \prod_{i=1}^{n+1}q(y_i|y_{i-2},y_{i-1}) \prod_{i=1}^ne(x_i|y_i)$$

The first product is a second-order Markov sequence. Here, we're making the the assumption that each part-of-speech tag depends on the two preceding tags (and only on the two preceeding tags). This probability is given by our **transition parameter** $q(s|u,v)$. 

The second product represents the probability of seeing a certain word given a certain tag. This is given by our **emission parameter** $e(x|s)$.

We need to estimate both of these parameters. To do so, we'll use training data from the CoNLL-2000 corpus, a collection of articles from the Wall Street Journal. Both the training and the test data are available for free [here](http://www.cnts.ua.ac.be/conll2000/chunking/).

## Split training data

Before building anything, we'll split the given training data ("train.txt") into two sets. We'll use the first set ("train_data.txt") to calculate our transition and emission parameters. Later, we'll use the second set ("dev_data.txt") as development data to create some additional smoothing parameters. The final split looks like this:

- **train_data.txt** (2.1 MB, from train.txt)
- **dev_data.txt** (723 KB, from train.txt)
- **test.txt** (639 KB)

In [8]:
# create and open output files
train_file = open('train_data.txt', 'w+')
dev_file = open('dev_data.txt', 'w+')

# split train.txt into train_data.txt and dev_data.txt
# this will be a 75/25 split
sentence = ''
with open('train.txt') as corpus:
    for i,line in enumerate(corpus):
        sentence += line
        if line == '\n':
            train_file.write(sentence) if i % 4 else dev_file.write(sentence)
            sentence = ''
            
# close output files
train_file.close()
dev_file.close()

## Get frequency counts

Our next step will be to examine the training data, and count how often certain patterns appear. In particular, we're looking for:

- **ngram_counts** - How often we see certain tag sequences in the training data
- **emiss_counts** - How often we see certain words in the training data

A more detailed explanation follows the code below.

In [472]:
from collections import defaultdict

def get_all_counts(infile, n=3):
    """
    Iterate through corpus, get n-gram counts and emission counts.
    """

    # create dictionaries to store ngram and emission counts
    ngram_counts = defaultdict(lambda: 0)
    emiss_counts = defaultdict(lambda: 0)

    # initialize tag list to ['*','*','*',...]
    tag_list = ['*'] * n

    with open(infile) as corpus:
        for line in corpus:

            # reached the end of a sentence
            if line == '\n':

                # if there is no data, just skip
                if tag_list[-1] == '*': continue

                word = False
                tag = 'STOP'

            # get word and tag from line
            else:
                word, tag, chunk = line.rstrip().split(' ')

            # add new tag to end of tag list
            tag_list.pop(0)
            tag_list.append(tag)

            # increment count by 1 when ngram appears in corpus
            for i in xrange(1,n+1):
                tag_sequence = tuple(tag_list[-i:])
                ngram_counts[tag_sequence] += 1

            # increment emmision count by 1, or reset tag list if no more words
            if word:
                emiss_counts[tag, word] += 1
                emiss_counts['_', word] += 1
            else:
                tag_list = ['*'] * n

    return ngram_counts, emiss_counts

In [473]:
ngram_counts, emiss_counts = get_all_counts('train_data.txt')

This function returns a couple of useful things. The first value, `ngram_counts`, is a dictionary mapping n-grams to their frequency in the training data. For example:

**How many times was an adjective (JJ) followed by a noun (NN) and then a past-tense verb (VBD)?**

In [60]:
ngram_counts[('JJ', 'NN', 'VBD')]

185

**How many times did we see a verb followed by an adjective?**

In [61]:
ngram_counts[('VBD','JJ')]

299

**How often did a sentence begin with a personal pronoun?**

In [62]:
ngram_counts[('*', '*','PRP')] # could also use ngram_counts[('*','PRP')]

381

We also have information on the conditional frequencies of words in the training data. The second value, `emiss_counts`, tells us how many times we saw a certain word, given a certain part-of-speech tag. For example:

**How many times did we see the word "charge" used as a noun?**

In [63]:
emiss_counts[('NN', 'charge')]

23

**How many times did we see the word "charge" used as a verb?**

In [64]:
emiss_counts[('VB', 'charge')]

5

## Building a baseline tagger

With this basic information, we can put together a simple tagger to use as a baseline. For each word, this tagger will choose the part of speech most frequently associated with that word in the training data. For example, the word "charge" will always be labeled as a noun (see example above).

In [281]:
def run_baseline(word_seq, e_counts, tag_set):
    return [max({(tag,word):emiss_counts[tag,word] \
                 for tag in tag_set}.iteritems(), key=lambda x:x[1])[0][0] for word in word_seq]

def get_unique_unigrams(ngram_counts):
    return {''.join(ngram) for ngram in ngram_counts if len(ngram) == 1 and ngram_counts[ngram]}

We can then run the baseline tagger like this:

In [282]:
tag_set = get_unique_unigrams(ngram_counts)
sentence = ['the', 'dog', 'went', 'into', 'the', 'house']

run_baseline(sentence, emiss_counts, tag_set)

['DT', 'NN', 'VBD', 'IN', 'DT', 'NNP']

## Scoring the tagger

We'll use three metrics to measure the effectiveness of our tagger.

1. **Recall.** Out of all the test words, how many were we able to tag?
2. **Precision.** Out of all the words we tagged, how many did we tag correctly?
3. **F-Score.** The geometric mean of precision and recall.

Below, we've defined a function that takes a tagger and test corpus as input, and outputs each of these three metrics.

In [751]:
from __future__ import division

def score_tagger(infile, tagger, *params):
    """
    description goes here
    Precision: # of correctly tagged words / # of tagged words
    Recall: # of tagged words / # of words
    """
    
    # store counts of tagged words
    num_correct_words = num_tagged_words = num_words = 0

    with open(infile) as corpus:
        word_seq = []
        answer = []
        
        for line in corpus:
            
            if line == '\n':

                # if there is no data, just skip
                if word_seq == []: continue

                # tagger will return either a list of tags (ex. ['DT','NN','VB'])
                # or ['*','*',...'*'] if sentence could not be tagged
                result = tagger(word_seq, *params)
                
                # get number of correctly tagged words
                num_correct_words += sum([result[i] == answer[i] for i in xrange(len(answer))])
                num_tagged_words += len(result) if result[0] != '*' else 0
                num_words += len(result)

                # reset sentence
                word_seq = []
                answer = []

                continue

            # get word and tag, add to list
            word, tag, chunk = line.rstrip().split(' ')
            word_seq.append(word)
            answer.append(tag)

    # calculate precision and recall
    precision_words = num_correct_words/num_tagged_words
    recall_words = num_tagged_words/num_words
    fscore_words = 2*precision_words*recall_words/(precision_words+recall_words)
    
    return precision_words, recall_words, fscore_words

In [752]:
score_tagger('test.txt', run_baseline, emiss_counts, tag_set)

(0.8043776516030986, 1.0, 0.8915845869499099)

The baseline tagger does a decent job, assigning a label to every word and correctly labeling around 80% of words in the corpus. Now that we have an idea of where we're starting from, we can work on building the Hidden Markov Model.

## Get maximum likelihood estimates

We can use our obtained `ngram_counts` and `emiss_counts` to calculate maximum likelihood estimates for the transition and emission parameters. These are defined as:

$$q_{ML}(s|u,v) = \frac{c(u,v,s)}{c(u,v)} =
\frac{\texttt{ngram_counts[u,v,s]}}{\texttt{ngram_counts[u,v]}}$$

<br>

$$e_{ML}(x|s) = \frac{c(s \rightarrow x)}{c(s)} =
\frac{\texttt{emiss_counts[s,x]}}{\texttt{ngram_counts[s]}}$$

In [534]:
def count_unique_unigrams(ngram_counts):
    return sum(ngram_counts[ngram] and len(ngram) == 1 for ngram in ngram_counts) - 1

def count_total_words(emiss_counts):
    return sum(key[0] == '_' and emiss_counts[key] for key in emiss_counts)

def get_ml_estimates(ngram_counts, emiss_counts):
    """
    Get maximum likelihood estimates for transition and emission
    probabilities.
    """

    # create dictionaries to store maximum likelihood estimates
    # for transition and emission parameters
    qml_est = defaultdict(lambda: 0)
    eml_est = defaultdict(lambda: 0)

    num_unique_tags = count_unique_unigrams(ngram_counts)
    num_total_words = count_total_words(emiss_counts)

    # get maximum likelihood estimates for transitions
    # qml_est[(u,v,s)] = q_ml(s|u,v) = count(u,v,s) / count(u,v)
    for ngram in ngram_counts.keys():

        if len(ngram) > 1:
            qml_est[ngram] = \
            ngram_counts[ngram] / (ngram_counts[ngram[:-1]] or ngram_counts[('STOP',)])
        else:
            qml_est[ngram] = ngram_counts[ngram] / num_unique_tags

    # get maximum likelihood estimates for emissions
    # eml_est[s][x] = e_ml(x|s) = count(s -> x) / count(s)
    for key in emiss_counts:
        eml_est[key] = emiss_counts[key] / (ngram_counts[(key[0],)] or num_total_words)

    return qml_est, eml_est

In [535]:
qml_est, eml_est = get_ml_estimates(ngram_counts, emiss_counts)

For now, we'll use these estimates (`qml_est` and `eml_est`) as our transition and emission parameters.

## Viterbi algorithm with maximum likelihood estimates

Now that we've got estimates for our transition and emission parameters, how do we find the most likely tag sequence?

One method would be examine all possible tag sequences for a given sentence, and calculate $p(x_1...x_n,y_1...y_{n+1})$ for each sequence. This method, however, is very inefficient. With $S$ possible tags for each word, and $n$ words in the sentence, we'd have to calculate $S^n$ probabilities and compare them all.

The Viterbi algorithm works much quicker, solving this problem in $nS^{k+1}$ time (where $k$ is the order of the Markov sequence).

In [498]:
from itertools import product

def run_viterbi(word_seq, q_params, e_params, tag_set, use_pseudo=False, order=2):
    """
    Implementation of the Viterbi algorithm with backpointers. Returns
    the tags that maximize the probability of a given sentence occuring,
    based on an n-gram hidden markov model.

    Args:
        q_params (defaultdict): Transition probabilities. q(s|u,v) -> q_params(u,v,s)
        e_params (defaultdict): Emission probabilities. e(x|s) -> e_params(s,x)
        word_seq (list): The sentence to be tagged.
        tag_set (set): Set of potential tags for a word.
        order (int): The order of the Markov sequence. Defaults to 2.
    """
    
    # base case for pi parameters, initialize backpointers
    pi_params = {(0,('*',) * n):1 for n in xrange(1,order+1)}
    bp_params = {}
    
    # probability words are seen with tags
    sent_prob = 0

    for k,word in enumerate(word_seq, start=1):

        # create list of tag sets for words at position k-order+1 to k
        tag_set_list = [tag_set if i+1 > 0 else {'*'} for i in xrange(k-order,k)]

        # create set of all possible tag sequences that end at position k
        tag_seq_list = {tag_seq for tag_seq in product(*tag_set_list)}
        
        # map unseen words to pseudo-words
        if use_pseudo and e_params['_',word] == 0:
            word = map_to_pseudo_word(word, k)

        # iterate through tag sequences that end at position k
        for tag_seq in tag_seq_list:

            pi_params[k, tag_seq] = 0   # initialize pi parameter

            # loop through set of tags in leftmost position
            for tag in k-order > 0 and tag_set or {'*'}:

                pi_key = k-1, (tag,) + tag_seq[:-1]     # looks like k-1,(w,u)
                q_key = (tag,) + tag_seq                # looks like w,u,v -- transition param v|w,u
                e_key = tag_seq[-1], word               # looks like v,x -- emission param x|v

                temp = pi_params[pi_key] * q_params[q_key] * e_params[e_key]
                
                # get largest pi parameter and remember arguments
                if temp > pi_params[k, tag_seq]:
                    pi_params[k, tag_seq] = temp
                    bp_params[k, tag_seq] = tag
                    
            if k == len(word_seq):

                temp_sent_prob = pi_params[k, tag_seq] * q_params[tag_seq + ('STOP',)]

                if temp_sent_prob > sent_prob:
                    sent_prob = temp_sent_prob
                    final_tag_seq = tag_seq
    
    # were we able to tag the sentence?
    if not sent_prob:
        return ['*'] * k
    
    # retrieve tags for each word in the sentence
    final_tag_seq = list(final_tag_seq)
    for k in xrange(k-order, 0, -1):
        final_tag_seq.insert(0, bp_params[k+order, tuple(final_tag_seq[:order])])

    return final_tag_seq

We can then run the tagger like this:

In [514]:
tag_set = get_unique_unigrams(ngram_counts)
sentence = ['Theodore', 'fed', 'his', 'pet', 'macaw', '.']

run_viterbi(sentence, qml_est, eml_est, tag_set, True)

['NNP', 'VBD', 'PRP$', 'JJ', 'NN', '.']

How well does it do on the test corpus?

In [521]:
tagger_1 = score_tagger('test.txt', run_viterbi, qml_est, eml_est, tag_set)

| Tagger        | F-Score       | Precision  | Recall  |
| ------------- |------------:| ----:|----:|
| Baseline      | 89.16% | 80.44% |100.0%|
| Maximum Likelihood Estimates  | 6.27%      |  98.24% |3.24%|

While we see a large jump in precision, there's an much more serious drop in recall. We're only able to find tags for around three percent of words, and the F-score suffers as a result.

This is a consequence of sparse training data. If a word does not appear in the training corpus, then the emission parameter for that word, $e_{ML}(x|s)$, will be zero. Remember that our probability $p(x_1...x_n,y_1...y_{n+1})$ is a product of the emission and transition parameters for every word in the sentence. Any unseen word in a test sentence will cause the tag sequence probability to drop to zero, regardless of what the rest of the words in the sentence are.

## Pseudo-word mapping

One way to solve this problem is by mapping low-frequency words in the training data to a set of pseudo-words. Then, when we encounter a word in the test data that we've never seen before, we can map it to a pseudo-word and use the corresponding emission parameter. This has the effect of closing the vocabulary -- every word we could encounter in the test data will be seen at least once in the training data.

I've used the mapping below for training this tagger. If a word occurs in the training data less than five times, it's changed to the approriate pseudo-word.

| Criteria        | Pseudo-Word       |
| ------------- |------------|
| First word in sentence | `__firstWord__` |
| All capital letters | `__allCaps__` |
| Capital letter followed by period | `__capPeriod__` |
| First letter is capitalized | `__initCap__` |
| All letters uncapitalized | `__lowercase__` |
| Begins with '\$', contains numbers | `__currency__` |
| Numeric value | `__number__` |
| All other words | `__other__` |

We can use [this script](https://github.com/jtgonz/simple_pos_tagger/blob/master/replace_words.py) to create a new training corpus, with low-frequency words mapped to their appropriate psuedo-words.

In [9]:
! python replace_words.py train_data.txt > train_data_replaced.txt

The result looks like this:

`Confidence in the pound is widely expected to take another share dive if trade figures for September, due for release tomorrow, fail to show a substantial improvement from July and August's near-record deficits.`

**`__firstWord__`** `in the pound is widely expected to take another share` **`__lowercase__`** `if trade figures for September, due for release tomorrow,` **`__lowercase__`** `to show a substantial improvement from July and August's` **`__other__ __lowercase__`**`.`

With the newly updated training corpus (and a slight modification to `run_viterbi`), we can re-train and re-evaluate the tagger. 

In [757]:
import re

# This is called by run_viterbi when we set pseudo=True

def map_to_pseudo_word(word, k):
    
    if k == 1:                                          # first word in sentence
        pseudo = '__firstWord__'
    elif re.match(r'[A-Z]+$', word):                    # organization
        pseudo = '__allCaps__'
    elif re.match(r'[A-Z]\.$', word):                   # person name initial
        pseudo = '__capPeriod__'
    elif re.match(r'[A-Z]\w*$', word):                  # capitalized word
        pseudo = '__initCap__'
    elif re.match(r'[a-z]\w*$', word):                  # uncapitalized word
        pseudo = '__lowercase__'
    elif re.match(r'\$[0-9][0-9,.]*$', word):           # monetary amount (dollars)
        pseudo = '__currency__'
    elif re.match(r'[0-9]+[0-9-/.,A-Za-z]*$', word):    # numeric value
        pseudo = '__number__'
    else:                                               # other
        pseudo = '__other__'
    
    return pseudo

In [741]:
ngram_counts, emiss_counts = get_all_counts('train_data_replaced.txt')
qml_est, eml_est = get_ml_estimates(ngram_counts, emiss_counts)

In [524]:
# with pseudo-words
tagger_2 = score_tagger('test.txt', run_viterbi, qml_est, eml_est, tag_set, True)

| Tagger        | F-Score       | Precision  | Recall  |
| ------------- |------------:| ----:|----:|
| Maximum Likelihood Estimates  | 6.27%      |  98.24% |3.24%|
| Maximum Likelihood Estimates w/ pseudo-words      | 86.90% | 92.71% |81.77%|

This is a large improvement over the last tagger. But there is still more that can be done to boost recall.

## Discounted maximum likelihood estimates

Sparse training data doesn't only affect the emission parameters -- it affects the transition parameters as well. They may be certain trigram tag sequences in the test data that we never see in the training data.

One way to account for this is by using discounting methods. We'll define the discounted estimate as:

$$q_D(w|u,v) = \frac{c(u,v,w)-\beta}{c(u,v)}\text{, where } \beta \text{ is between 0 and 1}$$

The intuition here is that with a pure ML estimate, we are likely overestimating trigrams that are seen in the training corpus and underestimating trigrams that are not seen. Discounting methods attempt to correct this by subtracting a small amount from our observed trigrams, and distributing that subtracted probability mass evenly among the unobserved trigrams.

To choose the value for $\beta$, we'll test the tagger on the development data and iterate through a few possible values.

In [742]:
def get_discounted_estimates(ngram_counts, tag_set, beta=0.5):

    qd_est = defaultdict(lambda: 0)
    alpha = defaultdict(lambda: 0)

    # get discounted estimates
    for ngram in ngram_counts.keys():
        if not ngram_counts[ngram] or len(ngram) < 2: continue

        qd_est[ngram] = (ngram_counts[ngram] - beta) / (ngram_counts[ngram[:-1]] or ngram_counts[('STOP',)])

        #if ngram[:-1] == ('DT','DT'):
        #    print qd_est[ngram], ngram, ngram[-1], ngram_counts[ngram], ngram_counts[ngram[:-1]]

        alpha[ngram[:-1]] += qd_est[ngram]

    # create missing mass
    for ngram in alpha.keys():
        alpha[ngram] = 1 - alpha[ngram]

    # divide missing mass in proportion to ngram estimates
    for i in xrange(2,2+order):

        # calculate denominator (sum of either ML or qd estimates)
        if i == 2:
            denom_counts = \
            {(v,):sum([ngram_counts[w,] if ngram_counts[v,w] == 0\
                       else 0 for w in tag_set]) for v in tag_set}
        else:
            tag_set_list = [tag_set - {'STOP',}] * (i-2) + [tag_set]
            denom_counts = \
            {ngram:sum([qd_est[ngram] if ngram_counts[ngram+tuple(w)] == 0\
                        else 0 for w in tag_set]) for ngram in product(*tag_set_list)}

        # calculate values for unseen ngrams
        tag_set_list = [tag_set - {'STOP',}] * (i-1) + [tag_set]
        for ngram in product(*tag_set_list):

            first = ngram[:-1]
            last = ngram[1:]

            if not ngram_counts[ngram] and i == 2:
                qd_est[ngram] = alpha[first] * ngram_counts[last] / denom_counts[first]

            elif not ngram_counts[ngram]:
                qd_est[ngram] = alpha[first] * qd_est[last] / denom_counts[first]

    return qd_est

In [743]:
# find best beta value by testing on development data
results_discounted = []
for i in xrange(1,10):
    qd_est = get_discounted_estimates(ngram_counts, tag_set, i/10)
    result = score_tagger('dev_data.txt', run_viterbi, qd_est, eml_est, tag_set, True)
    results_discounted.append(result)

We get the best results with $\beta=0.1$. With our beta value chosen, we'll now score the tagger on the test dataset.

In [755]:
# with discounted ML estimates
qd_est = get_discounted_estimates(ngram_counts, tag_set, 0.1)
tagger_3 = score_tagger('test.txt', run_viterbi, qd_est, eml_est, tag_set, True)

| Tagger        | F-Score       | Precision  | Recall  |
| ------------- |------------:| ----:|----:|
| Baseline      | 89.16% | 80.44% |100.0%|
| Maximum Likelihood Estimates  | 6.27%      |  98.24% |3.24%|
| Maximum Likelihood Estimates w/ pseudo-words      | 86.90% | 92.71% |81.77%|
| Discounted ML Estimates w/ pseudo-words  | 95.11%      |   92.70% |97.64%|

The final tagger does fairly well, achieving an F-score of **95.11%**. Around 98% of words are assigned a tag, and 93% of those tags are correct. As a point of reference, the Stanford Tagger 2.0 (the current state-of-the-art) has an F-score just a few points higher, at 97.32%. So not too bad!