In [1]:
import os
from typing import Dict, List, Tuple, Union, Any
import sys

import lightning as L
import matplotlib.pyplot as plt
import numpy as np
import tensorboard
import torch
from lightning.pytorch.callbacks import (
    LearningRateMonitor,
    ModelCheckpoint,
    ModelSummary,
)
from rich import print
from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader, Dataset

%load_ext autoreload
%autoreload 2
%load_ext rich
%load_ext tensorboard

# Set random seed for reproducibility
seed = 42
L.seed_everything(seed)

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

# Set data directory
DATA_DIR = os.path.join(os.getcwd(), "data")

Seed set to 42


## Load and preprocess the dataset


In [2]:
class ShakespeareDataset(Dataset):
    """
    A custom Dataset class for Shakespeare text data.

    Attributes:
        text (str): The entire text from the file.
        chars (List[str]): A sorted list of unique characters in the text.
        vocab_size (int): The size of the vocabulary.
        s_to_i (Dict[str, int]): A dictionary mapping characters to indices.
        i_to_s (Dict[int, str]): A dictionary mapping indices to characters.
        seq_len (int): The length of the sequences.
    """

    def __init__(self, file_path: str, seq_len: int):
        """
        Initializes the ShakespeareDataset with the given file path and sequence length.

        Args:
            file_path (str): Path to the text file.
            seq_len (int): Length of the sequences.
        """
        with open(file_path, encoding="utf-8") as f:
            text = f.read()

        self.text = text
        self.chars = sorted(list(set(text)))
        self.vocab_size = len(self.chars)
        self.s_to_i = {s: i for i, s in enumerate(self.chars)}
        self.i_to_s = {i: s for i, s in enumerate(self.chars)}
        self.seq_len = seq_len

    def __len__(self) -> int:
        """
        Returns the length of the dataset.

        Returns:
            int: The number of sequences in the dataset.
        """
        return len(self.text) - self.seq_len

    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Returns a tuple of input and target sequences for the given index.

        Args:
            idx (int): Index of the sequence.

        Returns:
            Tuple[torch.Tensor, torch.Tensor]: Input and target sequences as tensors.
        """
        return (
            torch.tensor(
                [self.s_to_i[c] for c in self.text[idx : idx + self.seq_len]],
                dtype=torch.long,
            ),
            torch.tensor(
                [self.s_to_i[c] for c in self.text[idx + 1 : idx + self.seq_len + 1]],
                dtype=torch.long,
            ),
        )

    def decode(self, x: Union[torch.Tensor, List[int]]) -> str:
        """
        Decodes a tensor or list of indices back to a string.

        Args:
            x (Union[torch.Tensor, List[int]]): Tensor or list of indices.

        Returns:
            str: Decoded string.
        """
        if isinstance(x, torch.Tensor):
            x = x.tolist()

        return "".join([self.i_to_s[i] for i in x])

    def encode(self, s: str) -> torch.Tensor:
        """
        Encodes a string into a tensor of indices.

        Args:
            s (str): Input string.

        Returns:
            torch.Tensor: Encoded tensor of indices.
        """
        encoded = [self.s_to_i[c] for c in s]
        return torch.tensor(encoded, dtype=torch.long).to(device)

    def collate_fn(
        self, batch: List[Tuple[torch.Tensor, torch.Tensor]]
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Collates a batch of sequences into padded tensors.

        Args:
            batch (List[Tuple[torch.Tensor, torch.Tensor]]): Batch of input and target sequences.

        Returns:
            Tuple[torch.Tensor, torch.Tensor]: Padded input and target sequences.
        """
        x, y = zip(*batch)
        x = torch.nn.utils.rnn.pad_sequence(
            x, batch_first=True, padding_value=self.s_to_i[" "]
        )
        y = torch.nn.utils.rnn.pad_sequence(
            y, batch_first=True, padding_value=self.s_to_i[" "]
        )
        return x, y

    def get_vocab_size(self) -> int:
        """
        Returns the size of the vocabulary.

        Returns:
            int: Vocabulary size.
        """
        return self.vocab_size

## Build the RNN/LSTM model


