In [None]:
# ==============================================================================
# EXPERIMENT EXECUTION SCRIPT
# ==============================================================================
# This script trains and evaluates Student models across multiple experimental
# scenarios (e.g., Threshold Sensitivity, FixMatch Baseline) defined previously.

import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
import torchvision.transforms as T
import time
import json
import random
from pathlib import Path
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, recall_score, confusion_matrix

# --- Local Modules ---
import config
import models
import utils

# Ensure reproducibility
torch.manual_seed(config.SEED)
np.random.seed(config.SEED)

In [None]:
class ThesisHelper:
    """
    Enhanced helper class to manage experiment logging, checkpointing, and artifacts.
    Supports experiment-specific suffixes for organized output.
    """

    def __init__(self, params, class_names, base_dir, run_type='teacher'):
        self.params = params
        self.class_names = class_names
        
        # Detect experiment suffix from parameters
        suffix = params.get('EXPERIMENT_SUFFIX', '') 
        
        # Construct run name with suffix
        if run_type == 'student':
            self.run_name = f"student_trained_with_win_teachers{params['MODEL_NAME']}{suffix}"
        else: 
            self.run_name = f"{run_type}_{params['MODEL_NAME']}"
            
        self.output_dir = Path(base_dir) / self.run_name
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.run_type = run_type 
        
        self.history = []
        self.best_f1_macro = -1.0
        self.best_epoch_metrics = None
        
        print(f"[INFO] ThesisHelper initialized for '{self.run_name}'. Output: {self.output_dir}")

    def log_epoch(self, model, metrics):
        self.history.append(metrics)
        current_f1_macro = metrics['f1m']
        
        if current_f1_macro > self.best_f1_macro:
            self.best_f1_macro = current_f1_macro
            self.best_epoch_metrics = metrics
            print(f"[INFO] New best F1-Macro: {self.best_f1_macro:.4f} (Epoch {metrics['epoch']}). Saving checkpoint...")
            self._save_checkpoint(model)

    def _save_checkpoint(self, model):
        torch.save(model.state_dict(), self.output_dir / 'best_model.pth')

    def finalize(self, total_duration_seconds):
        if not self.history:
            print("[WARN] No history to finalize.")
            return

        # 1. Save History
        history_df = pd.DataFrame(self.history)
        history_df.to_csv(self.output_dir / 'training_history.csv', index=False)
        
        summary = self.best_epoch_metrics.copy()
        summary['total_duration_min'] = total_duration_seconds / 60
        cm = summary.pop('cm', None) 
        
        with open(self.output_dir / 'summary.json', 'w') as f:
            json.dump(summary, f, indent=4)
            
        # 2. Generate Artifacts
        self._plot_curves(history_df)
        self._generate_latex_table(summary, cm)
        self._log_to_excel(summary, cm)
        
        print(f"[INFO] Experiment finalized. Artifacts saved.")

    def _plot_curves(self, df):
        best_epoch = self.best_epoch_metrics['epoch']
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8), sharex=True)
        
        # Loss
        ax1.plot(df['epoch'], df['tr_loss'], 'o-', label='Training Loss')
        ax1.plot(df['epoch'], df['loss'], 'o-', label='Validation Loss')
        ax1.axvline(x=best_epoch, color='r', linestyle='--', label=f'Best Epoch ({best_epoch})')
        ax1.set_ylabel('Loss')
        ax1.legend()
        ax1.grid(True, linestyle='--', alpha=0.6)
        
        # Metrics
        ax2.plot(df['epoch'], df['tr_acc'], 'o-', label='Train Accuracy')
        ax2.plot(df['epoch'], df['acc'], 'o-', label='Val Accuracy')
        ax2.plot(df['epoch'], df['f1m'], 'o-', label='Val F1-Macro', linewidth=2)
        ax2.axvline(x=best_epoch, color='r', linestyle='--')
        ax2.set_xlabel('Epoch')
        ax2.set_ylabel('Metric')
        ax2.legend()
        ax2.grid(True, linestyle='--', alpha=0.6)
        
        plt.tight_layout()
        plt.savefig(self.output_dir / 'training_curves.png', dpi=300)
        plt.close()

    def _generate_latex_table(self, summary, cm):
        latex_str = f"""
\\begin{{table}}[h!]
\\centering
\\caption{{Training summary for experiment {self.run_name.replace('_', ' ')}.}}
\\label{{tab:summary_{self.run_name}}}
\\begin{{tabular}}{{ll}}
\\hline
Parameter & Value \\\\
\\hline
Architecture & {self.params['MODEL_NAME']} \\\\
Best Epoch & {summary['epoch']} \\\\
Duration (min) & {summary['total_duration_min']:.2f} \\\\
\\hline
Validation F1 & {summary['f1m']:.4f} \\\\
Validation Acc & {summary['acc']:.4f} \\\\
\\hline
\\end{{tabular}}
\\end{{table}}
        """
        with open(self.output_dir / 'summary_table.tex', 'w') as f:
            f.write(latex_str)

    def _log_to_excel(self, summary, cm):
        metrics_to_log = {
            'carrier': config.CURRENT_CARRIER,
            'model_name': self.params['MODEL_NAME'],
            'run_tag': self.run_name,
            'num_classes': len(self.class_names),
            'acc': summary['acc'],
            'loss': summary['loss'],
            'f1m': summary['f1m'],
            'f1w': summary['f1w'],
            'recm': summary['recm'],
            'cm': cm,
            'epochs': self.params['EPOCHS'],
            'batch_size': self.params['BATCH_SIZE'],
            'lr': self.params['LR'],
            'weight_decay': self.params['WEIGHT_DECAY'],
            'notes': f"Student Exp: {self.run_name}"
        }
        utils.log_metrics_excel(config.METRICS_FILE, config.ARTIFACTS_DIR, self.class_names, metrics_to_log)

