In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
import requests
import pickle
from sklearn.model_selection import train_test_split 
import os




## LSTMLanguageModel Class
This is where I define the main model I am going to use to train and generate text. I need to use an LSTM (Long Short-Term Memory) model because it's great for handling sequences like text. LSTM can remember the context of previous words and use it to predict the next word in a sequence, which is what I need for text generation. The class has an embedding layer to convert words into numbers, an LSTM layer to process the sequences, and a final layer to predict the next word in the sequence.

### Forward Pass in LSTM
The forward method describes what happens when data goes through the model. First, I convert the words (tokens) into embeddings. Then, I pass them through the LSTM to get the model’s predictions. Finally, I use a fully connected layer to output the predicted word. In this method I check how the model makes predictions.

### Initializing the Hidden States
LSTMs need to remember things from the past, so I need to initialize the hidden state and the cell state (kind of like memory) at the start. These hidden states will get updated as the model processes the data. This function just sets the initial hidden states to zeros.

In [2]:

class LSTMLanguageModel(nn.Module):
    def __init__(self, vocab_size, emb_dim, hid_dim, num_layers, dropout_rate):
        super(LSTMLanguageModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, emb_dim)  # embedding layer
        self.lstm = nn.LSTM(emb_dim, hid_dim, num_layers, dropout=dropout_rate, batch_first=True)  # lstm layer
        self.fc = nn.Linear(hid_dim, vocab_size)  # output layer
        
        self.dropout    = nn.Dropout(dropout_rate)
        self.hidden_dim = hid_dim  # hidden layer size
        self.num_layers = num_layers  # num of layers
        
    def forward(self, x, hidden):
        embed = self.embedding(x)  # token to embedding
        out, hidden = self.lstm(embed, hidden)  # lstm forward pass
        out = self.fc(out)  # final output layer
        return out, hidden

    def init_hidden(self, batch_size, device):
        return (
            torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(device),  # hidden state init
            torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(device)   # cell state init
        )


## Loading Harry Potter Data
I am loading the Harry Potter dataset. I using load_dataset from the Hugging Face library

In [3]:
def load_harry_potter_data():
    from datasets import load_dataset
    dataset = load_dataset("elricwan/HarryPotter")  # load dataset
    all_content = [item["content"] for item in dataset["train"]]  # get content from dataset
    return train_test_split(all_content, test_size=0.2, random_state=42)  # split data into train/test


# Loading Pride and Prejudice Data
This is a similar function to load the "Pride and Prejudice" dataset, but this time I grab the text from a URL. I split it into lines and use the train_test_split function to split it into training and validation datasets. I need both datasets to train and evaluate the model.

In [4]:
def load_pride_and_prejudice_data():
    url = "https://www.gutenberg.org/files/1342/1342-0.txt"  # url for the text
    response = requests.get(url)  # get data
    text = response.text  # read content
    return train_test_split(text.split("\n"), test_size=0.2, random_state=42)  # split into train/test


## Tokenizing and Building Vocabulary
I use the get_tokenizer from TorchText to split text into words (tokens). After that, I build a vocabulary from the tokens, so the model knows what words are in the dataset. I also define special tokens like <unk> for unknown words and <eos> for the end of a sentence. In this step I need a way to convert words into numbers that the model can understand.

In [5]:
def tokenize_and_build_vocab(train_data):
    tokenizer = get_tokenizer("basic_english")  # use basic english tokenizer
    tokenized_data = [tokenizer(text) for text in train_data]  # tokenize data
    vocab = build_vocab_from_iterator(tokenized_data, min_freq=5, specials=["<unk>", "<eos>"])  # build vocab todo: change min freq
    vocab.set_default_index(vocab["<unk>"])  # set default to <unk>
    return vocab, tokenizer


## Converting Data to Batches

Now that I have tokenized the text and built the vocabulary, I need to turn the data into batches for training. This function takes the text, tokenizes it, and converts it into a tensor that the model can process. I also make sure that the batch size is consistent, and the data is in the correct shape.

