## CPSC 436N Assignment 1 - Language Models

In this assignment you will implement both a count-based N-gram language model and a simple neural language model.

For the neural model, familiarize yourself with pytorch: https://pytorch.org/tutorials/beginner/basics/intro.html




Let's load a number of dependencies.

In [21]:
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F

from tqdm import tqdm
from itertools import count
from scipy.special import softmax
from torch.nn.utils import clip_grad_norm_
from collections import defaultdict, Counter

# Check if GPU is available to pytorch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# needed for working in log space
import math

Next, we implement the corpus and vocabulary objects. The vocabulary object has an index for each word type, and the corpus object reads a text corpus and returns the word type index for each word instance in the corpus. It is a list of lines.  

In [2]:
class Vocabulary(object):
    def __init__(self):
        self.word2idx = defaultdict(count(0).__next__)

        # Add special tokens
        _ = self.word2idx['<start>']
        _ = self.word2idx['<end>']
        _ = self.word2idx['<unk>']

    def add_word(self, word):
        _ = self.word2idx[word]

    def __len__(self):
        return len(self.word2idx)

    def idx2word(self):
        return {i: w for w, i in self.word2idx.items()}


class Corpus(object):
    def __init__(self, path, n=2, vocab=None):
        # Only initialize for the train corpus.
        # Then, for the dev and test corpus, use the vocabulary
        # from the training set
        if vocab is None:
          self.vocab = Vocabulary()
        else:
          self.vocab = vocab

        # Read the entire corpus to memory
        lines = open(path).readlines()

        # "Tokenize" (split to words) and add the <start> tokens based on the
        # number of n-grams, and the <end> tokens, for each line.
        # We will lowercase the corpus because we don't have enough data.
        lines = [['<start>'] * (n-1) + l.lower().split() + ['<end>'] for l in lines]

        # Convert word instance to word type ID. If we are building the
        # vocabulary, add new words, other map to existing words or '<unk>'.
        if vocab is None:

          # Only keep words that appeared at least twice in the corpus
          # and use <unk> for infrequent words.
          freq = dict(Counter([w for line in lines for w in line]))
          lines = [[self.vocab.word2idx[w]
                    if freq[w] > 1
                    else self.vocab.word2idx['<unk>']
                    for w in l] for l in lines]

          # Convert from defaultdict to dictionary so that we can't add more tokens
          self.vocab.word2idx = dict(self.vocab.word2idx)
        else:
          lines = [[self.vocab.word2idx.get(w, self.vocab.word2idx['<unk>'])
                    for w in l]
                   for l in lines]

        self.lines = lines

Load the train, dev, and test sets. These are samples from Simple Wikipedia.

In [3]:
train = Corpus("data/train.txt")
dev = Corpus("data/dev.txt", vocab=train.vocab)
test = Corpus("data/test.txt", vocab=train.vocab)

for s, name in zip([train, dev, test], ["train", "dev", "test"]):
  print(f"{name} number of sentences: {len(s.lines)}, vocab size: {len(s.vocab)}")

train number of sentences: 25000, vocab size: 25692
dev number of sentences: 5000, vocab size: 25692
test number of sentences: 5000, vocab size: 25692


### Bigram (Count-based) Language Model

We will create a `CountBasedLM` object. Its training includes computing all the ngram counts in the corpus, for `n` and `n-1`. For inference you will need to complete the implementation of `compute_mle`.

In [12]:
def get_ngrams(s, n):
  """
  Gets a list of words and returns a list of n-grams
  """
  return zip(*[s[i:] for i in range(n)])


class CountBasedLM(object):
    def __init__(self, n=2, laplace=1):
        self.vocab = []
        self.vocab_size = len(self.vocab)
        self.laplace = laplace
        self.n = n

    def train(self, train_corpus):
        self.vocab = train_corpus.vocab
        self.vocab_size = len(self.vocab)

        # Count the n-grams in the corpus
        words = [w for line in train_corpus.lines for w in line]

        self.n_gram_count = dict(Counter(get_ngrams(words, self.n)))
        self.nm1_gram_count = dict(Counter(get_ngrams(words, self.n-1)))

    def compute_mle(self, n_gram):
        """
        Compute the MLE of P(w_i|w_{i-n+1}...w_{i−1}),
        with add-one Laplacian smoothing.
        Please see chapter 3.5.1 of J&M 3rd Ed. for more information.
        """
        ####################################
        #   Your code here
        ####################################
        # Add-one Laplace smoothing: add one to numerator
        counts_n = self.n_gram_count.get(n_gram, 0) + 1       

        # get the n-1 gram of the current n-gram
        nm1_gram = n_gram[:-1]
        counts_nm1 = self.nm1_gram_count.get(nm1_gram, 0) + self.vocab_size
        
        prob = counts_n / counts_nm1
        ####################################

        return prob

Let's create and train the bigram LM.

In [13]:
bigram_lm = CountBasedLM()
bigram_lm.train(train)

In [14]:
####################################
#   Answer to Q1
####################################
token_ids = tuple([bigram_lm.vocab.word2idx[t] for t in ["national", "park"]])
bigram_lm.compute_mle(token_ids)

