<a href="https://colab.research.google.com/github/myllanes/Introduction-to-Deep-Learning/blob/main/HW5_4_French_To_English.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# Michael Yllanes
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [12]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import random
import ast
import math
from sklearn.model_selection import train_test_split

# Install python-docx for reading .docx files
!pip install python-docx

# Path to the .docx file in Google Drive
file_path = '/content/drive/My Drive/Dataset_English_to_French.docx'

# Load the .docx file
from docx import Document
doc = Document(file_path)

# Extract text
text = []
for paragraph in doc.paragraphs:
    text.append(paragraph.text)

# Combine into single string
text = '\n'.join(text)

# Extract the list from the text
start_index = text.find('[')  # Find the start of the list
end_index = text.rfind(']') + 1  # Find the end of the list
list_content = text[start_index:end_index]  # Extract the list content

# Safely evaluate the list content
dataset = ast.literal_eval(list_content)

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Special tokens
SOS_token = 0
EOS_token = 1
PAD_token = 2  # padding token

# Create character mappings
all_chars = set(''.join([word for pair in dataset for word in pair]))
char_to_index = {"SOS": SOS_token, "EOS": EOS_token, "PAD": PAD_token,
                **{char: i+3 for i, char in enumerate(sorted(list(all_chars)))}}
index_to_char = {i: char for char, i in char_to_index.items()}
vocab_size = len(char_to_index)

# Find maximum sequence length in the dataset
max_length = max(len(word) for pair in dataset for word in pair) + 1  # +1 for EOS token

# Dataset class with proper padding
class TranslationDataset(Dataset):
    def __init__(self, dataset, char_to_index, max_length):
        self.dataset = dataset
        self.char_to_index = char_to_index
        self.max_length = max_length

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        # Swap input (French) and target (English)
        target_word, input_word = self.dataset[idx]  # Reverse the order

        # Convert to indices and add EOS
        input_indices = [self.char_to_index[char] for char in input_word] + [EOS_token]
        target_indices = [self.char_to_index[char] for char in target_word] + [EOS_token]

        # Pad sequences to max_length
        input_padded = input_indices + [PAD_token] * (self.max_length - len(input_indices))
        target_padded = target_indices + [PAD_token] * (self.max_length - len(target_indices))

        return (
            torch.tensor(input_padded[:self.max_length], dtype=torch.long),
            torch.tensor(target_padded[:self.max_length], dtype=torch.long)
        )

# Custom collate function to handle padding
def collate_fn(batch):
    src_batch, tgt_batch = zip(*batch)
    src_batch = torch.stack(src_batch)
    tgt_batch = torch.stack(tgt_batch)
    return src_batch, tgt_batch

# Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

# Transformer Model
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, d_model=256, nhead=2, num_layers=1, dropout=0.01): # Number of Heads and Layers
        super(TransformerModel, self).__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=PAD_token)
        self.pos_encoder = PositionalEncoding(d_model)

        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers,
            dropout=dropout,
            batch_first=True
        )

        self.fc_out = nn.Linear(d_model, vocab_size)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        # Create source and target masks
        src_key_padding_mask = (src == PAD_token)
        tgt_key_padding_mask = (tgt == PAD_token)

        # Embedding and positional encoding
        src = self.embedding(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)

        tgt = self.embedding(tgt) * math.sqrt(self.d_model)
        tgt = self.pos_encoder(tgt)

        # Create attention masks
        if tgt_mask is None:
            tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt.size(1)).to(device)

        # Transformer
        output = self.transformer(
            src, tgt,
            tgt_mask=tgt_mask,
            src_key_padding_mask=src_key_padding_mask,
            tgt_key_padding_mask=tgt_key_padding_mask
        )
        output = self.fc_out(output)

        return output

def calculate_accuracy(model, dataloader):
    model.eval()
    total = 0
    correct = 0

    with torch.no_grad():
        for src, tgt in dataloader:
            src, tgt = src.to(device), tgt.to(device)
            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]

            output = model(src, tgt_input)
            preds = output.argmax(dim=-1)

            # Mask for non-padding tokens
            mask = (tgt_output != PAD_token)
            total += mask.sum().item()
            correct += (preds == tgt_output)[mask].sum().item()

    return correct / total if total > 0 else 0

def evaluate(model, dataloader, criterion):
    model.eval()
    total_loss = 0
    total_batches = 0

    with torch.no_grad():
        for src, tgt in dataloader:
            src, tgt = src.to(device), tgt.to(device)
            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]

            output = model(src, tgt_input)
            loss = criterion(output.reshape(-1, output.size(-1)), tgt_output.reshape(-1))

            total_loss += loss.item()
            total_batches += 1

    return total_loss / total_batches if total_batches > 0 else 0

