In [12]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset


import numpy as np
import time
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader, random_split
import requests

#For training the models with different layers and heads
from itertools import product

#For Document processing in Problems 3 and 4
#pip install python-docx 
from collections import Counter
from docx import Document
from torch.nn.utils.rnn import pad_sequence
import random
import math



device = torch.device("cuda")
print(f"Using device: {device}")

#Check the GPU name and number
'''
devNumber = torch.cuda.current_device()
devName = torch.cuda.get_device_name(devNumber)

print(f"Current device number is: {devNumber}")
print(f"GPU name is: {devName}")
'''

Using device: cuda


'\ndevNumber = torch.cuda.current_device()\ndevName = torch.cuda.get_device_name(devNumber)\n\nprint(f"Current device number is: {devNumber}")\nprint(f"GPU name is: {devName}")\n'

In [None]:
''' Problem 1
Inspired by the course example, train and validate a transformer model, 
for learning the above sequence. Use sequence lengths of 10, 20, and 30 for your training.
Feel free to adjust other network parameters. Report and compare training loss, validation accuracy, 
execution time for training, and computational and mode size 
complexities against RNN-based approaches (with and without cross-attention).
'''

text = """Next character prediction is a fundamental task in the field of natural language processing (NLP) that involves predicting the next character in a sequence of text based on the characters that precede it. This task is essential for various applications, including text auto-completion, spell checking, and even in the development of sophisticated AI models capable of generating human-like text.
At its core, next character prediction relies on statistical models or deep learning algorithms to analyze a given sequence of text and predict which character is most likely to follow. These predictions are based on patterns and relationships learned from large datasets of text during the training phase of the model.
One of the most popular approaches to next character prediction involves the use of Recurrent Neural Networks (RNNs), and more specifically, a variant called Long Short-Term Memory (LSTM) networks. RNNs are particularly well-suited for sequential data like text, as they can maintain information in 'memory' about previous characters to inform the prediction of the next character. LSTM networks enhance this capability by being able to remember long-term dependencies, making them even more effective for next character prediction tasks.
Training a model for next character prediction involves feeding it large amounts of text data, allowing it to learn the probability of each character's appearance following a sequence of characters. During this training process, the model adjusts its parameters to minimize the difference between its predictions and the actual outcomes, thus improving its predictive accuracy over time.
Once trained, the model can be used to predict the next character in a given piece of text by considering the sequence of characters that precede it. This can enhance user experience in text editing software, improve efficiency in coding environments with auto-completion features, and enable more natural interactions with AI-based chatbots and virtual assistants.
In summary, next character prediction plays a crucial role in enhancing the capabilities of various NLP applications, making text-based interactions more efficient, accurate, and human-like. Through the use of advanced machine learning models like RNNs and LSTMs, next character prediction continues to evolve, opening new possibilities for the future of text-based technology.”
"""

chars = sorted(list(set(text)))
ix_to_char = {i: ch for i, ch in enumerate(chars)}
char_to_ix = {ch: i for i, ch in enumerate(chars)}

def prepare_data(sequence_length):
    X = []
    y = []
    for i in range(len(text) - sequence_length):
        sequence = text[i:i + sequence_length]
        label = text[i + sequence_length]
        X.append([char_to_ix[char] for char in sequence])
        y.append(char_to_ix[label])
    return np.array(X), np.array(y)

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=500):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)
    
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, d_model, nhead, num_layers):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        encoder_layers = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=256, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers=num_layers)
        self.fc = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        x = self.embedding(x)
        x = self.positional_encoding(x)
        x = self.transformer_encoder(x)
        x = self.fc(x[:, -1, :])
        return x
    
def train_and_evaluate(model, X_train, y_train, X_val, y_val, epochs=50, learning_rate=0.005):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    start_time = time.time()

    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        output = model(X_train)
        loss = criterion(output, y_train)
        loss.backward()
        optimizer.step()

        
        model.eval()
        with torch.no_grad():
            val_output = model(X_val)
            val_loss = criterion(val_output, y_val)
            _, predicted = torch.max(val_output, 1)
            val_accuracy = (predicted == y_val).float().mean()

        if (epoch + 1) % 10 == 0:
            print(f'Epoch {epoch+1}, Loss: {loss.item():.4f}, Val Loss: {val_loss.item():.4f}, Val Acc: {val_accuracy.item():.4f}')
    
    execution_time = time.time() - start_time
    return loss.item(), val_accuracy.item(), execution_time, sum(p.numel() for p in model.parameters())

# Hyperparameters
sequence_lengths = [10, 20, 30]
d_model = 64  # Hidden size
num_layers = 2  # Transformer layers
nhead = 2  # Number of attention heads
epochs = 50
learning_rate = 0.005
results = []

for seq_length in sequence_lengths:
    print(f"\nTraining Transformer with sequence length: {seq_length}")

    X, y = prepare_data(seq_length)
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

    X_train = torch.tensor(X_train, dtype=torch.long)
    y_train = torch.tensor(y_train, dtype=torch.long)
    X_val = torch.tensor(X_val, dtype=torch.long)
    y_val = torch.tensor(y_val, dtype=torch.long)

    model = TransformerModel(len(chars), d_model, nhead, num_layers)
    loss, val_acc, exec_time, model_size = train_and_evaluate(model, X_train, y_train, X_val, y_val, epochs, learning_rate)

    results.append((seq_length, loss, val_acc, exec_time, model_size))
