# Final Project

I'm using an exampple from the book \'Machine Learning with PyTorch and Scikit-Learn\' by Sebastian Raschka (Chapter 15, Project two - character-level language modeling in PyTorch) to create a RNN model and train it to generate short statement-like texts on the basis of Albert Einstein's book \'Relativity : the Special and General Theory\'. My goal is to create a functionality resambling asking a geeky friend who read the book to express their views on a chosen topic.

To accelerate the model training I used Google Colab environment where I specified Runtime type as *Python 3* and Hardware accelerator as *T4 GPU*

This notebook is adapted to be run in Google Colab environment (data are loaded from my Github repository)

## 1. Required imports

In [20]:
import numpy as np
import requests
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.distributions.categorical import Categorical

Here I'm checking Google Colab module instalation and whether hardware accelerator was set correctly:

In [3]:
print('torch version: ',torch.__version__)

print("GPU Available:", torch.cuda.is_available())

if torch.cuda.is_available():
  device = torch.device("cuda:0")
else:
  device = "cpu"
print('device: ', device)

torch version:  2.5.1+cu121
GPU Available: True
device:  cuda:0


## 2. Data preparation
I obtained the book text in the .txt format form https://www.gutenberg.org/files/5001/old/2004-5001.txt and saved it as \'Einstein_relativity_book.txt\'.\
The file is included in my GitHub repo and loaded in the cell below.

In [4]:
book_url = 'https://github.com/zuzka-szczelina/python_calc_project_sem_7/raw/refs/heads/master/data/Einstein_relativity_book.txt'
response = requests.get(book_url)
text = response.text

In [5]:

start_idx = text.find('CONTENTS')
end_idx = text.find('END OF THE PROJECT GUTENBERG')
text = text[start_idx: end_idx]
char_set = set(text)

Belove is a code verion enabling text loading and preporocessing in case the repo was cloned to your local machine.

In [6]:
# with open('./data/Einstein_relativity_book.txt', 'r') as f:
#     text = f.read()
#     start_idx = text.find('CONTENTS')
#     end_idx = text.find('END OF THE PROJECT GUTENBERG')
#     text = text[start_idx: end_idx]
#     char_set = set(text)

In [7]:
print('Book text includes:')
print(f'total characters: {len(text)}\nunique characters {len(char_set)}')

Book text includes:
total characters: 186305
unique characters 87


In [8]:
chars_sorted = sorted(char_set)
char2int_encoding = {ch: i for i, ch in enumerate(chars_sorted)}
char_array = np.array(chars_sorted)
text_encoded = np.array(
    [char2int_encoding[ch] for ch in text],
    dtype=np.int32
)

In [9]:
print(f'numerical code: {text_encoded[0:17]}\n stands for {char_array[text_encoded[0:17]]}')

numerical code: [30 42 41 47 32 41 47 46  0  0 43 74 61 62 57 59 61]
 stands for ['C' 'O' 'N' 'T' 'E' 'N' 'T' 'S' '\n' '\n' 'P' 'r' 'e' 'f' 'a' 'c' 'e']


## 3. ML Dataset construction
Divide text into chunks to feed into the model.

In [10]:
seq_length = 40
chunk_size = seq_length + 1
text_chunks = [text_encoded[i: i + chunk_size] for i in range(len(text_encoded) - chunk_size)]

In [11]:
class TextDataset(Dataset):
    def __init__(self, text_chunks):
        self.text_chunks = text_chunks

    def __len__(self):
        return len(self.text_chunks)

    def __getitem__(self, idx):
        text_chunk = self.text_chunks[idx]
        return text_chunk[:-1].long(), text_chunk[1:].long()

In [12]:
seq_dataset = TextDataset(torch.tensor(np.array(text_chunks)))
# seq_dataset = TextDataset(torch.tensor(text_chunks))

In [13]:
for i, (seq, target) in enumerate(seq_dataset):
    print('Model input (x): ', repr(''.join(char_array[seq])))
    print('Model target (y):', repr(''.join(char_array[target])))
    if i==5:
        break

Model input (x):  'CONTENTS\n\nPreface\n\nPart I: The Special T'
Model target (y): 'ONTENTS\n\nPreface\n\nPart I: The Special Th'
Model input (x):  'ONTENTS\n\nPreface\n\nPart I: The Special Th'
Model target (y): 'NTENTS\n\nPreface\n\nPart I: The Special The'
Model input (x):  'NTENTS\n\nPreface\n\nPart I: The Special The'
Model target (y): 'TENTS\n\nPreface\n\nPart I: The Special Theo'
Model input (x):  'TENTS\n\nPreface\n\nPart I: The Special Theo'
Model target (y): 'ENTS\n\nPreface\n\nPart I: The Special Theor'
Model input (x):  'ENTS\n\nPreface\n\nPart I: The Special Theor'
Model target (y): 'NTS\n\nPreface\n\nPart I: The Special Theory'
Model input (x):  'NTS\n\nPreface\n\nPart I: The Special Theory'
Model target (y): 'TS\n\nPreface\n\nPart I: The Special Theory '


create a dataloader - object used to pass data into the model in the form of \'batches\' (gropus of specified size)

In [14]:
torch.manual_seed(1)
batch_size = 64
seq_dataloader = DataLoader(seq_dataset, batch_size=batch_size, shuffle=True, drop_last=True)

