# ECE 157B/272B Homework 1 Code
Created by: Min Jian Yang and Matthew Dupree\
Parts of the code are from:
- The PyTorch tutorial: https://pytorch.org/tutorials/beginner/transformer_tutorial.html#load-and-batch-data
- The KDnuggets tutorial: https://www.kdnuggets.com/2020/07/pytorch-lstm-text-generation-tutorial.html

## Select a GPU Runtime
* If you would like to complete this assignment on your own computer, make sure that your computer has a GPU compatible with PyTorch hardware acceleration. Without a GPU, training the models will take a very long time.
* If you are using Google Colab, you will want to limit the amount of time you spend using a GPU runtime, to prevent running up against usage limits. If you are planning to train models this session, (i.e. beyond just vocabulary exploration,) you will need to select a GPU runtime by going to `Runtime` > `Change runtime type` > `Hardware accelerator` > `GPU`.
    * For each model you intend to train and evaluate, you will need 10-15 minutes of GPU runtime, so plan in advance.
    * If your computer goes to sleep or loses its internet connection for more than a minute or two, your Google Colab runtime will be recycled, erasing your model and training progress.


## Install Libraries
The below lab will check the versions of the PyTorch and Pandas libraries you have installed. We're expecting to see these versions of these packages, but later versions may also work fine.

In [None]:
%pip install torch==2.1.2 scikit-learn==1.4.0 tokenizers==0.15.0

You may be asked to restart the kernel after running the above cell. If so, please do so and then continue running the cells below. (This means specifically to "Restart" the _kernel_, not to get a new runtime, as that will remove the package versions installed.)

## Import Libraries

In [None]:
from typing import List, Callable, Tuple, NamedTuple, Union, Optional, Type
from pathlib import Path

import math
import numpy as np

import torch
from torch import Tensor

from tokenizers import Tokenizer, models, pre_tokenizers, decoders, trainers, AddedToken, normalizers

from torch.nn import Module
from torch.nn import RNN, LSTM, GRU, Embedding, Linear
from torch.nn import CrossEntropyLoss

from sklearn.model_selection import train_test_split

## Use GPU if available
The below line checks if a CUDA environment (typically a GPU accelerator) is available. If so, we'll use it, otherwise we'll use the CPU.

In [None]:
DEVICE = 'cuda:0' if torch.cuda.is_available() else 'cpu'
print(f'Using device: {DEVICE}')

Using device: cuda:0


## Parameters to tune for your experiments
The block below defines a "Parameters" class that you can use to configure settings for training runs of your various models. We will guide you through creating training functions that will make use of these parameters to allow you to create mostly-reusable code.

We'll also define some global parameters that will be used across all of your models.

In [None]:
# Test set size as a fraction of the total dataset size
TEST_SIZE: float = 0.05

# Random seed for reproducibility
RANDOM_STATE: int = 42

# Vocabulary size - number of tokens to use to break up the text (and learn)
VOCAB_SIZE_CAP: int = 19999

# Minimum token frequency - tokens that appear less than this number of times will be ignored
# (So single-use complex words will be ignored)
MIN_TOKEN_FREQUENCY: int = 2
# If no tokens appear at least this many times, the vocabulary size will be reduced
# below VOCAB_SIZE

In [None]:
class Parameters(NamedTuple):
    ### Dataset and training parameters
    # How many different chunks of text to train on at the same time
    BATCH_SIZE: int
    # Token target -- how many tokens to backpropagate through at most
    BPTT: int
    # Learning rate
    LR: float
    # Number of epochs to train for
    EPOCHS: int

    ### Model parameters
    # Number of dimensions in the embedding of each token.
    #   More dimensions --> more meaning, but more computation
    EMBEDDING_DIM: int
    # Number of dimensions in the hidden state in the recurrent model
    HIDDEN_DIM: int
    # Number of hidden layers in the recurrent model
    NUM_LAYERS: int

## Load Dataset