print("\nTransformer Model Results:")
for result in results:
    print(f"Seq Len: {result[0]} | Loss: {result[1]:.4f}, Val Acc: {result[2]:.4f}, Time: {result[3]:.2f}s, Model Size: {result[4]}")


Training Transformer with sequence length: 10
Epoch 10, Loss: 2.6078, Val Loss: 2.6409, Val Acc: 0.2479
Epoch 20, Loss: 2.2218, Val Loss: 2.4223, Val Acc: 0.2962
Epoch 30, Loss: 1.8996, Val Loss: 2.2842, Val Acc: 0.3592
Epoch 40, Loss: 1.5907, Val Loss: 2.2432, Val Acc: 0.4076
Epoch 50, Loss: 1.3435, Val Loss: 2.2657, Val Acc: 0.4139

Training Transformer with sequence length: 20
Epoch 10, Loss: 2.7003, Val Loss: 2.7528, Val Acc: 0.2384
Epoch 20, Loss: 2.3301, Val Loss: 2.4853, Val Acc: 0.2679
Epoch 30, Loss: 2.0653, Val Loss: 2.3774, Val Acc: 0.3291
Epoch 40, Loss: 1.7330, Val Loss: 2.3854, Val Acc: 0.3523
Epoch 50, Loss: 1.4281, Val Loss: 2.4298, Val Acc: 0.3755

Training Transformer with sequence length: 30
Epoch 10, Loss: 2.6721, Val Loss: 2.6258, Val Acc: 0.2352
Epoch 20, Loss: 2.3578, Val Loss: 2.4907, Val Acc: 0.2627
Epoch 30, Loss: 2.1256, Val Loss: 2.3626, Val Acc: 0.3030
Epoch 40, Loss: 1.8398, Val Loss: 2.3488, Val Acc: 0.3581
Epoch 50, Loss: 1.5579, Val Loss: 2.3525, Val A

In [None]:
''' Problem 1 RNN-based approach with cross-attention
Note: The RNN-based approach without cross-attention is not implemented here, that is included in hw3
'''

# Sample text for training
text = """Next character prediction is a fundamental task in the field of natural language processing (NLP) that involves predicting the next character in a sequence of text based on the characters that precede it. This task is essential for various applications, including text auto-completion, spell checking, and even in the development of sophisticated AI models capable of generating human-like text.
At its core, next character prediction relies on statistical models or deep learning algorithms to analyze a given sequence of text and predict which character is most likely to follow. These predictions are based on patterns and relationships learned from large datasets of text during the training phase of the model.
One of the most popular approaches to next character prediction involves the use of Recurrent Neural Networks (RNNs), and more specifically, a variant called Long Short-Term Memory (LSTM) networks. RNNs are particularly well-suited for sequential data like text, as they can maintain information in 'memory' about previous characters to inform the prediction of the next character. LSTM networks enhance this capability by being able to remember long-term dependencies, making them even more effective for next character prediction tasks.
Training a model for next character prediction involves feeding it large amounts of text data, allowing it to learn the probability of each character's appearance following a sequence of characters. During this training process, the model adjusts its parameters to minimize the difference between its predictions and the actual outcomes, thus improving its predictive accuracy over time.
Once trained, the model can be used to predict the next character in a given piece of text by considering the sequence of characters that precede it. This can enhance user experience in text editing software, improve efficiency in coding environments with auto-completion features, and enable more natural interactions with AI-based chatbots and virtual assistants.
In summary, next character prediction plays a crucial role in enhancing the capabilities of various NLP applications, making text-based interactions more efficient, accurate, and human-like. Through the use of advanced machine learning models like RNNs and LSTMs, next character prediction continues to evolve, opening new possibilities for the future of text-based technology."""

chars = sorted(list(set(text)))
ix_to_char = {i: ch for i, ch in enumerate(chars)}
char_to_ix = {ch: i for i, ch in enumerate(chars)}
chars = sorted(list(set(text)))

#Preparing the dataset
def prepare_data(sequence_length):
    X = []
    y = []
    for i in range(len(text) - sequence_length):
        sequence = text[i:i + sequence_length]
        label = text[i + sequence_length]
        X.append([char_to_ix[char] for char in sequence])
        y.append(char_to_ix[label])
    return np.array(X), np.array(y)

