# A Recurrent Neural Network POS Tagger

In [33]:
from __future__ import print_function
import sys

import torch
import torch.nn as nn
import torch.autograd as autograd
import torch.nn.functional as F
import torch.optim as optim

## Corpus Processing

In [34]:
def read_dataset(filename='./penn.train.pos'):
    fp = open(filename, 'r')
    dataset = []
    for line in fp:
        tokens = line.strip().split()
        dataset.append(([t.rsplit('/', 1)[0] for t in tokens], [t.rsplit('/', 1)[1] for t in tokens]))
    return dataset

training_data = read_dataset("train.pos")[:200]
devel_data = read_dataset("test.pos")[:100]

word_to_ix = {'UNK':0}
tag_to_ix = {'UNK':0}
for sent, tags in training_data:
    for word, tag in zip(sent, tags):
        if word not in word_to_ix:
            word_to_ix[word] = len(word_to_ix)
        if tag not in tag_to_ix:
            tag_to_ix[tag] = len(tag_to_ix)

def prepare_sequence(seq, to_ix):
    idxs = map(lambda w: to_ix[w] if w in to_ix else to_ix['UNK'], seq)
    tensor = torch.LongTensor(idxs)
    return autograd.Variable(tensor)

## RNN Tagger

<img src="images/rnn.png" style="width: 500px;"/>

In [35]:
class RNNTagger(nn.Module):

    def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size):
        super(RNNTagger, self).__init__()

        self.hidden_dim = hidden_dim
        
        # The word embedding layer map words (IDs) to their embedding vectors
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        
        # The RNN takes word embeddings as inputs, and outputs hidden states
        # with dimensionality hidden_dim.
        self.rnn = nn.RNNCell(embedding_dim, hidden_dim)
        
        # The linear layer maps hidden state space to the tag space
        self.hidden2tag = nn.Linear(hidden_dim, tagset_size)
        self.hidden = self.init_hidden()

    def init_hidden(self):
        # Before we've done anything, we dont have any hidden state.
        # The axes semantics are (num_layers, minibatch_size, hidden_dim)
        return autograd.Variable(torch.zeros(1, self.hidden_dim))

    def forward(self, sentence):
        # Embedding layer: map input words to embeddings
        # The axes semantics are (len_sentence, minibatch_size, embedding_dim)
        embeds = self.word_embeddings(sentence).view(len(sentence), 1, -1)
        
        # Incrementally obtain the RNN hidden states (left to right)
        rnn_hidden = []
        for i in xrange(len(sentence)):
            self.hidden = self.rnn(embeds[i], self.hidden)
            rnn_hidden.append(self.hidden)
        
        # Softmax layer: map RNN hidden states to the tag space
        tag_space = self.hidden2tag(torch.stack(rnn_hidden).view(len(sentence), -1))
        tag_scores = F.log_softmax(tag_space)
        return tag_scores

## Training and Testing

In [37]:
torch.manual_seed(66666)

EMBEDDING_DIM = 10
HIDDEN_DIM = 10

model = RNNTagger(EMBEDDING_DIM, HIDDEN_DIM, len(word_to_ix), len(tag_to_ix))

# Negative Log-likelihood loss
loss_function = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)

def train(epoch):
    model.train()
    train_loss = torch.Tensor([0])
    for sent, tags in training_data:
        # Step 1. Remember that Pytorch accumulates gradients.  We need to clear them out
        # before each instance
        model.zero_grad()
        
        # Also, we need to clear out the hidden state of the LSTM, detaching it from its
        # history on the last instance.
        model.hidden = model.init_hidden()
        
        # Step 2. Get our inputs ready for the network, that is, turn them into Variables
        # of word indices.
        sent_in = prepare_sequence(sent, word_to_ix)
        targets = prepare_sequence(tags, tag_to_ix)
        
        # Step 3. Run our forward pass.
        tag_scores = model(sent_in)
        
        # Step 4. Compute the loss, gradients, and update the parameters by calling
        # optimizer.step()
        loss = loss_function(tag_scores, targets)
        train_loss += loss.data
        loss.backward()
        optimizer.step()
        
    print("Epoch %d: loss = %.2f, " % (epoch, train_loss[0]), end='')

def test(epoch):
    model.eval()
    test_loss = torch.Tensor([0])
    correct = 0
    tot_words = 0
    for sent, tags in devel_data:
        sent_in = prepare_sequence(sent, word_to_ix)
        targets = prepare_sequence(tags, tag_to_ix)
        tag_scores = model(sent_in)
        pred = tag_scores.data.max(dim=1)[1]
        correct += pred.eq(targets.data).sum()
        tot_words += len(sent)
        
    print("Testing accuracy = %.2f%%" % (100. * correct / tot_words))

for epoch in xrange(30):
    train(epoch)
    test(epoch)

Epoch 0: loss = 1369.36, Testing accuracy = 35.24%
Epoch 1: loss = 1138.58, Testing accuracy = 40.18%
Epoch 2: loss = 1048.72, Testing accuracy = 41.98%
Epoch 3: loss = 987.34, Testing accuracy = 44.22%
Epoch 4: loss = 942.78, Testing accuracy = 45.45%
Epoch 5: loss = 909.83, Testing accuracy = 46.10%
Epoch 6: loss = 884.15, Testing accuracy = 46.35%
Epoch 7: loss = 863.34, Testing accuracy = 47.12%
Epoch 8: loss = 845.14, Testing accuracy = 47.45%
Epoch 9: loss = 828.16, Testing accuracy = 48.92%
Epoch 10: loss = 812.36, Testing accuracy = 49.33%
Epoch 11: loss = 798.16, Testing accuracy = 49.78%
Epoch 12: loss = 785.26, Testing accuracy = 50.10%
Epoch 13: loss = 773.38, Testing accuracy = 50.71%
Epoch 14: loss = 762.24, Testing accuracy = 51.29%
Epoch 15: loss = 751.65, Testing accuracy = 51.86%
Epoch 16: loss = 741.55, Testing accuracy = 52.06%
Epoch 17: loss = 731.94, Testing accuracy = 52.47%
Epoch 18: loss = 722.80, Testing accuracy = 52.63%
Epoch 19: loss = 714.14, Testing accur