<a href="https://colab.research.google.com/github/shebogholo/language_model_LSTM_pytorch/blob/master/language_model_LSTM_pytorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!git clone https://github.com/shebogholo/language_model_LSTM_pytorch.git
import os
os.chdir('language_model_LSTM_pytorch')

fatal: destination path 'language_model_LSTM_pytorch' already exists and is not an empty directory.


## Import libraries

In [0]:
import os
import time
import math 
import torch
import torch.nn as nn
import warnings
warnings.filterwarnings('ignore')

## Hyperparameters

In [0]:
batch_size = 20
embedding_dim = 200
hidden_size = 200
sequence_length = 35
learning_rate = 20
num_layers = 2
dropout = 0.4
display_interval = 100
epochs = 30
clip_gradient = 0.20
tie_weights = False
eval_batch_size = 10
save = 'model.pt'

## Create a dictionary of words

In [0]:
class Dictionary(object):
    def __init__(self):
        self.word2idx = {}
        self.idx2word = []
    
    def add_word(self, word):
        if word not in self.word2idx:
            self.idx2word.append(word) 
            self.word2idx[word] = len(self.idx2word) - 1
            return self.word2idx[word]
        
    def __len__(self):
        return len(self.idx2word)

## Read the corpus 

In [0]:
class Corpus(object):

    def __init__(self, path):
        self.dictionary = Dictionary()
        self.train = self.tokenize(os.path.join(path, 'train.txt'))
        self.valid = self.tokenize(os.path.join(path, 'valid.txt'))
        self.test = self.tokenize(os.path.join(path, 'test.txt'))
    
    def tokenize(self, path):
        """Tokenizes a text file."""
        assert os.path.exists(path)
        # Add words to the dictionary
        with open(path, 'r', encoding="utf8") as file:
            tokens = 0
            for line in file:
                words = line.split() + ['<eos>']
                tokens += len(words)
                for word in words:
                    self.dictionary.add_word(word)

        # Tokenize file content
        with open(path, 'r', encoding="utf8") as file:
            ids = torch.LongTensor(tokens)
            token = 0
            for line in file:
                words = line.split() + ['<eos>']
                for word in words:
                    ids[token] = self.dictionary.word2idx[word]
                    token += 1
        return ids

In [6]:
corpus = Corpus('wikitext')
num_tokens = len(corpus.dictionary)
num_tokens

33278

## Create batches of data

In [0]:
# Stack data one after another 
def create_batch(data, batch_size):
    num_batch = data.size(0) // batch_size
    data = data.narrow(0, 0, num_batch * batch_size)
    data = data.view(batch_size, -1).t().contiguous()
    return data

## Generate batches

In [0]:
train_data = create_batch(corpus.train, batch_size)
valid_data = create_batch(corpus.valid, batch_size)
test_data =  create_batch(corpus.test, batch_size)

In [9]:
train_data.shape

torch.Size([104431, 20])

In [10]:
valid_data.shape

torch.Size([10882, 20])

In [11]:
test_data.shape

torch.Size([12278, 20])

## Create data loader

In [0]:
def get_batch(source, index):
    seq_len = min(sequence_length, len(source) - 1 - index)
    data = source[index: index+seq_len]
    target = source[index+1: index+1+seq_len].view(-1)
    return data, target

In [13]:
data, target = get_batch(train_data, 1)
data.shape, target.shape

(torch.Size([35, 20]), torch.Size([700]))

## Define the model

In [0]:
class RNNModel(nn.Module):
    def __init__(self, num_embeddings, embedding_dim, hidden_size, num_layers, dropout=0.5, tie_weights=False):
        # embedding_dim == input_size
        # num_embeddings == vocabulary_size
        # hidden_size == number of features in the hidden layer
        super(RNNModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.dropout = nn.Dropout(dropout)
        self.encoder = nn.Embedding(num_embeddings, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, dropout=dropout)
        self.decoder = nn.Linear(hidden_size, num_embeddings)

        # suggested on paper
        if tie_weights:
            if hidden_size != embedding_dim:
                raise ValueError('When using tie weights hidden_size == embedding_size')
            self.decoder.weight = self.encoder.weight
        
        self.init_weights()

    def init_weights(self):
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, input, hidden):
        embed = self.dropout(self.encoder(input))
        output, hidden = self.lstm(embed, hidden)
        decoded = self.decoder(output.view(output.size(0)*output.size(1), output.size(2)))
        return decoded.view(output.size(0), output.size(1), decoded.size(1)), hidden
    
    def init_hidden(self, batch_size):
        weight = next(self.parameters())
        return weight.new_zeros(self.num_layers, batch_size, self.hidden_size), weight.new_zeros(self.num_layers, batch_size, self.hidden_size)

model = RNNModel(num_tokens, embedding_dim, hidden_size, num_layers, dropout, tie_weights=False)

## Check the available device

In [0]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [16]:
model.to(device)

RNNModel(
  (dropout): Dropout(p=0.4)
  (encoder): Embedding(33278, 200)
  (lstm): LSTM(200, 200, num_layers=2, dropout=0.4)
  (decoder): Linear(in_features=200, out_features=33278, bias=True)
)

## Define loss function

In [0]:
criterion = nn.CrossEntropyLoss()

## Innitialize hidden layer

In [0]:
def repackage_hidden(hidden):
    if isinstance(hidden, torch.Tensor):
        return hidden.detach()
    else:
        return tuple(repackage_hidden(v) for v in hidden)

hidden = model.init_hidden(batch_size)