#Models with attention
class CharModelWithAttention(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, model_type="LSTM"):
        super(CharModelWithAttention, self).__init__()
        self.hidden_size = hidden_size
        self.model_type = model_type
        
        # Embedding layer
        self.embedding = nn.Embedding(input_size, hidden_size)
        
        # RNN layer
        if model_type == "LSTM":
            self.rnn = nn.LSTM(hidden_size, hidden_size, batch_first=True, bidirectional=True)
        else:  # GRU
            self.rnn = nn.GRU(hidden_size, hidden_size, batch_first=True, bidirectional=True)
        
        # Attention mechanism
        self.attention = nn.Sequential(
            nn.Linear(hidden_size * 2, hidden_size),
            nn.Tanh(),
            nn.Linear(hidden_size, 1, bias=False))
        
        # Output layer
        self.fc = nn.Linear(hidden_size * 2, output_size)
    
    def forward(self, x):
        # Embedding
        embedded = self.embedding(x)
        
        # RNN layer
        if self.model_type == "LSTM":
            outputs, (hidden, cell) = self.rnn(embedded)
        else:  # GRU
            outputs, hidden = self.rnn(embedded)
        
        # Attention mechanism
        attention_weights = torch.softmax(
            self.attention(outputs).squeeze(-1), dim=1)
        context_vector = torch.bmm(attention_weights.unsqueeze(1), outputs).squeeze(1)
        
        # Final prediction
        output = self.fc(context_vector)
        return output


def train_and_evaluate(model, X_train, y_train, X_val, y_val, epochs, learning_rate):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    model_size = sum(p.numel() for p in model.parameters())
    start_time = time.time()
    
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        output = model(X_train)
        loss = criterion(output, y_train)
        loss.backward()
        optimizer.step()
        
        model.eval()
        with torch.no_grad():
            val_output = model(X_val)
            val_loss = criterion(val_output, y_val)
            _, predicted = torch.max(val_output, 1)
            val_accuracy = (predicted == y_val).float().mean()
            
        if (epoch+1) % 10 == 0:
            print(f'Epoch {epoch+1}, Loss: {loss.item():.4f}, Val Loss: {val_loss.item():.4f}, Val Acc: {val_accuracy.item():.4f}')
    
    execution_time = time.time() - start_time
    

    return loss.item(), val_loss.item(), val_accuracy.item(), execution_time, model_size

#Hyperparameters
sequence_lengths = [10, 20, 30]
model_types = ["LSTM", "GRU"]  
hidden_size = 128
epochs = 50
learning_rate = 0.005
results = []

# Updated training loop with correct unpacking
for model_type in model_types:
    print(f"\nTraining {model_type} with Attention:")
    for seq_length in sequence_lengths:
        X, y = prepare_data(seq_length)
        X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
        
        X_train = torch.tensor(X_train, dtype=torch.long)
        y_train = torch.tensor(y_train, dtype=torch.long)
        X_val = torch.tensor(X_val, dtype=torch.long)
        y_val = torch.tensor(y_val, dtype=torch.long)
        
        model = CharModelWithAttention(len(chars), hidden_size, len(chars), model_type)
        
        print(f"\nSequence Length: {seq_length}")
        # Now unpacking 5 values
        train_loss, val_loss, val_acc, exec_time, model_size = train_and_evaluate(
            model, X_train, y_train, X_val, y_val, epochs, learning_rate)
        
        results.append({
            'Model': f"{model_type}(atnn)",
            'Seq Length': seq_length,
            'Train Loss': train_loss,
            'Val Loss': val_loss,
            'Val Acc': val_acc,
            'Time (s)': exec_time,
            'Params': model_size
        })
print("\n=== Final Results ===")
for result in results:
    print(f"{result['Model']} | "
          f"Seq Len: {result['Seq Length']} | "
          f"Loss: {result['Train Loss']:.4f}, "
          f"Val Acc: {result['Val Acc']:.4f}, "
          f"Time: {result['Time (s)']:.2f}s, "
          f"Model Size: {result['Params']}")


Training LSTM with Attention:

Sequence Length: 10
Epoch 10, Loss: 2.8982, Val Loss: 2.8402, Val Acc: 0.2143
Epoch 20, Loss: 2.2973, Val Loss: 2.4100, Val Acc: 0.3445
Epoch 30, Loss: 1.6861, Val Loss: 2.1457, Val Acc: 0.4433
Epoch 40, Loss: 1.1752, Val Loss: 2.1592, Val Acc: 0.4496
Epoch 50, Loss: 0.7455, Val Loss: 2.2501, Val Acc: 0.4727

Sequence Length: 20
Epoch 10, Loss: 3.0276, Val Loss: 3.0044, Val Acc: 0.1498
Epoch 20, Loss: 2.9367, Val Loss: 2.9719, Val Acc: 0.1519
Epoch 30, Loss: 2.6366, Val Loss: 2.7488, Val Acc: 0.2468
Epoch 40, Loss: 2.0242, Val Loss: 2.3528, Val Acc: 0.3544
Epoch 50, Loss: 1.3841, Val Loss: 2.2644, Val Acc: 0.3776

Sequence Length: 30
Epoch 10, Loss: 3.0337, Val Loss: 3.0376, Val Acc: 0.1419
Epoch 20, Loss: 2.9395, Val Loss: 2.9800, Val Acc: 0.1674
Epoch 30, Loss: 2.7184, Val Loss: 2.8556, Val Acc: 0.2331
Epoch 40, Loss: 2.3832, Val Loss: 2.6535, Val Acc: 0.2712
Epoch 50, Loss: 1.9128, Val Loss: 2.6126, Val Acc: 0.2500

Training GRU with Attention:

Seque

