### This notebook contains the code to train the models, and load and test them. Run all cells sequentially. No modifications are needed unless stated
### Install required packages

In [None]:
!pip install matplotlib pandas torch torchmetrics scikit-learn

### Import all libraries and models

In [7]:
# Matplotlib
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
# Numpy
import numpy as np
# Pandas
import pandas as pd
# Torch
import torch
import torch.nn as nn
import json
from torch.utils.data import Dataset, DataLoader
from torchmetrics.classification import Accuracy
from models import ResNet50, ResNet50BiLSTMAttention, ResNet34BiLSTMAttention

import torch.optim as optim

import pickle
import random
from sklearn.model_selection import train_test_split
import os

#Implemented seeding 
def seed_functions(seed):
	"""Seeds functions from numpy and torch."""
	np.random.seed(seed)
	random.seed(seed)
	torch.manual_seed(seed)
	torch.cuda.manual_seed(seed)
	torch.cuda.manual_seed_all(seed)
	torch.backends.cudnn.benchmark = False
	torch.backends.cudnn.deterministic = True
	os.environ['PYTHONHASHSEED'] = str(seed)

SEED = 37
seed_functions(SEED)

### This is cell contains a custom class definition, helper function for instantiation to create a dataset of npy files (no changes needed)

In [8]:
# Custom class defined to store dataset
class SequenceDataset(Dataset):
    def __init__(self, npy_file_paths, labels):
        """
        Args:
            npy_file_paths (list of str): List of file paths for .npy files containing the sequences.
            labels (list): List of labels corresponding to each sequence.
        """
        # Load the sequences and labels
        self.data = [torch.tensor(np.load(file_path)) for file_path in npy_file_paths]
        self.labels = torch.tensor(labels, dtype=torch.float)  # Convert the labels to a tensor

    def __len__(self):
        # Dataset contains as many samples as the number of npy files
        return len(self.data)

    def __getitem__(self, idx):
        # Return the sequence data and its corresponding label
        return self.data[idx], self.labels[idx].long()

# Helper function to create dataset
def create_dataset(path_to_dataset):
    # Storing of dataset into class
    npy_file_paths = []
    labels = []

    languages = sorted([d for d in os.listdir(path_to_dataset) if d != '.ipynb_checkpoints' and os.path.isdir(os.path.join(path_to_dataset, d))])
    print(languages)
    num_languages = len(languages)

    for i, lang_dir in enumerate(languages):
        lang_path = os.path.join(path_to_dataset, lang_dir)
        if not os.path.isdir(lang_path):
            continue  # Skip non-directory files

        # List all .npy files
        file_names = os.listdir(lang_path)
        full_paths = [os.path.join(lang_path, f) for f in file_names]

        # Extend lists
        npy_file_paths.extend(full_paths)
        labels.extend([i] * len(full_paths))


    dataset = SequenceDataset(npy_file_paths, labels)  # THIS IS THE FINAL DATASET
    return dataset, num_languages

### Replace 'None' with the path to the dataset (MODIFY HERE)

In [9]:
path_to_dataset = 'data1' #Enter path to dataset here
dataset, num_languages = create_dataset(path_to_dataset)

['arabic', 'chinese', 'english', 'hindi']


### Helper function to split dataset into train, valid and test dataloaders (no changes needed)

In [10]:
def split_data(dataset):
    # Split dataset into train, validation, and test sets
    indices = np.arange(len(dataset))
    np.random.seed(SEED)
    np.random.shuffle(indices)

    train_size = int(0.7 * len(indices))
    valid_size = (len(indices) - train_size) // 2
    test_size = len(indices) - train_size - valid_size

    train_indices = indices[:train_size]
    valid_indices = indices[train_size:train_size+valid_size]
    test_indices = indices[train_size +valid_size:]

    train_data = torch.utils.data.Subset(dataset, train_indices)
    valid_data = torch.utils.data.Subset(dataset, valid_indices)
    test_data = torch.utils.data.Subset(dataset, test_indices)

    # Define a seed worker for DataLoader
    def seed_worker(worker_id):
        worker_seed = SEED + worker_id
        np.random.seed(worker_seed)
        random.seed(worker_seed)

    # Define generators for DataLoader
    generator = torch.Generator().manual_seed(SEED)

    batch_size = 128
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True,
                              generator=generator, worker_init_fn=seed_worker, drop_last=True)
    valid_loader = DataLoader(valid_data, batch_size=len(valid_data), shuffle=False,
                              generator=generator, worker_init_fn=seed_worker)
    test_loader = DataLoader(test_data, batch_size=len(test_data), shuffle=False,
                             generator=generator, worker_init_fn=seed_worker)

    return train_loader, valid_loader, test_loader