In [None]:
# --- Helper Classes (Data Augmentation, Dataset, Training Loop) ---

class RandomTimeShift(torch.nn.Module):
    def __init__(self, max_frac=0.1):
        super().__init__()
        self.max_frac = max_frac
    def forward(self, x):
        _, H, W = x.shape
        s = int(random.uniform(-self.max_frac, self.max_frac) * W)
        return torch.roll(x, shifts=s, dims=-1)

class RandomGain(torch.nn.Module):
    def __init__(self, a=0.95, b=1.05):
        super().__init__()
        self.a = a
        self.b = b
    def forward(self, x):
        g = random.uniform(self.a, self.b)
        return (x * g).clamp(0, 1)

weak_aug = T.Compose([
    RandomTimeShift(0.08),
    RandomGain(0.95, 1.05),
])

class LabeledSpectro(Dataset):
    def __init__(self, files, labels, transform=None):
        self.files = files
        self.labels = labels
        self.transform = transform
    def __len__(self):
        return len(self.files)
    def __getitem__(self, i):
        path = self.files[i]
        try:
            x = utils.load_png_gray(path)
            if self.transform:
                x = self.transform(x)
            y = self.labels[i]
            return x, y
        except Exception as e:
            print(f"[WARN] Skipping file due to error: {e}")
            return torch.zeros(1, config.IMG_SIZE[0], config.IMG_SIZE[1]), self.labels[i]

def maybe_resize_for_resnet(x, should_resize):
    if should_resize:
        return torch.nn.functional.interpolate(x, size=(224, 224), mode="bilinear", align_corners=False)
    return x

class EarlyStopping:
    def __init__(self, patience, min_delta, mode='max', restore_best=True):
        self.patience = patience
        self.min_delta = min_delta
        self.mode = mode
        self.restore_best = restore_best
        self.best = -float('inf') if mode == 'max' else float('inf')
        self.wait = 0
        self.best_state = None

    def step(self, metric, model):
        is_better = (metric > self.best + self.min_delta) if self.mode == 'max' else (metric < self.best - self.min_delta)
        if is_better:
            self.best = metric
            self.wait = 0
            if self.restore_best:
                self.best_state = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()}
            return False
        self.wait += 1
        return self.wait >= self.patience

    def restore(self, model):
        if self.restore_best and self.best_state is not None:
            model.load_state_dict(self.best_state)

