<a href="https://colab.research.google.com/github/kanav9063/Deep-Learning/blob/main/Kanav_HW5_CharRNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Character-RNN with Pytorch

In this homework, you will:

1. Implement a basic pytorch RNN module
2. Train a basic character-RNN language model on
  * Shakespeare's books
  * Linux kernel code
3. Play with the two trained language models to generate texts

## Task 1: Implement a RNN module

**Task 1.1:** Write a PyTorch module named `MyRNNLayer` that processes a sequence of inputs and produces a corresponding sequence of outputs. This module should replicate the functionality of PyTorch's official `RNN` with single layer.

To validate your implementation, use the provided `test_RNNLayer` function to compare the behavior of your custom `MyRNNLayer` with PyTorch's built-in RNN layer.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm


class MyRNNLayer(nn.Module):
    """
    A PyTorch implementation of a single RNN layer.
    """
    def __init__(self, input_size, hidden_size):
        super(MyRNNLayer, self).__init__()

        self.input_size = input_size
        self.hidden_size = hidden_size
        self.activation_fn = torch.tanh

        #################################################
        ## TODO:  Initialize the following parameters: ##
        ## Wxh: W_xh in the class slide                ##
        ## Whh: W_hh in the class slide                ##
        ## bh: b_h in the class slide                  ##
        #################################################
        ## related document: https://pytorch.org/docs/stable/generated/torch.nn.parameter.Parameter.html

        self.Wxh = ...
        self.Whh = ...
        self.bh = ...

        ######################################
        ############ End of your code ########
        ######################################


    def forward(self, x, h0):
        """
        Forward pass for MyRNN layer.
        :param x: Input sequence (time_steps, batch_size, input_size)
        :param h0: Initial hidden state (batch_size, hidden_size)
        :return: Sequence of hidden states (time_steps, batch_size, hidden_size), final hidden state
        """
        time_steps, batch_size, _ = x.size()
        h = h0 # The hidden states at current timestep
        outputs = [] # The hidden states across all timesteps

        #################################################
        ## TODO: apply RNN through the timestep        ##
        #################################################

        #################################################
        ############      End of your code       ########
        #################################################
        return outputs, h

@torch.no_grad()
def test_RNNLayer():
    input_size = 4
    hidden_size = 3
    batch_size = 2
    time_steps = 5

    # Initialize custom RNN layer
    my_rnn = MyRNNLayer(input_size, hidden_size)

    # Initialize PyTorch RNN using weights of custom RNN
    rnn = nn.RNN(input_size, hidden_size, batch_first=False, nonlinearity='tanh')
    rnn.weight_ih_l0.data = my_rnn.Wxh.data.clone()
    rnn.weight_hh_l0.data = my_rnn.Whh.data.clone()
    rnn.bias_ih_l0.data = my_rnn.bh.data.clone()
    rnn.bias_hh_l0.data = torch.zeros_like(rnn.bias_hh_l0.data)

    # Random input
    x = torch.randn(time_steps, batch_size, input_size)
    h0 = torch.zeros(batch_size, hidden_size)

    # Forward pass through custom RNN
    h_basic_seq, h_basic_final = my_rnn(x, h0)

    # Forward pass through PyTorch RNN
    h0_torch = torch.zeros(1, batch_size, hidden_size)  # PyTorch expects (num_layers, batch_size, hidden_size)
    h_pytorch_seq, h_pytorch_final = rnn(x, h0_torch)

    # Compare outputs
    print("\nMy RNNLayer Final Hidden State:\n", h_basic_final)
    print("\nPyTorch RNN Final Hidden State:\n", h_pytorch_final.squeeze(0))

    # Assert similarity using torch.allclose
    seq_match = torch.allclose(h_basic_seq, h_pytorch_seq, rtol=1e-5, atol=1e-5)
    final_match = torch.allclose(h_basic_final, h_pytorch_final.squeeze(0), rtol=1e-5, atol=1e-5)

    if seq_match and final_match:
        print("\nTest Passed: MyRNNLayer matches PyTorch RNN!")
    else:
        print("\nTest Failed: Outputs do not match.")
        if not seq_match:
            print("Sequence outputs do not match.")
        if not final_match:
            print("Final hidden states do not match.")

In [None]:
# Run the test RNN layer function, you should expect `Test Passed` in the output
# Below is an example test pass output:
# My RNNLayer Final Hidden State:
#  tensor([[ 0.2935,  0.0381, -0.0445],
#         [ 0.2169, -0.2411, -0.1285]])
#
# PyTorch RNN Final Hidden State:
#  tensor([[ 0.2935,  0.0381, -0.0445],
#         [ 0.2169, -0.2411, -0.1285]])
#
# Test Passed: MyRNNLayer matches PyTorch RNN!

