In [0]:
#Import Torch and necessary packages
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from collections import Counter
import os
from argparse import Namespace

#Initialize parameters for RNN/LSTM
flags = Namespace(
    train_file='mobydick.txt', #We use Moby Dick as the training text for the RNN/LSTM
    seq_size=32,
    batch_size=16,
    embedding_size=64,
    lstm_size=64,
    gradients_norm=5,
    initial_words=['I', 'am'], #Feed initial words into model to predict the next 50 words based on training text
    predict_top_k=5,
    checkpoint_path='checkpoint',
)

In [7]:
#Upload Moby Dick corpus text
from google.colab import files
train_file = files.upload()


Saving mobydick.txt to mobydick (1).txt


In [0]:
#Create function to preprocess text
def get_data_from_file(train_file, batch_size, seq_size):
    with open(train_file, 'r') as f:
        text = f.read()
    text = text.split()

    word_counts = Counter(text)
    sorted_vocab = sorted(word_counts, key=word_counts.get, reverse=True)
    int_to_vocab = {k: w for k, w in enumerate(sorted_vocab)}
    vocab_to_int = {w: k for k, w in int_to_vocab.items()}
    n_vocab = len(int_to_vocab)

    print('Vocabulary size', n_vocab)

    int_text = [vocab_to_int[w] for w in text]
    num_batches = int(len(int_text) / (seq_size * batch_size))
    in_text = int_text[:num_batches * batch_size * seq_size]
    out_text = np.zeros_like(in_text)
    out_text[:-1] = in_text[1:]
    out_text[-1] = in_text[0]
    in_text = np.reshape(in_text, (batch_size, -1))
    out_text = np.reshape(out_text, (batch_size, -1))
    return int_to_vocab, vocab_to_int, n_vocab, in_text, out_text

In [0]:
#Create function to generate batches of text for training
def get_batches(in_text, out_text, batch_size, seq_size):
    num_batches = np.prod(in_text.shape) // (seq_size * batch_size)
    for i in range(0, num_batches * seq_size, seq_size):
        yield in_text[:, i:i+seq_size], out_text[:, i:i+seq_size]

In [0]:
#Create architecture in Torch RNNModule
class RNNModule(nn.Module):
    #Define each layer
    def __init__(self, n_vocab, seq_size, embedding_size, lstm_size): 
        super(RNNModule, self).__init__()
        self.seq_size = seq_size
        self.lstm_size = lstm_size
        
        #Create word embedding layer, given total vocabulary of corpus and embedding size input
        self.embedding = nn.Embedding(n_vocab, embedding_size)
        
        #Create LSTM layer
        self.lstm = nn.LSTM(embedding_size,
                            lstm_size,
                            batch_first=True)
        
        #Create dense/fully-connected layer to arrive at prediction for next word in the sequence
        self.dense = nn.Linear(lstm_size, n_vocab)

    #Take an input sequence, the previous hidden state and cell states to produce a new hidden state 
    #and cell state for the next step, utilizing a forget gate, input gate, and output gate
    def forward(self, x, prev_state):
        embed = self.embedding(x)
        output, state = self.lstm(embed, prev_state)
        logits = self.dense(output)

        return logits, state

    #Reset state at the beginning of each new epoch
    def zero_state(self, batch_size):
        return (torch.zeros(1, batch_size, self.lstm_size),
                torch.zeros(1, batch_size, self.lstm_size))

In [0]:
#Get loss function and training optimizer
def get_loss_and_train_op(net, lr=0.001):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters(), lr=lr)

    return criterion, optimizer