In [0]:
hidden = repackage_hidden(hidden)

## Function to evaluate a model

In [0]:
# model evaluation
def evaluate(data_source):
    model.eval()
    total_loss = 0.
    hidden = model.init_hidden(batch_size)
    
    with torch.no_grad():
        for index in range(0, data_source.size(0) - 1, sequence_length):
            data, targets = get_batch(data_source, index)
            data, targets = data.to(device), targets.to(device)                
            output, hidden = model(data, hidden)
            output_flat = output.view(-1, num_tokens)
            total_loss += len(data) * criterion(output_flat, targets).item()
            hidden = repackage_hidden(hidden)
    return total_loss / len(data_source)

## Function to train a model

In [0]:
# model training
def train(epoch):
    model.train()
    print('*'*102)
    total_loss = 0.
    start_time = time.time()
    hidden = model.init_hidden(batch_size)

    for batch, index in enumerate(range(0, train_data.size(0) - 1, sequence_length)):
        data, targets = get_batch(train_data, index)
        data, targets = data.to(device), targets.to(device)
        hidden = repackage_hidden(hidden)
        model.zero_grad()
        output, hidden = model(data, hidden)
        loss = criterion(output.view(-1, num_tokens), targets)
        loss.backward()

        # gradient clipping to avoid exploding gradient
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip_gradient)
        for p in model.parameters():
            p.data.add_(-learning_rate, p.grad.data)

        total_loss += loss.item()

        if batch % display_interval == 0 and batch > 0:
            cur_loss = total_loss / display_interval
            elapsed = time.time() - start_time
            print('| Epoch {:3d} | {:5d}/{:5d} batches | lr {:02.2f} | ms/batch {:5.2f} | train loss {:5.2f} | perplexity {:8.2f}'.format(
                epoch, 
                batch, 
                len(train_data) // sequence_length, 
                learning_rate,
                elapsed * 1000 / display_interval, 
                cur_loss, 
                math.exp(cur_loss)))
            
            total_loss = 0
            start_time = time.time()

## Train a model

In [22]:
best_val_loss = None

for epoch in range(1, epochs+1):
        epoch_start_time = time.time()
        train(epoch)
        val_loss = evaluate(valid_data)
        print('-' *103)
        print('| End of epoch {:3d} | time: {:5.2f}s | valid loss {:5.2f} | valid perplexity {:8.2f}'.format(
            epoch, 
            (time.time() - epoch_start_time),
            val_loss, math.exp(val_loss)))
        
        # Save the model if the validation loss is the best we've seen so far.
        if not best_val_loss or val_loss < best_val_loss:
            with open(save, 'wb') as file:
                torch.save(model, file)
            best_val_loss = val_loss
        else:
            # Learninig rate annealing
            # Cut-off the learning rate by the factor of 4 if no improvement has been seen in validation data.
            learning_rate /= 4.0

******************************************************************************************************
| Epoch   2 |   100/ 2983 batches | lr 20.00 | ms/batch 48.26 | train loss  8.08 | perplexity  3229.22
| Epoch   2 |   200/ 2983 batches | lr 20.00 | ms/batch 45.82 | train loss  7.30 | perplexity  1478.00
| Epoch   2 |   300/ 2983 batches | lr 20.00 | ms/batch 46.27 | train loss  6.99 | perplexity  1086.50
| Epoch   2 |   400/ 2983 batches | lr 20.00 | ms/batch 46.01 | train loss  6.76 | perplexity   858.76
| Epoch   2 |   500/ 2983 batches | lr 20.00 | ms/batch 46.17 | train loss  6.58 | perplexity   720.26
| Epoch   2 |   600/ 2983 batches | lr 20.00 | ms/batch 46.26 | train loss  6.44 | perplexity   628.14
| Epoch   2 |   700/ 2983 batches | lr 20.00 | ms/batch 46.25 | train loss  6.35 | perplexity   573.30
| Epoch   2 |   800/ 2983 batches | lr 20.00 | ms/batch 46.48 | train loss  6.24 | perplexity   514.72
| Epoch   2 |   900/ 2983 batches | lr 20.00 | ms/batch 46.63 | train los

## Test a model

In [23]:
# Load the best saved model.
with open(save, 'rb') as file:
    model = torch.load(file)
    model.lstm.flatten_parameters()

# Run on test data.
test_loss = evaluate(test_data)
print('| End of evaluation | test loss {:5.2f} | test perplexity {:8.2f}'.format( test_loss, math.exp(test_loss)))

| End of evaluation | test loss  4.78 | test perplexity   119.44


## Generate sample text

In [0]:
generated_file = 'generated.txt'
num_words = 500
temp = 1.0
display_interval = 100

In [25]:
with open(save, 'rb') as file:
  model = torch.load(file).to(device)
model.eval()

hidden = model.init_hidden(1)
input = torch.randint(num_tokens, (1, 1), dtype=torch.long).to(device)

with open(generated_file, 'w') as file:
    with torch.no_grad():
      for i in range(num_words):
        output, hidden = model(input, hidden)
               
        word_weights = output.squeeze().div(temp).exp().cpu()
        word_idx = torch.multinomial(word_weights, 1)[0]

        input.fill_(word_idx)
        word = corpus.dictionary.idx2word[word_idx]
        file.write(word + ('\n' if i % 20 == 19 else ' '))
        if i % display_interval == 0:
          print('| Generated {}/{} words'.format(i, num_words))

| Generated 0/500 words
| Generated 100/500 words
| Generated 200/500 words
| Generated 300/500 words
| Generated 400/500 words
