In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
data_dir = '/content/drive/My Drive/Ferritico_assignment'
os.listdir(data_dir)

In [None]:
import torch
torch.manual_seed(0)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Dataset

In [None]:
import torch
import os
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

class TensorFolderDataset(Dataset):
    def __init__(self, root_dir):
        """
        Args:
            root_dir (str): Path to the main dataset folder.
        """
        self.root_dir = root_dir
        self.samples = []  # List of (tensor_path, class_idx)

        self.classes = sorted(os.listdir(root_dir))
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}

        for cls in self.classes:
            class_dir = os.path.join(root_dir, cls)
            for fname in os.listdir(class_dir):
                if fname.endswith(".pt"):
                    self.samples.append((os.path.join(class_dir, fname), self.class_to_idx[cls]))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        tensor_path, class_idx = self.samples[idx]
        tensor = torch.load(tensor_path)

        return tensor, class_idx


In [None]:
train_data_dir = os.path.join(data_dir, 'Train')
val_data_dir = os.path.join(data_dir, 'Validation')

image_datasets = {
    'train': TensorFolderDataset(train_data_dir),
    'val': TensorFolderDataset(val_data_dir)
}

# Create dataloaders
dataloaders = {
    'train': DataLoader(image_datasets['train'], batch_size=32, shuffle=True, num_workers=2),
    'val': DataLoader(image_datasets['val'], batch_size=32, shuffle=False, num_workers=2)
}

# class_names = image_datasets['train'].classes

# ConvNeXt-small, optimizers and scheduler

In [None]:
import torch
import torch.nn as nn
from torchvision.models import convnext_small, ConvNeXt_Small_Weights

def initialize_convnext(num_classes=3, train_last_conv=False):
    """
    Load and modify ConvNeXt-Small for fine-tuning
    Args:
        train_last_conv: if True, also train the last convolutional block
    """

    model = convnext_small(weights=ConvNeXt_Small_Weights.IMAGENET1K_V1)

    # Freeze all layers by default
    for param in model.parameters():
        param.requires_grad = False

    # Optionally unfreeze last convolutional block
    if train_last_conv:
        for param in model.features[-1].parameters():
            param.requires_grad = True

    model.classifier = nn.Sequential(
        nn.Flatten(1),
        nn.Linear(768, num_classes)
    )

    for param in model.classifier.parameters():
        param.requires_grad = True

    return model

In [None]:
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau


# Define loss function, optimizer, and learning rate scheduler
def optimizer_setup(model, lr=0.001):
  optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5)  # Weight decay for regularization
  criterion = nn.CrossEntropyLoss()
  scheduler = ReduceLROnPlateau(optimizer, 'min', patience=5)

  return optimizer, criterion, scheduler

# Training function

In [None]:
import torch.nn as nn
import torch.optim as optim
import copy
from torchvision import transforms

# # Training function
def train_model_convnext(model, criterion, optimizer, scheduler, num_epochs=60, data_augment=False):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    early_stopping_counter = 0
    early_stopping_patience = 20  # Number of epochs with no improvement

    #for data augmentation
    transform = transforms.Compose([
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(degrees=15)])

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:

                if data_augment and phase == 'train':
                  inputs = transform(inputs)

                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(image_datasets[phase])
            epoch_acc = running_corrects.double() / len(image_datasets[phase])

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
                early_stopping_counter = 0  # Reset counter on improvement
                torch.save(model.state_dict(), 'best_model.pt') # Save checkpoint
            elif phase == 'val':
                early_stopping_counter += 1

            if phase == 'val':
                scheduler.step(epoch_loss)  # Adjust learning rate based on validation loss

            if early_stopping_counter >= early_stopping_patience:
                print("Early stopping triggered!")
                break

        if early_stopping_counter >= early_stopping_patience:
            break  #Exit outer loop if early stopping occurs

    print(f'Best val Acc: {best_acc:4f}')

    # Load best model weights
    model.load_state_dict(best_model_wts)
    return model, best_acc


# Search over parameters

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
import pandas as pd

def run_search_experiments():
    # Parameters to test
    learning_rates = [0.001, 0.0001]
    augmentation = [True, False]
    conv_training = [True, False]
    abs_best_acc = 0
    best_params = None

    results = []

    for lr in learning_rates:
        for aug in augmentation:
            for train_conv in conv_training:
                print(f"\nStarting experiment with:")
                print(f"Learning rate: {lr}")
                print(f"Augmentation: {aug}")
                print(f"Training last conv: {train_conv}")

                # Initialize model
                model = initialize_convnext(
                    num_classes=3,
                    train_last_conv=train_conv
                )
                model = model.to(device)

                optimizer, criterion, scheduler = optimizer_setup(model, lr=lr)

                # Train model
                model, best_acc = train_model_convnext(
                    model=model,
                    criterion=criterion,
                    optimizer=optimizer,
                    scheduler=scheduler,
                    num_epochs=60,
                    data_augment=aug
                )

                if best_acc > abs_best_acc:
                  abs_best_acc = best_acc
                  model_save_path = '/content/drive/My Drive/Ferritico_assignment/convnext_best_model.pt'
                  torch.save(model.state_dict(), model_save_path)
                  best_params = {
                      'lr':lr,
                      'aug': aug,
                      'train_conv': train_conv,
                      'best_acc': best_acc
                      }

                # Store results
                results.append({
                    'learning_rate': lr,
                    'data_augmentation': aug,
                    'train_last_conv': train_conv,
                    'best_accuracy': best_acc,
                })

                # Clear GPU memory
                del model
                torch.cuda.empty_cache()

    # Save and display results
    results_df = pd.DataFrame(results)
    best_params_df = pd.DataFrame(best_params)

    return results_df, best_params_df


if __name__ == "__main__":
    results_df, best_params_df = run_search_experiments()
    print("\nExperiment Results:")
    print(results_df.to_string())

    print("Best params")
    print(best_params_df.to_string())

In [None]:
# model = initialize_convnext(num_classes=num_classes, train_last_conv=train_last_conv)
model = initialize_convnext()

model.load_state_dict(torch.load('/content/drive/My Drive/Ferritico_assignment/convnext_best_model.pt', map_location=device))

model = model.to(device)

model.eval()

correct = 0
total = 0

with torch.no_grad():
    for inputs, labels in dataloaders['val']:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)

        _, predicted = torch.max(outputs, 1)  # Get predicted class
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

accuracy = correct / total

print(f"Validation accuracy: {accuracy}")


