In [5]:
# prompt: cannot create extraction directory: /content/drive/MyDrive/Colab\ Notebooks/dataset
#            No such file or directory

import os

# Create the directory if it doesn't exist
directory_path = '/content/drive/MyDrive/Colab Notebooks/dataset/archive.zip'
if not os.path.exists(directory_path):
    os.makedirs(directory_path)
    print(f"Directory '{directory_path}' created successfully.")
else:
    print(f"Directory '{directory_path}' already exists.")


Directory '/content/drive/MyDrive/Colab Notebooks/dataset/archive.zip' already exists.


In [None]:

import os
# Check if the directory exists
dataset_dir = "/content/drive/MyDrive/Colab Notebooks/dataset/archive.zip"
!unzip '/content/drive/MyDrive/Colab Notebooks/dataset/archive.zip' -d '/content/drive/MyDrive/Colab Notebooks/dataset/'

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: /content/drive/MyDrive/Colab Notebooks/dataset/resized_train_cropped/resized_train_cropped/24064_left.jpeg  
  inflating: /content/drive/MyDrive/Colab Notebooks/dataset/resized_train_cropped/resized_train_cropped/24064_right.jpeg  
  inflating: /content/drive/MyDrive/Colab Notebooks/dataset/resized_train_cropped/resized_train_cropped/24073_left.jpeg  
  inflating: /content/drive/MyDrive/Colab Notebooks/dataset/resized_train_cropped/resized_train_cropped/24073_right.jpeg  
  inflating: /content/drive/MyDrive/Colab Notebooks/dataset/resized_train_cropped/resized_train_cropped/24074_left.jpeg  
  inflating: /content/drive/MyDrive/Colab Notebooks/dataset/resized_train_cropped/resized_train_cropped/24074_right.jpeg  
  inflating: /content/drive/MyDrive/Colab Notebooks/dataset/resized_train_cropped/resized_train_cropped/24075_left.jpeg  
  inflating: /content/drive/MyDrive/Colab Notebooks/dataset/resized_train_crop

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import cohen_kappa_score
from torchvision import models, transforms
from torch.utils.data import DataLoader, Dataset
import torch
import torch.nn as nn
import torch.optim as optim
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.image import show_cam_on_image
from PIL import Image

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# === DATASET PREPARATION ===
class DRDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.data.iloc[idx, 0])
        image = Image.open(img_name).convert("RGB")
        label = self.data.iloc[idx, 1]

        if self.transform:
            image = self.transform(image)

        return image, label

# Transforms
transform = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
}

# Load datasets
dataset_train = DRDataset(csv_file='train_labels.csv', root_dir='train_images', transform=transform['train'])
dataset_val = DRDataset(csv_file='val_labels.csv', root_dir='val_images', transform=transform['val'])

def load_model(model_name):
    if model_name == "ResNet18":
        model = models.resnet18(pretrained=True)
    elif model_name == "ResNet34":
        model = models.resnet34(pretrained=True)
    elif model_name == "VGG":
        model = models.vgg16(pretrained=True)
    elif model_name == "EfficientNet":
        model = models.efficientnet_b0(pretrained=True)
    else:
        raise ValueError("Unsupported model name")

    num_features = model.fc.in_features if hasattr(model, 'fc') else model.classifier[-1].in_features
    if hasattr(model, 'fc'):
        model.fc = nn.Linear(num_features, 5)
    else:
        model.classifier[-1] = nn.Linear(num_features, 5)

    return model

# Select model
model_name = "ResNet18"  # Change this to "ResNet34", "VGG", or "EfficientNet" as needed
model = load_model(model_name)
model = model.to(device)

train_loader = DataLoader(dataset_train, batch_size=32, shuffle=True)
val_loader = DataLoader(dataset_val, batch_size=32, shuffle=False)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# === TRAINING LOOP ===
def train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=10):
    train_losses, val_losses = [], []
    best_kappa = -1

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        train_loss /= len(train_loader)
        train_losses.append(train_loss)

        # Validation
        model.eval()
        val_loss = 0
        all_preds, all_labels = [], []

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                val_loss += loss.item()
                preds = torch.argmax(outputs, dim=1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        val_loss /= len(val_loader)
        val_losses.append(val_loss)

        # Cohen Kappa
        kappa = cohen_kappa_score(all_preds, all_labels, weights='quadratic')
        print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Kappa: {kappa:.4f}")

        # Save best model
        if kappa > best_kappa:
            best_kappa = kappa
            torch.save(model.state_dict(), f'best_model_{model_name}.pth')

    return train_losses, val_losses

train_losses, val_losses = train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=10)