test_RNNLayer()

**Task 1.2:** Write a PyTorch module named `MyMultiRNNLayer` that operates as a stack of multiple RNN layers, processing sequences through them.

To validate your implementation, use the provided `test_MultiRNNLayer` function to compare the behavior of your custom `MyMultiRNNLayer` with PyTorch's built-in RNN.

In [None]:
# MyMulti-Layer RNN Implementation
class MyMultiLayerRNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(MyMultiLayerRNN, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size

        #################################################
        ## TODO:  Create multiple RNN layers           ##
        #################################################
        # related document: https://pytorch.org/docs/stable/generated/torch.nn.ModuleList.html

        self.layers = ...

        ######################################
        ############ End of your code ########
        ######################################


    def forward(self, x, h0):
        """
        Forward pass through the multi-layer RNN.
        :param x: Input sequence (time_steps, batch_size, input_size)
        :param h0: Initial hidden states (num_layers, batch_size, hidden_size)
        :return: Sequence output (time_steps, batch_size, hidden_size), final hidden states (num_layers, batch_size, hidden_size)
        """
        h = h0
        seq = x
        final_hidden_states = []

        #################################################
        ## TODO: apply multiple RNN layer              ##
        #################################################

        #################################################
        ##           End of your code                  ##
        #################################################

        return seq, final_hidden_states


@torch.no_grad()
def test_multi_layer_rnn():
    # Set parameters
    input_size = 4
    hidden_size = 3
    batch_size = 2
    time_steps = 5
    num_layers = 2

    # Initialize custom multi-layer RNN
    custom_rnn = MyMultiLayerRNN(input_size, hidden_size, num_layers)

    # Initialize PyTorch RNN using weights of custom RNN
    rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=False, nonlinearity='tanh')
    for i in range(num_layers):
        getattr(rnn, f"weight_ih_l{i}").data = custom_rnn.layers[i].Wxh.data.clone()
        getattr(rnn, f"weight_hh_l{i}").data = custom_rnn.layers[i].Whh.data.clone()
        getattr(rnn, f"bias_ih_l{i}").data = custom_rnn.layers[i].bh.data.clone()
        getattr(rnn, f"bias_hh_l{i}").data = torch.zeros_like(getattr(rnn, f"bias_hh_l{i}").data)  # Zero bias_hh

    # Random input
    x = torch.randn(time_steps, batch_size, input_size)
    h0 = torch.zeros(num_layers, batch_size, hidden_size)

    # Forward pass through custom RNN
    h_custom_seq, h_custom_final = custom_rnn(x, h0)

    # Forward pass through PyTorch RNN
    h0_torch = h0  # PyTorch expects (num_layers, batch_size, hidden_size)
    h_pytorch_seq, h_pytorch_final = rnn(x, h0_torch)

    # Compare outputs
    print("\nCustom Multi-Layer RNN Final Hidden States:\n", h_custom_final)
    print("\nPyTorch Multi-Layer RNN Final Hidden States:\n", h_pytorch_final)

    # Assert similarity using torch.allclose
    seq_match = torch.allclose(h_custom_seq, h_pytorch_seq, rtol=1e-5, atol=1e-5)
    final_match = torch.allclose(h_custom_final, h_pytorch_final, rtol=1e-5, atol=1e-5)

    if seq_match and final_match:
        print("\nTest Passed: Custom Multi-Layer RNN matches PyTorch Multi-Layer RNN!")
    else:
        print("\nTest Failed: Outputs do not match.")
        if not seq_match:
            print("Sequence outputs do not match.")
        if not final_match:
            print("Final hidden states do not match.")

In [None]:
# Run the test RNN layer function, you should expect `Test Passed` in the output
# Below is an example test success output
# Custom Multi-Layer RNN Final Hidden States:
# tensor([[[ 0.1253,  0.0355,  0.4360],
#         [ 0.2587,  0.0225,  0.0261]],
#
#        [[ 0.0104,  0.0608, -0.0607],
#         [-0.0098,  0.0013, -0.0216]]])
#
# PyTorch Multi-Layer RNN Final Hidden States:
# tensor([[[ 0.1253,  0.0355,  0.4360],
#         [ 0.2587,  0.0225,  0.0261]],
#
#        [[ 0.0104,  0.0608, -0.0607],
#         [-0.0098,  0.0013, -0.0216]]])
#
# Test Passed: Custom Multi-Layer RNN matches PyTorch Multi-Layer RNN!

test_multi_layer_rnn()

## Task 2: Train character-RNN language model

**Task 2.1:** Write a `CharDataset` module that processes a text file for training a character-level RNN. The module should handle the following:

1. Reading and Splitting the Text: Load the input text file and divide it into fixed-length chunks. If the remaining characters at the end of the file are not enough to form a complete chunk, discard them.

2. Building the Vocabulary: Extract all unique characters present in the text file to form the vocabulary. Create a character-to-index dictionary that maps each character to a unique ID, and a corresponding index-to-character dictionary to map IDs back to characters.

Include two special tokens in the vocabulary, where `<end>` with index 0 represents the end of generation, and `<unk>` with index 1 represents a unknown character.

Inlude

3. Returning Data Samples: For each chunk, append the end-of-sequence token to the input sequence. Generate the corresponding label for the chunk, where the label is the shifted version of the input sequence.

In [None]:
# Run this cell to download the text files we need
!wget https://cs.stanford.edu/people/karpathy/char-rnn/shakespeare_input.txt
!wget https://cs.stanford.edu/people/karpathy/char-rnn/linux_input.txt

In [None]:
from itertools import islice

from torch.utils.data import DataLoader, Dataset
class CharDataset(Dataset):
    def __init__(self, textfile, seq_length):
        with open(textfile, 'r', encoding='utf-8', errors='ignore') as f:
            text = f.read().strip() # A string containing all characters
        all_chars = list(text)
        self.seq_length = seq_length

        self.eos_char = '<end>'
        self.unk_char = '<unk>'
        self.char2idx = {'<end>' : 0, '<unk>' : 1}
        self.idx2char = {}
        # this list contains the splitted char chunks, each chunk contains the character indexes
        self.char_chunks = []

        ##############################################################
        ## TODO:                                                    ##
        ## 1. Build vocab: count unique characters in the text file  ##
        ## 2. Split chunk: split the text file characters to chunk   ##
        ##############################################################

        ##############################################################
        ##           End of your code                               ##
        ##############################################################

        self.vocab_size = len(self.char2idx)

    def __len__(self):
        return len(self.char_chunks)

    def __getitem__(self, idx):
        # Returns tuple: (x, y) where x is the input sequence and y is the target sequence,
        # shifted one position with an end-of-sequence token (index 0) at the end.

        ##############################################################
        ## TODO: retrieve the idx-th chunk                          ##
        ##############################################################

        x = ...
        y = ...

        ##############################################################
        ##           End of your code                               ##
        ##############################################################

        return (x, y)

test_data = CharDataset('linux_input.txt', 100)
sample = test_data[0]
print("Sample char index: ", sample[0].tolist())
print("Sample input chars: ", "".join([test_data.idx2char[x] for x in sample[0].tolist()]))
print("Sample target chars: ", "".join([test_data.idx2char[x] for x in sample[1].tolist()]))


**Task 2.2:** Implement a character-RNN language model module.

In this task, you need to implement the `MyCharRNNLM` module, which includes the following components:

1. `embedding`: Maps input character id to dense vector representation.

2. `rnn`: The backbone RNN layers to process input sequence and captures temporal dependencies between characters.

3. `lm_head`: A fully connected layer that maps the outputs of the RNN layers to logits over the vocabulary, which can later be transformed into a probability distribution using a softmax function.

In [None]:
class MyCharRNNLM(nn.Module):
    def __init__(self, vocab_size, hidden_size, n_layers=1):
        super(MyCharRNNLM, self).__init__()
        self.hidden_size = hidden_size
        self.n_layers = n_layers

        #################################################
        ## TODO:  Define the model                      ##
        #################################################
        # related document: https://pytorch.org/docs/stable/generated/torch.nn.Embedding.html

        self.embedding = ...
        self.rnn = ...
        self.fc = ...

        #################################################
        ## End of your code                            ##
        #################################################

    def forward(self, x, hidden):
        #################################################
        ## TODO:  Forward pass                         ##
        #################################################
        out, hidden = ...

        #################################################
        ## End of your code                            ##
        #################################################
        return out, hidden

    def init_hidden(self, batch_size):
        # This function initializes the dummy all 0 input,
        # which is the initial h0 for RNN
        return torch.zeros(self.n_layers, batch_size, self.hidden_size)


**Task 2.3:** Train a character-RNN language model on the shakespear and linux kernel file.

In this task, you need to fill the missing parts of the following training loop function. Then train a 3-layer RNN langauge model on two text files. Report the training loss trajectory for both text file and save the final models.

There is no target loss requirement but the training loss figure should be smoothly going down. As a reference, the training loss is around xxx for shakespear and xxx for linxu kernel.

In this task, you need to complete the implementation of a character-level RNN language model training loop and train the model on two text files: the Shakespeare work file and the Linux kernel code file.

Specifically,
1. You should fill in the missing parts of the provided training loop function.

2. Train a 3-layer RNN-based language model **separately** on the two text files.

