<a href="https://colab.research.google.com/github/wickedWOLF123/DRP/blob/main/WordModellingTransformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Imports for this project

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import os
import time
import requests


In [None]:
# Getting Shakespear writing as text file from googleapis
import requests

url = 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt'
response = requests.get(url)
text = response.text

print(f'Length of text: {len(text)} characters')
print(text[:250])

# We need to convert every character to a vector so were see how many unique characters
# These unique characters make up our vocabulary
vocabulary = sorted(set(text))
vocab_size = len(vocabulary)
print(f'{len(vocabulary)} unique characters')

Length of text: 1115394 characters
First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

65 unique characters


In [None]:
# Create mappings from characters to vectors and vice-versa
letter_to_index = {character: idx for idx, character in enumerate(vocabulary)}
index_to_letter = {idx: character for idx, character in enumerate(vocabulary)}

It might be worth randomly choosing the lengths of the character input to be between say 10-100.

In [None]:
# Were going to chop up our 1000000+ character input into 100 size pieces
# So that it is easier and we get batch processing
sequence_length = 100
encoded_text = np.array([letter_to_index[ch] for ch in text], dtype=np.int64)

In [None]:
len(encoded_text)

1115394

To have sequences of random length, but on average, of length 50, I recommend doing something like the following. One of the caveats is that the output might sometimes be the input of something they've seen before, but then again this is gonna be stuff they are seeing during the training process.

In [None]:
import random
# generate the indices associated with the encoded texts
idxs = [i for i in range(len(encoded_text))]
# randomly pick indices and sort them. There on average,
#there are 1/50th of the indices chosen, so they should differ on average, length 50
np.sort(random.sample(idxs, int(len(idxs)/50)))

array([     38,      51,     119, ..., 1115291, 1115354, 1115385])

One issue with constructing the dataset in this format is that it results in the dataset being around 16 times larger than it was before, and this might require a larger dataset than we have ram. Tested it out, and getting from a sequence length of size 50 to 90 requires around 80gb of ram.

In [None]:
# Slice the encoded into sizes of encoded lenght
# Our input sequence is from [0: seq_len] and the
# the target sequence is [1: seq_len+1], now we loop
input_sequences = []
output_sequences = []

# Adjusting sequence generation based on incremental lengths
for seq in range(50,51):
    print(f'Sequence Length: {seq}')
    for i in range(len(encoded_text) - seq):
        input_sequences.append(list(encoded_text[i:i+seq]))
        output_sequences.append(list(encoded_text[i+1:i+seq+1]))


Sequence Length: 50


In [None]:
max_len = sequence_length

# Pad sequences to max_len
input_sequences_padded = [seq + [0] * (max_len - len(seq)) if len(seq) < max_len else seq[:max_len] for seq in input_sequences]
output_sequences_padded = [seq + [0] * (max_len - len(seq)) if len(seq) < max_len else seq[:max_len] for seq in output_sequences]

# Convert lists to numpy arrays for faster processing
inputs_array = np.array(input_sequences_padded, dtype=np.int64)
outputs_array = np.array(output_sequences_padded, dtype=np.int64)

# Now convert numpy arrays to tensors
inputs_tensor = torch.tensor(inputs_array, dtype=torch.long)
outputs_tensor = torch.tensor(outputs_array, dtype=torch.long)

print(f'Sequences = {len(input_sequences)}')


Sequences = 1115344


In [None]:
# PARAMETERS

BATCH_SIZE = 64
NUM_EPOCHS = 10
EMBED_SIZE = 512
HIDDEN_SIZE = 2048
NUM_LAYERS = 6
NUM_HEADS = 8
DROPOUT = 0.1
LEARNING_RATE = 0.001


In [None]:
# Creating the Dataset in Pytorch and change them to tensors

class shakespeareDataset(torch.utils.data.Dataset):
  def __init__(self, input, output):
    self.input = torch.tensor(input, dtype=torch.long)
    self.output = torch.tensor(output, dtype=torch.long)

  def __len__(self):
    return len(self.input)

  def __getitem__(self, idx):
    return self.input[idx], self.output[idx]

dataset = shakespeareDataset(inputs_tensor, outputs_tensor)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=BATCH_SIZE , shuffle=True, drop_last=True)

  self.input = torch.tensor(input, dtype=torch.long)
  self.output = torch.tensor(output, dtype=torch.long)


In [None]:
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers, num_heads, sequence_length, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.embed_size = embed_size
        self.sequence_length = sequence_length

        # Token embedding
        self.token_embedding = nn.Embedding(vocab_size, embed_size)
        self.positional_encoding = self.create_positional_encoding(sequence_length, embed_size)
        # Transformer encoder layers
        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_size, nhead=num_heads, dim_feedforward=hidden_size, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        self.fc_out = nn.Linear(embed_size, vocab_size)

    def create_positional_encoding(self, sequence_length, embed_size):
        pe = torch.zeros(sequence_length, embed_size)
        position = torch.arange(0, sequence_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_size, 2).float() * (-np.log(10000.0) / embed_size))

        pe[:, 0::2] = torch.sin(position * div_term)
        if embed_size % 2 == 1:
            # If embed_size is odd, adjust the size of div_term
            pe[:, 1::2] = torch.cos(position * div_term[:-1])
        else:
            pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)
        return pe

    def forward(self, x):
        batch_size, seq_len = x.size()
        positional_encoding = self.positional_encoding[:, :seq_len, :].to(x.device)
        x = self.token_embedding(x) * np.sqrt(self.embed_size)
        x = x + positional_encoding
        x = x.permute(1, 0, 2)
        x = self.transformer_encoder(x)
        x = x.permute(1, 0, 2)
        x = self.fc_out(x)
        return x