def evaluate(model, loader, criterion, params):
    device = torch.device(config.DEVICE)
    model.eval()
    va_loss, preds, gts = 0.0, [], []
    with torch.no_grad():
        for xb, yb in loader:
            xb = maybe_resize_for_resnet(xb, params.get('RESIZE_224', False))
            xb, yb = xb.to(device), yb.to(device)
            logits = model(xb)
            loss = criterion(logits, yb)
            va_loss += loss.item() * xb.size(0)
            preds.append(logits.softmax(1).argmax(1).cpu())
            gts.append(yb.cpu())
    va_loss /= len(loader.dataset)
    y_pred = torch.cat(preds).numpy()
    y_true = torch.cat(gts).numpy()
    
    return {
        'loss': va_loss,
        'acc': accuracy_score(y_true, y_pred),
        'f1m': f1_score(y_true, y_pred, average='macro', zero_division=0),
        'f1w': f1_score(y_true, y_pred, average='weighted', zero_division=0),
        'recm': recall_score(y_true, y_pred, average='macro', zero_division=0),
        'cm': confusion_matrix(y_true, y_pred)
    }

def train_student(params, train_loader, val_loader, num_classes, class_names):
    device = torch.device(config.DEVICE)
    helper = ThesisHelper(params, class_names, base_dir=config.ARTIFACTS_DIR, run_type='student')

    model = models.make_model(
        params['MODEL_NAME'], 
        num_classes, 
        params.get('USE_PRETRAIN', True)
    ).to(device)
    
    opt = torch.optim.SGD(model.parameters(), lr=params['LR'], momentum=params['MOMENTUM'], weight_decay=params['WEIGHT_DECAY'])
    crit = nn.CrossEntropyLoss()
    sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=params['EPOCHS'], eta_min=params['LR'] * config.ETA_MIN_FACTOR)
    
    es = EarlyStopping(patience=params['PATIENCE'], min_delta=config.EARLY_STOPPING_CONFIG['min_delta'], restore_best=False)
    t0 = time.time()

    for ep in range(1, params['EPOCHS'] + 1):
        model.train()
        tr_loss, n = 0.0, 0
        tr_preds, tr_gts = [], []

        for xb, yb in train_loader:
            xb = maybe_resize_for_resnet(xb, params.get('RESIZE_224', False))
            xb, yb = xb.to(device, non_blocking=True), yb.to(device, non_blocking=True)
            
            opt.zero_grad(set_to_none=True)
            logits = model(xb)
            loss = crit(logits, yb)
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), max_norm=config.CLIP_MAX_NORM)
            opt.step()
            
            tr_loss += loss.item() * xb.size(0)
            n += xb.size(0)
            tr_preds.append(logits.softmax(1).argmax(1).cpu())
            tr_gts.append(yb.cpu())
        
        tr_loss /= n
        sched.step()
        tr_acc = accuracy_score(torch.cat(tr_gts).numpy(), torch.cat(tr_preds).numpy())
        val_metrics = evaluate(model, val_loader, crit, params)
        monitor_val = val_metrics['f1m']
        
        log_entry = {'epoch': ep, 'tr_loss': tr_loss, 'tr_acc': tr_acc, **val_metrics, 'lr': sched.get_last_lr()[0]}
        helper.log_epoch(model, log_entry)
        
        print(f"[{params['MODEL_NAME']}] Ep {ep:03d} | TrLoss: {tr_loss:.4f} | ValLoss: {val_metrics['loss']:.4f} | F1: {monitor_val:.4f}")

        if es.step(monitor_val, model):
            print(f"[INFO] Early stopping at epoch {ep}.")
            break

    dur = time.time() - t0
    helper.finalize(dur)
    
    # Reload best
    best_model_path = helper.output_dir / 'best_model.pth'
    if best_model_path.exists():
        model.load_state_dict(torch.load(best_model_path))
    
    return {'model': model, 'helper': helper}

