## Language models
N-gram
Implement a simple bigram language model and scorer

First collect counts of unigrams and bigrams

Then normalise each bigram by the frequency of the first unigram

This is your bigram probability

Store the output in a dict, e.g. model['want']['to'] = 0.023

Save the dict to a file using pickle

Now write a program that scores novel sentences based on the model

In [1]:
import math
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import numpy as np
import re



In [2]:
def tokenize(s):
    o = re.sub('([^a-zA-Z0-9\']+)', ' \g<1> ', s.strip())
    return [''] + re.sub('  *', ' ', o).strip().split(' ')

In [3]:
# Bigram Neural Network Model
class BigramNNmodel(nn.Module):

    def __init__(self, vocab_size, embedding_dim, context_size, hidden_dim):
        super(BigramNNmodel, self).__init__()
        self.context_size = context_size
        self.embedding_dim = embedding_dim
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.linear1 = nn.Linear(context_size * embedding_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, vocab_size, bias = False)

    def forward(self, inputs):
        # compute x': concatenation of x1 and x2 embeddings
        embeds = self.embeddings(inputs).view(
                (-1,self.context_size * self.embedding_dim))
        # compute h: tanh(W_1.x' + b)
        out = torch.tanh(self.linear1(embeds))
        # compute W_2.h
        out = self.linear2(out)
        # compute y: log_softmax(W_2.h)
        log_probs = F.log_softmax(out, dim=1)
        # return log probabilities
        # BATCH_SIZE x len(vocab)
        return log_probs

In [4]:
def train():
    
    training_lines = []
    training_samples = []
    
    with open("train.txt") as fp:
        training_lines = [line.rstrip() for line in fp]
    
    vocabulary = set([''])
    
    for line in training_lines:
        tokens = tokenize(line)
        for i in tokens: vocabulary.add(i) 
        training_samples.append(tokens)
    word2idx = {k: v for v, k in enumerate(vocabulary)}
    idx2word = {v: k for k, v in word2idx.items()}

    x_train = []
    y_train = []
    for tokens in training_samples:
        for i in range(len(tokens) - 1): #!!!#
            x_train.append([word2idx[tokens[i]]]) #!!!#
            y_train.append([word2idx[tokens[i+1]]]) #!!!#

    x_train = np.array(x_train)
    y_train = np.array(y_train)

    ###############################################################################

    BATCH_SIZE = 1
    NUM_EPOCHS = 10

    train_set = np.concatenate((x_train, y_train), axis=1)
    train_loader = DataLoader(train_set, batch_size=BATCH_SIZE)

    loss_function = nn.NLLLoss()
    model = BigramNNmodel(len(vocabulary), EMBEDDING_DIM, CONTEXT_SIZE, HIDDEN_DIM)
    optimiser = torch.optim.Adam(model.parameters(), lr=0.01)

    for epoch in range(NUM_EPOCHS):
        for i, data_tensor in enumerate(train_loader):
            context_tensor = data_tensor[:,0:1] #!!!#
            target_tensor = data_tensor[:,1] #!!!#

            model.zero_grad()

            log_probs = model(context_tensor)
            loss = loss_function(log_probs, target_tensor)

            loss.backward()
            optimiser.step()    

        print('Epoch:', epoch, 'loss:', float(loss))

    torch.save({'model': model.state_dict(), 'vocab': idx2word}, 'model.lm')

    print('Model saved.')

In [5]:
EMBEDDING_DIM = 4
CONTEXT_SIZE = 1
HIDDEN_DIM = 6
train()

Epoch: 0 loss: 2.8793582916259766
Epoch: 1 loss: 2.829751491546631
Epoch: 2 loss: 2.743652820587158
Epoch: 3 loss: 2.584360122680664
Epoch: 4 loss: 2.3253326416015625
Epoch: 5 loss: 1.9789296388626099
Epoch: 6 loss: 1.598945140838623
Epoch: 7 loss: 1.2473078966140747
Epoch: 8 loss: 0.9583017826080322
Epoch: 9 loss: 0.7351698279380798
Model saved.


