## Step 1: Setup Environment

In [None]:
# Mount Google Drive for data and checkpoints
from google.colab import drive
drive.mount('/content/drive')

# Clone latest repository with fix
!rm -rf tri-objective-robust-xai-medimg
!git clone https://github.com/viraj1011JAIN/tri-objective-robust-xai-medimg.git
%cd tri-objective-robust-xai-medimg

# Pull latest changes (with the fix)
!git pull origin main

print("✅ Repository cloned with fix")

In [None]:
# Check GPU
import torch

print(f"PyTorch: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    mem_gb = torch.cuda.get_device_properties(0).total_memory / 1024**3
    print(f"Memory: {mem_gb:.1f} GB")
    if mem_gb < 16:
        print("⚠️ WARNING: GPU memory is low. Consider using A100.")
else:
    raise RuntimeError("❌ No GPU! Go to Runtime > Change runtime type > GPU")

In [None]:
# Install dependencies
!pip install -r requirements.txt --quiet
!pip install pillow==10.1.0 --quiet
print("✅ Dependencies installed")

## Step 2: Verify Fix

In [None]:
# Verify the PGD normalization fix
from src.losses.tri_objective import TriObjectiveConfig, TRADESLoss

config = TriObjectiveConfig()
print(f"✅ TriObjectiveConfig loaded")
print(f"   clip_min: {config.clip_min}")
print(f"   clip_max: {config.clip_max}")

if config.clip_min == 0.0 and config.clip_max == 1.0:
    raise ValueError("❌ FIX NOT APPLIED! clip_min/max should be [-2.12, 2.64]")
else:
    print("✅ PGD normalization fix verified!")

## Step 3: Setup Data

In [None]:
# Check if ISIC data exists on Google Drive
import os
from pathlib import Path

# Expected data locations
DATA_PATHS = [
    "/content/drive/MyDrive/data/processed/isic2018",
    "/content/drive/MyDrive/ISIC2018",
    "/content/data/isic2018",
]

DATA_ROOT = None
for path in DATA_PATHS:
    if os.path.exists(path):
        DATA_ROOT = path
        break

if DATA_ROOT:
    print(f"✅ Found ISIC data at: {DATA_ROOT}")
    # List contents
    contents = os.listdir(DATA_ROOT)
    print(f"   Contents: {contents[:5]}..." if len(contents) > 5 else f"   Contents: {contents}")
else:
    print("❌ ISIC data not found. Please upload to Google Drive.")
    print("Expected locations:")
    for p in DATA_PATHS:
        print(f"  - {p}")

## Step 4: Train Tri-Objective Model

In [None]:
# Training configuration
TRAINING_CONFIG = {
    "data_root": DATA_ROOT,
    "checkpoint_dir": "/content/drive/MyDrive/checkpoints/tri_objective_fixed",
    "max_epochs": 60,
    "batch_size": 32,
    "learning_rate": 1e-4,
    "weight_decay": 1e-4,
    "lambda_rob": 0.3,
    "lambda_expl": 0.0,  # Disable explanation loss (no CAVs)
    "trades_beta": 6.0,
    "pgd_epsilon": 8/255,
    "pgd_steps": 7,
    "seeds": [42, 123, 456],
}

print("Training Configuration:")
for k, v in TRAINING_CONFIG.items():
    print(f"  {k}: {v}")

In [None]:
import sys
import logging
from datetime import datetime
from pathlib import Path

import torch
import torch.nn as nn
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR
from torch.utils.data import DataLoader

# Project imports
from src.datasets.isic import ISICDataset
from src.datasets.transforms import get_isic_transforms
from src.models.build import build_model
from src.training.tri_objective_trainer import TriObjectiveTrainer, TriObjectiveConfig
from src.utils.reproducibility import set_global_seed

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def train_tri_objective_seed(seed: int, config: dict):
    """Train tri-objective model for a single seed."""
    logger.info(f"\n{'='*60}")
    logger.info(f"Training Tri-Objective Model - Seed {seed}")
    logger.info(f"{'='*60}")
    
    # Set seed
    set_global_seed(seed)
    
    # Device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    logger.info(f"Device: {device}")
    
    # Create data loaders
    logger.info("Loading datasets...")
    train_transforms = get_isic_transforms('train', image_size=224)
    val_transforms = get_isic_transforms('val', image_size=224)
    
    train_dataset = ISICDataset(
        root=config['data_root'],
        split='train',
        transforms=train_transforms
    )
    val_dataset = ISICDataset(
        root=config['data_root'],
        split='val',
        transforms=val_transforms
    )
    
    train_loader = DataLoader(
        train_dataset,
        batch_size=config['batch_size'],
        shuffle=True,
        num_workers=4,
        pin_memory=True
    )
    val_loader = DataLoader(
        val_dataset,
        batch_size=config['batch_size'],
        shuffle=False,
        num_workers=4,
        pin_memory=True
    )
    
    logger.info(f"Train samples: {len(train_dataset)}")
    logger.info(f"Val samples: {len(val_dataset)}")
    
    # Build model
    logger.info("Building ResNet-50 model...")
    model = build_model(
        name='resnet50',
        num_classes=7,
        pretrained=True
    )
    model = model.to(device)
    
    # Optimizer and scheduler
    optimizer = AdamW(
        model.parameters(),
        lr=config['learning_rate'],
        weight_decay=config['weight_decay']
    )
    scheduler = CosineAnnealingLR(
        optimizer,
        T_max=config['max_epochs'],
        eta_min=1e-6
    )
    
    # Trainer config
    trainer_config = TriObjectiveConfig(
        max_epochs=config['max_epochs'],
        learning_rate=config['learning_rate'],
        weight_decay=config['weight_decay'],
        early_stopping_patience=15,
        lambda_rob=config['lambda_rob'],
        lambda_expl=config['lambda_expl'],
        trades_beta=config['trades_beta'],
        pgd_epsilon=config['pgd_epsilon'],
        pgd_num_steps=config['pgd_steps'],
        generate_heatmaps=False,
        batch_size=config['batch_size'],
        device=str(device),
    )
    
    # Checkpoint directory for this seed
    checkpoint_dir = Path(config['checkpoint_dir']) / f"seed_{seed}"
    checkpoint_dir.mkdir(parents=True, exist_ok=True)
    
    # Create trainer
    trainer = TriObjectiveTrainer(
        model=model,
        optimizer=optimizer,
        train_loader=train_loader,
        config=trainer_config,
        val_loader=val_loader,
        scheduler=scheduler,
        device=str(device),
    )
    trainer.checkpoint_dir = checkpoint_dir
    
    # Train
    logger.info(f"Starting training for {config['max_epochs']} epochs...")
    start_time = datetime.now()
    history = trainer.fit()
    end_time = datetime.now()
    
    logger.info(f"Training completed in: {end_time - start_time}")
    logger.info(f"Best validation loss: {trainer.best_metric:.4f}")
    logger.info(f"Checkpoint saved to: {checkpoint_dir}")
    
    return history

print("✅ Training function defined")

In [None]:
# Train all seeds
all_histories = {}

for seed in TRAINING_CONFIG['seeds']:
    try:
        history = train_tri_objective_seed(seed, TRAINING_CONFIG)
        all_histories[seed] = history
        print(f"\n✅ Seed {seed} completed successfully!")
    except Exception as e:
        print(f"\n❌ Seed {seed} failed: {e}")
        import traceback
        traceback.print_exc()

print(f"\n{'='*60}")
print("TRAINING COMPLETE")
print(f"{'='*60}")
print(f"Successful seeds: {list(all_histories.keys())}")

## Step 5: Evaluate Models

In [None]:
# Quick evaluation of trained models
from src.attacks.pgd import pgd_attack
import torch.nn.functional as F

def evaluate_model(checkpoint_path, test_loader, device):
    """Evaluate model on clean and adversarial examples."""
    # Load model
    model = build_model('resnet50', num_classes=7, pretrained=False)
    checkpoint = torch.load(checkpoint_path, map_location=device, weights_only=False)
    model.load_state_dict(checkpoint['model_state_dict'])
    model = model.to(device)
    model.eval()
    
    correct_clean = 0
    correct_robust = 0
    total = 0
    
    for batch in test_loader:
        images = batch['image'].to(device)
        labels = batch['label'].to(device)
        
        # Clean accuracy
        with torch.no_grad():
            outputs = model(images)
            _, preds = outputs.max(1)
            correct_clean += (preds == labels).sum().item()
        
        # Robust accuracy (PGD-20)
        images_adv = pgd_attack(
            model, images, labels,
            epsilon=8/255,
            num_steps=20,
            step_size=2/255,
            clip_min=-2.12,  # ImageNet normalized
            clip_max=2.64
        )
        with torch.no_grad():
            outputs_adv = model(images_adv)
            _, preds_adv = outputs_adv.max(1)
            correct_robust += (preds_adv == labels).sum().item()
        
        total += labels.size(0)
    
    clean_acc = correct_clean / total * 100
    robust_acc = correct_robust / total * 100
    
    return clean_acc, robust_acc

print("✅ Evaluation function defined")

In [None]:
# Evaluate all trained models
from pathlib import Path

device = torch.device('cuda')
checkpoint_dir = Path(TRAINING_CONFIG['checkpoint_dir'])

# Create test loader
test_transforms = get_isic_transforms('test', image_size=224)
test_dataset = ISICDataset(
    root=TRAINING_CONFIG['data_root'],
    split='test',
    transforms=test_transforms
)
test_loader = DataLoader(
    test_dataset,
    batch_size=32,
    shuffle=False,
    num_workers=4
)

print("\n" + "="*60)
print("EVALUATION RESULTS")
print("="*60)

results = []
for seed in TRAINING_CONFIG['seeds']:
    ckpt_path = checkpoint_dir / f"seed_{seed}" / "best.pt"
    if ckpt_path.exists():
        clean_acc, robust_acc = evaluate_model(ckpt_path, test_loader, device)
        results.append((seed, clean_acc, robust_acc))
        print(f"Seed {seed}: Clean={clean_acc:.1f}%, Robust={robust_acc:.1f}%")
    else:
        print(f"Seed {seed}: Checkpoint not found at {ckpt_path}")

if results:
    avg_clean = sum(r[1] for r in results) / len(results)
    avg_robust = sum(r[2] for r in results) / len(results)
    print("\n" + "-"*60)
    print(f"Average: Clean={avg_clean:.1f}%, Robust={avg_robust:.1f}%")
    print("-"*60)
    
    # Check if meeting targets
    if avg_clean >= 75 and avg_robust >= 35:
        print("✅ Model meets RQ1 targets!")
    else:
        print("⚠️ Model below targets. May need hyperparameter tuning.")

## Step 6: Copy Checkpoints to Standard Location

In [None]:
# Copy checkpoints to standard project location for evaluation pipeline
import shutil

src_dir = Path(TRAINING_CONFIG['checkpoint_dir'])
dst_dir = Path('/content/tri-objective-robust-xai-medimg/checkpoints/tri_objective')

if src_dir.exists():
    dst_dir.mkdir(parents=True, exist_ok=True)
    for seed in TRAINING_CONFIG['seeds']:
        src = src_dir / f"seed_{seed}" / "best.pt"
        dst = dst_dir / f"seed_{seed}" / "best.pt"
        if src.exists():
            dst.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy(src, dst)
            print(f"Copied seed {seed} checkpoint")
    print(f"\n✅ Checkpoints copied to {dst_dir}")
else:
    print("❌ Source checkpoint directory not found")

## Done!

The tri-objective models have been trained with the PGD normalization fix.

### Next Steps:
1. Download checkpoints from Google Drive
2. Copy to local project: `checkpoints/tri_objective/`
3. Run full RQ1 evaluation locally