train_loader, valid_loader, test_loader = split_data(dataset)

## Training a model

### Definition of the Trainer class (no changes needed)

In [None]:
class Trainer:
    def __init__(self, model, train_loader, valid_loader, test_loader, num_classes=2, num_epochs=10, patience=3, save_dir='checkpoints'):
        self.device = torch.device("cuda" if (torch.cuda.is_available()) else "cpu")
        self.model = model.to(self.device)
        self.train_loader = train_loader
        self.valid_loader = valid_loader
        self.test_loader = test_loader
        self.num_epochs = num_epochs
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.01, betas=(0.9, 0.999), eps=1e-08)
        self.optimizer.zero_grad()
        self.accuracy_metric = Accuracy(task="multiclass", num_classes=num_classes).to(self.device)
        self.best_val_loss = float('inf')
        self.best_model_state = None
        self.best_epoch = 0
        self.l2_lambda = 0.001
        self._initialize_requires_grad()

        self.train_losses = []
        self.val_losses = []
        self.train_accuracies = []
        self.val_accuracies = []

        model_name = model.__class__.__name__
        self.model_save_dir = os.path.join(save_dir, model_name)
        os.makedirs(self.model_save_dir, exist_ok=True)

        self.patience = patience
        self.early_stopping_counter = 0
        self._save_config() #save the configs of the model in config.json

    def _initialize_requires_grad(self):
        # Make all parameters trainable
        for param in self.model.parameters():
            param.requires_grad = True
        
    def train(self, start_epoch=0):
        for epoch in range(start_epoch, start_epoch + self.num_epochs):
            self.model.train()

            epoch_loss = 0.0
            epoch_accuracy = 0.0

            for inputs, targets in self.train_loader:
                inputs, targets = inputs.float().to(self.device), targets.to(self.device)
                pred = self.model(inputs)
                loss = self.criterion(pred, targets)
                
                # L2 Regularization
                l2_norm = sum(p.pow(2).sum() for p in self.model.parameters())
                loss += self.l2_lambda * l2_norm
                
                self.accuracy_metric.update(pred, targets)
                accuracy = self.accuracy_metric.compute()
                self.accuracy_metric.reset()
                
                loss.backward()
                self.optimizer.step()
                self.optimizer.zero_grad()

                epoch_loss += loss.item()
                epoch_accuracy += accuracy.item()
            
            avg_loss = epoch_loss / len(self.train_loader)
            avg_accuracy = epoch_accuracy / len(self.train_loader)

            val_loss, val_accuracy = self.validate()
            self.train_losses.append(float(avg_loss))
            self.train_accuracies.append(float(avg_accuracy))
            self.val_losses.append(float(val_loss))
            self.val_accuracies.append(float(val_accuracy))

            # Save current model
            torch.save(self.model.state_dict(), os.path.join(self.model_save_dir, f'model_epoch_{epoch+1}.pt'))

            # Best model logic
            if val_loss < self.best_val_loss:
                self.best_val_loss = val_loss
                self.best_model_state = self.model.state_dict()
                self.best_epoch = epoch + 1
                self.early_stopping_counter = 0
                self._save_best_model()
                torch.save(self.best_model_state, os.path.join(self.model_save_dir, 'best_model.pt'))
            else:
                self.early_stopping_counter += 1

            print(f'Epoch [{epoch+1}/{self.num_epochs}], Train Loss: {avg_loss:.4f}, Train Acc: {avg_accuracy:.4f}, '
                  f'Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.4f}, Patience Counter: {self.early_stopping_counter}')

            # Check early stopping
            if self.early_stopping_counter >= self.patience:
                print(f"Early stopping triggered at epoch {epoch+1}")
                break

        self._load_best_model()
        self.test()

    def validate(self):
        self.model.eval()
        total_loss = 0.0
        total_accuracy = 0.0

        with torch.no_grad():
            for inputs, targets in self.valid_loader:
                inputs, targets = inputs.to(self.device), targets.to(self.device)
                pred = self.model(inputs)
                loss = self.criterion(pred, targets).item()
                self.accuracy_metric.update(pred, targets)
                accuracy = self.accuracy_metric.compute()
                self.accuracy_metric.reset()

                total_loss += loss
                total_accuracy += accuracy.item()

        avg_loss = total_loss / len(self.valid_loader)
        avg_accuracy = total_accuracy / len(self.valid_loader)
        return avg_loss, avg_accuracy
      
    def test(self):
        self.model.eval()
        total_loss = 0.0
        total_accuracy = 0.0

        with torch.no_grad():
            for inputs, targets in self.test_loader:
                inputs, targets = inputs.to(self.device), targets.to(self.device)
                pred = self.model(inputs)
                loss = self.criterion(pred, targets).item()
                self.accuracy_metric.update(pred, targets)
                accuracy = self.accuracy_metric.compute()
                self.accuracy_metric.reset()

                total_loss += loss
                total_accuracy += accuracy.item()

        avg_loss = total_loss / len(self.test_loader)
        avg_accuracy = total_accuracy / len(self.test_loader)
        print(f'Final Test Loss: {avg_loss:.4f}, Final Test Accuracy: {avg_accuracy:.4f}')

    def _save_best_model(self):
        with open(os.path.join(self.model_save_dir, "best_model.pkl"), "wb") as f:
            pickle.dump({
                "model_state": self.best_model_state,
                "epoch": self.best_epoch,
                "val_loss": self.best_val_loss
            }, f)

    def _load_best_model(self):
        with open(os.path.join(self.model_save_dir, "best_model.pkl"), "rb") as f:
            saved_data = pickle.load(f)
            self.model.load_state_dict(saved_data["model_state"])
            print(f"Best Model Achieved at Epoch: {saved_data['epoch']} with Validation Loss: {saved_data['val_loss']:.4f}")

    def _save_config(self):
        config = {
            "model_name": self.model.__class__.__name__,
            "num_epochs": self.num_epochs,
            "optimizer": "Adam",
            "lr": self.optimizer.defaults["lr"],
            "betas": self.optimizer.defaults["betas"],
            "eps": self.optimizer.defaults["eps"],
            "loss_function": "CrossEntropyLoss",
            "l2_lambda": self.l2_lambda,
            "num_classes": self.accuracy_metric.num_classes,
            "device": str(self.device),
            "patience": self.patience,
        }
        config_path = os.path.join(self.model_save_dir, "config.json")
        with open(config_path, "w") as f:
            json.dump(config, f, indent=4)        

    def plot_losses(self):
        plt.figure(figsize=(8, 5))
        plt.plot(self.train_losses, label='Train Loss')
        plt.plot(self.val_losses, label='Validation Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('Loss vs. Epoch')
        plt.legend()
        plt.grid(True)
        plt.show()

    def plot_accuracies(self):
        plt.figure(figsize=(8, 5))
        plt.plot(self.train_accuracies, label='Train Accuracy')
        plt.plot(self.val_accuracies, label='Validation Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.title('Accuracy vs. Epoch')
        plt.legend()
        plt.grid(True)
        plt.show()



## Trainer code for 9 languages. To see our final results, head to the section containing trainer code for 4 languages.

### Training of ResNet34BiLSTMAttention (no changes needed)

In [None]:
model = ResNet34BiLSTMAttention(classes=num_languages)
trainer = Trainer(model, train_loader, valid_loader, test_loader, num_classes = num_languages, num_epochs=50, patience=10)
trainer.train()
trainer.plot_losses()
trainer.plot_accuracies()



Mode: train Shape: torch.Size([128, 1, 128, 241])
ResNet Layer conv1 Output Shape: torch.Size([128, 64, 64, 121])
ResNet Layer bn1 Output Shape: torch.Size([128, 64, 64, 121])
ResNet Layer relu Output Shape: torch.Size([128, 64, 64, 121])
ResNet Layer maxpool Output Shape: torch.Size([128, 64, 32, 61])
ResNet Layer layer1 Output Shape: torch.Size([128, 64, 32, 61])
ResNet Layer layer2 Output Shape: torch.Size([128, 128, 16, 31])
ResNet Layer layer3 Output Shape: torch.Size([128, 256, 8, 16])
ResNet Layer layer4 Output Shape: torch.Size([128, 512, 4, 8])
ResNet Layer avgpool Output Shape: torch.Size([128, 512, 8, 8])
train: tensor([[ 0.0468,  0.1238, -0.0036,  0.0526],
        [ 0.0331,  0.1282,  0.0684,  0.0881],
        [-0.0314,  0.0298, -0.0126,  0.0210],
        [ 0.0421, -0.0674, -0.0530,  0.0943],
        [-0.0299, -0.0163, -0.0441,  0.0210],
        [ 0.0374,  0.0300, -0.0180,  0.0573],
        [ 0.0540,  0.0949,  0.0278,  0.0715],
        [-0.0776,  0.0757,  0.0145,  0.0236],
 

### Training of ResNet50BiLSTMAttention (no changes needed)

In [None]:
model = ResNet50BiLSTMAttention(classes=num_languages)
trainer = Trainer(model, train_loader, valid_loader, test_loader, num_classes = num_languages, num_epochs=50, patience=10)
trainer.train()
trainer.plot_losses()
trainer.plot_accuracies()

### Training of Resnet50 (no changes needed)

In [None]:
model = ResNet50(classes=num_languages)
trainer = Trainer(model, train_loader, valid_loader, test_loader, num_classes = num_languages, num_epochs=50, patience=10)
trainer.train()
trainer.plot_losses()
trainer.plot_accuracies()

## Loading and testing a model

### Helper function to load and test a model (no changes needed)

In [None]:
def load_best_model_and_test(model_dir, model, test_loader, num_classes):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    # Load model checkpoint
    with open(os.path.join(model_dir, "best_model.pkl"), "rb") as f:
        saved_data = pickle.load(f)
        model.load_state_dict(saved_data["model_state"])
        print(f"Best Model Achieved at Epoch: {saved_data['epoch']} with Validation Loss: {saved_data['val_loss']:.4f}")
    
    # Setup accuracy metric
    accuracy_metric = Accuracy(task="multiclass", num_classes=num_classes).to(device)

    total_loss = 0.0
    criterion = torch.nn.CrossEntropyLoss()

    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            total_loss += loss.item()
            accuracy_metric.update(outputs, targets)

    avg_loss = total_loss / len(test_loader)
    test_accuracy = accuracy_metric.compute().item()
    print(f"Test Loss: {avg_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

### Load and test ResNet34BiLSTMAttention

In [None]:
model_dir = "checkpoints/ResNet34BiLSTMAttentionlr0001"
model = ResNet34BiLSTMAttention(classes=num_languages)
load_best_model_and_test(model_dir, model, test_loader, num_classes=4)

### Load and test ResNet50BiLSTMAttention

In [None]:
model_dir = "checkpoints/ResNet50BiLSTMAttentionlr0001"
model = ResNet50BiLSTMAttention(classes=num_languages)
load_best_model_and_test(model_dir, model, test_loader, num_classes=4)

### Load and test ResNet50

In [None]:
model_dir = "checkpoints/ResNet500001"
model = ResNet50(classes=num_languages)
load_best_model_and_test(model_dir, model, test_loader, num_classes=4)