3. Plot and report the training loss trajectory for each text file. Save the final trained model for each file. The training loss should decrease smoothly. There is no specific reqruiement for the final training loss, but as a reference, the final training loss in TA's trial is around 1.5 for shakespear and 1.4 for linux kernel.


The training requires ~90s for one epoch when trained on GPU. If you find your training too slow, please check if the colab runtime is a GPU runtime.


In [None]:
import matplotlib.pyplot as plt
def visualize_train_loss(losses):
    plt.figure(figsize=(10, 5))
    plt.plot(losses, label="Train Loss")
    plt.xlabel("Train step")
    plt.ylabel("Train Loss")
    plt.show()

def save_model(model, save_path):
    torch.save(model.to('cpu').state_dicts(), save_path)

def load_model(model, save_path):
    state_dicts = torch.load(save_path)
    model.load_state_dict(state_dicts)

def train_loop(model, dataset, save_path, batch_size = 512, num_epochs=2,  device='cuda', report_every_step=10):
    train_losses = []
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    dataloader = DataLoader(dataset, batch_size, num_workers=8, shuffle=True)

    # Training loop
    for epoch in range(num_epochs):
        step = 0

        #################################################
        ## TODO:  Training of one epoch                ##
        #################################################
        # Remember to add following code to record training loss:
        # if step % report_every_step == 0:
        #   train_losses.append(loss.cpu().item())
        #################################################


        #################################################
        ## End of your code                            ##
        #################################################

    print(train_losses[-5:])
    visualize_train_loss(train_losses)
    save_model(model, save_path)
    return model

To prevent the need to retrain the model if your Colab session disconnects, you can mount your Google Drive and save the model weights there.

To do this, execute the following code. Note that you may be prompted to authenticate your Google account when mounting the drive.


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Hyperparameters
learning_rate = 0.005
num_epochs = 10
save_path = "tmp_model"
batch_size = 128
seq_length = 512

num_layers = 3
hidden_size = 128
device = 'cuda' if torch.cuda.is_available() else 'cpu'

book_data = CharDataset(
    'shakespeare_input.txt', seq_length
)
code_data = CharDataset(
    'linux_input.txt', seq_length
)

In [None]:
# Run the following to train the model
# Train on shakespear book
book_rnnlm = MyCharRNNLM(book_data.vocab_size, hidden_size, num_layers)
book_rnnlm = book_rnnlm.to(device)
print(book_rnnlm)
book_rnnlm = train_loop(
    book_rnnlm, book_data, 'book_rnnlm.pt', batch_size, num_epochs, device=device, report_every_step=10,
)

# Train on linux kernel code
code_loader = DataLoader(code_data, batch_size=batch_size, num_workers=8, shuffle=True)
code_rnnlm = MyCharRNNLM(code_data.vocab_size, hidden_size, num_layers)
code_rnnlm = code_rnnlm.to(device)
print(code_rnnlm)
code_rnnlm = train_loop(
    code_rnnlm, code_data, 'code_rnnlm.pt', batch_size, num_epochs, device=device, report_every_step=10,
)


## Task 3: Play with the trained RNN language model

**Task 3.1:** Load the trained model and generate sequences using it.

In this homework, we will use the simplest approach—selecting the character with the highest probability at each step. This method is known as the greedy decoding strategy for language models, implemented in `generate_text` function. While there are more advanced decoding strategies available, we use this as a simple demonstration.

Your task is to experiment with the two trained models by providing different prefix sequences and compare their generated outputs. Two example prefix is provided.

Write **one short sentence** summarizing your observation about the differences between the two models' generation results.

**One short sentence summarization of the observations:**

\[Fill your summariziation here, \]

In [None]:
def generate_text(model, prefix_str, dataset, predict_len=100):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model.eval()
    chars = list(prefix_str)
    input_seq = torch.tensor([dataset.char2idx.get(ch, 1) for ch in prefix_str], dtype=torch.long).unsqueeze(0).to(device)
    hidden = model.init_hidden(1).to(device)

    for _ in range(predict_len):
        output, hidden = model(input_seq, hidden)
        last_char_logits = output[0, -1]
        predicted_idx = torch.argmax(torch.softmax(last_char_logits, dim=0)).item()
        chars.append(dataset.idx2char[predicted_idx])
        input_seq = torch.tensor([[predicted_idx]], dtype=torch.long).to(device)
        if predicted_idx == 0: # early exit when model predicts to end the generation
            break

    return ''.join(chars)

In [None]:
code_prefix = "#include <"
text_prefix = "In sooth, my heart doth weigh "

with torch.inference_mode():
    #################################################
    ## TODO:  Play with the models!                ##
    #################################################
    # Call generate_text to experiment the two model with different prefixes
    pass