In [6]:
def data_to_batches(data, vocab, tokenizer, batch_size):
    tokenized_data = [torch.tensor([vocab[token] for token in tokenizer(text)] + [vocab["<eos>"]]) for text in data]  # tokenize and convert to indices
    data_tensor = torch.cat(tokenized_data)  # concatenate all tokens
    num_batches = data_tensor.size(0) // batch_size  # calculate batches
    data_tensor = data_tensor[:num_batches * batch_size]  # ensure even batch size
    data_tensor = data_tensor.view(batch_size, -1)  # reshape into batch format
    return data_tensor


## Training the Model
This function is responsible for training the model. It loops through the data, computes the loss, and adjusts the weights of the model to minimize the loss. I use gradient clipping to avoid exploding gradients, which can happen during training when the gradients become too large.

In [7]:
def train(model, data, optimizer, criterion, batch_size, seq_len, clip, device):
    model.train()  # set model to train mode
    epoch_loss = 0  # track loss
    hidden = model.init_hidden(batch_size, device)  # init hidden state
    num_batches = data.size(1) // seq_len  # calculate batches
    for idx in range(0, data.size(1) - seq_len, seq_len):  # loop through data
        src = data[:, idx:idx + seq_len].to(device)  # get input sequence
        target = data[:, idx + 1:idx + seq_len + 1].to(device)  # target sequence
        optimizer.zero_grad()  # reset gradients
        output, hidden = model(src, hidden)  # model prediction
        hidden = tuple(h.detach() for h in hidden)  # detach hidden state
        output = output.reshape(-1, output.size(-1))  # reshape output for loss
        target = target.reshape(-1)  # reshape target for loss
        loss = criterion(output, target)  # calculate loss
        loss.backward()  # backpropagate
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)  # gradient clipping
        optimizer.step()  # update weights
        epoch_loss += loss.item()  # accumulate loss
    return epoch_loss / num_batches  # return avg loss


## Evaluating the Model
After training the model, I need to check how well it's doing. This function evaluates the model using the validation data and calculates the loss. I don’t update the model weights during evaluation, so I set the model to evaluation mode and don’t track gradients.

In [8]:
def evaluate(model, data, criterion, batch_size, seq_len, device):
    model.eval()  # set model to eval mode
    epoch_loss = 0  # track loss
    hidden = model.init_hidden(batch_size, device)  # init hidden state
    num_batches = data.size(1) // seq_len  # calculate batches
    with torch.no_grad():  # no gradients during eval
        for idx in range(0, data.size(1) - seq_len, seq_len):  # loop through data
            src = data[:, idx:idx + seq_len].to(device)  # get input sequence
            target = data[:, idx + 1:idx + seq_len + 1].to(device)  # target sequence
            output, hidden = model(src, hidden)  # model prediction
            hidden = tuple(h.detach() for h in hidden)  # detach hidden state
            output = output.reshape(-1, output.size(-1))  # reshape output
            target = target.reshape(-1)  # reshape target
            loss = criterion(output, target)  # calculate loss
            epoch_loss += loss.item()  # accumulate loss
    return epoch_loss / num_batches  # return avg loss


## Traing and saving model

This function handles the entire training process for a given model. First, it loads the dataset and prepares the training and validation data. Then, it initializes the model, optimizer, and loss function. The function trains the model for a set number of epochs, calculating the training and validation losses at each step to monitor the performance. After each epoch, it saves the model if the validation loss improves. This function combines all the steps needed to train and save the model, making the training process automated and efficient.


