# Text Generation Using LSTM

We'll be using TorchText, which is PyTorch text library providing advanced functionalities on language processing.

The dataset we'll be using is AG News. AG News (AG’s News Corpus) is a subdataset of AG's corpus of news articles constructed by assembling titles and description fields of articles from the 4 largest classes (“World”, “Sports”, “Business”, “Sci/Tech”) of AG’s Corpus. The AG News contains 30,000 training and 1,900 test samples per class.

In [12]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import io
import re
from tqdm.notebook import trange, tqdm

import torch
import torch.nn as nn
from torch import optim
from torch.utils.data import DataLoader
from torch.utils.data.dataset import Dataset
import torch.nn.functional as F
from torch.distributions import Categorical

from torchtext.data.functional import generate_sp_model
from torchtext.datasets import WikiText2, EnWik9, AG_NEWS
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
import torchtext.transforms as T
from torch.hub import load_state_dict_from_url
from torchtext.data.functional import sentencepiece_tokenizer, load_sp_model
from AGNewsDataset import AGNews

Define the hyperparameters

In [2]:
learning_rate = 1e-4  # Learning rate for the optimizer
nepochs = 20  # Number of training epochs
batch_size = 32  # Batch size for training
max_len = 64  # Maximum length of input sequences

Define the AGNews Dataset class, that performs some filtering and error correction on the AGNews dataset CSV file

In [13]:
# Create AGNews dataset instances for training and testing
dataset_train = AGNews("../../data/train.csv")
dataset_test = AGNews("../../data/test.csv")

## Tokenization

We are going to train a TorchText sentence-piece tokenizer model. The model is trained to segment sentences with the least number of tokens possible, in order to build a vocabulary of arbitrary size.
This is a more advanced technique than segmenting words using the space character as separator.

In [None]:
with open("../../data/train.csv") as f:
    with open(("./vocab/data.txt"), "w") as f2:
        for i, line in enumerate(f):
            text_only = "".join(line.split(",")[1:])
            filtered = re.sub(r'\\|\\n|;', ' ', text_only.replace('"', ' ').replace('\n', ' ')) # remove newline characters
            f2.write(filtered.lower() + "\n")

generate_sp_model(("./vocab/data.txt"), 
                  vocab_size=20000, model_prefix='./vocab/spm_ag_news')

Now we can define a generator function `yield_tokens` to yield tokens from the data iterator, and then build the vocabulary out of these tokens.

In [15]:
# Define a function to yield tokens from a file
def yield_tokens(file_path):
    # Open the file in UTF-8 encoding
    with io.open(file_path, encoding='utf-8') as f:
        # Iterate over each line in the file
        for line in f:
            # Yield the token split by tab character
            yield [line.split("\t")[0]]

# Build vocabulary from the iterator of tokens
# We will also add "special" tokens that we'll use to signal something to our model
# <pad> is a padding token that is added to the end of a sentence to ensure 
# the length of all sequences in a batch is the same
# <sos> signals the "Start-Of-Sentence" aka the start of the sequence
# <eos> signals the "End-Of-Sentence" aka the end of the sequence
# <unk> "unknown" token is used if a token is not contained in the vocab
vocab = build_vocab_from_iterator(
    yield_tokens("./vocab/spm_ag_news.vocab"),
    # Define special tokens with special_first=True to place them at the beginning of the vocabulary
    specials=['<pad>', '<sos>', '<eos>', '<unk>'],
    special_first=True
)

# Set default index for out-of-vocabulary tokens
vocab.set_default_index(vocab['<unk>'])

Define a text transformation pipeline using TorchText Sequential Transform

In [16]:
# Define a transformation pipeline for training data
train_transform = T.Sequential(
    # Tokenize sentences using pre-existing SentencePiece tokenizer model
    T.SentencePieceTokenizer("./vocab/spm_ag_news.model"),
    # Convert tokens to indices based on given vocabulary
    T.VocabTransform(vocab=vocab),
    # Add <sos> token at the beginning of each sentence (index 1 in vocabulary)
    T.AddToken(1, begin=True),
    # Crop the sentence if it is longer than the max length
    T.Truncate(max_seq_len=max_len),
    # Add <eos> token at the end of each sentence (index 2 in vocabulary)
    T.AddToken(2, begin=False),
    # Convert the list of lists to a tensor and pad sentences with the <pad> token if shorter than max length
    T.ToTensor(padding_value=0)
)