In [0]:
def main():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


    #Get the training data, create the network, loss function and the training optimizer
    int_to_vocab, vocab_to_int, n_vocab, in_text, out_text = get_data_from_file(
    flags.train_file, flags.batch_size, flags.seq_size)

    net = RNNModule(n_vocab, flags.seq_size,
                    flags.embedding_size, flags.lstm_size)
    net = net.to(device)

    criterion, optimizer = get_loss_and_train_op(net, 0.01)

    iteration = 0

    #For each epoch, loop through the batches to compute loss values and update network’s parameters
    for e in range(100):
        batches = get_batches(in_text, out_text, flags.batch_size, flags.seq_size)
        state_h, state_c = net.zero_state(flags.batch_size)
        state_h = state_h.to(device)
        state_c = state_c.to(device)
        for x, y in batches:
            iteration += 1
            net.train()

            #Reset all gradients
            optimizer.zero_grad()

            x = torch.tensor(x).to(device)
            y = torch.tensor(y).to(device)

            #Compute output, loss value, accuracy
            logits, (state_h, state_c) = net(x, (state_h, state_c))
            loss = criterion(logits.transpose(1, 2), y)
            loss_value = loss.item()

            #Perform back-propagation and update the network’s parameters
            loss.backward()
            state_h = state_h.detach()
            state_c = state_c.detach()

            _ = torch.nn.utils.clip_grad_norm_(
                net.parameters(), flags.gradients_norm)
            
            optimizer.step()

            if iteration % 100 == 0:
                print('Epoch: {}/{}'.format(e, 100),
                      'Iteration: {}'.format(iteration),
                      'Loss: {}'.format(loss_value))
                
            #Print predictive keyboard sequence after every 500 iterations
            if iteration % 500 == 0:
                predict(device, net, flags.initial_words, n_vocab,
                        vocab_to_int, int_to_vocab, top_k=5)
                #torch.save(net.state_dict(),
                #           'checkpoint_pt/model-{}.pth'.format(iteration))


In [0]:
def predict(device, net, words, n_vocab, vocab_to_int, int_to_vocab, top_k=5):
    net.eval()
    words = ['How', 'about']

    state_h, state_c = net.zero_state(1)
    state_h = state_h.to(device)
    state_c = state_c.to(device)
    for w in words:
        ix = torch.tensor([[vocab_to_int[w]]]).to(device)
        output, (state_h, state_c) = net(ix, (state_h, state_c))

    _, top_ix = torch.topk(output[0], k=top_k)
    choices = top_ix.tolist()
    choice = np.random.choice(choices[0])

    words.append(int_to_vocab[choice])

    #Predict the next word in sequence 50 times and print final sequence
    for _ in range(50):
        ix = torch.tensor([[choice]]).to(device)
        output, (state_h, state_c) = net(ix, (state_h, state_c))

        _, top_ix = torch.topk(output[0], k=top_k)
        choices = top_ix.tolist()
        choice = np.random.choice(choices[0])
        words.append(int_to_vocab[choice])

    print(' '.join(words).encode('utf-8'))

In [0]:
if __name__ == '__main__':
    main()

Vocabulary size 33585
Epoch: 0/200 Iteration: 100 Loss: 7.716070175170898
Epoch: 0/200 Iteration: 200 Loss: 7.500507354736328
Epoch: 0/200 Iteration: 300 Loss: 7.29447603225708
Epoch: 0/200 Iteration: 400 Loss: 7.419355392456055
Epoch: 1/200 Iteration: 500 Loss: 6.854187965393066
b'How about an boat was to a good The Sperm Whale\xe2\x80\x99s of a few whale is a little the old whale, to his head of the whale was a good The ship to have seen that in this old whale and in their boat is an ship and to the Sperm of his'
Epoch: 1/200 Iteration: 600 Loss: 6.7079997062683105
Epoch: 1/200 Iteration: 700 Loss: 6.3766913414001465
Epoch: 1/200 Iteration: 800 Loss: 6.330689430236816
Epoch: 2/200 Iteration: 900 Loss: 6.361871719360352
Epoch: 2/200 Iteration: 1000 Loss: 6.093411922454834
b'How about the old burden, The ship would get the other boats in the old Manxman, of this same whale was to his hand and his head, and the same whale had a few of his hammock, in his own own own old acquaintances th