### Milestone 2

</br>Author : Nadine Mohamed (20162200)
</br>Date : 12/12/2024

In [None]:
import pickle
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
import torchvision.transforms as transforms
from PIL import Image
from torchvision import models
import matplotlib.pyplot as plt
import csv


# modify the following
path_train_data = "./data/train_data.pkl"
path_test_data = "./data/test_data.pkl"

# ResNET50 Hyperparameters
dropout_rate = 0.6533917677589358
lr = 0.00013562523589705295
weight_decay = 1.4871085437648996e-05
epochs = 10
batch_size = 64

In [2]:
class RetinalDataset(Dataset):
    """
    A dataset class for loading and transforming retinal images.
    """
    def __init__(self, images, labels=None, transform=None):
        self.images = images.astype(np.float32)
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img = self.images[idx]
        img = np.stack([img, img, img], axis=-1)
        img = (img * 255).astype(np.uint8)
        img = Image.fromarray(img)
        if self.transform:
            img = self.transform(img)
        else:
            img = transforms.ToTensor()(img)

        if self.labels is not None:
            label = self.labels[idx]
            return img, label
        else:
            return img

In [None]:
class finalResNetModel:
    """
    A final model class for training and evaluating a ResNet50 model on the retinal dataset.
    Adapts the style of simpleNN but uses PyTorch and a pretrained ResNet50 architecture.
    """

    def __init__(self, num_classes=4, dropout_rate=0.6533917677589358, lr=0.00013562523589705295, 
                 weight_decay=1.4871085437648996e-05, seed=42, device=None):
        """
        Initializes the ResNet50 model with given hyperparameters.

        args:
            num_classes : int
                Number of output classes.
            dropout_rate : float
                Dropout rate for the final layer.
            lr : float
                Learning rate.
            weight_decay : float
                Weight decay (L2 regularization).
            seed : int
                Random seed.
            device : torch.device
                Device to run computations on (e.g. torch.device('cuda')).
        """
        np.random.seed(seed)
        torch.manual_seed(seed)

        self.device = device if device is not None else torch.device('cuda' if torch.cuda.is_available() else 'cpu')

        # Load pretrained ResNet50
        self.model = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V2)
        in_ftrs = self.model.fc.in_features
        self.model.fc = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(in_ftrs, num_classes)
        )
        self.model = self.model.to(self.device)

        self.criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr, weight_decay=weight_decay)

    def train_one_epoch(self, train_loader):
        """
        Trains the model for one epoch on the training data.
        """
        self.model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        for X_batch, y_batch in train_loader:
            X_batch = X_batch.to(self.device)
            y_batch = y_batch.to(self.device)

            self.optimizer.zero_grad()
            outputs = self.model(X_batch)
            loss = self.criterion(outputs, y_batch)
            loss.backward()
            self.optimizer.step()

            running_loss += loss.item() * X_batch.size(0)
            _, preds = torch.max(outputs, 1)
            total += y_batch.size(0)
            correct += (preds == y_batch).sum().item()

        avg_loss = running_loss / total
        accuracy = correct / total
        return avg_loss, accuracy

    def evaluate(self, data_loader):
        """
        Evaluates the model on given data_loader.
        """
        self.model.eval()
        running_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for X_batch, y_batch in data_loader:
                X_batch = X_batch.to(self.device)
                y_batch = y_batch.to(self.device)
                outputs = self.model(X_batch)
                loss = self.criterion(outputs, y_batch)

                running_loss += loss.item() * X_batch.size(0)
                _, preds = torch.max(outputs, 1)
                total += y_batch.size(0)
                correct += (preds == y_batch).sum().item()

        avg_loss = running_loss / total
        accuracy = correct / total
        return avg_loss, accuracy

    def predict(self, data_loader):
        """
        Predicts class labels for the given data_loader.
        """
        self.model.eval()
        all_preds = []
        with torch.no_grad():
            for X_batch in data_loader:
                X_batch = X_batch.to(self.device)
                outputs = self.model(X_batch)
                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
        return np.array(all_preds)

    def fit(self, train_loader, val_loader, epochs=10, patience=10):
        """
        Trains the model using training and validation data loaders.

        args:
            train_loader : DataLoader
                DataLoader for training data
            val_loader : DataLoader
                DataLoader for validation data
            epochs : int
                Number of epochs.
            patience : int
                Early stopping patience.
        """
        best_val_loss = float('inf')
        epochs_no_improve = 0

        for epoch in range(epochs):
            train_loss, train_acc = self.train_one_epoch(train_loader)
            val_loss, val_acc = self.evaluate(val_loader)

            if (epoch + 1) % 5 == 0 or epoch == 0:
                print(
                    f"Epoch {epoch+1}/{epochs} "
                    f"- Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
                    f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}"
                )

            if val_loss < best_val_loss:
                best_val_loss = val_loss
                epochs_no_improve = 0
                torch.save(self.model.state_dict(), 'final_pretrained_model.pth')
            else:
                epochs_no_improve += 1

            if epochs_no_improve >= patience:
                print("early stopping")
                break

    def load_best(self, path='final_pretrained_model.pth'):
        """
        Loads the best model weights saved during training.
        """
        self.model.load_state_dict(torch.load(path))

In [None]:
# Train
with open(path_train_data, "rb") as f:
    train_data = pickle.load(f)
train_images = np.array(train_data["images"])
train_labels = np.array(train_data["labels"])

# split into train/val
X_train_np, X_val_np, y_train_np, y_val_np = train_test_split(
    train_images, train_labels, test_size=0.2, random_state=42, stratify=train_labels
)

imagenet_mean = [0.485, 0.456, 0.406]
imagenet_std = [0.229, 0.224, 0.225]

train_transform = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomVerticalFlip(),
        transforms.RandomRotation(15),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
        transforms.ToTensor(),
        transforms.Normalize(mean=imagenet_mean, std=imagenet_std),
    ]
)

val_transform = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=imagenet_mean, std=imagenet_std),
    ]
)

train_dataset = RetinalDataset(X_train_np, y_train_np, transform=train_transform)
val_dataset = RetinalDataset(X_val_np, y_val_np, transform=val_transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=4)

model = finalResNetModel()
model.fit(train_loader, val_loader, epochs=10, patience=10)
model.load_best()

# Evaluate on test set or predict on unlabeled data as needed.

Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to C:\Users\ns99a/.cache\torch\hub\checkpoints\resnet50-11ad3fa6.pth
100%|██████████| 97.8M/97.8M [00:04<00:00, 22.2MB/s]


In [None]:
# Prediction on test (unlabeled) data
with open(path_test_data, 'rb') as f:
    test_data = pickle.load(f)
X_test = np.array(test_data['images'])

test_dataset = RetinalDataset(X_test, labels=None, transform=val_transform)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=4)

test_preds = model.predict(test_loader)

with open('submission.csv', 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(["ID", "Class"])
    for i, pred in enumerate(test_preds, start=1):
        writer.writerow([i, pred])

print("Submission saved to 'submission.csv'.")

Submission saved to 'submission.csv'.


---

#### Sources/references:

1. IFT6390 Course material 
2. Torchvision Documentation : https://pytorch.org/vision/main/models.html
3. Torchvision Documentation : https://pytorch.org/tutorials/intermediate/torchvision_tutorial.html
4. The help of AI tools (Co-pilot, ChatGPT, Gemini) 
AI tools, including GitHub Co-pilot and ChatGPT, were utilized during the coding process. These tools primarily contributed to generating docstrings, refining code structure, and offering suggestions inline within the IDE.