# Text Generation using LSTM

### Objective
In this project, I will implement a character-level text generation model using Long Short-Term Memory (LSTM) networks in PyTorch. The goal is to understand how LSTMs work for sequential data and how to train them effectively to generate new text based on an input sequence.

I will follow the steps below:
1. Load and preprocess a text dataset
2. Character-level encoding by constructing the vocabulary and dictionary
3. Batch generation for training 
4. Defining the character-level LSTM model 
5. Training loop
6. Text generation using the trained model

In [1]:
skip_training = False   # You can set it to True if you want to run inference on your trained model.

### Import the necessary libraries

In [3]:
import random
import re

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tqdm import tqdm

In [4]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


### 1. Load and Preprocess the Text Dataset

We will be using *Alice's Adventures in Wonderland* by Lewis Carroll as our dataset. You can download it from [Project Gutenberg](https://www.gutenberg.org/):

[Alice's Adventures in Wonderland by Lewis Carroll (Project Gutenberg Page)](https://www.gutenberg.org/ebooks/11) \
[Direct Text File Download](https://www.gutenberg.org/files/11/11-0.txt)

We’ve chosen Alice's Adventures in Wonderland as a relatively small text to make training still manageable on a CPU. 
This section contains the following steps:
1. Load the dataset into Python
2. Remove metadata to focus on the main part of the text
3. Clean the text by removing special characters and converting it to lowercase
   
The goal is to preprocess the dataset by filtering out any metadata that is not part of the text, converting the text to lowercase, and removing unnecessary punctuation. We will also build a dictionary to map each unique character to a unique integer.

#### 1.1. Load the Dataset

We start by loading the text dataset into Python. The dataset should be a plain text file. The first step is to load and inspect a small portion of the raw text to understand its structure to identify any unwanted metadata or special characters that should be removed during preprocessing.

In [5]:
txt_path = '/content/alice.txt' # replace 'alice.txt' with your txt path

In [6]:
with open(txt_path, 'r') as file:
    raw_text = file.read()

print('===First 1500 characters before any processing:\n\n')
print(raw_text[:1500])

print('\n\n\n===Ending characters before any processing:\n')
print(raw_text[-19000:-17000])

===First 1500 characters before any processing:


*** START OF THE PROJECT GUTENBERG EBOOK 11 ***
[Illustration]




Alice’s Adventures in Wonderland

by Lewis Carroll

THE MILLENNIUM FULCRUM EDITION 3.0

Contents

 CHAPTER I.     Down the Rabbit-Hole
 CHAPTER II.    The Pool of Tears
 CHAPTER III.   A Caucus-Race and a Long Tale
 CHAPTER IV.    The Rabbit Sends in a Little Bill
 CHAPTER V.     Advice from a Caterpillar
 CHAPTER VI.    Pig and Pepper
 CHAPTER VII.   A Mad Tea-Party
 CHAPTER VIII.  The Queen’s Croquet-Ground
 CHAPTER IX.    The Mock Turtle’s Story
 CHAPTER X.     The Lobster Quadrille
 CHAPTER XI.    Who Stole the Tarts?
 CHAPTER XII.   Alice’s Evidence




CHAPTER I.
Down the Rabbit-Hole


Alice was beginning to get very tired of sitting by her sister on the
bank, and of having nothing to do: once or twice she had peeped into
the book her sister was reading, but it had no pictures or
conversations in it, “and what is the use of a book,” thought Alice
“without pictures 

#### 1.2. Remove Metadata and Focus on the Main Text

Text files may contain introductary or ending metadata such as copyright information. We want to focus only on the main body of the text. For Alice's Adventures in Wonderland, we remove everything before the first chapter and after the Project Gutenberg closing markers.

In [7]:
# For this example, we are removing everything before 'CHAPTER I.\nDown the Rabbit-Hole'
# and after the end marker
start_index = raw_text.find('CHAPTER I.\nDown the Rabbit-Hole')

end_index = raw_text.find('*** END OF THE PROJECT GUTENBERG') # closing markers of Project Gutenberg

trimmed_text = raw_text[start_index:end_index]

print('===Text after removing metadata:\n')
print(trimmed_text[:1500])

===Text after removing metadata:

CHAPTER I.
Down the Rabbit-Hole


Alice was beginning to get very tired of sitting by her sister on the
bank, and of having nothing to do: once or twice she had peeped into
the book her sister was reading, but it had no pictures or
conversations in it, “and what is the use of a book,” thought Alice
“without pictures or conversations?”

So she was considering in her own mind (as well as she could, for the
hot day made her feel very sleepy and stupid), whether the pleasure of
making a daisy-chain would be worth the trouble of getting up and
picking the daisies, when suddenly a White Rabbit with pink eyes ran
close by her.

There was nothing so _very_ remarkable in that; nor did Alice think it
so _very_ much out of the way to hear the Rabbit say to itself, “Oh
dear! Oh dear! I shall be late!” (when she thought it over afterwards,
it occurred to her that she ought to have wondered at this, but at the
time it all seemed quite natural); but when the Rabbit a

#### 1.3. Clean the Text

Next, we preprocess the text by removing any special characters, leaving only alphanumeric characters, and normalizing spaces. We also convert all text to lowercase to standardize the format. This helps the model learn without case sensitivity or irrelevant symbols.

##### Steps to follow:
##### 1. Convert text to lowercase:
First, convert the text to lowercase to avoid treating uppercase and lowercase letters as different characters.
##### 2. Remove special characters:
Then, you need to remove any character that is not a letter `(a-z)`, a number `(0-9)`, or a space `\s`.
##### 3. Handling double spaces:
After removing characters, there may be extra spaces in the text. Make sure that sequences of multiple spaces are reduced to just a single space.


In [8]:
def preprocess_text(text):
    """
    Preprocesses the input text by i. converting it to lowercase,
    ii. removing non-alphanumeric characters (except spaces),
    iii. and normalizing spaces.

    Args:
    text -- The raw input text as a string

    Returns:
    cleaned_text -- The processed text where all the preprocessing steps are applied
    """
    # 1. Convert text to lowercase
    # 2. Remove special characters
    # 3. Remove double spaces

    # YOUR CODE HERE
    # 1. Convert text to lowercase
    text = re.sub(r"[A-Z]", lambda match: match.group().lower(), text)

    # 2. Remove special characters
    text = re.sub(r"[^a-z0-9\s]", " ", text)

    # 3. Remove double spaces
    cleaned_text = re.sub(r"\s+", " ", text)

    return cleaned_text

cleaned_text = preprocess_text(trimmed_text)
print('Text after cleaning and converting to lowercase:\n')
print(cleaned_text[:1000])


Text after cleaning and converting to lowercase:

chapter i down the rabbit hole alice was beginning to get very tired of sitting by her sister on the bank and of having nothing to do once or twice she had peeped into the book her sister was reading but it had no pictures or conversations in it and what is the use of a book thought alice without pictures or conversations so she was considering in her own mind as well as she could for the hot day made her feel very sleepy and stupid whether the pleasure of making a daisy chain would be worth the trouble of getting up and picking the daisies when suddenly a white rabbit with pink eyes ran close by her there was nothing so very remarkable in that nor did alice think it so very much out of the way to hear the rabbit say to itself oh dear oh dear i shall be late when she thought it over afterwards it occurred to her that she ought to have wondered at this but at the time it all seemed quite natural but when the rabbit actually took a watch 

In [9]:
def test_text_cleaning(text, cleaned_text):
    """
    Test whether the text has been properly cleaned.

    Args:
    text -- Original raw text
    cleaned_text -- The cleaned text

    Returns:
    Assertion errors if tests fail
    """
    # Test 1: Check if the text length is reduced
    assert len(cleaned_text) < len(text), 'Error: The cleaned text should be shorter than the original raw text.'

    # Test 2: All characters should be lowercase
    assert cleaned_text.islower(), 'Error: The cleaned text is not fully lowercase.'

    # Test 3: Ensure all special characters are removed
    assert all(char.isalnum() or char == ' ' for char in cleaned_text), 'Error: Special characters are still present in the cleaned text.'

    # Test 4: Ensure no consecutive spaces exist
    assert "  " not in cleaned_text, 'Error: There are multiple consecutive spaces in the cleaned text.'

    print('Text cleaning test passed successfully!')

test_text_cleaning(raw_text, cleaned_text)

Text cleaning test passed successfully!


### 2. Character-Level Encoding

In this step, we convert the cleaned text into a format that the model can understand. Since we are working with character-level encoding, each individual character will be treated as a token. This allows the LSTM to learn patterns at the character level and generate text one character at a time.



#### 2.1 Character-Level Vocabulary

In this section, I will create a vocabulary from the cleaned text and map each element to a unique integer.

##### Steps to follow:
##### 1. Create a vocabulary of unique characters:
First, I extract all the unique characters from the cleaned text.
##### 2. Construct mapping between characters and integers:
Once I have the unique characters, I create a dictionary `char_to_int` that maps each character to a unique integer to represent each character during training. I also create a reverse mapping `int_to_char` that maps integers back to characters to be used when decoding the text later.


In [11]:
def create_char_mappings(cleaned_text):
    """
    Creates character-to-integer and integer-to-character mappings from the cleaned text.

    Args:
    cleaned_text -- The cleaned input text as a string

    Returns:
    char_to_int -- A dictionary mapping each unique character to an integer
    int_to_char -- A dictionary mapping each integer back to its corresponding character
    """
    # YOUR CODE HERE
    # Extract unique characters and sort them
    unique_chars = sorted(set(cleaned_text))
    char_to_int = {char: idx for idx, char in enumerate(unique_chars)}
    int_to_char = {idx: char for idx, char in enumerate(unique_chars)}

    return char_to_int, int_to_char

char_to_int, int_to_char = create_char_mappings(cleaned_text)
print('Character to Integer Mapping:')
for char, idx in list(char_to_int.items()):
    print(f"'{char}' : {idx}")

Character to Integer Mapping:
' ' : 0
'a' : 1
'b' : 2
'c' : 3
'd' : 4
'e' : 5
'f' : 6
'g' : 7
'h' : 8
'i' : 9
'j' : 10
'k' : 11
'l' : 12
'm' : 13
'n' : 14
'o' : 15
'p' : 16
'q' : 17
'r' : 18
's' : 19
't' : 20
'u' : 21
'v' : 22
'w' : 23
'x' : 24
'y' : 25
'z' : 26


#### 2.2 Encode the Text into Integers

During training, the model will use the encoded representation of the cleaned text as the input. In this section, I convert each character in the cleaned text to its corresponding integer using `char_to_int` dictionary.

In [12]:
def encode_text(cleaned_text, char_to_int):
    """
    Encodes the cleaned text into an array of integers.

    Args:
    cleaned_text -- The cleaned input text as a string
    char_to_int -- Characters to integer mapping

    Returns:
    encoded_chars -- Numpy array of integers representing the encoded characters from the text
    """
    # YOUR CODE HERE
    encoded_chars = np.array([char_to_int[char] for char in cleaned_text])
    return encoded_chars

encoded_chars = encode_text(cleaned_text, char_to_int)
print('First 100 encoded characters:')
print(encoded_chars[:100])

First 100 encoded characters:
[ 3  8  1 16 20  5 18  0  9  0  4 15 23 14  0 20  8  5  0 18  1  2  2  9
 20  0  8 15 12  5  0  1 12  9  3  5  0 23  1 19  0  2  5  7  9 14 14  9
 14  7  0 20 15  0  7  5 20  0 22  5 18 25  0 20  9 18  5  4  0 15  6  0
 19  9 20 20  9 14  7  0  2 25  0  8  5 18  0 19  9 19 20  5 18  0 15 14
  0 20  8  5]


In [13]:
def test_character_encoding_length(cleaned_text, encoded_chars, char_to_int, int_to_char):
    """
    Test if there is mismatch with the sizes produces in the character encoding.

    Args:
    cleaned_text -- The cleaned text
    encoded_chars -- The encoded character array
    char_to_int -- Character to integer mapping
    int_to_char -- Integer to character mapping

    Returns:
    Assertion errors if tests fail
    """
    # Test 1: Ensure char_to_int and int_to_char have the same length
    assert len(char_to_int) == len(int_to_char), 'Error: char_to_int and int_to_char dictionaries should have the same length.'

    # Test 2: Ensure that the length of encoded_chars matches the length of cleaned_text
    assert len(encoded_chars) == len(cleaned_text), 'Error: The length of encoded_chars should match the length of cleaned_text.'

    print('Character encoding length test passed successfully!')

test_character_encoding_length(cleaned_text, encoded_chars, char_to_int, int_to_char)

Character encoding length test passed successfully!


### 3. Batch Generation for Training
In this step, I will implement the function `get_batches()` that splits the encoded data into smaller batches for training. Each batch will have input sequences `x` and target sequences `y` where `y` is `x` shifted by one position.  This means that the model is trained to generate the next character in the sequence based on the previous ones.

##### Steps to follow:
##### 1. Handle step_size:
The step size determines how much the window moves across the data after each sequence is generated. If `step_size` is not provided, it is set to `seq_length`. This means the sequences will not overlap. A smaller `step_size` allows for overlapping sequences.

##### 2. Calculate the number of batches:
When calculating how many batches you can generate from the input data, there are some key factors to consider:
1. Sequence Length: Each input sequence in a batch will contain a specific number of tokens (representing the characters). The longer your sequence length is, the fewer total sequences you can generate from the input data.
2. Step Size: A smaller step size results in more overlap between sequences and it allows you to generate more sequences from the same input. If the step size is larger, there will be less overlap (or none at all if step size equals sequence length), leading to fewer sequences in total.
3. Batch Size: Once you generate sequences, you need to group them into batches for efficient training. The batch size defines how many sequences are grouped in each batch. A larger batch size means fewer batches because more sequences are grouped together in each batch.

Make sure to generate full number of batches.

##### 3. Trim the input array:
If the input data does not perfectly divide into batches, trim the array so it contains only full batches. Avoid having incomplete sequences at the end.

##### 4. Generate batches:
Use nested loops to generate batches:
- `x` will be the input sequence of length `seq_length`.
- `y` will be target sequence, which is `x` shifted by one position (token).

##### 5. Store and return batches:
Store the input and target sequences in separate arrays (`x_batches` and `y_batches`) and return them as NumPy arrays to be used in training.



In [20]:
def get_batches(arr, batch_size, seq_length, step_size=None):
    """
    Generates batches of input and target sequences from the given array.

    Args:
        arr (array-like): Encoded text as an array of integers.
        batch_size (int): Number of sequences per batch.
        seq_length (int): Number of characters in each sequence.
        step_size (int, optional): Steps to move the window for the next sequence.

    Returns:
        tuple (x_batches, y_batches): A tuple of numpy arrays of input and target sequences,
                                      each has shape (num_batches, batch_size, seq_length)
    """
    if step_size is None:
        step_size = seq_length

    # Lists that will contain the batches
    x_batches, y_batches = [], []

    # Num of sequences that we can generate
    total_number_of_sequences = (len(arr) - seq_length) // step_size

    # Calculate the number of batches
    total_number_of_batches = total_number_of_sequences // batch_size

    # Array trimming to make sure it exactly contains full batches
    arr = arr[:total_number_of_batches * batch_size * step_size + seq_length]

    # Batch Computation:
    for i in range(total_number_of_batches):
        batch_x, batch_y = [], []
        for j in range(batch_size):
            start = (i * batch_size + j) * step_size
            end = start + seq_length

            x = arr[start:end]
            y = arr[start + 1:end + 1]  # y is simply x shifted by 1

            batch_x.append(x)
            batch_y.append(y)

        x_batches.append(np.array(batch_x))
        y_batches.append(np.array(batch_y))

    x_batches = np.array(x_batches)
    y_batches = np.array(y_batches)

    return x_batches, y_batches

In [21]:
# Test cell
# Test if the shape of the generated batches is correct when there is no overlap between sequences (step_size=seq_len)
def test_batch_generation_shape_no_overlap(encoded_chars):
    # Generate batches
    x_batches, y_batches = get_batches(encoded_chars, batch_size=64, seq_length=100)

    assert len(x_batches) == len(y_batches), 'Error: The number of x_batches and y_batches should be the same.'

    # Test 2: Check the shape
    assert x_batches.shape == (21, 64, 100), (
        f'Error: The shape of x_batches is incorrect. Expected (21, 64, 100), but got {x_batches.shape}.'
    )
    assert y_batches.shape == (21, 64, 100), (
        f'Error: The shape of y_batches is incorrect. Expected (21, 64, 100), but got {y_batches.shape}.'
    )

    print('All visible tests passed successfully!')

test_batch_generation_shape_no_overlap(encoded_chars)

All visible tests passed successfully!


In [22]:
# Test cell
# Test if the shape of the generated batches is correct when there is an overlap between sequences (step_size!=seq_len)
def test_batch_generation_shape_overlap(encoded_chars):
    # Generate batches
    x_batches, y_batches = get_batches(encoded_chars, batch_size=64, seq_length=100, step_size=50)

    assert len(x_batches) == len(y_batches), 'Error: The number of x_batches and y_batches should be the same.'

    # Test 2: Check the shape
    assert x_batches.shape == (42, 64, 100), (
        f'Error: The shape of x_batches is incorrect. Expected (42, 64, 100), but got {x_batches.shape}.'
    )
    assert y_batches.shape == (42, 64, 100), (
        f'Error: The shape of y_batches is incorrect. Expected (42, 64, 100), but got {y_batches.shape}.'
    )

    # If all tests pass
    print('All visible tests passed successfully!')

test_batch_generation_shape_overlap(encoded_chars)

All visible tests passed successfully!


In [23]:
# Display for y shift and  step_size
def display_batch_generation(arr, char_to_int, int_to_char):
    batch_size, seq_length, step_size = 8, 10, 5  # Setting step_size for overlap between sequences

    x_batches, y_batches = get_batches(arr, batch_size, seq_length, step_size)

    # Display batch number 10
    x_chars = ''.join([int_to_char[idx] for idx in x_batches[10][0]])
    y_chars = ''.join([int_to_char[idx] for idx in y_batches[10][0]])

    print('='*50)
    print('Displaying a Single Batch')
    print('='*50)
    for i in range(batch_size):
        x_chars = ''.join([int_to_char[idx] for idx in x_batches[10][i]])
        y_chars = ''.join([int_to_char[idx] for idx in y_batches[10][i]])

        print(f"[{x_chars}]  -->  [{y_chars}]")
    print('='*50)
display_batch_generation(encoded_chars, char_to_int, int_to_char )

Displaying a Single Batch
[made her f]  -->  [ade her fe]
[her feel v]  -->  [er feel ve]
[eel very s]  -->  [el very sl]
[ery sleepy]  -->  [ry sleepy ]
[leepy and ]  -->  [eepy and s]
[ and stupi]  -->  [and stupid]
[stupid whe]  -->  [tupid whet]
[d whether ]  -->  [ whether t]


### 4. Define the Character-Level LSTM Model
In this step, I implement the CharLSTM class, which processes sequences of characters and predicts the next character in the sequence. The model will learn sequential patterns in the data and store information over time using hidden states.

##### Key Components:
##### 1. Single Multi-Layer LSTM (see [nn.LSTM](https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html)):
- Use a single `nn.LSTM` module configured as a multi-layer LSTM by setting the `num_layers` parameter to specify the number of stacked layers within this LSTM.
- Dropout set in `nn.LSTM` automatically applies dropout between the internal LSTM layers (e.g., between the 1st and 2nd layers if num_layers=2). This dropout is only applied between the internal LSTM layers and does not affect the final output layer.

##### 2. Dropout Layer (see [nn.Dropout](https://pytorch.org/docs/stable/generated/torch.nn.Dropout.html)):
- Define an additional dropout layer to be applied after the final LSTM layer. This helps to prevent overfitting.

##### 3. Fully Connected Layer (see [nn.Linear](https://pytorch.org/docs/stable/generated/torch.nn.Linear.html)):
- After the LSTM layers, a fully connected layer maps the hidden states to process them as a probability distribution over the vocabulary to predict the next character in the sequence.

##### 4. Hidden State Initialization:
- The LSTM's hidden and cell states will be initialized with zero values before each batch is processed and will be updated as the model processes the sequence.

In [26]:
class CharLSTM(nn.Module):
    """
    Character-Level Multi-Layer LSTM Model

    This model processes sequences of characters and predicts the next character in the sequence.
    """

    def __init__(self, num_layers, input_dim, hidden_dim, output_dim, dropout_prob):
        """
        Initializes the CharLSTM model with the specified parameters.

        Args:
            num_layers (int): Number of LSTM layers
            input_dim (int): Dimensionality of the input (e.g. one-hot encoded input size)
            hidden_dim (int): Dimensionality of the LSTM hidden layer.
            output_dim (int): Dimensionality of the output.
            dropout_prob (float): Dropout after each layer.
        """
        super(CharLSTM, self).__init__()

        # Save hidden dimension and number of layers for hidden state initialization
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        # Define a single LSTM module configured as a multi-layer LSTM
        # Internal dropout of dropout_prob is applied between layers
        # Apply additional dropout after LSTM
        # Apply dense layer at the end
        # YOUR CODE HERE

        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout_prob)
        self.dropout = nn.Dropout(dropout_prob)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x, hidden):
        """
        Performs the forward pass of the CharLSTM model.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, seq_length, input_dim).
            hidden (tuple): Tuple of (h0, c0), where each is a tensor of shape (num_layers, batch_size, hidden_dim).

        Returns:
            out (torch.Tensor): Output tensor of shape (batch_size, seq_length, output_dim).
            (h, c) (tuple): Updated hidden states (h, c) for each LSTM layer.
                - h (torch.Tensor): Final hidden state
                - c (torch.Tensor): Final cell state
        """

        # Pass the input through the LSTM
        # The LSTM output 'out' has shape (batch_size, seq_length, hidden_dim)
        # 'h' and 'c' represent the final hidden and cell states for each layer
        # YOUR CODE HERE
        out, (h, c) = self.lstm(x, hidden)
        out = self.dropout(out)
        out = self.fc(out)

        # Return the final output and the updated hidden states
        return out, (h, c)

    def init_hidden(self, batch_size):
        """
        Initializes the hidden and cell states to zeros for each LSTM layer.

        Args:
            batch_size (int): The batch size for the current data.

        Returns:
            (h0, c0) (tuple): Tuple of initial hidden states (h0, c0) for each LSTM layer.
            - h0 (torch.Tensor): Initial hidden state
            - c0 (torch.Tensor): Initial cell state
        """
        # Set the device to match the model's device to prevent device mismatch errors
        device = next(self.parameters()).device

        # Initialize hidden state (h0) and cell state (c0) to zeros
        # Check the expected shape of hidden states in pytorch nn.LSTM documentation
        # YOUR CODE HERE

        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(device)

        return (h0, c0)