In [None]:
'''Problem 2
Similar to homework 3, build a transformer model for the tiny Shakespeare dataset; the data loader code is already provided. 
Use 2 transformer layers with 2 heads. Train the models for the sequence of 20 and 30, report and compare training loss, validation accuracy, 
execution time for training, and computational and model size complexities, and compare it against RNN-based models.
'''

# Step 1: Download the dataset
url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
response = requests.get(url)
text = response.text  # This is the entire text data

# Step 2: Prepare the dataset
def process_data(sequence_length):
    chars = sorted(list(set(text)))
    char_to_int = {ch: i for i, ch in enumerate(chars)}
    int_to_char = {i: ch for i, ch in enumerate(chars)}

    encoded_text = [char_to_int[ch] for ch in text]

    sequences, targets = [], []
    for i in range(len(encoded_text) - sequence_length):
        seq = encoded_text[i:i + sequence_length]
        target = encoded_text[i + sequence_length]
        sequences.append(seq)
        targets.append(target)

    return torch.tensor(sequences, dtype=torch.long), torch.tensor(targets, dtype=torch.long), char_to_int, int_to_char

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=500):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

class TransformerModel(nn.Module):
    def __init__(self, vocab_size, d_model=128, nhead=4, num_layers=2):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        encoder_layers = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=256, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers=num_layers)
        self.fc = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        x = self.embedding(x)
        x = self.positional_encoding(x)
        x = self.transformer_encoder(x)
        x = self.fc(x[:, -1, :]) 
        return x

def train_and_evaluate(model, train_loader, val_loader, epochs=50, learning_rate=0.005):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    start_time = time.time()

    for epoch in range(epochs):
        model.train()
        train_loss = 0
        
        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            optimizer.zero_grad()
            output = model(batch_X)
            loss = criterion(output, batch_y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        # Validation
        model.eval()
        val_loss, correct, total = 0, 0, 0
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                val_output = model(batch_X)
                val_loss += criterion(val_output, batch_y).item()
                _, predicted = torch.max(val_output, 1)
                correct += (predicted == batch_y).sum().item()
                total += batch_y.size(0)
        
        val_accuracy = correct / total

        if (epoch + 1) % 2 == 0:
            print(f'Epoch {epoch+1}, Train Loss: {train_loss / len(train_loader):.4f}, Val Loss: {val_loss / len(val_loader):.4f}, Val Acc: {val_accuracy:.4f}')
    
    execution_time = time.time() - start_time
    return train_loss / len(train_loader), val_accuracy, execution_time, sum(p.numel() for p in model.parameters())

# Hyperparameters
sequence_lengths = [20, 30]
d_model = 64
num_layers = [1, 2, 4]
nhead = [2, 4]
epochs = 10
learning_rate = 0.005
batch_size = 64
results = []

# Loop through combinations of layers and heads
for num_layers, nhead in product(num_layers, nhead):
    print(f"\nTransformer: Layers= {num_layers} Heads= {nhead}")

    for seq_length in sequence_lengths:
        print(f"\nTraining with sequence length: {seq_length}")

        X, y, char_to_int, int_to_char = process_data(seq_length)
        X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

        train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
        val_dataset = torch.utils.data.TensorDataset(X_val, y_val)

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    
        model = TransformerModel(len(char_to_int), d_model, nhead, num_layers)
        loss, val_acc, exec_time, model_size = train_and_evaluate(model, train_loader, val_loader, epochs, learning_rate)

        # Store results
        results.append({
            "Seq Len": seq_length,
            "Layers": num_layers,
            "Heads": nhead,
            "Loss": loss,
            "Val Acc": val_acc,
            "Time": exec_time,
            "Model Size": model_size
        })

print("\nTransformer Results:")
for r in results:
    print(f"Seq Len: {r['Seq Len']} | Layers: {r['Layers']} | Heads: {r['Heads']} | "
          f"Loss: {r['Loss']:.4f} | Val Acc: {r['Val Acc']:.4f} | Time: {r['Time']:.2f}s | "
          f"Model Size: {r['Model Size']}")



Transformer: Layers= 1 Heads= 2

Training with sequence length: 20
Epoch 2, Train Loss: 2.2079, Val Loss: 2.1213, Val Acc: 0.3681
Epoch 4, Train Loss: 2.1963, Val Loss: 2.1012, Val Acc: 0.3743
Epoch 6, Train Loss: 2.2008, Val Loss: 2.1027, Val Acc: 0.3743
Epoch 8, Train Loss: 2.1991, Val Loss: 2.0979, Val Acc: 0.3701
Epoch 10, Train Loss: 2.1660, Val Loss: 2.0915, Val Acc: 0.3698

Training with sequence length: 30
Epoch 2, Train Loss: 2.2174, Val Loss: 2.1377, Val Acc: 0.3639
Epoch 4, Train Loss: 2.1907, Val Loss: 2.1044, Val Acc: 0.3639
Epoch 6, Train Loss: 2.2001, Val Loss: 2.1187, Val Acc: 0.3663
Epoch 8, Train Loss: 2.1870, Val Loss: 2.1082, Val Acc: 0.3674
Epoch 10, Train Loss: 2.2420, Val Loss: 2.1766, Val Acc: 0.3456

Transformer: Layers= 1 Heads= 4

Training with sequence length: 20
Epoch 2, Train Loss: 2.0976, Val Loss: 1.9998, Val Acc: 0.4039
Epoch 4, Train Loss: 2.0901, Val Loss: 2.0179, Val Acc: 0.3989
Epoch 6, Train Loss: 2.0972, Val Loss: 2.0335, Val Acc: 0.3910
Epoch 8,

In [None]:
'''Problem 2 sequence length to 50. Perform the training and report the accuracy and model complexity results.'''
# Hyperparameters
sequence_lengths = [50]
d_model = 64
num_layers_list = [2]
nhead_list = [4]
epochs = 10
learning_rate = 0.005
batch_size = 64
results = []

# Loop through combinations of layers and heads
for num_layers, nhead in product(num_layers_list, nhead_list):
    print(f"\nTransformer: Layers= {num_layers} Heads= {nhead}")

    for seq_length in sequence_lengths:
        print(f"\nTraining with sequence length: {seq_length}")

       
        X, y, char_to_int, int_to_char = process_data(seq_length)
        X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

        train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
        val_dataset = torch.utils.data.TensorDataset(X_val, y_val)

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)


        model = TransformerModel(len(char_to_int), d_model, nhead, num_layers)
        loss, val_acc, exec_time, model_size = train_and_evaluate(model, train_loader, val_loader, epochs, learning_rate)

        # Store results
        results.append({
            "Seq Len": seq_length,
            "Layers": num_layers,
            "Heads": nhead,
            "Loss": loss,
            "Val Acc": val_acc,
            "Time": exec_time,
            "Model Size": model_size
        })