In [6]:
def test():
    blob = torch.load('model.lm')
    idx2word = blob['vocab']
    word2idx = {k: v for v, k in idx2word.items()}
    vocabulary = set(idx2word.values())

    model = BigramNNmodel(len(vocabulary), EMBEDDING_DIM, CONTEXT_SIZE, HIDDEN_DIM)
    model.load_state_dict(blob['model'])

    ###############################################################################

    BATCH_SIZE = 1

    lines = []
    
    with open("test.txt") as fp:
        lines = [line.rstrip() for line in fp]
    
    for line in lines:
        tokens = tokenize(line)
        
        x_test = []
        y_test = []
        for i in range(len(tokens) - 1): #!!!#
            x_test.append([word2idx[tokens[i]]]) #!!!#
            y_test.append([word2idx[tokens[i+1]]]) #!!!#
        
        x_test = np.array(x_test)
        y_test = np.array(y_test)
        
        test_set = np.concatenate((x_test, y_test), axis=1)
        test_loader = DataLoader(test_set, batch_size=BATCH_SIZE)
        
        total_prob = 1.0
        for i, data_tensor in enumerate(test_loader):
            context_tensor = data_tensor[:,0:1] #!!!#
            target_tensor = data_tensor[:,1] #!!!#
            log_probs = model(context_tensor)
            probs = torch.exp(log_probs)
            predicted_label = int(torch.argmax(probs, dim=1)[0])
        
            true_label = y_test[i][0]
            true_word = idx2word[true_label]
        
            prob_true = float(probs[0][true_label])
            total_prob *= prob_true
        
        print('%.6f\t%.6f\t' % (total_prob, math.log(total_prob)), tokens)

In [7]:
test()

0.000850	-7.070470	 ['', 'where', 'are', 'you', '?']
0.000529	-7.543928	 ['', 'were', 'you', 'in', 'england', '?']
0.002621	-5.944099	 ['', 'are', 'you', 'in', 'mexico', '?']
0.000039	-10.157038	 ['', 'i', 'am', 'in', 'mexico', '.']
0.000100	-9.211725	 ['', 'are', 'you', 'still', 'in', 'mexico', '?']


## Training and testing a trigram model

In [8]:
# Trigram Neural Network Model
class TrigramNNmodel(nn.Module):

    def __init__(self, vocab_size, embedding_dim, context_size, hidden_dim):
        super(TrigramNNmodel, self).__init__()
        self.context_size = context_size
        self.embedding_dim = embedding_dim
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.linear1 = nn.Linear(context_size * embedding_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, vocab_size, bias = False)

    def forward(self, inputs):
        # compute x': concatenation of x1 and x2 embeddings
        embeds = self.embeddings(inputs).view(
                (-1,self.context_size * self.embedding_dim))
        # compute h: tanh(W_1.x' + b)
        out = torch.tanh(self.linear1(embeds))
        # compute W_2.h
        out = self.linear2(out)
        # compute y: log_softmax(W_2.h)
        log_probs = F.log_softmax(out, dim=1)
        # return log probabilities
        # BATCH_SIZE x len(vocab)
        return log_probs