In [9]:
def train_and_save_model(data_loader, model_filename, vocab_filename, tokenizer_filename):
    train_data, valid_data = data_loader()  # load data
    print(f"Training data size: {len(train_data)}")  # print train data size
    print(f"Validation data size: {len(valid_data)}")  # print validation data size

    vocab, tokenizer = tokenize_and_build_vocab(train_data)  # tokenizing and vocab building

    batch_size = 32  # batch size
    train_batches = data_to_batches(train_data, vocab, tokenizer, batch_size)  # convert data to batches
    valid_batches = data_to_batches(valid_data, vocab, tokenizer, batch_size)  # convert validation data to batches

    # torch.set_num_threads(os.cpu_count())  # Set to all available cores
    device = torch.device("mps")

    vocab_size = len(vocab)
    emb_dim = 1024                # 400 in the paper
    hid_dim = 1024                # 1150 in the paper
    num_layers = 2                # 3 in the paper
    dropout_rate = 0.65              
    lr = 1e-3   
    clip = 1  # gradient clipping
    n_epochs = 5  # number of epochs
    seq_len = 32  # sequence length

    # vocab_size = len(vocab)
    # emb_dim = 256
    # hid_dim = 256
    # num_layers = 1
    # dropout_rate = 0.3
    # lr = 1e-3
    # n_epochs = 1
    # seq_len = 25
    # clip = 0.25
    
    print(f"Vocabulary Size: {len(vocab)}")
    model = LSTMLanguageModel(vocab_size, emb_dim, hid_dim, num_layers, dropout_rate).to(device)  # initialize model
    optimizer  = optim.Adam(model.parameters(), lr=lr) # Adam optimizer
    criterion = nn.CrossEntropyLoss()  # loss function


    for epoch in range(n_epochs):  # loop over epochs
        train_loss = train(model, train_batches, optimizer, criterion, batch_size, seq_len, clip, device)  # training
        valid_loss = evaluate(model, valid_batches, criterion, batch_size, seq_len, device)  # evaluation
        print(f"Epoch {epoch + 1}/{n_epochs}")  # epoch info
        print(f"\tTrain Perplexity: {torch.exp(torch.tensor(train_loss)):.3f}")  # print train perplexity
        print(f"\tValid Perplexity: {torch.exp(torch.tensor(valid_loss)):.3f}")  # print validation perplexity


     # Save model and vocab using torch.save
    torch.save({"model_state": model.state_dict(), "vocab": vocab}, model_filename)
    
    # Save tokenizer and vocab using pickle
    with open(vocab_filename, 'wb') as f:
        pickle.dump(vocab, f)
    
    with open(tokenizer_filename, 'wb') as f:
        pickle.dump(tokenizer, f)

    print(f"Model, vocab, and tokenizer saved to {model_filename}, {vocab_filename}, and {tokenizer_filename}")
    return model, valid_batches, criterion, batch_size, seq_len, vocab, tokenizer  # return model and data


## Generating Text
This function generates text given a prompt. I pass the prompt through the model and sample the next words one by one. The temperature parameter controls how random the predictions are. A higher temperature leads to more randomness, and a lower temperature makes the model more confident in its predictions.

In [10]:
def generate(prompt, max_seq_len, temperature, model, tokenizer, vocab, device, seed=None):
    if seed is not None:
        torch.manual_seed(seed)  # set random seed if given
    
    model.eval()  # set model to eval mode
    tokens = tokenizer(prompt)  # tokenize the prompt
    indices = [vocab[t] for t in tokens]  # convert tokens to indices
    batch_size = 1  # batch size is 1 for generation
    hidden = model.init_hidden(batch_size, device)  # initialize hidden state
    
    with torch.no_grad():  # no gradients during generation
        for i in range(max_seq_len):  # generate for max_seq_len
            src = torch.LongTensor([indices]).to(device)  # input sequence
            prediction, hidden = model(src, hidden)  # model prediction
            probs = torch.softmax(prediction[:, -1] / temperature, dim=-1)  # apply temperature
            prediction = torch.multinomial(probs, num_samples=1).item()  # sample from probabilities

            while prediction == vocab['<unk>']:  # avoid unknown tokens
                prediction = torch.multinomial(probs, num_samples=1).item()

            if prediction == vocab['<eos>']:  # stop if end of sentence token
                break

            indices.append(prediction)  # append predicted word index

    itos = vocab.get_itos()  # get index-to-token mapping
    tokens = [itos[i] for i in indices]  # convert indices back to tokens
    return tokens  # return generated tokens


##  Training
This chunk trains two separate models: one for the Harry Potter dataset and one for Pride and Prejudice. First, I load and train the Harry Potter model using train_and_save_model(), saving it to harry_potter_lstm.pt. Then, I repeat the same process for the Pride and Prejudice model, saving it to pride_prejudice_lstm.pt. The function also returns the necessary data, loss function, batch size, sequence length, vocabulary, and tokenizer for both models. This train and save both models for later use.

In [11]:
print("Training Harry Potter Model")
hp_model, hp_valid_data, hp_criterion, hp_batch_size, hp_seq_len, hp_vocab, hp_tokenizer = train_and_save_model(
    load_harry_potter_data, "harry_potter_lstm.pt", "harry_potter_vocab.pkl", "harry_potter_tokenizer.pkl"
)