print("\nTransformer Results:")
for r in results:
    print(f"Seq Len: {r['Seq Len']} | Layers: {r['Layers']} | Heads: {r['Heads']} | "
          f"Loss: {r['Loss']:.4f} | Val Acc: {r['Val Acc']:.4f} | Time: {r['Time']:.2f}s | "
          f"Model Size: {r['Model Size']}")


Transformer: Layers= 2 Heads= 4

Training with sequence length: 50
Epoch 2, Train Loss: 2.1691, Val Loss: 2.1255, Val Acc: 0.3702
Epoch 4, Train Loss: 2.2234, Val Loss: 2.1411, Val Acc: 0.3671
Epoch 6, Train Loss: 2.2552, Val Loss: 2.1149, Val Acc: 0.3749
Epoch 8, Train Loss: 2.3502, Val Loss: 2.3210, Val Acc: 0.3278
Epoch 10, Train Loss: 2.3451, Val Loss: 2.2238, Val Acc: 0.3395

Transformer Results:
Seq Len: 50 | Layers: 2 | Heads: 4 | Loss: 2.3451 | Val Acc: 0.3395 | Time: 920.85s | Model Size: 108353


In [None]:
'''Problem 3: Translate English to French using the transformer model.
Train the model on the entire dataset and evaluate it on the entire dataset. 
Report training loss, validation loss, and validation accuracy. 
Explore transformer architecture with 1, 2, and 4 layers, with 2 and 4 heads (8 different combinations). 
Also, try some qualitative validation
'''

# Load dataset from .docx file
def load_english_french_pairs(docx_path):
    doc = Document(docx_path)
    text = "\n".join([p.text for p in doc.paragraphs])
    english_to_french = []
    
    for line in text.split("\n"):
        if '", "' in line:
            en, fr = line.split('", "')
            en = en.replace('("', '').strip()
            fr = fr.replace('")', '').strip()
            english_to_french.append((en, fr))
            
    return english_to_french

class Vocabulary:
    def __init__(self):
        self.word2index = {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2, "<UNK>": 3}
        self.index2word = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
    
    def add_sentence(self, sentence):
        for word in sentence.split():
            self.add_word(word)

    def add_word(self, word):
        if word not in self.word2index:
            index = len(self.word2index)
            self.word2index[word] = index
            self.index2word[index] = word

    def sentence_to_indices(self, sentence):
        return [self.word2index.get(word, self.word2index["<UNK>"]) for word in sentence.split()] + [self.word2index["<EOS>"]]


class TranslationDataset(Dataset):
    def __init__(self, pairs, english_vocab, french_vocab):
        self.pairs = pairs
        self.english_vocab = english_vocab
        self.french_vocab = french_vocab

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        en_sentence, fr_sentence = self.pairs[idx]
        en_indices = self.english_vocab.sentence_to_indices(en_sentence)
        fr_indices = self.french_vocab.sentence_to_indices(fr_sentence)
        
        return torch.tensor(en_indices), torch.tensor(fr_indices)

# Collate function for padding
def collate_fn(batch):
    en_batch = [item[0] for item in batch]
    fr_batch = [item[1] for item in batch]

    en_batch = nn.utils.rnn.pad_sequence(en_batch, batch_first=True, padding_value=0)  # <PAD> is 0
    fr_batch = nn.utils.rnn.pad_sequence(fr_batch, batch_first=True, padding_value=0)  # <PAD> is 0

    return en_batch, fr_batch