In [3]:
class ShakespeareModel(nn.Module):
    def __init__(
        self, vocab_size: int, embedding_dim: int, hidden_dim: int, num_layers: int
    ):
        """
        Initializes the ShakespeareModel.

        Args:
            vocab_size (int): Size of the vocabulary.
            embedding_dim (int): Dimension of the embedding layer.
            hidden_dim (int): Dimension of the hidden layer.
            num_layers (int): Number of LSTM layers.
        """
        super(ShakespeareModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(
            embedding_dim, hidden_dim, num_layers, batch_first=True, dropout=0.1
        )
        self.fc = nn.Linear(hidden_dim, vocab_size)

        self.init_weights()

    def init_weights(self) -> None:
        """
        Initializes the weights of the model.
        """
        for name, param in self.named_parameters():
            if "weight" in name:
                nn.init.xavier_normal_(param)
            else:
                nn.init.zeros_(param)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass of the model.

        Args:
            x (torch.Tensor): Input tensor.

        Returns:
            torch.Tensor: Output tensor.
        """
        x = self.embedding(x)
        x, hc = self.lstm(x)
        x = self.fc(x)

        return x

    def generate(self, x: torch.Tensor, n: int) -> torch.Tensor:
        """
        Generates a sequence of tokens.

        Args:
            x (torch.Tensor): Input tensor.
            n (int): Number of tokens to generate.

        Returns:
            torch.Tensor: Generated sequence.
        """
        self.eval()
        for _ in range(n):
            logits = self(x)
            logits = logits[:, -1, :]
            probs = F.softmax(logits, dim=-1)

            idx_next = torch.multinomial(probs, num_samples=1)
            x = torch.cat([x, idx_next], dim=1)

        return x.squeeze()

## Train the model

In [4]:
class ShakespeareModule(L.LightningModule):
    def __init__(
        self,
        dataset: ShakespeareDataset,
        model_hparams: Dict[str, Any],
        other_hparams: Dict[str, Any],
        optimizer_name: str,
        optimizer_hparams: Dict[str, Any],
    ) -> None:
        """
        Initializes the ShakespeareModule.

        Args:
            dataset (ShakespeareDataset): The dataset to be used.
            model_hparams (Dict[str, Any]): Hyperparameters for the model.
            other_hparams (Dict[str, Any]): Other hyperparameters.
            optimizer_name (str): Name of the optimizer to be used.
            optimizer_hparams (Dict[str, Any]): Hyperparameters for the optimizer.
        """
        super(ShakespeareModule, self).__init__()
        self.save_hyperparameters()

        self.dataset = dataset
        self.model = ShakespeareModel(**model_hparams)

        self.loss_fn = nn.CrossEntropyLoss()

        self.example_input_array = torch.randint(0, 10, (1, 32))  # dummy input

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass of the model.

        Args:
            x (torch.Tensor): Input tensor.

        Returns:
            torch.Tensor: Output tensor.
        """
        return self.model(x)

    def configure_optimizers(self) -> Dict[str, Any]:
        """
        Configures the optimizers and learning rate scheduler.

        Returns:
            Dict[str, Any]: Dictionary containing the optimizer and scheduler.
        """
        if self.hparams.optimizer_name == "adam":
            optimizer = torch.optim.Adam(
                self.parameters(), **self.hparams.optimizer_hparams
            )

        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode="min", factor=0.1, patience=10, verbose=True
        )

        return {
            "optimizer": optimizer,
            "scheduler": scheduler,
            "monitor": "val_loss",
        }

    def training_step(
        self, batch: Tuple[torch.Tensor, torch.Tensor], batch_idx: int
    ) -> torch.Tensor:
        """
        Training step.

        Args:
            batch (Tuple[torch.Tensor, torch.Tensor]): Batch of data.
            batch_idx (int): Index of the batch.

        Returns:
            torch.Tensor: Loss value.
        """
        x, y = batch

        logits = self(x)

        # Reshape logits and labels to (N, C) shape for loss function
        loss = self.loss_fn(logits.view(-1, logits.size(-1)), y.view(-1))

        self.log(
            "train_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True
        )

        return loss

    def validation_step(
        self, batch: Tuple[torch.Tensor, torch.Tensor], batch_idx: int
    ) -> torch.Tensor:
        """
        Validation step.

        Args:
            batch (Tuple[torch.Tensor, torch.Tensor]): Batch of data.
            batch_idx (int): Index of the batch.

        Returns:
            torch.Tensor: Loss value.
        """
        x, y = batch

        logits = self(x)

        # Reshape logits and labels to (N, C) shape for loss function
        loss = self.loss_fn(logits.view(-1, logits.size(-1)), y.view(-1))

        self.log(
            "val_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True
        )

        if batch_idx == 0:
            predicted = self.generate_text(x, n=100)

            predicted_text = self.dataset.decode(predicted[0])

            self.logger.experiment.add_text(
                "input_text", self.dataset.decode(x[0]), self.current_epoch
            )
            self.logger.experiment.add_text(
                "val_text", predicted_text, self.current_epoch
            )

        return loss

    def generate_text(self, x: torch.Tensor, n: int) -> torch.Tensor:
        """
        Generates text using the model.

        Args:
            x (torch.Tensor): Input tensor.
            n (int): Number of tokens to generate.

        Returns:
            torch.Tensor: Generated text tensor.
        """
        return self.model.generate(x, n)

In [5]:
SEQ_LEN = 16
BATCH_SIZE = 128

# Create dataset and dataloaders
s = ShakespeareDataset("./input.txt", seq_len=SEQ_LEN)

L.seed_everything(seed)

# Train-val split
train_data, val_data = torch.utils.data.random_split(
    s, [int(0.9 * len(s)), len(s) - int(0.9 * len(s))]
)