In [9]:
def train_trigram():
    
    training_lines = []
    training_samples = []
    
    with open("train.txt") as fp:
        training_lines = [line.rstrip() for line in fp]
    
    vocabulary = set([''])
    
    for line in training_lines:
        tokens = tokenize(line)
        for i in tokens: vocabulary.add(i) 
        training_samples.append(tokens)
    word2idx = {k: v for v, k in enumerate(vocabulary)}
    idx2word = {v: k for k, v in word2idx.items()}

    x_train = []
    y_train = []
    for tokens in training_samples:
      for i in range(len(tokens) - 2): #!!!#
          x_train.append([word2idx[tokens[i]], word2idx[tokens[i+1]]]) #!!!#
          y_train.append([word2idx[tokens[i+2]]]) #!!!#


    x_train = np.array(x_train)
    y_train = np.array(y_train)

    ###############################################################################

    BATCH_SIZE = 1
    NUM_EPOCHS = 10

    train_set = np.concatenate((x_train, y_train), axis=1)
    train_loader = DataLoader(train_set, batch_size=BATCH_SIZE)

    loss_function = nn.NLLLoss()
    model = TrigramNNmodel(len(vocabulary), EMBEDDING_DIM, CONTEXT_SIZE, HIDDEN_DIM)
    optimiser = torch.optim.Adam(model.parameters(), lr=0.01)

    for epoch in range(NUM_EPOCHS):
        for i, data_tensor in enumerate(train_loader):
            context_tensor = data_tensor[:,0:2] #!!!#
            target_tensor = data_tensor[:,2] #!!!#

            model.zero_grad()

            log_probs = model(context_tensor)
            loss = loss_function(log_probs, target_tensor)

            loss.backward()
            optimiser.step()    

        print('Epoch:', epoch, 'loss:', float(loss))

    torch.save({'model': model.state_dict(), 'vocab': idx2word}, 'model_trigram.lm')

    print('Model saved.')

In [10]:
EMBEDDING_DIM = 4
CONTEXT_SIZE = 2
HIDDEN_DIM = 6
train_trigram()

Epoch: 0 loss: 2.830050468444824
Epoch: 1 loss: 2.5215606689453125
Epoch: 2 loss: 2.180320978164673
Epoch: 3 loss: 1.8046839237213135
Epoch: 4 loss: 1.4113456010818481
Epoch: 5 loss: 1.046945333480835
Epoch: 6 loss: 0.756178081035614
Epoch: 7 loss: 0.5483984351158142
Epoch: 8 loss: 0.4094017446041107
Epoch: 9 loss: 0.3179492950439453
Model saved.


In [21]:
def test_trigram():
    blob = torch.load('model_trigram.lm')
    idx2word = blob['vocab']
    word2idx = {k: v for v, k in idx2word.items()}
    vocabulary = set(idx2word.values())

    model = TrigramNNmodel(len(vocabulary), EMBEDDING_DIM, CONTEXT_SIZE, HIDDEN_DIM)
    model.load_state_dict(blob['model'])

    ###############################################################################

    BATCH_SIZE = 1

    lines = []
    with open("test.txt") as fp:
        lines = [line.rstrip() for line in fp]
    
    for line in lines:
        tokens = tokenize(line)
        
        x_test = []
        y_test = []
        for i in range(len(tokens) - 2): #!!!#
            x_test.append([word2idx[tokens[i]], word2idx[tokens[i+1]]]) #!!!#
            y_test.append([word2idx[tokens[i+2]]]) #!!!#
        
        x_test = np.array(x_test)
        y_test = np.array(y_test)
        
        test_set = np.concatenate((x_test, y_test), axis=1)
        test_loader = DataLoader(test_set, batch_size=BATCH_SIZE)
        
        total_prob = 1.0
        for i, data_tensor in enumerate(test_loader):
            context_tensor = data_tensor[:,0:2] #!!!#
            target_tensor = data_tensor[:,2] #!!!#
            log_probs = model(context_tensor)
            probs = torch.exp(log_probs)
            predicted_label = int(torch.argmax(probs, dim=1)[0])
        
            true_label = y_test[i][0]
            true_word = idx2word[true_label]
        
            prob_true = float(probs[0][true_label])
            total_prob *= prob_true
        print('%.6f\t%.6f\t' % (total_prob, math.log(total_prob)), tokens)

In [22]:
test_trigram()

0.025998	-3.649754	 ['', 'where', 'are', 'you', '?']
0.083461	-2.483377	 ['', 'were', 'you', 'in', 'england', '?']
0.060088	-2.811947	 ['', 'are', 'you', 'in', 'mexico', '?']
0.000545	-7.514688	 ['', 'i', 'am', 'in', 'mexico', '.']
0.001444	-6.540078	 ['', 'are', 'you', 'still', 'in', 'mexico', '?']