In [None]:
def load_data(path: Path) -> List[List[str]]:
    with open(path, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    def preprocess_line(line: str) -> str:
        # TODO: Do any preprocessing here
        #   e.g. removing punctuation, lowercasing, etc.
        # If a line is empty after preprocessing,
        # it will be removed from the dataset.
        return line

    # Each play is delineated by a line beginning '''
    # We can use this to split the data into plays,
    # then choose some of them to be training data
    # and some to be validation data
    plays = []
    current_play = []
    for line in lines:
        if line.startswith('\'\'\'') and current_play:
            plays.append(current_play)
            current_play = []

        processed_line = preprocess_line(line)
        if processed_line:
            current_play.append(processed_line)

    if current_play:
        # The last play won't be added in the loop
        plays.append(current_play)

    return plays


data_path = Path('shakespeare.txt')
data_list = load_data(data_path)

## Split Dataset

In [None]:
def split_data(all_plays: List[List[str]]) -> Tuple[List[str], List[str]]:
    # We use the scikit-learn train_test_split function
    # just like we did in previous assignments.
    # As we only provide one list, it splits that list
    # into two pieces, one as TEST_SIZE fraction of the total data
    # and the other as the rest.
    training_plays, validation_plays = train_test_split(
        all_plays,
        test_size=TEST_SIZE,
        random_state=RANDOM_STATE,
    )

    # We'll append the plays in each set together into one big list of lines
    training_list = [line for play in training_plays for line in play]
    validation_list = [line for play in validation_plays for line in play]

    return training_list, validation_list

training_list, validation_list = split_data(data_list)

## Prepare the dataset for training

### Build vocabulary

* Break the given text into "tokens" using the tokenizer provided by the `tokenizers` library.
* Build a vocabulary of tokens from the training set.

In [None]:
# We'll create a "WordPiece" tokenizer -- this is the kind used by BERT.
tokenizer = Tokenizer(models.WordPiece(unk_token="<unk>"))

# This sequence of normalizers is the same as the one used by BERT.
#  NFD: Normalization Form D (canonical decomposition)
#       This is fancy Unicode speak for transforming accented characters
#       into their base form and the accent separately.
#       e.g. é -> e + ´
#  Lowercase: Lowercase the text
#  StripAccents: Remove the accents from the text (e.g. ´ from é)
tokenizer.normalizer = normalizers.Sequence(
    [normalizers.NFD(), normalizers.Lowercase(), normalizers.StripAccents()]
)

# We'll use a whitespace pre-tokenizer.
# This will split the text on whitespace and punctuation first,
# then tokenize the words separately using the WordPiece model.
# That way, we can keep punctuation as separate tokens,
# which might be useful for the model.
tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()
# pre_tokenizers.WhitespaceSplit() would also work here,
# but it would keep punctuation attached to words,
# which might not be what we want.

# Finally, we'll use the WordPiece decoder
# to convert the tokens back into text.
tokenizer.decoder = decoders.WordPiece()

# We'll train the tokenizer on the training data.
# You can play with the VOCAB_SIZE and MIN_TOKEN_FREQUENCY
# parameters to see how they affect the tokenizer.
trainer = trainers.WordPieceTrainer(
    vocab_size=VOCAB_SIZE_CAP,
    min_frequency=MIN_TOKEN_FREQUENCY,
    special_tokens=["<unk>"],
    show_progress=True,
)

# This training process isn't so bad for Shakespeare,
# but it can take a while for larger datasets scraped from the web.
tokenizer.train_from_iterator(training_list, trainer=trainer)

# Finally, we add a newline token to the tokenizer
# so that the model can learn to predict line breaks
# (and thus line lengths) instead of smushing it all
# together into one giant line with character names
# and stage directions and everything.
print("Newline added:",tokenizer.add_tokens([AddedToken("\n", normalized=False)]))

Newline added: 1


In [None]:
# create tokenizer and vocabulary
vocab_size = tokenizer.get_vocab_size()
print(f"The vocabulary size is {vocab_size}.")

# # uncomment to see the long list of vocabulary and corresponding integer IDs
tokenizer.get_vocab()


The vocabulary size is 19864.


{'##apped': 13196,
 '##orters': 16429,
 'laments': 10870,
 'distaff': 12314,
 'mummy': 18850,
 'bags': 7885,
 '##by': 683,
 'tough': 8748,
 'bestowed': 7884,
 'sunburn': 14813,
 'babble': 15062,
 'brute': 16847,
 'shaft': 7410,
 'foundation': 11766,
 '##flies': 12044,
 'empir': 17856,
 'lectures': 15743,
 'strains': 10069,
 'ravish': 5573,
 '##uster': 14098,
 'cons': 1298,
 'angels': 4634,
 'ja': 1235,
 '##enders': 10573,
 'prec': 8494,
 'unreverend': 15851,
 '##eth': 930,
 'bruit': 14333,
 'deluge': 17785,
 'jerkin': 7961,
 '##ool': 2764,
 'falstaff': 479,
 'sund': 5629,
 'det': 3308,
 'jar': 9747,
 'lionel': 11858,
 'frugal': 16653,
 'hatred': 7059,
 'windy': 11735,
 'alenc': 2934,
 'thither': 2828,
 'salutes': 12827,
 'unve': 12672,
 'varlet': 5579,
 'beseems': 15579,
 'matter': 1139,
 '##sc': 980,
 'mows': 13932,
 'hive': 11296,
 'unloose': 12023,
 'generous': 8527,
 '##leterre': 12709,
 'enemies': 2611,
 '##andise': 9850,
 'mockeries': 19324,
 'multiplied': 19853,
 'cobbler': 1878

### Tokenize dataset
* Use the tokenizer and vocabulary to convert the dataset into a sequence of integers.

In [None]:
def data_process(
    data_list: List[str],
    tokenizer: Callable,
) -> Tensor:
    """Converts raw text into a flat Tensor.

    Args:
        data_list (List[str]): A list of raw text data.
        tokenizer (Callable): An English tokenizer.

    Returns:
        Tensor: A flat Tensor containing the numeric form of the flatten text.
    """
    data = [torch.tensor(tokenizer.encode(item).ids, dtype=torch.long)
            for item in data_list]
    return torch.cat(tuple(filter(lambda t: t.numel() > 0, data)))

In [None]:
# Here we convert the raw training and validation data
# into tensors of token IDs.
training_data = data_process(training_list, tokenizer)
validation_data = data_process(validation_list, tokenizer)
print(f'Quick look at the numericalized data {training_data}.')
print(f'The length of the training_data is {len(training_data)}.')
print(f'The length of the validation_data is {len(validation_data)}.')

Quick look at the numericalized data tensor([ 3197,   775,    18,  ...,    18,   543, 19863]).
The length of the training_data is 1338875.
The length of the validation_data is 79221.


## Batchify dataset
- Create batches out of the long list of number
- This allows for parallel computation during training
- Drawback being the relationship between the batches are not learned

See this link for more detail: https://pytorch.org/tutorials/beginner/transformer_tutorial.html#load-and-batch-data

In [None]:
def batchify(data: Tensor, bsz: int) -> Tensor:
    """Divides the data into bsz separate sequences, removing extra elements
    that wouldn't cleanly fit.

    Args:
        data: Tensor, shape [N]
        bsz: int, batch size

    Returns:
        Tensor of shape [N // bsz, bsz]
    """
    seq_len = data.size(0) // bsz
    data = data[:seq_len * bsz]
    data = data.view(bsz, seq_len).t().contiguous()
    return data.to(DEVICE)

## Generate input and target sequence
- Grab an input sequence with length `seq_len` from the batchify data starting at index `i`
- The target sequence also has the same length `seq_len` but the starting index is `i+1`

See this link for more detail: https://pytorch.org/tutorials/beginner/transformer_tutorial.html#functions-to-generate-input-and-target-sequence

In [None]:
def get_batch(source: Tensor, i: int, bptt: int) -> Tuple[Tensor, Tensor]:
    """
    Args:
        source: Tensor, shape [full_seq_len, batch_size]
        i: int

    Returns:
        tuple (data, target), where data has shape [seq_len, batch_size] and
        target has shape [seq_len * batch_size]
    """
    seq_len = min(bptt, len(source) - 1 - i)
    data = source[i:i+seq_len]
    target = source[i+1:i+1+seq_len].reshape(-1)
    return data, target

In [None]:
len(batchify(training_data, 64))

20919

In [None]:
# Example batch drawn from the data and decoded
# to show what the model will be predicting against
def print_example_batch():
    example_batch = batchify(training_data, 2)
    example_data, example_targets = get_batch(example_batch, 6485, 55)
    print(f"Example data: {example_data.t()}")
    print(f"Example targets: {example_targets}")
    print(
        f"=== Example data[0] decoded ===\n{tokenizer.decode(example_data[:, 0].tolist())}"
    )
    print(
        f"=== Example data[1] decoded ===\n{tokenizer.decode(example_data[:, 1].tolist())}"
    )
    print(
        f"=== Example targets decoded ===\n{tokenizer.decode(example_targets.tolist()[:20])}"
    )


print_example_batch()

print(tokenizer.decode(batchify(training_data, 2).to("cpu")[:, 0].tolist()[:100]))

Example data: tensor([[  350,    18,   175,   686,   169,  2235,  1356,   416,   115,  4006,
         19863,   350,    18,   234,  6987,     5,  2927,     5, 13726,  9542,
            19, 19863,  2868,    18,   102,  1047,   119,   222,   167,  2545,
          2293,   160,   835,    19, 19863,   350,    18,   102,   256,     5,
           130,   560,   414,  2863,   414,   835,     7, 19863,   350,    18,
           343,   177,    30, 15119,   256],
        [  104,   507,     7, 19863,  1633,    18,  2084,  5170,  1574,     5,
           239,   295,  1083,    22,  6366,     1, 19863,   173,    18,  1627,
           776,    99,  4068, 19863,  1581,    18,   180,    93,  1960, 19863,
          1581,    18,  1240,  1371,    18, 19863,  1581,    18,    30,     2,
           298,  6995, 19863,  1581,    18,   102,   160,  1096,     5, 19863,
          1581,    18,   373,  3344,     5]], device='cuda:0')
Example targets: tensor([   18,   507,   175,     7,   686, 19863,   169,  1633,  2235, 

## Create recurrent neural network model
A recurrent neural network with an embedding layer, one or more recurrent layers, and a linear layer.

We use this `RecurrentModel` class to describe the behavior of the embedding and output layers, which are the same no matter which recurrent unit we use. We then pass the recurrent unit as a parameter to the `RecurrentModel` constructor to describe the behavior of the recurrent layers. This lets us easily create models with different recurrent units to compare their performance.

In [None]:
class RecurrentModel(Module):
    """The recurrent neural network."""

    # __init__ is the constructor.
    # It sets up new instances of the class.
    def __init__(
        self,
        vocab_size: int,
        recurrent_module: Union[RNN, LSTM, GRU],
    ) -> None:
        """Initialize the recurrent neural network.

        Note: For this model architecure, if the number of layers is greater
        than 1, then embedding size and hidden size must be equal.

        Args:
            vocab_size (int): The number of vocabulary in the dataset.
            embedding_dim (int): The dimension of the embedding output.
            hidden_size (int): The size of the recurrent unit's hidden state.
            num_layers (int): The number of recurrent unit layers.

        Raises:
            ValueError: If the number of layers is greater than 1 and the
            embedding size is not equal to the hidden size.
        """
        # First, we call the superclass constructor
        # to give PyTorch a chance to set up the
        # parts of the object it needs to.
        super().__init__()

        # num_layers is how many layers the recurrent unit has.
        num_layers = recurrent_module.num_layers

        # hidden_size is the size of the hidden state of the recurrent unit.
        hidden_size = recurrent_module.hidden_size

        # embedding_dim is the size of the embedding output.
        embedding_dim = recurrent_module.input_size

        if (num_layers > 1) and (hidden_size != embedding_dim):
            raise ValueError(
                "When the number of layers is greater than 1, the embedding "
                "size and hidden size must be equal."
            )

        # The embedding layer turns the token IDs into embedding vectors.
        # This is a matrix of size [vocab_size, embedding_dim].
        # Each row of the matrix corresponds to one token in the vocabulary,
        #  providing a sort of "meaning in isolation" for that token.
        # The embedding layer is a trainable part of the model,
        #  so it will be updated during training.
        # The values start out initialized randomly.
        self.embedding = Embedding(
            num_embeddings=vocab_size, embedding_dim=embedding_dim
        )

        # The recurrent module is the main part of the model.
        # It takes in a sequence of embedding vectors
        #  and spreads information across the sequence.
        self.rnn = recurrent_module

        # The linear layer is the output layer of the model.
        # It takes in the final embedding vector from the
        #  recurrent module and outputs a vector of size [vocab_size].
        # This vector contains a score for each token in the vocabulary.
        # The token with the highest score is the one the model predicts
        #  as the next token, so the model will try to make the score
        #  for the correct token as high as possible.
        self.linear = Linear(hidden_size, vocab_size)

    # The forward method is called when we pass data through the model.
    def forward(
        self,
        x: Tensor,
        prev_state: Optional[Union[Tensor, Tuple[Tensor, Tensor]]],
    ) -> Tuple[Tensor, Union[Tensor, Tuple[Tensor, Tensor]]]:
        """Pass the a batch of data through the recurrent neural network model
        along with the previous state.

        Args:
            x (Tensor): The batch of data.
            prev_states (Tensor): The previous states of the recurrent units.

        Returns:
            Tuple[Tensor, Tensor]: The logits and hidden states.
        """

        # The embedding layer turns the token IDs into embedding vectors.
        # TODO: Embed the input tokens using the embedding layer.
        embedded = self.embedding(x)
        # The recurrent module takes in a sequence of embedding vectors
        #  and spreads information across the sequence via its hidden state.
        # TODO: Pass the embedded input tokens into the recurrent module along with the previous state.
        output, state = self.rnn(embedded, prev_state)
        # The linear layer takes in the final embedding vector from the
        #  recurrent module and outputs a vector of size [vocab_size].
        # TODO: Pass the final embedding vector into the linear layer to get the logits.
        logits=self.linear(output)
        # Return the logits and the final states.
        # TODO: Return the logits and the final states.
        return logits, state
    def detach_state_(self, states: Union[Tensor, Tuple[Tensor, Tensor]]) -> None:
        """Detach the state of the recurrent units.

        This function is used to make sure the model doesn't try to backpropagate
        through the entire history of the sequence, as our computers can't
        handle that much data.

        Args:
            states (Union[Tensor, Tuple[Tensor, Tensor]]): The states of the
            recurrent units.
        """
        if isinstance(states, Tensor):
            states.detach_()
        elif isinstance(states, tuple):
            for state in states:
                state.detach_()

## Define training loss
The loss function is the negative log likelihood loss since we are doing classification.

The classification problem is, based on the previous words, what is the next word.

Our target classes are all the words in the vocabulary.

In [None]:
criterion = CrossEntropyLoss()

## Define training and evaluation functions
- `train_epoch` trains the model for one epoch
- `evaluate` evaluates the model on the validation set
- `train_run` trains the model for all `EPOCHS` epochs and prints as it goes

In [None]:
def train_epoch(model: RecurrentModel, optimizer: torch.optim.Optimizer, train_data: Tensor, parameters:Parameters) -> Tuple[float,float]:
    """Training function for our recurrent model.

    Args:
        model (RecurrentModel): The model to train.
        train_data (Tensor): Batchified training data.
    """
    # Turn on training mode which enables training-specific
    # layer functionality (e.g. dropout)
    model.train()

    # Keep track of the loss as we go
    total_loss: float = 0.

    # Determine how many steps we'll need to cover the whole dataset
    num_batches: int = len(train_data) // parameters.BPTT

    # Keep track of the hidden state between batches.
    state = None

    # For each of our batches
    for i in range(0, train_data.size(0) - 1, parameters.BPTT):
        # Throw away gradients from previous step
        optimizer.zero_grad()

        # Get our input and target batches
        data, targets = get_batch(train_data, i, parameters.BPTT)

        # Evaluate the model to get logits and the new hidden state
        output, state = model(data, state)

        # Compute the loss
        loss = criterion(output.view(-1, vocab_size), targets)

        # Make sure the state does not carry gradients
        # between batches so we avoid a runaway memory leak
        model.detach_state_(state)

        # Compute gradients
        loss.backward()

        # Clip the gradients to avoid exploding gradients
        # (Ask us TAs if you're curious why this is necessary)
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)

        # Update the parameters
        optimizer.step()

        # Keep track of the loss
        total_loss += loss.item()

    # Return the average loss across batches
    cur_loss = total_loss / num_batches
    # and the perplexity
    ppl = math.exp(cur_loss)
    return cur_loss, ppl


In [None]:
def evaluate(model: RecurrentModel, val_data: Tensor, parameters: Parameters) -> Tuple[float, float]:
    """Evaluate the perplexity of the model on the validation data.

    Args:
        model (RecurrentModel): The model to evaluate.
        val_data (Tensor): Batchified validation data.

    Returns:
        float: The perplexity of the model on the validation data.
    """
    # Turn on evaluation mode which disables
    # training-specific functionality (e.g. dropout)
    model.eval()

    # Keep track of the loss as we go
    total_loss: float = 0.

    # Determine how many steps we'll need to cover the whole dataset
    num_batches: int = len(val_data) // parameters.BPTT

    # Keep track of the hidden state between batches.
    state = None
    with torch.no_grad(): # No need to track gradients here, since we're not training
        for i in range(0, val_data.size(0) - 1, parameters.BPTT):
            # Get our input and target batches
            data, targets = get_batch(val_data, i, parameters.BPTT)

            # Evaluate the model to get logits and the new hidden state
            output, state = model(data, state)

            # Compute the loss
            loss = criterion(output.view(-1, vocab_size), targets)
            total_loss += loss.item()

    # Return the average loss across batches
    cur_loss = total_loss / num_batches
    # and the perplexity
    ppl = math.exp(cur_loss)
    return cur_loss, ppl

In [None]:
def train_run(model: RecurrentModel, train_data: Tensor, validation_data: Tensor, parameters: Parameters, verbose: bool = True) ->None:
    """Train a model for NUM_EPOCHS epochs.

    Args:
        model (RecurrentModel): The model to train.
        train_data (Tensor): Batchified training data.
    """
    # Here we define an optimizer and scheduler to use for training.
    # An optimizer adjusts the parameters of the model based on the loss
    # and the gradients of the parameters with respect to the loss.

    # Adam is a popular optimizer that tends to work well in practice.
    # optimizer = torch.optim.Adam(model.parameters(), lr=parameters.LR) # Set LR to about 0.005 to start

    # AdamW corrects a minor flaw in the original Adam implementation
    # allowing it to converge slightly better.
    optimizer = torch.optim.AdamW(model.parameters(), lr=parameters.LR) # Set LR to about 0.01

    # SGD is the simple stochastic gradient descent algorithm,
    # implemented as LR * gradient.
    # Notably, the RNNs shown here need a strong learning rate to
    # even begin to move the loss, so we start with a high LR.
    # optimizer = torch.optim.SGD(model.parameters(), lr=parameters.LR) # Set LR to about 10.0 to start

    # The scheduler adjusts the learning rate over time.
    # Here we use a scheduler that decreases the learning rate
    # by a factor of 0.5 if the training loss doesn't decrease
    # for two epochs in a row.
    # This makes sure that the model can converge down into a
    # minimum once it finds one, instead of bouncing around.
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.5, patience=1, threshold=1e-3)

    # Train for NUM_EPOCHS epochs
    for epoch in range(1,parameters.EPOCHS+1,1):
        if verbose:
            # Print out an initial message stating the epoch number
            # and learning rate
            lrs = [group['lr'] for group in optimizer.param_groups]
            lr = lrs[0] if len(lrs) > 0 else -1
            # The end='' argument tells Python not to print a newline
            # so we can extend this row with more information
            # as we compute it.
            print(f'| epoch {epoch:3d} | lr {lr:03.3f} | ', end='')

        # Train for one epoch
        loss, ppl = train_epoch(model, optimizer, train_data, parameters)

        # Update the learning rate
        scheduler.step(loss)
        # Note that we can move this line after the validation step
        # if we want to update the learning rate based on the val
        # loss instead of the training loss.

        if verbose:
            # Update the printed row with the training loss and perplexity
            # so that the user knows the training step completed and that
            # we're now evaluating on the validation set.
            print(f'train loss {loss:5.2f} | train ppl {ppl:8.2f} | ', end='')

        # Evaluate on the validation set
        val_loss, val_ppl = evaluate(model, validation_data, parameters)

        if verbose:
            # Update the printed row with the validation loss and perplexity
            print(f'val loss {val_loss:5.2f} | val ppl {val_ppl:8.2f} |')
            # We no longer have the end='' argument, so this completes the line.

In [None]:
def create_model(
    vocab_size: int,
    parameters: Parameters,
    model_type: Callable[..., Union[RNN, LSTM, GRU]],
) -> RecurrentModel:
    """Create a recurrent model.

    Args:
        vocab_size (int): The number of vocabulary in the dataset.
        parameters (Parameters): The model parameters.
        model_type (Callable[..., Union[RNN, LSTM, GRU]]): The type of recurrent
            model to create.

    Returns:
        RecurrentModel: The recurrent model.
    """
    # Create the model
    # We've split the model into two pieces:
    # 1. The RecurrentModel class, which defines the overall model architecture
    #    including the embedding and output layer
    # 2. The RNN, LSTM, or GRU class, which defines the recurrent unit
    #    that will be used inside the model.
    # This allows us to easily swap out different recurrent units
    # without having to rewrite the whole model's code
    # across multiple notebooks or cells.
    return RecurrentModel(
        vocab_size=vocab_size,
        recurrent_module=model_type(
            input_size=parameters.EMBEDDING_DIM,
            hidden_size=parameters.HIDDEN_DIM,
            num_layers=parameters.NUM_LAYERS,
        ),
    ).to(DEVICE)

def train_model(
    model: RecurrentModel,
    unbatched_train_data: Tensor,
    unbatched_validation_data: Tensor,
    parameters: Parameters,
    verbose: bool = True,
) -> None:
    if verbose:
        print(f'The shape of the training data is {training_data.shape}.')
    batched_training_data = batchify(unbatched_train_data, parameters.BATCH_SIZE)
    if verbose:
        print(f'The shape of the batched training data is {batched_training_data.shape}.')
        print(f'The shape of the validation data is {validation_data.shape}.')
    batched_validation_data = batchify(unbatched_validation_data, parameters.BATCH_SIZE)
    if verbose:
        print(f'The shape of the batched validation data is {batched_validation_data.shape}.')

    train_run(model, batched_training_data, batched_validation_data, parameters, verbose)
    return model


def create_and_train_model(
    vocab_size: int,
    parameters: Parameters,
    model_type: Callable[..., Union[RNN, LSTM, GRU]],
    unbatched_train_data: Tensor,
    unbatched_validation_data: Tensor,
    verbose: bool = True,
) -> RecurrentModel:
    """Create and train a recurrent model.

    Args:
        vocab_size (int): The number of vocabulary in the dataset.
        parameters (Parameters): The model parameters.
        model_type (Callable[..., Union[RNN, LSTM, GRU]]): The type of recurrent
            model to create.
        train_data (Tensor): Unbatched training data.

    Returns:
        RecurrentModel: The trained recurrent model.
    """
    model = create_model(vocab_size, parameters, model_type)
    model.to(DEVICE)

    train_model(model, unbatched_train_data, unbatched_validation_data, parameters, verbose)
    return model



## Run model-building experiments

In [None]:
parameters1 = Parameters(
    BATCH_SIZE=16,
    BPTT=50,
    LR=0.01,
    EPOCHS=30,
    EMBEDDING_DIM=16,
    HIDDEN_DIM=16,
    NUM_LAYERS=3,
)
model1 = create_model(
    vocab_size=vocab_size,
    parameters=parameters1,
    model_type=GRU,
)
train_model(
    model=model1,
    unbatched_train_data=training_data,
    unbatched_validation_data=validation_data,
    parameters=parameters1,
    verbose=True,
)

The shape of the training data is torch.Size([1338875]).
The shape of the batched training data is torch.Size([83679, 16]).
The shape of the validation data is torch.Size([79221]).
The shape of the batched validation data is torch.Size([4951, 16]).
| epoch   1 | lr 0.010 | train loss  5.40 | train ppl   220.37 | val loss  5.83 | val ppl   341.90 |
| epoch   2 | lr 0.010 | train loss  4.66 | train ppl   105.44 | val loss  5.72 | val ppl   305.23 |
| epoch   3 | lr 0.010 | train loss  4.47 | train ppl    87.41 | val loss  5.64 | val ppl   282.11 |
| epoch   4 | lr 0.010 | train loss  4.38 | train ppl    79.89 | val loss  5.66 | val ppl   288.24 |
| epoch   5 | lr 0.010 | train loss  4.33 | train ppl    75.92 | val loss  5.66 | val ppl   286.63 |
| epoch   6 | lr 0.010 | train loss  4.30 | train ppl    73.49 | val loss  5.63 | val ppl   278.41 |
| epoch   7 | lr 0.010 | train loss  4.26 | train ppl    70.90 | val loss  5.61 | val ppl   272.99 |
| epoch   8 | lr 0.010 | train loss  4.23 | 

RecurrentModel(
  (embedding): Embedding(19864, 16)
  (rnn): GRU(16, 16, num_layers=3)
  (linear): Linear(in_features=16, out_features=19864, bias=True)
)

## Test model by generating text

In [None]:
def generate_text(
    tokenizer: Tokenizer,
    model: RecurrentModel,
    input_text: str,
    num_tokens_to_generate: int,
    by_prob: bool = False,
) -> str:
    """Input a text and use the model generate the next `n` words.

    Args:
        tokenizer (Tokenizer): The tokenizer used in training.
        model (RNNModel): A trained recurrent neural network model.
        text (str): A string of text to generate the text off of.
        num_words (int, optional): The number of words to generate.
            Defaults to 10.
        by_prob (bool, optional): If `True`, words are randomly generated
            based on probability. If `False`, words are generated based on
            highest probability. Defaults to False.

    Returns:
        str: Generated text string.
    """

    # Set the model to evaluation mode (no Dropout and the like)
    model.eval()

    with torch.no_grad(): # Disable gradient calculation because we won't use it

        # Process our input text
        data = data_process([input_text], tokenizer)
        # Turn it into a batch for the model
        x = data.reshape((-1, 1)).to(DEVICE)

        state = None # Start with a blank state
        output_tokens = [] # Keep track of the generated tokens
        for _ in range(num_tokens_to_generate):
            # Get the model's predicted next token
            y_pred, state = model(x, state)
            # We only care about the last token's predictions
            last_word_logits = y_pred[-1][0]

            # If we're generating by probability, we'll randomly choose a word
            if by_prob:
                # Normalize the logits so they form a probability distribution
                p = (
                    torch.nn.functional.softmax(last_word_logits, dim=0)
                    .cpu()
                    .numpy()
                )
                # Randomly choose the next word based on the probability
                word_index = np.random.choice(len(last_word_logits), p=p)
            else:
                # Otherwise, we'll just choose the word with the highest probability
                word_index = torch.argmax(last_word_logits)

            # Add the generated word to our output
            output_tokens.append(word_index)

            # Add the generated word as the next input to the model
            x = torch.tensor([word_index]).reshape(1, 1).to(DEVICE)

    # Decode the generated tokens and return the result
    return tokenizer.decode(output_tokens)

In [None]:
print(generate_text(
    tokenizer,
    model1,
    "To be or not to be",
    10,
    by_prob=False,
))


a senator, and others 
 king henry v :
