## Project 1: POS Tagger

This notebook implements a neural part-of-speech tagger.

In [None]:
import torch

if torch.cuda.is_available():
    print('Found GPU')
else:
    print('Did not find GPU')

Found GPU


### Part-of-Speech Tagging

In this project, we associate each word only with its most common part of speech in the [Brown Corpus](https://www1.essex.ac.uk/linguistics/external/clmt/w3c/corpus_ling/content/corpora/list/private/brown/brown.html), which has been manually labeled with part-of-speech tags.  

Words are lowercased and filtered for length and frequency. Punctuation and numbers are removed.

In [None]:
import nltk
import random
from nltk.corpus import brown
from collections import defaultdict, Counter

nltk.download('brown')
nltk.download('universal_tagset')

brown_tokens = brown.tagged_words(tagset='universal')
print('Tagged tokens example: ', brown_tokens[:5])
print('Total # of word tokens:', len(brown_tokens))

max_word_len = 20

def most_common(s):
    "Return the most common element in a sequence."
    return Counter(s).most_common(1)[0][0]

def most_common_tags(tagged_words, min_count=3, max_len=max_word_len):
    "Return a dictionary of the most common tag for each word, filtering a bit."
    counts = defaultdict(list)
    for w, t in tagged_words:
        counts[w.lower()].append(t)
    return {w: most_common(tags) for w, tags in counts.items() if 
            w.isalpha() and len(w) <= max_len and len(tags) >= min_count}

brown_types = most_common_tags(brown_tokens)
print('Tagged types example: ', sorted(brown_types.items())[:5])
print('Total # of word types:', len(brown_types))

def split(items, test_size):
    "Randomly split into train, validation, and test sets with a fixed seed."
    random.Random(288).shuffle(items)
    once, twice = test_size, 2 * test_size
    return items[:-twice], items[-twice:-once], items[-once:]

val_test_size = 1000
all_data_raw = split(sorted(brown_types.items()), val_test_size)
train_data_raw, validation_data_raw, test_data_raw = all_data_raw
all_tags = sorted(set(brown_types.values()))
print('Tag options:', all_tags)

[nltk_data] Downloading package brown to /root/nltk_data...
[nltk_data]   Unzipping corpora/brown.zip.
[nltk_data] Downloading package universal_tagset to /root/nltk_data...
[nltk_data]   Unzipping taggers/universal_tagset.zip.
Tagged tokens example:  [('The', 'DET'), ('Fulton', 'NOUN'), ('County', 'NOUN'), ('Grand', 'ADJ'), ('Jury', 'NOUN')]
Total # of word tokens: 1161192
Tagged types example:  [('a', 'DET'), ('aaron', 'NOUN'), ('ab', 'NOUN'), ('abandon', 'VERB'), ('abandoned', 'VERB')]
Total # of word types: 18954
Tag options: ['ADJ', 'ADP', 'ADV', 'CONJ', 'DET', 'NOUN', 'NUM', 'PRON', 'PRT', 'VERB', 'X']


We first run a baseline that predicts `NOUN` for every word, as well as setting up basic evaluation functions.



In [None]:
def noun_predictor(raw_data):
    "A predictor that always predicts NOUN."
    predictions = []
    for word, _ in raw_data:
        predictions.append('NOUN')
    return predictions

def accuracy(predictions, targets):
    """Return the accuracy percentage of a list of predictions.
    
    predictions has only the predicted tags
    targets has tuples of (word, tag)
    """
    assert len(predictions) == len(targets)
    n_correct = 0
    for predicted_tag, (word, gold_tag) in zip(predictions, targets):
        if predicted_tag == gold_tag:
            n_correct += 1

    return n_correct / len(targets) * 100.0

def evaluate(predictor, raw_data):
    return accuracy(predictor(raw_data), raw_data)

def print_sample_predictions(predictor, raw_data, k=10):
    "Print the first k predictions."
    d = raw_data[:k]
    print('Sample predictions:', 
          [(word, guess) for (word, _), guess in zip(d, predictor(d))])

print('noun baseline validation accuracy:', 
      evaluate(noun_predictor, validation_data_raw))
print_sample_predictions(noun_predictor, validation_data_raw)

noun baseline validation accuracy: 55.1
Sample predictions: [('salem', 'NOUN'), ('unsympathetic', 'NOUN'), ('downwind', 'NOUN'), ('exodus', 'NOUN'), ('avoiding', 'NOUN'), ('informal', 'NOUN'), ('padded', 'NOUN'), ('tantalizing', 'NOUN'), ('farce', 'NOUN'), ('berger', 'NOUN')]


### Pytorch Tagger
We now build a neural tagging model. We need to do some basic data pre-processing by padding.

In [None]:
import torch
from torch import nn
import torch.nn.functional as F

In [None]:
def make_matrices(data_raw):
    """Convert a list of (word, tag) pairs into tensors with appropriate padding.
    
    character_matrix holds character codes for each word, 
      indexed as [word_index, character_index]
    character_mask masks valid characters (1 for valid, 0 invalid), 
      indexed similarly so that all inputs can have a constant length
    pos_labels holds part-of-speech values for each word as integer indices
    """
    max_len = max_word_len + 2  # leave room for word start/end symbols
    character_matrix = torch.zeros(len(data_raw), max_len, dtype=torch.int64) 
    character_mask = torch.zeros(len(data_raw), max_len, dtype=torch.float32)
    pos_labels = torch.zeros(len(data_raw), dtype=torch.int64)
    for word_i, (word, pos) in enumerate(data_raw):
        for char_i, c in enumerate('^' + word + '$'):
            character_matrix[word_i, char_i]= ord(c)
            character_mask[word_i, char_i] = 1
        pos_labels[word_i] = all_tags.index(pos)
    return torch.utils.data.TensorDataset(character_matrix, character_mask, pos_labels)

validation_data = make_matrices(validation_data_raw)

print('Sample datapoint after preprocessing:', validation_data[0])
print('Raw datapoint:', validation_data_raw[0])

We define a predictor for a network, taking a list of words (strings) and returning a list of part-of-speech tags (also strings).

In [None]:
def predict_using(network):
    def predictor(raw_data):
        """Return a list of part-of-speech tags as strings, one for each word.

        raw_data - a list of (word, tag) pairs.
        """
        with torch.no_grad():

            predictions = []

            network.eval()
            processed_data = make_matrices(raw_data)
            loader = torch.utils.data.DataLoader(processed_data)
            for chars_batch, mask_batch, pos_labels in loader:
                chars_batch, mask_batch = chars_batch.cuda(), mask_batch.cuda()
                logits = network(chars_batch, mask_batch)
                pred = torch.argmax(logits)
                predictions.append(all_tags[pred])

            network.train()

            return predictions

    return predictor


Now we define a helper training function.

In [None]:
import tqdm

def train(network, n_epochs=25):
    processed_data = make_matrices(train_data_raw)
    data_loader = torch.utils.data.DataLoader(processed_data)
    network = network.cuda()
    optimizer = torch.optim.Adam(network.parameters())
    
    predictor = predict_using(network)

    for epoch in range(n_epochs):
        print('Epoch', epoch)
        for batch in tqdm.tqdm_notebook(data_loader, leave=False):
            chars_batch, mask_batch, pos_batch = batch
            assert network.training

            optimizer.zero_grad()
            chars_batch, mask_batch, pos_batch = chars_batch.cuda(), mask_batch.cuda(), pos_batch.cuda()
            output = network(chars_batch, mask_batch)
            loss = F.cross_entropy(output, pos_batch)
            loss.backward()
            optimizer.step()

        validation_score = evaluate(predictor, validation_data_raw)
        print('Validation score:', validation_score)

        # early stopping
        if (epoch == 0) or (validation_score > best_score):
            best_score = validation_score
            torch.save(network.state_dict(), 'network.pt')

    network.load_state_dict(torch.load('network.pt'))

    return network

In [None]:
class POSTagger(nn.Module):
    def __init__(self, n_outputs): # pass whatever arguments you need
        super().__init__()

        # YOUR CODE HERE
        # create Modules from torch.nn (imported as nn)

        # BEGIN SOLUTION
        self.embeddings = nn.Embedding(256, 44)
        self.linear0 = nn.Linear(132, 256)
        self.linear1 = nn.Linear(256, 256)
        self.linear2 = nn.Linear(256, n_outputs)
        # END SOLUTION

    def forward(self, chars, mask):
        # for this network, `chars` should be an int64 tensor of character ids with size (batch, n_chars)
        # `mask` is a float32 tensor of size (batch, n_chars) that is 1.0 if the character at that position in `chars` is valid (else 0.0)
        # the function returns a float32 tensor of size (batch, n_pos)

        # YOUR CODE HERE

        # BEGIN SOLUTION
        embeds = self.embeddings(chars)
        # implementing character trigrams
        concat_embed = torch.cat((embeds[:,:-2], embeds[:,1:-1], embeds[:,2:]), dim=2).squeeze(0)
        concat_embed = F.relu(self.linear0(concat_embed)).unsqueeze(0)
        pooled_masked_embeds = (concat_embed*mask[:,2:].unsqueeze(-1)).mean(1)
        output = F.relu(self.linear1(pooled_masked_embeds))
        output = F.dropout(output, training=self.training)
        output = self.linear2(output)

        return output
        # END SOLUTION


In [None]:
print_sample_predictions(improved_predictor, validation_data_raw)

print_sample_predictions(improved_predictor, [['kleining','X'], ['deneroful','X']])

Sample predictions: [('salem', 'NOUN'), ('unsympathetic', 'ADJ'), ('downwind', 'VERB'), ('exodus', 'NOUN'), ('avoiding', 'VERB'), ('informal', 'ADJ'), ('padded', 'VERB'), ('tantalizing', 'VERB'), ('farce', 'NOUN'), ('berger', 'NOUN')]
Sample predictions: [('kleining', 'VERB'), ('deneroful', 'ADJ')]


We can save outputs.

In [None]:
def save_predictions(predictions, filename):
    """Save predictions to a file.
    
    predictions is a list of strings.
    """
    with open(filename, 'w') as f:
        for pred in predictions:
            f.write(pred)
            f.write('\n')

print('test score improved:', evaluate(improved_predictor, test_data_raw))
test_predictions = improved_predictor(test_data_raw)
save_predictions(test_predictions, 'predicted_test_outputs_improved.txt')