# === FINE-TUNE ON DEEPDRiD DATASET ===
# Load DeepDRiD dataset
dataset_deepdrid_train = DRDataset(csv_file='/content/drive/MyDrive/Colab Notebooks/dataset/521153S-3005-final-project/DeepDRiD/train.csv', root_dir='/content/drive/MyDrive/Colab Notebooks/dataset/521153S-3005-final-project/DeepDRiD/train', transform=transform['train'])
dataset_deepdrid_val = DRDataset(csv_file='/content/drive/MyDrive/Colab Notebooks/dataset/521153S-3005-final-project/DeepDRiD/val.csv', root_dir='/content/drive/MyDrive/Colab Notebooks/dataset/521153S-3005-final-project/DeepDRiD/val', transform=transform['val'])

train_loader_deepdrid = DataLoader(dataset_deepdrid_train, batch_size=32, shuffle=True)
val_loader_deepdrid = DataLoader(dataset_deepdrid_val, batch_size=32, shuffle=False)

# Load pretrained model from Kaggle DR Resized training
model.load_state_dict(torch.load(f'best_model_{model_name}.pth'))

# Fine-tune on DeepDRiD
def fine_tune_on_deepdrid(model, criterion, optimizer, train_loader, val_loader, num_epochs=10):
    train_losses, val_losses = [], []
    best_kappa = -1

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        train_loss /= len(train_loader)
        train_losses.append(train_loss)

        # Validation
        model.eval()
        val_loss = 0
        all_preds, all_labels = [], []

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                val_loss += loss.item()
                preds = torch.argmax(outputs, dim=1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        val_loss /= len(val_loader)
        val_losses.append(val_loss)

        # Cohen Kappa
        kappa = cohen_kappa_score(all_preds, all_labels, weights='quadratic')
        print(f"Epoch {epoch + 1}/{num_epochs} (DeepDRiD), Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Kappa: {kappa:.4f}")

        # Save best model
        if kappa > best_kappa:
            best_kappa = kappa
            torch.save(model.state_dict(), f'best_model_deepdrid_{model_name}.pth')

    return train_losses, val_losses

train_losses_deepdrid, val_losses_deepdrid = fine_tune_on_deepdrid(model, criterion, optimizer, train_loader_deepdrid, val_loader_deepdrid, num_epochs=10)

# === VISUALIZATIONS ===
plt.figure()
plt.plot(train_losses_deepdrid, label='DeepDRiD Training Loss')
plt.plot(val_losses_deepdrid, label='DeepDRiD Validation Loss')
plt.legend()
plt.title('Losses over Epochs (DeepDRiD)')
plt.show()

# === TEST SUBMISSION ===
def create_submission_file(model, test_csv, test_images_dir, output_csv):
    model.eval()
    test_data = pd.read_csv(test_csv)
    predictions = []

    transform_test = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    with torch.no_grad():
        for idx, row in test_data.iterrows():
            img_path = os.path.join(test_images_dir, row['ID'])
            image = Image.open(img_path).convert("RGB")
            input_tensor = transform_test(image).unsqueeze(0).to(device)
            output = model(input_tensor)
            pred = torch.argmax(output, dim=1).item()
            predictions.append(pred)

    test_data['Prediction'] = predictions
    test_data.to_csv(output_csv, index=False)
    print(f"Submission file saved to {output_csv}")

# Example usage:
create_submission_file(model, test_csv='/content/drive/MyDrive/Colab Notebooks/dataset/521153S-3005-final-project/DeepDRiD/test.csv', test_images_dir='/content/drive/MyDrive/Colab Notebooks/dataset/521153S-3005-final-project/DeepDRiD/test', output_csv='/content/drive/MyDrive/Colab Notebooks/dataset/submission_hybrid.csv')






In [None]:
# === GRADCAM ===
def apply_gradcam(model, img_path, target_layer):
    transform_test = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    image = Image.open(img_path).convert("RGB")
    input_tensor = transform_test(image).unsqueeze(0).to(device)

    cam = GradCAM(model=model, target_layers=target_layer)
    grayscale_cam = cam(input_tensor=input_tensor, target_category=None)
    grayscale_cam = grayscale_cam[0, :]

    visualization = show_cam_on_image(np.array(image) / 255.0, grayscale_cam, use_rgb=True)
    plt.imshow(visualization)
    plt.show()

In [None]:
example_image, _ = train_dataset[0]

In [None]:
# Example usage:
model.load_state_dict(torch.load(f'best_model_deepdrid_{model_name}.pth'))
apply_gradcam(model, 'example_image.jpg', target_layer=[model.layer4[-1]])