In [27]:
def test_dropout_effect():
    model = CharLSTM(num_layers=2, input_dim=10, hidden_dim=100, output_dim=40, dropout_prob=0.1)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    hidden = model.init_hidden(64)
    input_seq = torch.rand(64, 50, 10).to(device)

    # Check variance in training mode
    model.train()
    output1, _ = model(input_seq, hidden)
    output2, _ = model(input_seq, hidden)
    assert not torch.equal(output1, output2), 'Dropout has no effect in training mode.'

    # Check consistency in evaluation mode
    model.eval()
    output3, _ = model(input_seq, hidden)
    output4, _ = model(input_seq, hidden)
    assert torch.equal(output3, output4), 'Outputs should be consistent in evaluation mode'

    print('Dropout test passed successfully!')

test_dropout_effect()

Dropout test passed successfully!


In [28]:
def test_lstm_model():
    model = CharLSTM(num_layers=2, input_dim=10, hidden_dim=100, output_dim=40, dropout_prob=0.1)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    hidden = model.init_hidden(64)
    input_seq = torch.rand(64, 50, 10).to(device)

    try:
        output, hidden = model(input_seq, hidden)

        # Test 1: Check if output shape matches the expected shape (batch_size, seq_length, output_dim)
        assert output.shape == (64, 50, 40), f'Expected output shape: {(64, 50, 40)}, but got: {output.shape}. Check your final output shape.'

        # Test 2: Check if hidden state shapes match expected shape based on number of layers and batch size
        (h, c) = hidden
        assert h.shape == (2, 64, 100), f'Expected h shape: {(2, 64, 100)}, but got: {h.shape}'
        assert c.shape == (2, 64, 100), f'Expected c shape: {(2, 64, 100)}, but got: {c.shape}'

        print('LSTM shape test passed successfully!')

    except RuntimeError as e:
        print('RuntimeError encountered. Check if your LSTM is handling batch_first correctly in the model definition.')
        print(f'Error details: {e}')

    print('Visible tests passed!')