In [None]:
# --- EXPERIMENT CONFIGURATION ---
# Mapping experiment folders (from step 07) to descriptive run names
RUN_CONFIG = [
    {
        "source_folder": "PSEUDO_EXP_3.2_FixMatch95",
        "run_name": "EXP_3.2_FixMatch",
        "desc": "Baseline SOTA Automatic (0.95)"
    },
    {
        "source_folder": "PSEUDO_EXP_3.4_Auto80",
        "run_name": "EXP_3.4_NoHuman",
        "desc": "Human Ablation (Auto 0.80)"
    },
    {
        "source_folder": "PSEUDO_EXP_3.3_Umb70",
        "run_name": "EXP_3.3_Thresh70",
        "desc": "Sensitivity Analysis (0.70)"
    },
    {
        "source_folder": "PSEUDO_EXP_3.3_Umb90",
        "run_name": "EXP_3.3_Thresh90",
        "desc": "Sensitivity Analysis (0.90)"
    }
]

# Paths
BASE_EXP_DIR = config.ARTIFACTS_DIR / "ABLATION_EXPERIMENTS"
CLASS_NAMES = sorted([p.name for p in config.TRAIN_VAL_DIR.iterdir() if p.is_dir()])
CLS2IDX = {name: i for i, name in enumerate(CLASS_NAMES)}
NUM_CLASSES = len(CLASS_NAMES)

# 1. LOAD ORIGINAL DATASET (Train/Val Fixed Split)
# Use the same seed to ensure the Validation Set is identical to previous stages
all_original_files, all_original_labels = [], []
for class_name in CLASS_NAMES:
    class_path = config.TRAIN_VAL_DIR / class_name
    files = list(class_path.glob("*.png"))
    all_original_files.extend(files)
    all_original_labels.extend([CLS2IDX[class_name]] * len(files))

original_train_files, val_files, original_train_labels, val_labels = train_test_split(
    all_original_files, all_original_labels, 
    test_size=0.2, 
    random_state=config.SEED, 
    stratify=all_original_labels
)

val_ds = LabeledSpectro(val_files, val_labels, transform=None) 
print(f"[DATA] Fixed Validation Set: {len(val_ds)} samples")

In [None]:
# --- EXPERIMENT EXECUTION LOOP ---
for run in RUN_CONFIG:
    print(f"\n{'='*60}")
    print(f"TESTING: {run['run_name']}")
    print(f"Source: {run['source_folder']}")
    print(f"Goal: {run['desc']}")
    print(f"{'='*60}")
    
    # A. Load Pseudo-Labels
    pseudo_path = BASE_EXP_DIR / run['source_folder']
    if not pseudo_path.exists():
        print(f"[ERROR] Path not found: {pseudo_path}. Please run notebook 07 first.")
        continue
        
    pseudo_files, pseudo_labels = [], []
    for class_path in pseudo_path.glob('*'):
        if class_path.is_dir() and class_path.name in CLS2IDX:
            for file_path in class_path.glob("*.png"):
                pseudo_files.append(file_path)
                pseudo_labels.append(CLS2IDX[class_path.name])
    
    print(f"   > Loaded Pseudo-Labels: {len(pseudo_files)}")
    
    # B. Combine Datasets
    aug_files = original_train_files + pseudo_files
    aug_labels = original_train_labels + pseudo_labels
    train_ds = LabeledSpectro(aug_files, aug_labels, transform=weak_aug)
    
    batch_size = config.TRAIN_PARAMS['resnet50']['BATCH_SIZE']
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=config.NUM_WORKERS)
    val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False, num_workers=config.NUM_WORKERS)
    
    # C. Configure Training
    current_params = config.TRAIN_PARAMS['resnet50'].copy()
    current_params['MODEL_NAME'] = 'resnet50'
    # Important: Suffix ensures folder uniqueness
    current_params['EXPERIMENT_SUFFIX'] = f"_{run['run_name']}" 
    
    # D. Train
    print(f"   > Training ResNet50 Student...")
    train_student(current_params, train_loader, val_loader, NUM_CLASSES, CLASS_NAMES)
    
    print(f"[DONE] {run['run_name']} Finished.")