In [None]:
# Hyperparameters
NUM_HEADS = 8
DROPOUT = 0.1
LEARNING_RATE = 0.001

# Instantiate the model
model = TransformerModel(
    vocab_size=vocab_size,
    embed_size=EMBED_SIZE,
    hidden_size=HIDDEN_SIZE,
    num_layers=NUM_LAYERS,
    num_heads=NUM_HEADS,
    sequence_length=sequence_length,
    dropout=DROPOUT
)

# Move model to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)


In [None]:
# Training loop
model.train(mode=True)
for epoch in range(NUM_EPOCHS):
    total_loss = 0
    start_time = time.time()

    for batch, (inp, target) in enumerate(dataloader):
        inp = inp.to(device)
        target = target.to(device)

        optimizer.zero_grad()

        # Forward pass
        output = model(inp)

        # Reshape output and target for computing loss
        output = output.view(-1, vocab_size)
        target = target.reshape(-1)

        # Compute loss
        loss = criterion(output, target)

        # Backpropagation
        loss.backward()

        # Gradient clipping (optional but helps with training stability)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        # Optimizer step
        optimizer.step()

        total_loss += loss.item()

        if batch % 100 == 0 and batch > 0:
            avg_loss = total_loss / 100
            elapsed = time.time() - start_time
            print(f'Epoch: {epoch+1}/{NUM_EPOCHS}, Batch: {batch}/{len(dataloader)}, Loss: {avg_loss:.4f}, Time/Batch: {elapsed/100:.2f}s')
            total_loss = 0
            start_time = time.time()


Epoch: 1/10, Batch: 100/4356, Loss: 1.3820, Time/Batch: 0.22s
Epoch: 1/10, Batch: 200/4356, Loss: 1.1578, Time/Batch: 0.21s
Epoch: 1/10, Batch: 300/4356, Loss: 1.0723, Time/Batch: 0.21s
Epoch: 1/10, Batch: 400/4356, Loss: 1.0370, Time/Batch: 0.21s
Epoch: 1/10, Batch: 500/4356, Loss: 1.0212, Time/Batch: 0.21s
Epoch: 1/10, Batch: 600/4356, Loss: 1.0123, Time/Batch: 0.21s
Epoch: 1/10, Batch: 700/4356, Loss: 1.0037, Time/Batch: 0.21s
Epoch: 1/10, Batch: 800/4356, Loss: 0.9970, Time/Batch: 0.21s
Epoch: 1/10, Batch: 900/4356, Loss: 0.9916, Time/Batch: 0.21s
Epoch: 1/10, Batch: 1000/4356, Loss: 0.9861, Time/Batch: 0.21s
Epoch: 1/10, Batch: 1100/4356, Loss: 0.9810, Time/Batch: 0.21s
Epoch: 1/10, Batch: 1200/4356, Loss: 0.9793, Time/Batch: 0.21s
Epoch: 1/10, Batch: 1300/4356, Loss: 0.9764, Time/Batch: 0.21s
Epoch: 1/10, Batch: 1400/4356, Loss: 0.9727, Time/Batch: 0.21s
Epoch: 1/10, Batch: 1500/4356, Loss: 0.9712, Time/Batch: 0.21s
Epoch: 1/10, Batch: 1600/4356, Loss: 0.9694, Time/Batch: 0.21s
E

In [None]:
def generate_text(model, start_text, generate_length=100):
    model.eval()  # Set model to evaluation mode
    generated_text = start_text
    input_indices = [letter_to_index.get(c, 0) for c in start_text]
    input_tensor = torch.tensor([input_indices], dtype=torch.long).to(device)

    for _ in range(generate_length):
        # Ensure input tensor is of shape (batch_size=1, sequence_length)
        input_seq = input_tensor[:, -sequence_length:]

        with torch.no_grad():
            output = model(input_seq)

        # Get the logits for the last time step
        logits = output[:, -1, :]  # Shape: (1, vocab_size)
        probabilities = F.softmax(logits, dim=-1)

        # Sample from the distribution or take the most probable token
        next_token = torch.multinomial(probabilities, num_samples=1).item()

        # Append generated character
        generated_text += index_to_letter[next_token]

        # Update input tensor
        input_tensor = torch.cat([input_tensor, torch.tensor([[next_token]], dtype=torch.long).to(device)], dim=1)

    return generated_text


In [None]:
start_text = "Romeo: "
generated_text = generate_text(model, start_text, generate_length=200)
print(generated_text)