print("Training Pride and Prejudice Model")
pp_model, pp_valid_data, pp_criterion, pp_batch_size, pp_seq_len, pp_vocab, pp_tokenizer = train_and_save_model(
    load_pride_and_prejudice_data, "pride_prejudice_lstm.pt", "pride_prejudice_vocab.pkl", "pride_prejudice_tokenizer.pkl"
)


Training Harry Potter Model


  from .autonotebook import tqdm as notebook_tqdm


Training data size: 6
Validation data size: 2
Vocabulary Size: 13034
Epoch 1/5
	Train Perplexity: 115.891
	Valid Perplexity: 74.714
Epoch 2/5
	Train Perplexity: 59.298
	Valid Perplexity: 58.712
Epoch 3/5
	Train Perplexity: 43.454
	Valid Perplexity: 52.555
Epoch 4/5
	Train Perplexity: 34.643
	Valid Perplexity: 49.894
Epoch 5/5
	Train Perplexity: 29.026
	Valid Perplexity: 48.190
Model, vocab, and tokenizer saved to harry_potter_lstm.pt, harry_potter_vocab.pkl, and harry_potter_tokenizer.pkl
Training Pride and Prejudice Model
Training data size: 11627
Validation data size: 2907
Vocabulary Size: 1912
Epoch 1/5
	Train Perplexity: 158.256
	Valid Perplexity: 96.283
Epoch 2/5
	Train Perplexity: 81.226
	Valid Perplexity: 72.676
Epoch 3/5
	Train Perplexity: 63.721
	Valid Perplexity: 67.398
Epoch 4/5
	Train Perplexity: 54.038
	Valid Perplexity: 64.144
Epoch 5/5
	Train Perplexity: 46.867
	Valid Perplexity: 63.702
Model, vocab, and tokenizer saved to pride_prejudice_lstm.pt, pride_prejudice_vocab.p

## Testing
Testing the text generation capability of both the Harry Potter and Pride and Prejudice models. For each model, I use a prompt and generate text with varying temperatures (from 0.5 to 1.0) to explore how temperature affects the creativity and randomness of the output. I evaluate how well each model can generate text that continues from the given prompt, with different temperatures providing insights into the model's ability to produce diverse or more predictable results.

In [12]:
# Test the text generation with a prompt for Harry Potter model
prompt_hp = 'Harry Potter is '
max_seq_len = 30
seed = 0
temperatures = [0.5, 0.7, 0.75, 0.8, 1.0]
print("Harry Potter Model Text Generation:")
for temperature in temperatures:
    generation_hp = generate(prompt_hp, max_seq_len, temperature, hp_model, hp_tokenizer, hp_vocab, "mps", seed)
    print(f"Temperature {temperature}\n{' '.join(generation_hp)}\n")

# Test the text generation with a prompt for Pride and Prejudice model
prompt_pp = 'Charlotte Lucas and her father '
print("Pride and Prejudice Model Text Generation:")
for temperature in temperatures:
    generation_pp = generate(prompt_pp, max_seq_len, temperature, pp_model, pp_tokenizer, pp_vocab, "mps", seed)
    print(f"Temperature {temperature}\n{' '.join(generation_pp)}\n")


Harry Potter Model Text Generation:
Temperature 0.5
harry potter is merciful , and i hope you’re biding a hundred thousand things , but i have already presented my forces to divine support . and so i shall be sure that

Temperature 0.7
harry potter is really good ! ” “i didn’t mean to change , ” said harry , “but i don’t understand . . . . ” and so soon it was , though

Temperature 0.75
harry potter is really good ! ” “i beg your pardon , potter , ” said dumbledore , but he did not care what sirius was saying . harry glanced in the mirror

Temperature 0.8
harry potter is really good ! ” “i beg your pardon , potter , ” said dumbledore , but dumbledore smiled . “the accused thing he and me . . . ” “it

Temperature 1.0
harry potter is really good ! ” “but you’re not supposed to be hiding out for that , it seems four years later —” “but i am afraid i did in , though

Pride and Prejudice Model Text Generation:
Temperature 0.5
charlotte lucas and her father were at

Temperature 0.7
charlotte