# Train
def train_model(model, train_dataloader, val_dataloader, n_epochs, learning_rate=0.001):
    criterion = nn.CrossEntropyLoss(ignore_index=PAD_token)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    best_val_loss = float('inf')

    for epoch in range(n_epochs):
        model.train()
        train_loss = 0
        train_batches = 0

        for src, tgt in train_dataloader:
            src, tgt = src.to(device), tgt.to(device)

            # Prepare input and output
            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]

            # Forward pass
            optimizer.zero_grad()
            output = model(src, tgt_input)

            # Calculate loss (ignoring padding tokens)
            loss = criterion(output.reshape(-1, output.size(-1)), tgt_output.reshape(-1))

            # Backward pass
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            train_batches += 1

        # Calculate metrics
        avg_train_loss = train_loss / train_batches
        avg_val_loss = evaluate(model, val_dataloader, criterion)
        val_accuracy = calculate_accuracy(model, val_dataloader)

        # Print progress
        print(f'Epoch {epoch+1}/{n_epochs}:')
        print(f'  Training Loss: {avg_train_loss:.4f}')
        print(f'  Validation Loss: {avg_val_loss:.4f}')
        print(f'  Validation Accuracy: {val_accuracy:.4f}')

        # Save best model
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), 'best_model.pth')
            print('  (Model saved)')


#Examples
def show_examples(model, dataloader, index_to_char, n_examples):
    model.eval()
    example_count = 0

    print(f"\nShowing {n_examples} translation examples (French → English):")

    with torch.no_grad():
        for src, tgt in dataloader:
            if example_count >= n_examples:
                break

            src, tgt = src.to(device), tgt.to(device)
            tgt_input = tgt[:, :-1]
            output = model(src, tgt_input)
            preds = output.argmax(dim=-1)

            def indices_to_str(indices):
                return ''.join([index_to_char.get(idx.item(), '?')
                              for idx in indices
                              if idx not in (SOS_token, EOS_token, PAD_token)])

            for i in range(src.size(0)):
                if example_count >= n_examples:
                    break

                print(f"\nExample {example_count + 1}: French Input: {indices_to_str(src[i])}, English Target: {indices_to_str(tgt[i])}, English Prediction: {indices_to_str(preds[i])}")

                example_count += 1

# Split dataset (now French → English)
train_data, val_data = train_test_split(dataset, test_size=0.2, random_state=42)

# Create dataloaders (input=French, target=English)
train_dataset = TranslationDataset(train_data, char_to_index, max_length)
val_dataset = TranslationDataset(val_data, char_to_index, max_length)

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

# Initialize model (same architecture)
model = TransformerModel(vocab_size).to(device)

# Train the model (French → English)
print("Starting training (French → English)...")
train_model(model, train_dataloader, val_dataloader, n_epochs=600) # Number of Epochs

# Show examples (French → English)
show_examples(model, val_dataloader, index_to_char, n_examples=5) # NUmber of examples

Starting training (French → English)...
Epoch 1/600:
  Training Loss: 3.4111
  Validation Loss: 2.6986
  Validation Accuracy: 0.2871
  (Model saved)
Epoch 2/600:
  Training Loss: 2.5809
  Validation Loss: 2.4985
  Validation Accuracy: 0.3118
  (Model saved)
Epoch 3/600:
  Training Loss: 2.3567
  Validation Loss: 2.4153
  Validation Accuracy: 0.3194
  (Model saved)
Epoch 4/600:
  Training Loss: 2.2411
  Validation Loss: 2.3684
  Validation Accuracy: 0.3232
  (Model saved)
Epoch 5/600:
  Training Loss: 2.1567
  Validation Loss: 2.3509
  Validation Accuracy: 0.3194
  (Model saved)
Epoch 6/600:
  Training Loss: 2.1006
  Validation Loss: 2.3306
  Validation Accuracy: 0.3213
  (Model saved)
Epoch 7/600:
  Training Loss: 2.0460
  Validation Loss: 2.3464
  Validation Accuracy: 0.3213
Epoch 8/600:
  Training Loss: 1.9876
  Validation Loss: 2.3401
  Validation Accuracy: 0.3289
Epoch 9/600:
  Training Loss: 1.9336
  Validation Loss: 2.3550
  Validation Accuracy: 0.3270
Epoch 10/600:
  Training Lo