print("\n--- ALL EXPERIMENTS COMPLETED ---")

In [None]:
# ==============================================================================
# FINAL EVALUATION ON OFFICIAL TEST SET
# ==============================================================================
# This step assesses all experimental models against the strictly isolated Test Set.

print(f"{'='*60}")
print("FINAL THESIS EVALUATION (Official Test Set)")
print(f"{'='*60}")

# 1. Prepare Test Set
TEST_DIR = config.TEST_DIR
if not TEST_DIR.exists():
     raise FileNotFoundError(f"[CRITICAL] Test directory not found at {TEST_DIR}")

print(f"[DATA] Loading Test Set from: {TEST_DIR}")

test_files, test_labels = [], []
for class_name in CLASS_NAMES:
    c_path = TEST_DIR / class_name
    if not c_path.exists(): continue
    files = list(c_path.glob("*.png"))
    test_files.extend(files)
    test_labels.extend([CLS2IDX[class_name]] * len(files))

print(f"[DATA] Total Test Images: {len(test_files)}")

test_ds = LabeledSpectro(test_files, test_labels, transform=None)
test_loader = DataLoader(test_ds, batch_size=32, shuffle=False, num_workers=config.NUM_WORKERS)

# 2. Evaluation Loop
results_thesis = []
device = torch.device(config.DEVICE)

def evaluate_final(model, loader):
    model.eval()
    preds, gts = [], []
    with torch.no_grad():
        for xb, yb in loader:
            xb = torch.nn.functional.interpolate(xb, size=(224, 224), mode="bilinear", align_corners=False)
            xb = xb.to(device)
            logits = model(xb)
            preds.append(logits.softmax(1).argmax(1).cpu())
            gts.append(yb.cpu())
    y_pred = torch.cat(preds).numpy()
    y_true = torch.cat(gts).numpy()
    return {
        'acc': accuracy_score(y_true, y_pred),
        'f1m': f1_score(y_true, y_pred, average='macro', zero_division=0),
        'recm': recall_score(y_true, y_pred, average='macro', zero_division=0)
    }

for run in RUN_CONFIG:
    print(f"\nEvaluating Experiment: {run['run_name']}...")
    
    folder_name = f"student_trained_with_win_teachersresnet50_{run['run_name']}"
    model_path = config.ARTIFACTS_DIR / folder_name / "best_model.pth"
    
    if not model_path.exists():
        print(f"[ERROR] Model not found at {model_path}")
        continue
        
    model = models.make_model('resnet50', len(CLASS_NAMES), resnet_use_pretrain=False)
    
    try:
        model.load_state_dict(torch.load(model_path, map_location=device, weights_only=True))
    except TypeError:
        model.load_state_dict(torch.load(model_path, map_location=device))
        
    model.to(device)
    
    metrics = evaluate_final(model, test_loader)
    print(f"   -> Test F1-Macro: {metrics['f1m']:.4f}")
    
    results_thesis.append({
        "Experiment": run['run_name'],
        "Strategy": run['desc'],
        "Test_F1_Macro": metrics['f1m'],
        "Test_Accuracy": metrics['acc'],
        "Test_Recall": metrics['recm']
    })

# 3. Save Final CSV
df_final = pd.DataFrame(results_thesis)
print("\n=== FINAL THESIS RESULTS ===")
print(df_final)

output_csv = config.ARTIFACTS_DIR / "FINAL_COMPARATIVE_TABLE_OFFICIAL_TEST.csv"
df_final.to_csv(output_csv, index=False)
print(f"\n[DONE] Results saved to: {output_csv}")