0.0003803872342044201

Now, you will implement a function that uses the LM to compute the probability of a sentence based on the chain rule. There are several important implementation details:

1. The function gets a (string) sentence so the first step should be to tokenize it and convert the tokens to token IDs.

2. Work in log space to avoid underflow, but return the probability (not log probability).

3. Do not normalize the probability by sentence length.

We will test this function by making sure that:

* The probability of each sentence is between 0 and 1

* The probability of a sentence is not higher when a word is added

* The probability of a grammatical sentence is higher than that of an ungrammatical sentence.

In [33]:
def compute_probability(lm, sentence):
    ####################################
    #   Your code here
    ####################################
    # Split sentence into tokens, convert tokens to token IDs
    tokens = sentence.split()
    token_ids = tuple([bigram_lm.vocab.word2idx[t] for t in tokens])

    # Break tokens into overlapping n-grams
    token_ngrams = get_ngrams(token_ids, lm.n)

    # Compute the log-probability using the chain rule
    log_prob = 0.0
    for ngram in token_ngrams:
        ngram_prob = lm.compute_mle(ngram)
        log_prob += math.log(ngram_prob)    

    # Convert back from log to raw probability
    probability = math.exp(log_prob)
    ####################################    
    return probability


s1 = "this is a sentence"
prob_s1 = compute_probability(bigram_lm, s1)
assert(0 <= prob_s1 <= 1)

s2 = "this is a longer sentence"
prob_s2 = compute_probability(bigram_lm, s2)
assert(prob_s2 <= prob_s1)

s3 = "this longer is a sentence"
prob_s3 = compute_probability(bigram_lm, s3)
assert(prob_s3 < prob_s2)

1.549747446191864e-08
2.4089650583948504e-12
3.694731684654674e-15


Now, complete the code for computing average perplexity on a dataset using the LM. We will then compute the perplexity on each of the training, dev, and test sets.

In [None]:
import math


def compute_perplexity(lm, corpus):
  ####################################
  #   Your code here
  ####################################

  ####################################
  return perplexity


for s, name in zip([train, dev, test], ["train", "dev", "test"]):
  ppl = compute_perplexity(bigram_lm, s)
  print(f"The average {name} perplexity for the bigram LM: {ppl}")

Finally, let's train a trigram model and compare their perplexities on the dev set (remember, lower perplexity is better).

In [None]:
# Reload the corpus for the trigram model (we need to add 2 special tokens to
# mark the beginning of the sentence)
trigram_train = Corpus("data/train.txt", n=3)
trigram_dev = Corpus("data/dev.txt", vocab=train.vocab, n=3)
trigram_test = Corpus("data/test.txt", vocab=train.vocab, n=3)

trigram_lm = CountBasedLM(n=3)
trigram_lm.train(trigram_train)

for s, name in zip([trigram_train, trigram_dev, trigram_test], ["train", "dev", "test"]):
  ppl = compute_perplexity(trigram_lm, s)
  print(f"The average {name} perplexity for the trigram LM: {ppl}")

We can now use the count-based LM to generate text. This is done by starting with an input `prompt`, computing the distribution for the next token, and sampling from it. We will implement `top k` decoding that prunes the distribution to the k most probable next tokens, re-normalizes it and samples from it proportionally to the probability for each token. Using `k=1` would enable greedy decoding, i.e. selecting the most probable next token at each time step. We will generate the text once with greedy decoding and 5 times with `k=1000`. Complete the following code.

In [None]:
def generate_text_count_based(lm, prompt, k=1, max_tokens=10):
    # Tokenize and convert the sentence to a list of IDs
    tokens = ['<start>'] * (lm.n-1) + prompt.split()
    input_tokens = [lm.vocab.word2idx[t] for t in tokens]

    # Iteratively generate the next word until generating <end>
    # or until reaching max_tokens.
    generated_tokens = []

    for i in range(max_tokens):
      ####################################
      #   Your code here
      ####################################

      ####################################

      generated_tokens.append(selected_index)

      if lm.vocab.idx2word()[selected_index] == "<end>":
        break

    return " ".join([lm.vocab.idx2word()[i] for i in input_tokens + generated_tokens])


print("k=1")
print(generate_text_count_based(bigram_lm, "he was born", k=1, max_tokens=10))

print("k=100")
for _ in range(5):
  print(generate_text_count_based(bigram_lm, "he was born", k=100, max_tokens=10))

### Neural Language Model

The neural LM is an Ngram LM with one hidden layer as we learned in class. Let's start by defining some hyperparameters. Feel free to change them!

In [None]:
embed_size = 64
hidden_size = 128
num_epochs = 2
learning_rate = 1e-3

Now, you will implement the class for the neural ngram language model. You will complete the `forward` function that gets a tensor with the context (n-1) token IDs, and returns the unnormalized next token probability distribution.