class TransformerModel(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, nhead, num_layers, max_seq_length=100):
        super().__init__()
        self.src_embedding = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)
        
        # Positional encoding
        self.positional_encoding = nn.Parameter(torch.zeros(max_seq_length, d_model))
        nn.init.normal_(self.positional_encoding, mean=0, std=0.02)
        
        # Transformer
        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers
        )
        
        self.fc_out = nn.Linear(d_model, tgt_vocab_size)
        
    def forward(self, src, tgt):

        src_embedded = self.src_embedding(src) + self.positional_encoding[:src.size(1), :]
        tgt_embedded = self.tgt_embedding(tgt) + self.positional_encoding[:tgt.size(1), :]
        
        
        src_embedded = src_embedded.transpose(0, 1)
        tgt_embedded = tgt_embedded.transpose(0, 1)
        
        
        src_mask = self.transformer.generate_square_subsequent_mask(src.size(1)).to(src.device)
        tgt_mask = self.transformer.generate_square_subsequent_mask(tgt.size(1)).to(tgt.device)
        
        # Transformer forward pass
        output = self.transformer(
            src_embedded, 
            tgt_embedded, 
            src_mask=src_mask, 
            tgt_mask=tgt_mask
        )
        
        output = self.fc_out(output)
        return output

def train_and_evaluate(model, dataloader, epochs, learning_rate):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss(ignore_index=0)  # Ignore padding tokens

    start_time = time.time()
    for epoch in range(1, epochs + 1):
        model.train()
        total_train_loss = 0
        for src, tgt in dataloader:
            src, tgt = src.to(device), tgt.to(device)
            
            # Prepare target input (shifted right) and output (shifted left)
            tgt_input = tgt[:, :-1]  # Remove last token
            tgt_output = tgt[:, 1:]   # Remove first token (SOS)
            
            optimizer.zero_grad()
            
            # Forward pass
            output = model(src, tgt_input)  # Shape: (seq_len, batch_size, vocab_size)
            
            # Reshape for loss calculation
            output = output.permute(1, 2, 0)  # (batch_size, vocab_size, seq_len)
            
            # Loss
            loss = criterion(output, tgt_output)
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()

        avg_train_loss = total_train_loss / len(dataloader)
        if epoch % 10 == 0 or epoch == epochs:
            print(f"Epoch {epoch}/{epochs} - Train Loss: {avg_train_loss:.4f}")

    exec_time = time.time() - start_time

   
    model.eval()
    total_val_loss, correct, total = 0, 0, 0
    with torch.no_grad():
        for src, tgt in dataloader:
            src, tgt = src.to(device), tgt.to(device)
            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]
            
            output = model(src, tgt_input)
            output = output.permute(1, 2, 0)  
            
            
            loss = criterion(output, tgt_output)
            total_val_loss += loss.item()
            
            
            preds = output.argmax(dim=1)  
            mask = tgt_output != 0  
            correct += (preds[mask] == tgt_output[mask]).sum().item()
            total += mask.sum().item()

    avg_val_loss = total_val_loss / len(dataloader)
    val_acc = correct / total if total > 0 else 0
    model_size = sum(p.numel() for p in model.parameters())
    return avg_train_loss, avg_val_loss, val_acc, exec_time, model_size


# Load Dataset
dataset = load_english_french_pairs("Dataset - English to French.docx")
    
# Build vocabularies
english_vocab = Vocabulary()
french_vocab = Vocabulary()
for en, fr in dataset:
    english_vocab.add_sentence(en)
    french_vocab.add_sentence(fr)
    
# Dataset and dataloader
train_dataset = TranslationDataset(dataset, english_vocab, french_vocab)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
    
# Hyperparameters
d_model = 128
num_layers = [1, 2, 4]
nhead = [2,4]
epochs = 50
learning_rate = 0.001
results = []
    
# Loop through combinations of layers and heads
for num_layers, nhead in product(num_layers, nhead):
    print(f"\nTransformer: Layers = {num_layers} Heads = {nhead}")
        
    model = TransformerModel(
        len(english_vocab.word2index), 
        len(french_vocab.word2index), 
        d_model, 
        nhead, 
        num_layers
    )
        
    train_loss, val_loss, val_acc, exec_time, model_size = train_and_evaluate(
        model, train_loader, epochs, learning_rate
    )
        
    results.append({
        "Layers": num_layers,
        "Heads": nhead,
        "Train Loss": train_loss,
        "Val Loss": val_loss,
        "Val Acc": val_acc,
        "Time": exec_time,
        "Model Size": model_size
    })
    
# Print final results
print("\nTransformer Results:")
for r in results:
    print(f"Layers: {r['Layers']} | Heads: {r['Heads']} | "
    f"Train Loss: {r['Train Loss']:.4f} | Val Loss: {r['Val Loss']:.4f} | "
    f"Val Acc: {r['Val Acc']:.4f} | Time: {r['Time']:.2f}s | Model Size: {r['Model Size']}")


Transformer: Layers = 1 Heads = 2
Epoch 10/50 - Train Loss: 2.0547
Epoch 20/50 - Train Loss: 0.5401
Epoch 30/50 - Train Loss: 0.1808
Epoch 40/50 - Train Loss: 0.0784
Epoch 50/50 - Train Loss: 0.0621