test_lstm_model()


LSTM shape test passed successfully!
Visible tests passed!


### 5. Train the Model

In this task, I will implement the training loop for the CharLSTM model.

##### Steps to Follow:
Outside of the iteration, initialize the hidden states using the `init_hidden()` method.

For each iteration:
##### 1. Encoding the input sequence:
- For each batch, convert the input to one-hot representations (see [F.one_hot](https://pytorch.org/docs/stable/generated/torch.nn.functional.one_hot.html))

##### 2. Detach Hidden States:
- Detach the hidden states after each batch to avoid backpropagating through previous batches to ensure efficient training.

##### 3. Forward Pass and Loss Calculation:
- For each batch, perform forward pass by passing the input x to the model.
- The model will output logits, which will represent the predicted probabilities for the next character in the sequence when passed to the cross-entropy loss.
- Use cross-entropy loss to compare the predicted output to the target y and calculate the error for the current batch.

##### 4. Backpropagation and Parameter Update:
- After calculating the loss compute the gradients with backward pass.
- Update the model parameters using the optimizer.

In [29]:
def train(model, encoded_chars, vocab_size, num_epochs, batch_size,
          seq_length, step_size, learning_rate, save_path=None, verbose=True):
    """
    Train the CharLSTM model on encoded text data.

    Arguments:
    model -- The LSTM model
    encoded_chars -- Encoded data (characters)
    vocab_size -- Size of the vocabulary
    num_epochs -- Number of training epochs
    batch_size -- Batch size for training
    seq_length -- Sequence length for each batch
    learning_rate -- Learning rate for the optimizer
    save_path -- Path to save the trained model (optional)
    """

    model.train()  # Set model to training mode
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)  # Initialize Adam optimizer
    criterion = nn.CrossEntropyLoss()  # Cross entropy loss function

    # Prepare batches
    x_batches, y_batches = get_batches(encoded_chars, batch_size, seq_length, step_size)
    num_batches = len(x_batches)

    for epoch in range(num_epochs):
        total_loss = 0
        # Progress bar for the current epoch
        batch_loader = tqdm(zip(x_batches, y_batches), total=num_batches,
                            leave=True, desc=f'Epoch {epoch+1}/{num_epochs}')

        # Initialize hidden states for both LSTM layers
        # YOUR CODE HERE
        hidden = model.init_hidden(batch_size)

        for x, y in batch_loader:
            x = torch.as_tensor(x, dtype=torch.long).to(device)
            y = torch.as_tensor(y, dtype=torch.long).to(device) # target

            # Make sure to have consistent variable naming with the rest of the code (e.g., loss)
            # YOUR CODE HERE
            x_one_hot = F.one_hot(x, num_classes=vocab_size).float()
            hidden = tuple([state.detach() for state in hidden])
            logits, hidden = model(x_one_hot, hidden)

            logits = logits.view(batch_size * seq_length, vocab_size)
            y = y.view(batch_size * seq_length)

            loss = criterion(logits, y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        # Print the average loss for the current epoch
        if verbose:
            print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss / num_batches:.4f}')

        # Optional model saving
        # Let us save it each epoch since training takes a while and you want to stop in the middle
        if save_path:
            torch.save(model.state_dict(), save_path)
            print(f'Your trained model at epoch {epoch} is saved successfully!')

    return total_loss / num_batches

In [30]:
from unittest.mock import patch, MagicMock
from functools import partialmethod

def test_model_forward_called():
    vocab_size, hidden_dim, dropout_prob = 50, 12, 0.2
    batch_size, seq_length, num_epochs = 2, 3, 1
    test_chars = np.arange(vocab_size)
    model = CharLSTM(2, vocab_size, hidden_dim, vocab_size, dropout_prob=0.1)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    # Wrap the model's forward method in a MagicMock to check if it's called
    with patch.object(model, 'forward', wraps=model.forward) as mock_forward, \
         patch('torch.optim.Adam'), \
         patch('torch.nn.CrossEntropyLoss'), \
         patch('tqdm.tqdm.__init__', partialmethod(tqdm.__init__, disable=True)):

        try:
            # Run the train function
            train(model, test_chars, vocab_size, num_epochs=num_epochs, batch_size=batch_size, seq_length=seq_length, step_size=seq_length, learning_rate=0.001, verbose=False)
        except Exception as e:
            # Ignore any exceptions that might arise from incorrect shapes or other issues
            # We will check this in the following cells
            pass

        # Verify that forward was called, regardless of shape issues
        assert mock_forward.called, 'Expected model.forward to be called at the beginning of training, but it was not.'
        print('Test passed: model.forward was called.')

test_model_forward_called()

Test passed: model.forward was called.


In [31]:
def test_input_shape():
    vocab_size, hidden_dim, dropout_prob = 50, 12, 0.2
    batch_size, seq_length, num_epochs = 2, 3, 1
    test_chars = np.arange(vocab_size)
    model = CharLSTM(2, vocab_size, hidden_dim, vocab_size, dropout_prob=0.1)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    def forward_spy(x, hidden):
        assert x.shape == (2,3,50), f'Expected x shape {(2,3,50)}, but got {x.shape}'
        return model.__class__.forward(model, x, hidden)

    # Patch the forward method
    with patch.object(model, 'forward', wraps=forward_spy), \
         patch('torch.optim.Adam'), \
         patch('torch.nn.CrossEntropyLoss'), \
         patch('tqdm.tqdm.__init__', partialmethod(tqdm.__init__, disable=True)):

        train(model, test_chars, vocab_size, num_epochs=num_epochs, batch_size=batch_size, seq_length=seq_length, step_size=seq_length, learning_rate=0.001, verbose=False)
        print('Test passed: input shape x is correctly set')

test_input_shape()

Test passed: input shape x is correctly set


In [32]:
def test_hidden_state_requires_grad():
    vocab_size, hidden_dim, dropout_prob = 50, 12, 0.2
    batch_size, seq_length, num_epochs = 2, 3, 1
    test_chars = np.arange(vocab_size)
    model = CharLSTM(2, vocab_size, hidden_dim, vocab_size, dropout_prob=0.1)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    def forward_spy(x, hidden):
        h, c = hidden

        # Check if h and c require gradients (are they are detached)
        assert not h.requires_grad, 'Expected hidden state h to be detached (requires_grad=False)'
        assert not c.requires_grad, 'Expected hidden state c to be detached (requires_grad=False)'

        return model.__class__.forward(model, x, hidden)

    # Patch the forward method
    with patch.object(model, 'forward', wraps=forward_spy), \
         patch('torch.optim.Adam'), \
         patch('torch.nn.CrossEntropyLoss'), \
         patch('tqdm.tqdm.__init__', partialmethod(tqdm.__init__, disable=True)):

        train(model, test_chars, vocab_size, num_epochs=num_epochs, batch_size=batch_size, seq_length=seq_length, step_size=seq_length, learning_rate=0.001, verbose=False)
        print('Test passed: hidden states are detached.')

test_hidden_state_requires_grad()

Test passed: hidden states are detached.


In [33]:
def test_criterion_argument_shapes():
    vocab_size, hidden_dim, dropout_prob = 50, 12, 0.2
    batch_size, seq_length, num_epochs = 2, 3, 1
    test_chars = np.arange(vocab_size)

    # Define the device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    mock_model = CharLSTM(2, vocab_size, hidden_dim, vocab_size, dropout_prob=0.1).to(device)

    with patch("tqdm.tqdm.__init__", partialmethod(tqdm.__init__, disable=True)), \
         patch("torch.optim.Adam"):

        def criterion_side_effect(output, y):
            assert output.shape == (batch_size * seq_length, vocab_size), \
                f"Expected model output shape ({batch_size * seq_length}, {vocab_size}), but got {output.shape}"
            assert y.shape == (batch_size * seq_length,), \
                f"Expected target (y) shape ({batch_size * seq_length},), but got {y.shape}"
            return torch.tensor(0.0, requires_grad=True, device=device)

        with patch("torch.nn.CrossEntropyLoss", return_value=criterion_side_effect), \
             patch("__main__.get_batches", return_value=(
                 torch.randint(0, 50, (11, 2, 3), device=device),
                 torch.randint(0, 50, (11, 2, 3), device=device)
             )):

            train(mock_model, test_chars, vocab_size, num_epochs=num_epochs, batch_size=batch_size, seq_length=seq_length, step_size=seq_length, learning_rate=0.001, verbose=False)
            print("Test passed: criterion arguments have expected shapes.")

test_criterion_argument_shapes()

Test passed: criterion arguments have expected shapes.


#### Model Initialization


In [34]:
hidden_dim = 400
dropout_prob=0.1
num_layers=2
vocab_size = len(char_to_int)
model = CharLSTM(num_layers, vocab_size, hidden_dim, vocab_size, dropout_prob)
model = model.to(device)
print(model)

CharLSTM(
  (lstm): LSTM(27, 400, num_layers=2, batch_first=True, dropout=0.1)
  (dropout): Dropout(p=0.1, inplace=False)
  (fc): Linear(in_features=400, out_features=27, bias=True)
)


In [35]:
num_epochs = 50 # Train for *at least* 50 epochs to meet the min loss of 0.99
batch_size = 50
seq_length=100
step_size=100
learning_rate=0.001
if not skip_training:
    loss = train(
        model=model,
        encoded_chars=encoded_chars,
        vocab_size=vocab_size,
        num_epochs=num_epochs,
        batch_size=batch_size,
        seq_length=seq_length,
        step_size=step_size,
        learning_rate=learning_rate,
        save_path='best_model.pth'
    )
else:
    model.load_state_dict(torch.load('best_model.pth', weights_only=False, map_location=device))
    print('Loaded weights from your saved model successfully!')

Epoch 1/50: 100%|██████████| 26/26 [00:01<00:00, 23.11it/s]


Epoch 1/50, Loss: 2.9011
Your trained model at epoch 0 is saved successfully!


Epoch 2/50: 100%|██████████| 26/26 [00:00<00:00, 38.09it/s]


Epoch 2/50, Loss: 2.8012
Your trained model at epoch 1 is saved successfully!


Epoch 3/50: 100%|██████████| 26/26 [00:00<00:00, 37.81it/s]


Epoch 3/50, Loss: 2.7706
Your trained model at epoch 2 is saved successfully!


Epoch 4/50: 100%|██████████| 26/26 [00:00<00:00, 37.55it/s]


Epoch 4/50, Loss: 2.5737
Your trained model at epoch 3 is saved successfully!


Epoch 5/50: 100%|██████████| 26/26 [00:00<00:00, 37.39it/s]


Epoch 5/50, Loss: 2.3302
Your trained model at epoch 4 is saved successfully!


Epoch 6/50: 100%|██████████| 26/26 [00:00<00:00, 37.37it/s]


Epoch 6/50, Loss: 2.2063
Your trained model at epoch 5 is saved successfully!


Epoch 7/50: 100%|██████████| 26/26 [00:00<00:00, 37.80it/s]


Epoch 7/50, Loss: 2.1153
Your trained model at epoch 6 is saved successfully!


Epoch 8/50: 100%|██████████| 26/26 [00:00<00:00, 37.67it/s]


Epoch 8/50, Loss: 2.0265
Your trained model at epoch 7 is saved successfully!


Epoch 9/50: 100%|██████████| 26/26 [00:00<00:00, 37.61it/s]


Epoch 9/50, Loss: 1.9494
Your trained model at epoch 8 is saved successfully!


Epoch 10/50: 100%|██████████| 26/26 [00:00<00:00, 37.58it/s]


Epoch 10/50, Loss: 1.8841
Your trained model at epoch 9 is saved successfully!


Epoch 11/50: 100%|██████████| 26/26 [00:00<00:00, 37.19it/s]


Epoch 11/50, Loss: 1.8226
Your trained model at epoch 10 is saved successfully!


Epoch 12/50: 100%|██████████| 26/26 [00:00<00:00, 37.35it/s]


Epoch 12/50, Loss: 1.7673
Your trained model at epoch 11 is saved successfully!


Epoch 13/50: 100%|██████████| 26/26 [00:00<00:00, 37.31it/s]


Epoch 13/50, Loss: 1.7173
Your trained model at epoch 12 is saved successfully!


Epoch 14/50: 100%|██████████| 26/26 [00:00<00:00, 37.59it/s]


Epoch 14/50, Loss: 1.6670
Your trained model at epoch 13 is saved successfully!


Epoch 15/50: 100%|██████████| 26/26 [00:00<00:00, 37.55it/s]


Epoch 15/50, Loss: 1.6228
Your trained model at epoch 14 is saved successfully!


Epoch 16/50: 100%|██████████| 26/26 [00:00<00:00, 37.40it/s]


Epoch 16/50, Loss: 1.5826
Your trained model at epoch 15 is saved successfully!


Epoch 17/50: 100%|██████████| 26/26 [00:00<00:00, 37.72it/s]


Epoch 17/50, Loss: 1.5426
Your trained model at epoch 16 is saved successfully!


Epoch 18/50: 100%|██████████| 26/26 [00:00<00:00, 37.51it/s]


Epoch 18/50, Loss: 1.5039
Your trained model at epoch 17 is saved successfully!


Epoch 19/50: 100%|██████████| 26/26 [00:00<00:00, 37.50it/s]


Epoch 19/50, Loss: 1.4693
Your trained model at epoch 18 is saved successfully!


Epoch 20/50: 100%|██████████| 26/26 [00:00<00:00, 37.54it/s]


Epoch 20/50, Loss: 1.4376
Your trained model at epoch 19 is saved successfully!


Epoch 21/50: 100%|██████████| 26/26 [00:00<00:00, 37.28it/s]


Epoch 21/50, Loss: 1.4098
Your trained model at epoch 20 is saved successfully!


Epoch 22/50: 100%|██████████| 26/26 [00:00<00:00, 37.39it/s]


Epoch 22/50, Loss: 1.3800
Your trained model at epoch 21 is saved successfully!


Epoch 23/50: 100%|██████████| 26/26 [00:00<00:00, 37.44it/s]


Epoch 23/50, Loss: 1.3517
Your trained model at epoch 22 is saved successfully!


Epoch 24/50: 100%|██████████| 26/26 [00:00<00:00, 37.57it/s]


Epoch 24/50, Loss: 1.3275
Your trained model at epoch 23 is saved successfully!


Epoch 25/50: 100%|██████████| 26/26 [00:00<00:00, 37.43it/s]


Epoch 25/50, Loss: 1.3037
Your trained model at epoch 24 is saved successfully!


Epoch 26/50: 100%|██████████| 26/26 [00:00<00:00, 37.21it/s]


Epoch 26/50, Loss: 1.2790
Your trained model at epoch 25 is saved successfully!


Epoch 27/50: 100%|██████████| 26/26 [00:00<00:00, 37.13it/s]


Epoch 27/50, Loss: 1.2582
Your trained model at epoch 26 is saved successfully!


Epoch 28/50: 100%|██████████| 26/26 [00:00<00:00, 37.02it/s]


Epoch 28/50, Loss: 1.2388
Your trained model at epoch 27 is saved successfully!


Epoch 29/50: 100%|██████████| 26/26 [00:00<00:00, 37.25it/s]


Epoch 29/50, Loss: 1.2131
Your trained model at epoch 28 is saved successfully!


Epoch 30/50: 100%|██████████| 26/26 [00:00<00:00, 36.64it/s]


Epoch 30/50, Loss: 1.1896
Your trained model at epoch 29 is saved successfully!


Epoch 31/50: 100%|██████████| 26/26 [00:00<00:00, 37.39it/s]


Epoch 31/50, Loss: 1.1677
Your trained model at epoch 30 is saved successfully!


Epoch 32/50: 100%|██████████| 26/26 [00:00<00:00, 37.22it/s]


Epoch 32/50, Loss: 1.1485
Your trained model at epoch 31 is saved successfully!


Epoch 33/50: 100%|██████████| 26/26 [00:00<00:00, 37.20it/s]


Epoch 33/50, Loss: 1.1300
Your trained model at epoch 32 is saved successfully!


Epoch 34/50: 100%|██████████| 26/26 [00:00<00:00, 37.12it/s]


Epoch 34/50, Loss: 1.1064
Your trained model at epoch 33 is saved successfully!


Epoch 35/50: 100%|██████████| 26/26 [00:00<00:00, 37.33it/s]


Epoch 35/50, Loss: 1.0825
Your trained model at epoch 34 is saved successfully!


Epoch 36/50: 100%|██████████| 26/26 [00:00<00:00, 37.05it/s]


Epoch 36/50, Loss: 1.0624
Your trained model at epoch 35 is saved successfully!


Epoch 37/50: 100%|██████████| 26/26 [00:00<00:00, 37.10it/s]


Epoch 37/50, Loss: 1.0431
Your trained model at epoch 36 is saved successfully!


Epoch 38/50: 100%|██████████| 26/26 [00:00<00:00, 36.80it/s]


Epoch 38/50, Loss: 1.0162
Your trained model at epoch 37 is saved successfully!


Epoch 39/50: 100%|██████████| 26/26 [00:00<00:00, 36.96it/s]


Epoch 39/50, Loss: 0.9943
Your trained model at epoch 38 is saved successfully!


Epoch 40/50: 100%|██████████| 26/26 [00:00<00:00, 36.81it/s]


Epoch 40/50, Loss: 0.9714
Your trained model at epoch 39 is saved successfully!


Epoch 41/50: 100%|██████████| 26/26 [00:00<00:00, 36.98it/s]


Epoch 41/50, Loss: 0.9525
Your trained model at epoch 40 is saved successfully!


Epoch 42/50: 100%|██████████| 26/26 [00:00<00:00, 37.07it/s]


Epoch 42/50, Loss: 0.9399
Your trained model at epoch 41 is saved successfully!


Epoch 43/50: 100%|██████████| 26/26 [00:00<00:00, 36.74it/s]


Epoch 43/50, Loss: 0.9171
Your trained model at epoch 42 is saved successfully!


Epoch 44/50: 100%|██████████| 26/26 [00:00<00:00, 37.17it/s]


Epoch 44/50, Loss: 0.8950
Your trained model at epoch 43 is saved successfully!


Epoch 45/50: 100%|██████████| 26/26 [00:00<00:00, 36.54it/s]


Epoch 45/50, Loss: 0.8724
Your trained model at epoch 44 is saved successfully!


Epoch 46/50: 100%|██████████| 26/26 [00:00<00:00, 36.92it/s]


Epoch 46/50, Loss: 0.8472
Your trained model at epoch 45 is saved successfully!


Epoch 47/50: 100%|██████████| 26/26 [00:00<00:00, 36.69it/s]


Epoch 47/50, Loss: 0.8232
Your trained model at epoch 46 is saved successfully!


Epoch 48/50: 100%|██████████| 26/26 [00:00<00:00, 36.72it/s]


Epoch 48/50, Loss: 0.7997
Your trained model at epoch 47 is saved successfully!


Epoch 49/50: 100%|██████████| 26/26 [00:00<00:00, 36.83it/s]


Epoch 49/50, Loss: 0.7729
Your trained model at epoch 48 is saved successfully!


Epoch 50/50: 100%|██████████| 26/26 [00:00<00:00, 36.95it/s]

Epoch 50/50, Loss: 0.7489
Your trained model at epoch 49 is saved successfully!





### Step 6: Text Generation
The `text_generation()` function will use a trained model to create new text based on a given starting string.

In this process, the model becomes autoregressive by using its own predictions as inputs for the next steps. Starting with an initial string, the model produces characters one by one, feeding each newly generated character back into itself as input.


1. Input start string:
The function begins with a starting string, which is converted into a sequence of integers using the character-to-integer mapping.
2. Generate text:
The model will take this sequence as input, predict the next character, and add it to the text. This process is repeated for a specified number of characters `predict_len`.
3. Output probabilities

##### Steps to follow:
1. One-Hot encoding: Apply one-hot encoding to the input sequence as you did in the training loop.

2. Forward pass: Feed the one-hot encoded input through the model to obtain the output logits and the updated hidden state.

3. Extract the last output: You only need the output from the **last time step** to predict the next character. Slice the output and get the last element along the sequence dimension. If you did the training well, the generated text should mostly include meaningful words.

4. Temperature scaling: To control the randomness in prediction, divide the output logits by the temperature parameter. The temperature should be in range (0,1]. Higher temperatures produce more random text, while lower temperatures produce more predictable results. You can observe the variations in the generated text by experimenting with different temperature values.

In [36]:
def generate_text(model, start_str, char_to_int, int_to_char, vocab_size, predict_len=100, temperature=1.0):
    """
    Generate text using the trained model.

    Arguments:
    model -- Trained RNN model
    start_str -- String to start generating from
    char_to_int -- Dictionary mapping characters to integers
    int_to_char -- Dictionary mapping integers back to characters
    vocab_size -- Size of the vocabulary
    predict_len -- Number of characters to generate
    temperature -- Float controlling randomness in predictions (higher is more random)

    Returns:
    generated_text -- The generated text as a string
    """
    model.eval()  # Set model to evaluation mode

    # Encode the starting string
    input_seq = [char_to_int[char] for char in start_str]
    input_seq = torch.tensor(input_seq).long().to(device).unsqueeze(0)  # Add batch dimension

    hidden = model.init_hidden(1)  # Batch size of 1 for generating text

    generated_text = start_str

    with torch.no_grad():  # inference
        for _ in range(predict_len):

            # Make sure to have consistent variable naming with the rest of the code
            # YOUR CODE HERE
            input_one_hot = F.one_hot(input_seq, num_classes=vocab_size).float()
            output, hidden = model(input_one_hot, hidden)
            output = output[:, -1, :]
            output = output/temperature

            # Convert output to probabilities using softmax
            probabilities = F.softmax(output, dim=-1).detach().cpu().numpy()

            # Randomly sample based on the output probabilities
            next_char_index = np.random.choice(range(vocab_size), p=probabilities.ravel())

            # Add the predicted character to the generated text
            next_char = int_to_char[next_char_index]
            generated_text += next_char

            # Update the input sequence - shift left to preserve the input length and add the new character
            input_seq = torch.cat([input_seq[:, 1:], torch.tensor([[next_char_index]]).to(device)], dim=1)

    return generated_text

In [37]:
start_str = 'we re all '
predict_len = 1000
temperature = 0.5
generated_text = generate_text(model,
                               start_str,
                               char_to_int,
                               int_to_char,
                               vocab_size,
                               predict_len=predict_len,
                               temperature=temperature)
print(generated_text)

we re all this time the mock turtle to say what it was over alice was the white rabbit i shan t be sure they had the other i wish i hould the mock turtle its head in a low that she went to she said the gryphon in the sea the thimble sne felt a great said the queen with from one hen i beg your have my sixe aloog the mock turtle to see if she could not all the name with the mock turtle sort of the ground and the whiter alice could not know alice looking at the far he tring and she were thing said alice that s very curious crieg the queen way it s got in another mouth and you know the cat s had found its mouth and you may not said the hatter looking about in a courtere the queen who was the began alice thought and the two me the look of in the sea she thought and began and the garden who said the hatter looking again the king said alice that the white rabbit is the fan a rage of the dormouse thought the moral of the regenting there the gryphon in the sea sho thought and i don t know whe h