In [None]:
class NeuralNgramLM(nn.Module):
    def __init__(self, vocab, embed_size, intermediate_size, n=2):
        super(NeuralNgramLM, self).__init__()
        self.vocab = vocab
        self.vocab_size = len(self.vocab)
        self.n = n
        self.embed = nn.Embedding(self.vocab_size, embed_size)
        self.hidden = nn.Linear((self.n-1) * embed_size, hidden_size)
        self.output = nn.Linear(hidden_size, self.vocab_size)

    def forward(self, x):
        ####################################
        #   Your code here
        ####################################

        ####################################

        return y

Next, let's create and train a neural bigram model.

In [None]:
neural_lm = NeuralNgramLM(train.vocab, embed_size, hidden_size).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(
    neural_lm.parameters(), lr=learning_rate, momentum=0.9)

In [None]:
train_ids = [torch.from_numpy(np.array(line)).unsqueeze(0) for line in train.lines]
dev_ids = [torch.from_numpy(np.array(line)).unsqueeze(0) for line in dev.lines]

neural_lm.train()

for epoch in range(num_epochs):
    total_loss = 0
    for instance in tqdm(train_ids):
        # Notice that we get a [batch_size, max_len-1] tensor
        # which would only work for a bigram LM.
        # If we want to train a n > 2 LM, we would need
        # add a dimension for the context.
        inputs = instance[:, :-1].to(device)
        targets = instance[:, 1:].to(device)

        # Forward pass
        # (the outputs shape should be [batch_size, max_len, vocab_size])
        outputs = neural_lm(inputs)
        loss = criterion(
            outputs.reshape(-1, neural_lm.vocab_size), targets.reshape(-1))
        total_loss += loss.item()

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Calculate the perplexity of the current trained model on the dev set.
    # It should reduce between epochs. We will not use our implementation of
    # perplexity but instead we will use cross-entropy loss. See J&M for the
    # relationship between the two.
    total_ppl = 0
    for i in range(0, len(dev_ids)):
        dev_inputs = dev_ids[i][:, :-1].to(device)
        dev_targets = dev_ids[i][:,1:].to(device)
        dev_outputs = neural_lm(dev_inputs)
        ce = criterion(
            dev_outputs.reshape(-1, neural_lm.vocab_size),
            dev_targets.reshape(-1))
        total_ppl += ce.item()

    print(f"\nEpoch [{epoch + 1}/{num_epochs}], " + \
          f"Training Loss: {total_loss/len(train_ids):.4f}, " + \
          f"Dev Perplexity: {np.exp(total_ppl/len(dev_ids)):5.2f}\n")

Save the trained model.

In [None]:
torch.save(neural_lm, f"neural_{neural_lm.n}gram_lm.ckpt")

Finally, let's load the model and test it on the test set.

In [None]:
test_ids = [torch.from_numpy(np.array(line)).unsqueeze(0) for line in test.lines]

neural_model = torch.load(f"neural_{neural_lm.n}gram_lm.ckpt", weights_only=False)
neural_model.eval()
test_ppl = 0
with torch.no_grad():
    for i in range(0, len(test_ids)):
      inputs = test_ids[i][:, :-1].to(device)
      targets = test_ids[i][:,1:].to(device)
      outputs = neural_lm(inputs)
      ce = criterion(outputs.reshape(-1, neural_lm.vocab_size),
                     targets.reshape(-1))
      test_ppl += ce.item()

print(f"Test Perplexity: {np.exp(test_ppl/len(test_ids)):5.2f}\n")

We can compute the probability of sentences using the neural LM. In fact, cross entropy loss between the first `|S|-1` tokens and the `|S|-1` last tokens is the negative log likelihood of a sequence, i.e. it is -1 * the average of log probabilities for each token. Thanks to the monotonicity of log function, lower cross entropy loss corresponds to higer probability for a sentence. Since we are typically interested in comparing probabilities of sentences (rather than the absolute value), we can use CE loss.

In [None]:
def compute_negative_log_likelihood(neural_lm, sentence):
  ####################################
  #   Your code here
  ####################################

  ####################################

s = "this is a park"
prob_s = -compute_negative_log_likelihood(neural_lm, s1)
print(prob_s)

We can also use the neural LM to generate text. Complete the following code.

In [None]:
def generate_text_neural(neural_lm, prompt, k=1, max_tokens=10):
    neural_lm.eval()

    # Tokenize and convert the sentence to a list of IDs
    tokens = ['<start>'] * (neural_lm.n-1) + prompt.split()
    input_tokens = [neural_lm.vocab.word2idx.get(t, neural_lm.vocab.word2idx['<unk>']) for t in tokens]

    # Iteratively generate the next word until generating <end>
    # or until reaching max_tokens.
    generated_tokens = []

    for i in range(max_tokens):
      token_ids = torch.tensor(input_tokens + generated_tokens, device=device).unsqueeze(0)

      ####################################
      #   Your code here
      ####################################


      ####################################

      generated_tokens.append(selected_index)

      if neural_lm.vocab.idx2word()[selected_index] == "<end>":
        break

    return " ".join([neural_lm.vocab.idx2word()[i]
                     for i in input_tokens + generated_tokens])


print("k=1")
print(generate_text_neural(neural_lm, "he was born", k=1, max_tokens=10))

print("k=100")
for _ in range(5):
  print(generate_text_neural(neural_lm, "he was born", k=10, max_tokens=10))