Transformer: Layers = 1 Heads = 4
Epoch 10/50 - Train Loss: 2.1038
Epoch 20/50 - Train Loss: 0.5284
Epoch 30/50 - Train Loss: 0.1486
Epoch 40/50 - Train Loss: 0.0762
Epoch 50/50 - Train Loss: 0.0455

Transformer: Layers = 2 Heads = 2
Epoch 10/50 - Train Loss: 2.0876
Epoch 20/50 - Train Loss: 0.6119
Epoch 30/50 - Train Loss: 0.1619
Epoch 40/50 - Train Loss: 0.0707
Epoch 50/50 - Train Loss: 0.0422

Transformer: Layers = 2 Heads = 4
Epoch 10/50 - Train Loss: 1.9621
Epoch 20/50 - Train Loss: 0.5133
Epoch 30/50 - Train Loss: 0.1330
Epoch 40/50 - Train Loss: 0.0608
Epoch 50/50 - Train Loss: 0.0392

Transformer: Layers = 4 Heads = 2
Epoch 10/50 - Train Loss: 3.5992
Epoch 20/50 - Train Loss: 1.4473
Epoch 30/50 - Train Loss: 0.5524
Epoch 40/50 - Train Loss: 0.1933
Epoch 50/50 - Train Loss: 0.0688


In [None]:
'''Problem 4: Translate French to English using the transformer model.
Train the model on the entire dataset and evaluate it on the entire dataset. 
Report training loss, validation loss, and validation accuracy. 
Explore transformer architecture with 1, 2, and 4 layers, with 2 and 4 heads (8 different combinations). 
Also, try some qualitative validation
'''

# Load dataset from .docx file
def load_english_french_pairs(docx_path):
    doc = Document(docx_path)
    text = "\n".join([p.text for p in doc.paragraphs])
    english_to_french = []
    
    for line in text.split("\n"):
        if '", "' in line:
            en, fr = line.split('", "')
            en = en.replace('("', '').strip()
            fr = fr.replace('")', '').strip()
            english_to_french.append((fr, en))
            
    return english_to_french

class Vocabulary:
    def __init__(self):
        self.word2index = {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2, "<UNK>": 3}
        self.index2word = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
    
    def add_sentence(self, sentence):
        for word in sentence.split():
            self.add_word(word)

    def add_word(self, word):
        if word not in self.word2index:
            index = len(self.word2index)
            self.word2index[word] = index
            self.index2word[index] = word

    def sentence_to_indices(self, sentence):
        return [self.word2index.get(word, self.word2index["<UNK>"]) for word in sentence.split()] + [self.word2index["<EOS>"]]


class TranslationDataset(Dataset):
    def __init__(self, pairs, english_vocab, french_vocab):
        self.pairs = pairs
        self.english_vocab = english_vocab
        self.french_vocab = french_vocab

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        en_sentence, fr_sentence = self.pairs[idx]
        en_indices = self.english_vocab.sentence_to_indices(en_sentence)
        fr_indices = self.french_vocab.sentence_to_indices(fr_sentence)
        
        return torch.tensor(en_indices), torch.tensor(fr_indices)

# Collate function for padding
def collate_fn(batch):
    en_batch = [item[0] for item in batch]
    fr_batch = [item[1] for item in batch]

    en_batch = nn.utils.rnn.pad_sequence(en_batch, batch_first=True, padding_value=0)  # <PAD> is 0
    fr_batch = nn.utils.rnn.pad_sequence(fr_batch, batch_first=True, padding_value=0)  # <PAD> is 0

    return en_batch, fr_batch

class TransformerModel(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, nhead, num_layers, max_seq_length=100):
        super().__init__()
        self.src_embedding = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)
        
        # Positional encoding
        self.positional_encoding = nn.Parameter(torch.zeros(max_seq_length, d_model))
        nn.init.normal_(self.positional_encoding, mean=0, std=0.02)
        
        # Transformer
        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers
        )
        
        self.fc_out = nn.Linear(d_model, tgt_vocab_size)
        
    def forward(self, src, tgt):

        src_embedded = self.src_embedding(src) + self.positional_encoding[:src.size(1), :]
        tgt_embedded = self.tgt_embedding(tgt) + self.positional_encoding[:tgt.size(1), :]
        
        
        src_embedded = src_embedded.transpose(0, 1)
        tgt_embedded = tgt_embedded.transpose(0, 1)
        
        
        src_mask = self.transformer.generate_square_subsequent_mask(src.size(1)).to(src.device)
        tgt_mask = self.transformer.generate_square_subsequent_mask(tgt.size(1)).to(tgt.device)
        
        # Transformer forward pass
        output = self.transformer(
            src_embedded, 
            tgt_embedded, 
            src_mask=src_mask, 
            tgt_mask=tgt_mask
        )
        
        output = self.fc_out(output)
        return output

