In [None]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torchvision import models
from utils import DRDataset
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from imblearn.under_sampling import RandomUnderSampler
import optuna

# Constants
DESC = 'resnet_tune'
DATA_PATH = '../data/preproc_train_imgs.pth'
LABEL_CSV = '../data/trainLabels.csv'
RESULT_DIR = f'../results/{DESC}'
os.makedirs(RESULT_DIR, exist_ok=True)
BATCH_SIZE = 32
NUM_CLASSES = 5
NUM_EPOCHS_TUNE = 50

# Load and split data
labels = pd.read_csv(LABEL_CSV)
labels_train, labels_test = train_test_split(labels, test_size=0.3, stratify=labels['level'], random_state=42)
labels_val, labels_test = train_test_split(labels_test, test_size=2/3, stratify=labels_test['level'], random_state=42)

def undersample_df(df):
    rus = RandomUnderSampler(random_state=42)
    X_res, y_res = rus.fit_resample(df[['image']], df['level'])
    return pd.DataFrame({'image': X_res['image'], 'level': y_res})

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def objective(trial):
    # --- Hyperparameters ---
    use_undersampling = trial.suggest_categorical('undersample', [True, False])
    optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'SGD'])
    lr = trial.suggest_float('lr', 1e-5, 1e-3, log=True)
    dropout1 = trial.suggest_float('dropout1', 0.3, 0.7)
    dropout2 = trial.suggest_float('dropout2', 0.2, 0.5)
    hidden1 = trial.suggest_int('hidden1', 512, 2048, step=256)
    hidden2 = trial.suggest_int('hidden2', 128, 1024, step=128)
    num_fc_layers = trial.suggest_int('num_fc_layers', 1, 3)
    hidden3 = trial.suggest_int('hidden3', 64, 512, step=64) if num_fc_layers == 3 else None
    weighted_loss = trial.suggest_categorical('weighted_loss', [True, False])
    unfreeze_layer4 = trial.suggest_categorical('unfreeze_layer4', [True, False])

    # Dataset
    train_labels_bal = undersample_df(labels_train) if use_undersampling else labels_train

    train_data = DRDataset(train_labels_bal, DATA_PATH, preproc=models.ResNet50_Weights.DEFAULT.transforms())
    val_data = DRDataset(labels_val, DATA_PATH, preproc=models.ResNet50_Weights.DEFAULT.transforms())

    train_loader = torch.utils.data.DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_data, batch_size=BATCH_SIZE, shuffle=False)
    train_N = len(train_loader.dataset)
    val_N = len(val_loader.dataset)

    # Model
    model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
    model.requires_grad_(False)
    if unfreeze_layer4:
        for param in model.layer4.parameters():
            param.requires_grad = True

    fc_layers = [
        nn.Linear(model.fc.in_features, hidden1),
        nn.ReLU(),
        nn.Dropout(p=dropout1)
    ]
    if num_fc_layers >= 2:
        fc_layers.extend([
            nn.Linear(hidden1, hidden2),
            nn.ReLU(),
            nn.Dropout(p=dropout2)
        ])
    if num_fc_layers == 3:
        fc_layers.extend([
            nn.Linear(hidden2, hidden3),
            nn.ReLU(),
            nn.Dropout(p=0.25),
            nn.Linear(hidden3, NUM_CLASSES)
        ])
    else:
        fc_layers.append(nn.Linear(hidden2 if num_fc_layers == 2 else hidden1, NUM_CLASSES))

    model.fc = nn.Sequential(*fc_layers)
    model.to(DEVICE)

    # Loss
    if weighted_loss:
        weights = compute_class_weight('balanced', classes=np.arange(NUM_CLASSES), y=train_labels_bal['level'])
        class_weights = torch.tensor(weights, dtype=torch.float32).to(DEVICE)
        loss_fn = nn.CrossEntropyLoss(weight=class_weights)
    else:
        loss_fn = nn.CrossEntropyLoss()

    # Optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=lr) if optimizer_name == 'Adam' else \
                torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)

    # Training loop with pruning
    for epoch in range(NUM_EPOCHS_TUNE):
        model.train()
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()

        # Validation step per epoch
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
                outputs = model(inputs)
                preds = outputs.argmax(dim=1)
                correct += (preds == labels).sum().item()
                total += labels.size(0)
        val_acc = correct / total

        # Report to Optuna and check for pruning
        trial.report(val_acc, epoch)
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    return val_acc

# Optuna study with pruning
pruner = optuna.pruners.MedianPruner(n_warmup_steps=3)
study = optuna.create_study(direction='maximize', pruner=pruner)
study.optimize(objective, n_trials=30)

# Save best result
print('Best trial:', study.best_trial.params)
with open(os.path.join(RESULT_DIR, 'best_params.txt'), 'w') as f:
    f.write(str(study.best_trial.params))

[I 2025-04-11 12:37:06,598] A new study created in memory with name: no-name-f6f9c83a-6ad8-4881-b470-8aef5825304d
[I 2025-04-11 12:39:49,047] Trial 0 finished with value: 0.23690205011389523 and parameters: {'undersample': True, 'optimizer': 'SGD', 'lr': 5.4439884808594094e-05, 'dropout1': 0.5985329872832813, 'dropout2': 0.4411090099148564, 'hidden1': 1792, 'hidden2': 640, 'num_fc_layers': 1, 'weighted_loss': True, 'unfreeze_layer4': True}. Best is trial 0 with value: 0.23690205011389523.
