<a href="https://colab.research.google.com/github/teymour-aldridge/NN/blob/master/text/pos_tagger/GRU_tagger.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
import torch
from torch import nn
import torch.nn.functional as F
import nltk
from nltk.corpus import brown

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

# Lookup table

This is essentially a lookup table, mapping each word to a number. Numbers are useful, because they can be input into a neural network (letters can't).

The table is built by looking through every word in the corpus, assigning a number to every word. 

In [0]:
class Lang:
  def __init__(self):
    # Create a dictionary to map words to numbers
    self.word2index = {}
    # Create a dictionary to count the frequency of each word
    self.word2count = {}
    # Create a dictionary to map numbers to words
    self.index2word = {}
    self.n_words = 0
  def add_word(self, word):
    # Check if the word is already assigned to a number
    if word in self.word2index:
      self.word2count[word] += 1
    # Otherwise add it to the dictionary
    else:
      self.word2index[word] = self.n_words
      self.index2word[self.n_words] = word
      self.word2count[word] = 1
      # Increase the total number of words by 1
      self.n_words += 1
  def add_sentence(self, sentence):
    for word in sentence:
      self.add_word(word)

# Preprocess the dataset

The python NLTK (natural language toolkit) is a handly collation of natural language processing tools and datasets. We are interested in a dataset called the Brown Corpus, the first computer-readable general corpus of texts in the english langauge. 

In [6]:
# Download the dataset
nltk.download('brown')
# Load the sentences
brown_dataset = brown.tagged_sents()

[nltk_data] Downloading package brown to /root/nltk_data...
[nltk_data]   Unzipping corpora/brown.zip.


In [0]:
from torch.utils.data import Dataset
REMOVE_TOKENS = "',:!01234567.-_+=-?" + '"'
class POSDataset(Dataset):
  def __init__(self, sentences, tagged_sentences, sentence_language, tagged_language):
    self.sentences = sentences
    self.tagged_sentences = tagged_sentences
    self.sents = sentence_language
    self.pos_tags = tagged_language
    
  def __getitem__(self, i):
    item, item_tags = self.sentences[i], self.tagged_sentences[i]
    item2index = []
    item_tags2index = []
    for i, token in enumerate(item):
      item2index.append(self.sents.word2index[token])
    for i, token in enumerate(item_tags):
      item_tags2index.append(self.pos_tags.word2index[token])
    return torch.tensor(item2index, dtype=torch.long, device=device), torch.tensor(item_tags2index, dtype=torch.long, device=device)
  
  @classmethod
  def from_corpus(cls, corpus):
    sentences = []
    tagged_sentences = []
    for item in corpus:
      sentences.append([pair[0].strip(REMOVE_TOKENS) for pair in item if len(pair[0].strip(REMOVE_TOKENS)) > 0])
      tagged_sentences.append([pair[1] for pair in item if len(pair[0].strip(REMOVE_TOKENS)) > 0])
    sents, pos_tags = Lang(), Lang()
    for item in sentences:
      sents.add_sentence(item)
    for item in tagged_sentences:
      pos_tags.add_sentence(item)
    return POSDataset(sentences, tagged_sentences, sents, pos_tags)

In [0]:
brown_dataset = POSDataset.from_corpus(brown_dataset)

# Defining the model

The model looks something like ```INPUT => WORD EMBEDDING => GATED RECURRENT UNIT(S) => FEEDFORWARD LAYER = > OUTPUTS ```

In [0]:
class POSTagger(nn.Module):
  def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size, n_layers, device=device):
    super(POSTagger, self).__init__()
    self.embedding = nn.Embedding(vocab_size, embedding_dim)
    self.gru = nn.GRU(embedding_dim, hidden_dim, n_layers)
    self.word2tag = nn.Linear(hidden_dim, tagset_size)
  def forward(self, x):
    x.to(device)
    x = self.embedding(x)
    x = x.unsqueeze(0)
    x, _ = self.gru(x)
    x = self.word2tag(x.view(len(sentence), -1))
    x = F.log_softmax(x, dim=1)
    return x

# Training loop

This loop is where we compute our partial derivatives.

In [13]:
import os
import time
model_save_path = '/content/gdrive/My Drive/Computing/ML/Text/POS tagger/'
N_EPOCHS = 100
EMBEDDING_DIM = 256
HIDDEN_DIM = 12

# Work out the vocabulary size
vocab_size = len(brown_dataset.sents.word2index)
# Work out the size of the output vocabulary
tagset_size = len(brown_dataset.pos_tags.word2index)
# Initialize the model and (if necessary) move it to the GPU
model = POSTagger(EMBEDDING_DIM, HIDDEN_DIM, vocab_size, tagset_size, 1).to(device)
# Define a loss function
loss_fn = nn.NLLLoss().to(device)
# Use stochastic gradient desent
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)

# Pick up from the last epoch
epoch_n = 0 if not os.path.isfile(os.path.join(model_save_path, 'pos_tagger_epoch_count.txt')) else int(open(os.path.join(model_save_path, 'pos_tagger_epoch_count.txt')).read())
print("Epoch: {}".format(epoch_n))
# Check if there are some saved model weights, and if so, use them. 
if os.path.isfile(os.path.join(model_save_path, 'pos_tagger_epoch_count.txt')):
  model.load_state_dict(torch.load(os.path.join(os.path.join(model_save_path, 'gru_tagger_model_weights.pt'))))
  print("Loaded weights.")
for i in range(epoch_n, N_EPOCHS):
  # Get the time
  t = time.time()
  # Loop through the entire dataset
  for sentence, tags in brown_dataset:
    # Ignore the training sample if it is of length 0 or less
    
    if len(sentence) == 0 or len(tags) == 0:
      continue
      
    # Remove accumulated gradients
    model.zero_grad()
    # Make a prediction
    pred = model(sentence)
    # Work out how far off the prediction was
    loss = loss_fn(pred, tags)
    # Update the model
    loss.backward()
    
    optimizer.step()
  # Print the time taken per epoch
  print("Epoch: {}, Loss: {}, Time: {}".format(i, loss.item(), time.time() - t))
  # Save the weights
  torch.save(model.state_dict(), os.path.join(model_save_path, 'gru_tagger_model_weights.pt'))
  # Save the epoch count
  with open(os.path.join(model_save_path, 'pos_tagger_epoch_count.txt'), 'w+') as f:
    f.write(str(i))

Epoch: 99
Loaded weights.


KeyboardInterrupt: ignored

# Make a prediction

The meaning of each token is given in the [instruction manual](http://clu.uni.no/icame/manuals/BROWN/INDEX.HTM#bc6).

In [0]:
sentence = "The shops were walked to by me".split(' ')
sentence_tokenized = [brown_dataset.sents.word2index[word] for word in sentence]

In [21]:
sentence_tokenized = torch.tensor(sentence_tokenized, dtype=torch.long, device=device)
pred = model(sentence_tokenized)
pred = [torch.argmax(row).item() for row in pred]
pred

[0, 22, 67, 4, 24, 7, 35]

In [22]:
# Correct!
[brown_dataset.pos_tags.index2word[index] for index in pred]

['AT', 'NP', 'BED', 'VBD', 'TO', 'IN', 'PPO']