def train_and_evaluate(model, dataloader, epochs, learning_rate):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss(ignore_index=0)  # Ignore padding tokens

    start_time = time.time()
    for epoch in range(1, epochs + 1):
        model.train()
        total_train_loss = 0
        for src, tgt in dataloader:
            src, tgt = src.to(device), tgt.to(device)
            
            # Prepare target input (shifted right) and output (shifted left)
            tgt_input = tgt[:, :-1]  # Remove last token
            tgt_output = tgt[:, 1:]   # Remove first token (SOS)
            
            optimizer.zero_grad()
            
            # Forward pass
            output = model(src, tgt_input)  # Shape: (seq_len, batch_size, vocab_size)
            
            # Reshape for loss calculation
            output = output.permute(1, 2, 0)  # (batch_size, vocab_size, seq_len)
            
            # Loss
            loss = criterion(output, tgt_output)
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()

        avg_train_loss = total_train_loss / len(dataloader)
        if epoch % 10 == 0 or epoch == epochs:
            print(f"Epoch {epoch}/{epochs} - Train Loss: {avg_train_loss:.4f}")

    exec_time = time.time() - start_time

   
    model.eval()
    total_val_loss, correct, total = 0, 0, 0
    with torch.no_grad():
        for src, tgt in dataloader:
            src, tgt = src.to(device), tgt.to(device)
            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]
            
            output = model(src, tgt_input)
            output = output.permute(1, 2, 0)  
            
            
            loss = criterion(output, tgt_output)
            total_val_loss += loss.item()
            
            
            preds = output.argmax(dim=1)  
            mask = tgt_output != 0  
            correct += (preds[mask] == tgt_output[mask]).sum().item()
            total += mask.sum().item()

    avg_val_loss = total_val_loss / len(dataloader)
    val_acc = correct / total if total > 0 else 0
    model_size = sum(p.numel() for p in model.parameters())
    return avg_train_loss, avg_val_loss, val_acc, exec_time, model_size


# Load Dataset
dataset = load_english_french_pairs("Dataset - English to French.docx")
    
# Build vocabularies
english_vocab = Vocabulary()
french_vocab = Vocabulary()
for en, fr in dataset:
    english_vocab.add_sentence(en)
    french_vocab.add_sentence(fr)
    
# Dataset and dataloader
train_dataset = TranslationDataset(dataset, english_vocab, french_vocab)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
    
# Hyperparameters
d_model = 128
num_layers = [1, 2, 4]
nhead = [2,4]
epochs = 50
learning_rate = 0.001
results = []
    
# Loop through combinations of layers and heads
for num_layers, nhead in product(num_layers, nhead):
    print(f"\nTransformer: Layers = {num_layers} Heads = {nhead}")
        
    model = TransformerModel(
        len(english_vocab.word2index), 
        len(french_vocab.word2index), 
        d_model, 
        nhead, 
        num_layers
    )
        
    train_loss, val_loss, val_acc, exec_time, model_size = train_and_evaluate(
        model, train_loader, epochs, learning_rate
    )
        
    results.append({
        "Layers": num_layers,
        "Heads": nhead,
        "Train Loss": train_loss,
        "Val Loss": val_loss,
        "Val Acc": val_acc,
        "Time": exec_time,
        "Model Size": model_size
    })
    
# Print final results
print("\nTransformer Results:")
for r in results:
    print(f"Layers: {r['Layers']} | Heads: {r['Heads']} | "
    f"Train Loss: {r['Train Loss']:.4f} | Val Loss: {r['Val Loss']:.4f} | "
    f"Val Acc: {r['Val Acc']:.4f} | Time: {r['Time']:.2f}s | Model Size: {r['Model Size']}")


Transformer: Layers = 1 Heads = 2




Epoch 10/50 - Train Loss: 1.9695
Epoch 20/50 - Train Loss: 0.5288
Epoch 30/50 - Train Loss: 0.1666
Epoch 40/50 - Train Loss: 0.0863
Epoch 50/50 - Train Loss: 0.0538

Transformer: Layers = 1 Heads = 4
Epoch 10/50 - Train Loss: 1.9096
Epoch 20/50 - Train Loss: 0.4174
Epoch 30/50 - Train Loss: 0.1369
Epoch 40/50 - Train Loss: 0.0642
Epoch 50/50 - Train Loss: 0.0400

Transformer: Layers = 2 Heads = 2
Epoch 10/50 - Train Loss: 2.1179
Epoch 20/50 - Train Loss: 0.6094
Epoch 30/50 - Train Loss: 0.1621
Epoch 40/50 - Train Loss: 0.0633
Epoch 50/50 - Train Loss: 0.0419

Transformer: Layers = 2 Heads = 4
Epoch 10/50 - Train Loss: 2.1064
Epoch 20/50 - Train Loss: 0.5979
Epoch 30/50 - Train Loss: 0.1479
Epoch 40/50 - Train Loss: 0.0593
Epoch 50/50 - Train Loss: 0.0368

Transformer: Layers = 4 Heads = 2
Epoch 10/50 - Train Loss: 4.0151
Epoch 20/50 - Train Loss: 1.8736
Epoch 30/50 - Train Loss: 0.9127
Epoch 40/50 - Train Loss: 0.3924
Epoch 50/50 - Train Loss: 0.1508

Transformer: Layers = 4 Heads = 4


'n_examples = 10\nshow_examples ='