## 4. Defining a RNN model

#### __init__ method:

**self.embeding** is a table that sotres an embedding vector for each unique character (we generate one embedding vector for each character by specifying vocab_size=len(char_array) below). Each embeding vector has length of embed_dim

**self.rnn** is a RNN we will use. We specify its properties using *torch.nn.LSTM* function. LSTM stands for \'Long Short-Term Memory\' end indicates that we will use a RNN with LSTM cells used as hidden layers. The input to a hidden lauer will be a specific character represented as an embedding vector. Therefore, we sepcify that we expect an input to be of size of the embeding vector length (embed_dim).\
Output of the model is:\
*output_features,\
(final hidden state (for each element in the sequence),\
final cell state (for each element in the sequence))*

**self.fc** is where we define a type of transformation we will apply to the output of hidden layers (????)

**self.rnn_hidden_size** is the number of features in the hidden state of RNN

#### forward method:
...

#### init_hidden method:
Is where we initialise the state of a hidden layer and LSTM cell. (which will be used in forward method)

In [15]:
class RNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, rnn_hidden_size):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.rnn = nn.LSTM(input_size=embed_dim,
                           hidden_size=rnn_hidden_size,
                           batch_first=True)
        self.fc = nn.Linear(in_features=rnn_hidden_size,
                            out_features=vocab_size)

        self.rnn_hidden_size = rnn_hidden_size

    def forward(self, x, hidden, cell):
        out = self.embedding(x).unsqueeze(1)
        out, (hidden, cell) = self.rnn(out, (hidden, cell))
        out = self.fc(out).reshape(out.size(0), -1)
        return out, hidden, cell

    def init_hidden(self, batch_size):
        hidden = torch.zeros(1, batch_size, self.rnn_hidden_size)
        cell = torch.zeros(1, batch_size, self.rnn_hidden_size)
        return hidden, cell

## 5. Creating an istance of the defined model

In [16]:
vocab_size = len(char_array)
embed_dim = 256
rnn_hidden_size = 512
model = RNN(vocab_size, embed_dim, rnn_hidden_size)
model

RNN(
  (embedding): Embedding(87, 256)
  (rnn): LSTM(256, 512, batch_first=True)
  (fc): Linear(in_features=512, out_features=87, bias=True)
)

define loss function and optimizer

In [17]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

## 6. Training the model

In [18]:
import time

# num_epochs = 10000
num_epochs = 30

for epoch in range(num_epochs):
    start_time = time.time()
    hidden, cell = model.init_hidden(batch_size)
    seq_batch, target_batch = next(iter(seq_dataloader))
    optimizer.zero_grad()
    loss = 0
    for c in range(seq_length):
        pred, hidden, cell = model(seq_batch[:, c], hidden, cell)
        loss += loss_fn(pred, target_batch[:, c])
    loss.backward()
    optimizer.step()
    loss = loss.item()/seq_length
    if epoch % 10 == 0:
        print(f'Epoch {epoch} loss: {loss:.4f}')
print('time passed: ', time.time() - start_time)

Epoch 0 loss: 4.4621
Epoch 10 loss: 3.0273
Epoch 20 loss: 2.6790
time passed:  0.4783666133880615


In the above cell we ..
for each epoch:
1. hidden layer and cell states initialisation
2. use seq_dataloader as iterator object and load one batch (set of 64 input sequences and corresponding target sequences)
3. reset the gradients of all optimised tensors.
4. initialise loss (between predicted seqences and target seqences of loaded batch) as 0
5. we use for loop to:\
   I. predict next character for each character in the input_sequence. It is done simultaneously for all input sequences in the batch.\
   II.Compute temporary loss as sum of losses for all characters
6. Compute loss gradients after iterating throuch all characters.
7. Perform optimization step to update model parameters (function .step() can be called once the gradients are computed using .backward())
8. Compute final loss for a batch (dividing by the number of characters each sequence had)
9.

In [19]:
next(iter(seq_dataloader))[1]

tensor([[68, 57, 76,  ..., 75, 61,  1],
        [74, 61,  1,  ..., 64, 61,  1],
        [ 3, 25,  1,  ...,  1,  3,  1],
        ...,
        [65, 75, 76,  ..., 17, 20, 24],
        [ 1, 79, 64,  ..., 70,  1, 71],
        [10,  1, 75,  ..., 57, 70, 81]])

## 7. Text generating function

In [19]:
# to rewrite, to be used when model is trained, add training results saving
def sample(model, starting_str, len_generated_text=500, scale_factor=1.0):
    encoded_input = torch.tensor(
        [char2int_encoding[s] for s in starting_str]
    )
    encoded_input = torch.reshape(encoded_input, (1, -1))
    generated_str = starting_str

    model.eval()
    hidden, cell = model.init_hidden(1)
    for c in range(len(starting_str)-1):
        _, hidden, cell = model(
            encoded_input[:, c].view(1), hidden, cell
        )

    last_char = encoded_input[:, -1]
    for i in range(len_generated_text):
        logits, hidden, cell = model(
            last_char.view(1), hidden, cell
        )
    logits = torch.squeeze(logits, 0)
    scaled_logits = logits * scale_factor
    m = Categorical(logits=scaled_logits)
    last_char = m.sample()
    generated_str += str(char_array[last_char])

    return generated_str