# Define a transformation pipeline for generation (without truncation)
gen_transform = T.Sequential(
    # Tokenize sentences using pre-existing SentencePiece tokenizer model
    T.SentencePieceTokenizer("./vocab/spm_ag_news.model"),
    # Convert tokens to indices based on given vocabulary
    T.VocabTransform(vocab=vocab),
    # Add <sos> token at the beginning of each sentence (index 1 in vocabulary)
    T.AddToken(1, begin=True),
    # Convert the list of lists to a tensor and pad sentences with the <pad> token if shorter than max length
    T.ToTensor(padding_value=0)
)

Create data loaders for training and testing datasets

In [17]:
# DataLoader for training dataset
data_loader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True, num_workers=8, drop_last=True)
# DataLoader for testing dataset
data_loader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=True, num_workers=8)

## LSTM Model Definition

In [22]:
class LSTM(nn.Module):
    def __init__(self, num_emb, num_layers=1, emb_size=128, hidden_size=128):
        super(LSTM, self).__init__()
        
        self.embedding = nn.Embedding(num_emb, emb_size)

        self.mlp_emb = nn.Sequential(nn.Linear(emb_size, emb_size),
                                     nn.LayerNorm(emb_size),
                                     nn.ELU(),
                                     nn.Linear(emb_size, emb_size))
        
        self.lstm = nn.LSTM(input_size=emb_size, hidden_size=hidden_size, 
                            num_layers=num_layers, batch_first=True, dropout=0.25)

        self.mlp_out = nn.Sequential(nn.Linear(hidden_size, hidden_size//2),
                                     nn.LayerNorm(hidden_size//2),
                                     nn.ELU(),
                                     nn.Dropout(0.5),
                                     nn.Linear(hidden_size//2, num_emb))
        
    def forward(self, input_seq, hidden_in, mem_in):
        input_embs = self.embedding(input_seq)
        input_embs = self.mlp_emb(input_embs)
                
        output, (hidden_out, mem_out) = self.lstm(input_embs, (hidden_in, mem_in))
                
        return self.mlp_out(output), hidden_out, mem_out

## Model and Optimizer Initialization

In [24]:
# Check if GPU is available, set device accordingly
device = torch.device(1 if torch.cuda.is_available() else 'cpu')

# Define embedding size and hidden size for the LSTM model
emb_size = 256
hidden_size = 1024

# Define number of layers for the LSTM model
num_layers = 2

# Create LSTM model instance
lstm_generator = LSTM(num_emb=len(vocab), num_layers=num_layers, 
                      emb_size=emb_size, hidden_size=hidden_size).to(device)

# Initialize optimizer with Adam optimizer
optimizer = optim.Adam(lstm_generator.parameters(), lr=learning_rate, weight_decay=1e-4)

# Define the loss function (Cross Entropy Loss)
loss_fn = nn.CrossEntropyLoss()

# List to store training loss during each epoch
training_loss_logger = []

# List to store entropy during training (for monitoring)
entropy_logger = []

In [25]:
# Let's see how many Parameters our Model has!
num_model_params = 0
for param in lstm_generator.parameters():
    num_model_params += param.flatten().shape[0]

print("-This Model Has %d (Approximately %d Million) Parameters!" % (num_model_params, num_model_params//1e6))

-This Model Has 29688099 (Approximately 29 Million) Parameters!


## Training

In [None]:
# Training loop
for epoch in trange(0, nepochs, leave=False, desc="Epoch"):
    # Set LSTM generator model to training mode
    lstm_generator.train()
    steps = 0
    # Iterate over batches in training data loader
    for text in tqdm(data_loader_train, desc="Training", leave=False):
        # Transform text tokens using training transform and move to device
        text_tokens = train_transform(list(text)).to(device)
        bs = text_tokens.shape[0]
        
        input_text = text_tokens[:, 0:-1]
        output_text = text_tokens[:, 1:]
        
        # Initialize the memory buffers
        hidden = torch.zeros(num_layers, bs, hidden_size, device=device)
        memory = torch.zeros(num_layers, bs, hidden_size, device=device)
        
        # Forward pass through the LSTM generator
        pred, hidden, memory = lstm_generator(input_text, hidden, memory)

        # Calculate loss
        loss = loss_fn(pred.transpose(1, 2), output_text)
        
        # Zero gradients, perform backward pass, and update weights
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Log training loss
        training_loss_logger.append(loss.item())
        
        # Log entropy during training (for monitoring)
        with torch.no_grad():
            dist = Categorical(logits=pred)
            entropy_logger.append(dist.entropy().mean().item())

## Plot Metrics

In [None]:
_ = plt.figure(figsize=(10, 5))
_ = plt.plot(training_loss_logger[100:])
_ = plt.title("Training Loss")

In [None]:
_ = plt.figure(figsize=(10, 5))
_ = plt.plot(entropy_logger[1000:])
_ = plt.title("Distribution Entropy")