train_loader = DataLoader(
    train_data,
    batch_size=BATCH_SIZE,
    shuffle=True,
)

val_loader = DataLoader(val_data, batch_size=64, shuffle=True)

Seed set to 42


In [6]:
x, y = next(iter(train_loader))

for i in range(4):
    print(f"Input: {s.decode(x[i])}")
    print(f"Target: {s.decode(y[i])}")
    print("-" * 50)


In [7]:
model = ShakespeareModule(
    dataset=s,
    model_hparams={
        "vocab_size": s.get_vocab_size(),
        "embedding_dim": 128,
        "hidden_dim": 256,
        "num_layers": 3,
    },
    other_hparams={
        "seq_len": SEQ_LEN,
        "batch_size": BATCH_SIZE,
    },
    optimizer_name="adam",
    optimizer_hparams={"lr": 1e-3, "weight_decay": 1e-6},
)

# Parameters
print(f"Number of parameters: {sum(p.numel() for p in model.parameters())}")


In [None]:
trainer = L.Trainer(
    accelerator="auto",
    max_epochs=50,
    callbacks=[
        ModelCheckpoint(save_weights_only=True, mode="min", monitor="val_loss"),
        LearningRateMonitor(
            logging_interval="epoch", log_momentum=True, log_weight_decay=True
        ),
    ],
)

trainer.logger._log_graph = True
trainer.logger._default_hp_metric = None

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [24]:
L.seed_everything(42)
trainer.fit(model, train_loader, val_loader)


Seed set to 42
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
c:\Users\Lakshya Agarwal\AppData\Local\Programs\Python\Python310\lib\site-packages\lightning\pytorch\core\optimizer.py:376: Found unsupported keys in the optimizer configuration: {'scheduler'}

  | Name    | Type             | Params | In sizes | Out sizes  
----------------------------------------------------------------------
0 | model   | ShakespeareModel | 1.5 M  | [1, 32]  | [1, 32, 65]
1 | loss_fn | CrossEntropyLoss | 0      | ?        | ?          
----------------------------------------------------------------------
1.5 M     Trainable params
0         Non-trainable params
1.5 M     Total params
5.892     Total estimated model params size (MB)


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

c:\Users\Lakshya Agarwal\AppData\Local\Programs\Python\Python310\lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:492: Your `val_dataloader`'s sampler has shuffling enabled, it is strongly recommended that you turn shuffling off for val/test dataloaders.
c:\Users\Lakshya Agarwal\AppData\Local\Programs\Python\Python310\lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:441: The 'val_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.
c:\Users\Lakshya Agarwal\AppData\Local\Programs\Python\Python310\lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:441: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.


Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

c:\Users\Lakshya Agarwal\AppData\Local\Programs\Python\Python310\lib\site-packages\lightning\pytorch\trainer\call.py:54: Detected KeyboardInterrupt, attempting graceful shutdown...


## Evaluate and predict with the model

In [8]:
model = ShakespeareModule.load_from_checkpoint(
    "./lightning_logs/version_1/checkpoints/epoch=21-step=172546.ckpt",
)

trainer = L.Trainer(
    accelerator="auto",
    logger=False,
)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [25]:
trainer.validate(model, val_loader)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Validation: |          | 0/? [00:00<?, ?it/s]

[1m[[0m[1m{[0m[32m'val_loss_epoch'[0m: [1;36m1.2201244831085205[0m[1m}[0m[1m][0m

In [9]:
# Sample text generation from model
model.eval()
x = s.encode("ANTONIO: ").unsqueeze(0)

MAX_N = 1000

with torch.no_grad():
    for _ in range(MAX_N):
        logits = model(x)[:, -1, :]
        probs = F.softmax(logits, dim=-1)
        idx_next = torch.multinomial(probs, num_samples=1)

        sys.stdout.write(s.decode(idx_next[0]))
        sys.stdout.flush()

        x = torch.cat([x, idx_next], dim=1)

I
will affliction will draw them and he, precious!

CORIOLANUS:
Fellow, she is not one.

POLIXENES:
To you, mispredition man, more than this again; come, sit down;
For having hours discreeth,
Whose case is marvellous chance to make the cold; by the said was Bohemia; thou hast masquing blush and presence at the matter.

MENENIUS:
A hundred thousand thoughts of yonder: if things expect'd?

DUKE OF YORK:
Lay her volubtle, sir, and you hope I see the assembling slave:
But that I want you do protest.

Second Watchman:
Sovery one shoeld
In thy happy buried, this strokes.

TYBALT:
An I do find break out forth the war; but then I'ld be writ,
Lest base to complain accused leisure but underneath the king
As myself in trouble him
To look on me! then Clarence issued and end
With thission of the people.

CORIOLANUS:
Indeaver, upon your voices by guilty, counsel and a fool
Is bore this design. Let's clove.

FRIAR POLIXENES:
O,
that your hands,
What if it be so straight
A horses' eyes ot blow: your f