In [None]:
import torch
import random
import pandas as pd
import numpy as np
import os
from torch import nn

from sklearn.preprocessing import MinMaxScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split
import cv2

import json
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import DataLoader, TensorDataset

from sklearn.utils.class_weight import compute_class_weight

from sklearn.preprocessing import LabelEncoder, MinMaxScaler, OneHotEncoder, StandardScaler

# Local application/library imports
from utils import load_search_space
import optuna

from sklearn.metrics import (
    RocCurveDisplay, PrecisionRecallDisplay,
    ConfusionMatrixDisplay, roc_auc_score, average_precision_score
)

## DATASET

In [None]:
SEED = 64

# Set random seeds
torch.manual_seed(SEED)
random.seed(SEED)
np.random.seed(SEED)

In [None]:
# Dataset Info
# adult_income_cleaned, framingham_cleaned, preprocessed_heloc, diabetes
dataset_name = 'boston'        
dataset_subpath = 'Regression/boston'       
task_type = 'Regression'

In [None]:
# Dataset Info
# adult_income_cleaned, framingham_cleaned, preprocessed_heloc, diabetes
dataset_name = 'cmc'        
dataset_subpath = 'Multiclass/cmc'       
task_type = 'Multiclass'

In [None]:
# Dataset Info
# adult_income_cleaned, framingham_cleaned, preprocessed_heloc, diabetes
dataset_name = 'preprocessed_heloc'        
dataset_subpath = 'Binary/heloc'       
task_type = 'Binary'

In [None]:
df = pd.read_csv(f"./data/{dataset_subpath}/{dataset_name}.csv")

In [None]:
df.shape

In [None]:
df.head()

In [None]:
reduce = True if len(df) > 20000 else False

## LOAD AND PREPROCESS

In [None]:
def prepare_target_tensor(y, task):
    task = task.lower()
    if isinstance(y, pd.Series):
        y = y.to_numpy()
    elif isinstance(y, list):
        y = np.array(y)
        
    if task == "regression" or task == "binary":
        return torch.as_tensor(y, dtype=torch.float32).reshape(-1, 1)
    elif task == "multiclass":
        return torch.as_tensor(y, dtype=torch.long)
    else:
        raise ValueError(f"Unsupported task type: {task}")

In [None]:
import os, json
from typing import Optional, Tuple, Union
import cv2
import numpy as np
import pandas as pd
import torch
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler, MinMaxScaler, LabelEncoder
from sklearn.compose import ColumnTransformer
from sklearn.utils.class_weight import compute_class_weight

def _read_split_rgb(images_folder: str, split: str, problem_type: str) -> np.ndarray:
    """Read RGB uint8 images for a split based on <split>/<problem_type>.csv (column 'images')."""
    csv_path = os.path.join(images_folder, split, f"{problem_type}.csv")
    df = pd.read_csv(csv_path)
    img_paths = [os.path.join(images_folder, split, p) for p in df["images"].tolist()]
    imgs = []
    for p in img_paths:
        im = cv2.imread(p, cv2.IMREAD_COLOR)
        if im is None:
            raise FileNotFoundError(f"Could not read image: {p}")
        im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)  # HxWx3 uint8
        imgs.append(im)
    return np.stack(imgs, axis=0)  # [N,H,W,3] uint8 (assumes same size as you stated)

def _pad_constant_right_bottom_batch(
    imgs_uint8: np.ndarray,
    target_size: Union[int, Tuple[int, int]],
    fill_rgb01: np.ndarray
) -> np.ndarray:
    """
    Constant pad (right/bottom) to target size with fill = TRAIN mean RGB in [0,1].
    Input:  imgs_uint8 [N,H,W,3] uint8
    Output: float32 [N,3,Ht,Wt] in [0,1]
    """
    if isinstance(target_size, int):
        tw, th = target_size, target_size
    else:
        tw, th = int(target_size[0]), int(target_size[1])

    N = imgs_uint8.shape[0]
    out = np.empty((N, 3, th, tw), dtype=np.float32)
    fill = fill_rgb01.reshape(1, 1, 3)  # (1,1,3) in [0,1]

    for i in range(N):
        im01 = imgs_uint8[i].astype(np.float32) / 255.0  # [H,W,3] in [0,1]
        h, w, _ = im01.shape
        if w > tw or h > th:
            raise ValueError(f"Image {w}x{h} larger than target {tw}x{th}. Increase target_size or resize upstream.")
        canvas = np.empty((th, tw, 3), dtype=np.float32)
        canvas[:] = fill
        canvas[:h, :w, :] = im01
        out[i] = np.transpose(canvas, (2, 0, 1))
    return out

def load_and_preprocess_data(
    df, dataset_name, images_folder,
    problem_type, task_type,
    seed: int = 42, batch_size: int = 32, device: str = 'cpu',
    pad_images: bool = False, target_size: Optional[Union[int, Tuple[int, int]]] = None,
):
    task_type = task_type.lower()

    # ----- Config -----
    with open(f"./configs/preprocess/{dataset_name}.json") as f:
        config = json.load(f)
    categorical_cols = config["categorical_cols"]
    numerical_cols = config["numerical_cols"]
    encoding = config["encoding"]

    # ----- Features / target -----
    X = df[numerical_cols + categorical_cols].copy()
    y = df.iloc[:, -1].copy()

    le = None
    if encoding.get("target") == "label":
        le = LabelEncoder()
        y = le.fit_transform(y)
        label_mapping = dict(zip(le.classes_, le.transform(le.classes_)))
    else:
        label_mapping = None

    # ----- Splits (70/15/15) -----
    if task_type == "regression":
        X_train_raw, X_temp_raw, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=seed)
        X_val_raw,   X_test_raw, y_val,  y_test  = train_test_split(X_temp_raw, y_temp, test_size=0.5, random_state=seed)
    else:
        X_train_raw, X_temp_raw, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=seed, stratify=y)
        X_val_raw,   X_test_raw, y_val,  y_test  = train_test_split(
            X_temp_raw, y_temp, test_size=0.5, random_state=seed, stratify=y_temp
        )

    # ----- Class weights (optional) -----
    class_weight = None
    if task_type in ["binary", "multiclass"]:
        cw_vals = compute_class_weight(class_weight="balanced", classes=np.unique(y_train), y=y_train)
        classes_sorted = np.sort(np.unique(y_train))
        if task_type == "binary":
            wd = dict(zip(classes_sorted, cw_vals))
            pos_weight = wd[1] / wd[0]
            class_weight = torch.tensor(pos_weight, dtype=torch.float32)
            print(f"Binary pos_weight (for BCEWithLogitsLoss): {class_weight.item():.6f}")
        else:
            class_weight = torch.tensor(cw_vals, dtype=torch.float32)
            print(f"Multiclass class weights (for CrossEntropyLoss): {class_weight.tolist()}")

    # ----- ColumnTransformer (fit on TRAIN only) -----
    transformers = []
    if encoding.get("numerical_features") == "minmax":
        transformers.append(("num", MinMaxScaler(), numerical_cols))
    elif encoding.get("numerical_features") == "standard":
        transformers.append(("num", StandardScaler(), numerical_cols))
    if categorical_cols and encoding.get("categorical_features") == "onehot":
        transformers.append(("cat", OneHotEncoder(sparse_output=False, handle_unknown="ignore"), categorical_cols))

    if transformers:
        preprocessor = ColumnTransformer(transformers=transformers)
        X_train = preprocessor.fit_transform(X_train_raw)
        X_val   = preprocessor.transform(X_val_raw)
        X_test  = preprocessor.transform(X_test_raw)

        if "cat" in preprocessor.named_transformers_:
            cat_feature_names = preprocessor.named_transformers_["cat"].get_feature_names_out(categorical_cols)
            all_feature_names = numerical_cols + list(cat_feature_names)
        else:
            all_feature_names = numerical_cols + categorical_cols

        X_train_num = pd.DataFrame(X_train, columns=all_feature_names, index=X_train_raw.index)
        X_val_num   = pd.DataFrame(X_val,   columns=all_feature_names, index=X_val_raw.index)
        X_test_num  = pd.DataFrame(X_test,  columns=all_feature_names, index=X_test_raw.index)
    else:
        all_feature_names = numerical_cols + categorical_cols
        X_train_num = pd.DataFrame(X_train_raw, columns=all_feature_names, index=X_train_raw.index)
        X_val_num   = pd.DataFrame(X_val_raw,   columns=all_feature_names, index=X_val_raw.index)
        X_test_num  = pd.DataFrame(X_test_raw,  columns=all_feature_names, index=X_test_raw.index)

    print(f"Shapes — Train: {X_train_num.shape}, Val: {X_val_num.shape}, Test: {X_test_num.shape}")
    print(f"Numerical features: {len(numerical_cols)} — {numerical_cols}")
    print(f"Categorical features: {len(categorical_cols)} — {categorical_cols}")
    print(f"Total features: {X_train_num.shape[1]}")
    if label_mapping:
        print(f"Target label mapping: {label_mapping}")

    # ----- Images (uint8 RGB) -----
    X_train_img_u8 = _read_split_rgb(images_folder, "train", problem_type)
    X_val_img_u8   = _read_split_rgb(images_folder, "val",   problem_type)
    X_test_img_u8  = _read_split_rgb(images_folder, "test",  problem_type)

    # ----- Optional padding with TRAIN mean (no normalization) -----
    if pad_images:
        if target_size is None:
            raise ValueError("pad_images=True requires target_size (int or (W,H)).")
        train_mean_rgb01 = (X_train_img_u8.astype(np.float32) / 255.0).reshape(-1, 3).mean(axis=0).astype(np.float32)
        X_train_arr = _pad_constant_right_bottom_batch(X_train_img_u8, target_size, train_mean_rgb01)
        X_val_arr   = _pad_constant_right_bottom_batch(X_val_img_u8,   target_size, train_mean_rgb01)
        X_test_arr  = _pad_constant_right_bottom_batch(X_test_img_u8,  target_size, train_mean_rgb01)

        if isinstance(target_size, int):
            tw = th = int(target_size)
        else:
            tw, th = int(target_size[0]), int(target_size[1])
        imgs_shape = (3, th, tw)
    else:
        # Scale to [0,1] and convert to NCHW.
        X_train_arr = (X_train_img_u8.astype(np.float32) / 255.0).transpose(0, 3, 1, 2)
        X_val_arr   = (X_val_img_u8.astype(np.float32)   / 255.0).transpose(0, 3, 1, 2)
        X_test_arr  = (X_test_img_u8.astype(np.float32)  / 255.0).transpose(0, 3, 1, 2)
        _, C, H, W = X_train_arr.shape
        imgs_shape = (C, H, W)

    # ----- Tensors & DataLoaders -----
    X_train_num_tensor = torch.as_tensor(X_train_num.values, dtype=torch.float32)
    X_val_num_tensor   = torch.as_tensor(X_val_num.values,   dtype=torch.float32)
    X_test_num_tensor  = torch.as_tensor(X_test_num.values,  dtype=torch.float32)

    X_train_img_tensor = torch.from_numpy(X_train_arr)  # float32 [0,1], NCHW
    X_val_img_tensor   = torch.from_numpy(X_val_arr)
    X_test_img_tensor  = torch.from_numpy(X_test_arr)

    y_train_tensor = prepare_target_tensor(y_train, task_type)
    y_val_tensor   = prepare_target_tensor(y_val,   task_type)
    y_test_tensor  = prepare_target_tensor(y_test,  task_type)

    train_dataset = TensorDataset(X_train_img_tensor, y_train_tensor)
    val_dataset   = TensorDataset(X_val_img_tensor,   y_val_tensor)
    test_dataset  = TensorDataset(X_test_img_tensor,  y_test_tensor)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,  pin_memory=True)
    val_loader   = DataLoader(val_dataset,   batch_size=batch_size, shuffle=False, pin_memory=True)
    test_loader  = DataLoader(test_dataset,  batch_size=batch_size, shuffle=False, pin_memory=True)

    attributes = X_train_num.shape[1]
    print("Images shape (C,H,W):", imgs_shape)
    print("Attributes:", attributes)

    return train_loader, val_loader, test_loader, attributes, imgs_shape, le, class_weight


## MODEL ARCHITECTURES

In [None]:
def find_divisors(n):
    divisors = []
    for i in range(1, int(n**0.5) + 1):
        if n % i == 0:
            divisors.append(i)
            if i != n // i:  # Check to include both divisors if they are not the same
                divisors.append(n // i)
    divisors.sort()
    return divisors

### Vision Transformer

In [None]:
from models.vit_pytorch.vit import ViT

In [None]:
class ViTMLP(nn.Module):
    def __init__(self, imgs_shape, params, task, num_classes=None):
        super(ViTMLP, self).__init__()

        # Vision Transformer branch
        self.vit = ViT(
            image_size=imgs_shape,
            patch_size=params["patch_size"],
            dim=params["dim"],
            depth=params["depth"],
            heads=params["heads"],
            mlp_dim=params["mlp_dim"]*params["dim"],
            dropout=params["dropout"],
            emb_dropout=params["emb_dropout"]
        )

        # MLP branch
        mlp_layers = []
        input_dim = params["dim"]
        for hidden_dim in params["mlp_hidden_dims"]:
            mlp_layers.append(nn.Linear(input_dim, int(params["dim"]*hidden_dim)))
            mlp_layers.append(nn.ReLU())
            input_dim = int(params["dim"]*hidden_dim)

        # Determine output layer
        output_dim = 1 if task in ['regression', 'binary'] else num_classes
        mlp_layers.append(nn.Linear(input_dim, output_dim))
        self.mlp = nn.Sequential(*mlp_layers) 

        # Change identity to something else if needed
        self.activation = nn.Identity()

    def forward(self, vit_input):
        x = self.vit(vit_input)
        x = self.mlp(x)
        return self.activation(x)


### CNN

In [None]:
import math
import torch
import torch.nn as nn

# Reuse your get_act helper
def get_act(name: str):
    return nn.ReLU if str(name).lower() == "relu" else nn.GELU

# ---------------- Stem ----------------
class UnifiedStem(nn.Module):
    """
    - '3x3' stem: safe for tiny images (3x3, 5x5, 32x32).
    - '7x7' stem (+ optional maxpool): classic ImageNet style for large images.
    Only apply 7x7+stride2 when max(H,W) >= 64; otherwise fallback to 3x3.
    """
    def __init__(self, C, stem_width, stem_type="3x3", use_maxpool=True, H=None, W=None):
        super().__init__()
        large = (max(H or 0, W or 0) >= 64)
        if stem_type == "7x7" and large:
            layers = [
                nn.Conv2d(C, stem_width, kernel_size=7, stride=2, padding=3, bias=False),
                nn.BatchNorm2d(stem_width),
                nn.ReLU(inplace=True),
                nn.MaxPool2d(kernel_size=3, stride=2, padding=1) if use_maxpool else nn.Identity(),
            ]
        else:
            layers = [
                nn.Conv2d(C, stem_width, kernel_size=3, stride=1, padding=1, bias=False),
                nn.BatchNorm2d(stem_width),
                nn.ReLU(inplace=True),
            ]
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x)

# --------------- Basic Block ---------------
class BasicBlock(nn.Module):
    expansion = 1
    def __init__(self, in_ch, out_ch, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_ch, out_ch, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1   = nn.BatchNorm2d(out_ch)
        self.relu  = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_ch, out_ch, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2   = nn.BatchNorm2d(out_ch)

        self.down = None
        if stride != 1 or in_ch != out_ch:
            self.down = nn.Sequential(
                nn.Conv2d(in_ch, out_ch, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_ch),
            )

    def forward(self, x):
        identity = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        if self.down is not None:
            identity = self.down(identity)
        out = self.relu(out + identity)
        return out

# --------------- ResNet Backbone (features only) ---------------
class ResNetBackboneAnySize(nn.Module):
    """
    Classic ResNet (BasicBlocks), size-agnostic via AdaptiveAvgPool2d(1).
    Outputs a feature vector (no classifier).
    """
    def __init__(self, params, imgs_shape):
        super().__init__()
        C, H, W = imgs_shape
        assert params["in_channels"] == C, "in_channels must match imgs_shape[0]"

        # knobs
        stem_type     = params["stem_type"]
        use_maxpool   = params["use_maxpool"]
        stem_width    = params["stem_width"]
        blocks_ps     = params["blocks_per_stage"]  # e.g., "[2,2,2,2]"
        n_stages      = len(blocks_ps)
        base_width    = params["base_width"]
        width_mul     = params["width_mul"]

        # stem
        self.stem = UnifiedStem(C, stem_width, stem_type=stem_type, use_maxpool=use_maxpool, H=H, W=W)

        # stage widths
        B = int(base_width * width_mul)
        all_out = [B, B*2, B*4, B*8][:n_stages]
        blocks_ps = [max(1, int(x)) for x in list(blocks_ps)[:n_stages]]

        # approximate current spatial size after stem
        curH, curW = H, W
        if stem_type == "7x7" and max(H, W) >= 64:
            curH = max(1, curH // 2)
            curW = max(1, curW // 2)
            if use_maxpool:
                curH = max(1, curH // 2)
                curW = max(1, curW // 2)

        in_planes = stem_width
        layers = []

        def can_downsample(h, w):
            return (h >= 4 and w >= 4)

        for si in range(n_stages):
            out_planes = all_out[si]
            n_blocks   = blocks_ps[si]
            stride = 2 if (si > 0 and can_downsample(curH, curW)) else 1
            layers.append(BasicBlock(in_planes, out_planes, stride=stride))
            in_planes = out_planes
            if stride == 2:
                curH = max(1, curH // 2)
                curW = max(1, curW // 2)
            for _ in range(n_blocks - 1):
                layers.append(BasicBlock(in_planes, out_planes, stride=1))

        self.features = nn.Sequential(*layers)
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.flat = nn.Flatten()

        # infer feature dim
        with torch.no_grad():
            dummy = torch.zeros(1, C, max(1, H), max(1, W))
            x = self.stem(dummy)
            x = self.features(x)
            x = self.pool(x)
            x = self.flat(x)
            self.feat_dim = x.shape[1]

    def forward(self, x):
        x = self.stem(x)
        x = self.features(x)
        x = self.pool(x)
        x = self.flat(x)
        return x  # (B, feat_dim)

# CNN

class CNN(nn.Module):
    """
    CNN backbone -> MLP head (mirrors ViTMLP: encoder + MLP head).
    Uses your ResNetBackboneAnySize.
    """
    def __init__(self, imgs_shape, params, task, num_classes=None, device="cuda"):
        super().__init__()
        self.task = task.lower()
        act = get_act(params.get("activation", "relu"))

        # Backbone (reuse your class; params must include cnn knobs)
        self.backbone = ResNetBackboneAnySize(params, imgs_shape)
        feat_dim = self.backbone.feat_dim
            
        mlp_layers = []
        input_dim = feat_dim
        for hidden_dim in params["mlp_hidden_dims"]:
            mlp_layers.append(nn.Linear(input_dim, int(feat_dim*hidden_dim)))
            mlp_layers.append(act())
            input_dim = int(feat_dim*hidden_dim)

        out_dim = 1 if self.task in ("regression", "binary") else num_classes
        mlp_layers.append(nn.Linear(input_dim, out_dim))
        self.head = nn.Sequential(*mlp_layers)

        self.activation = nn.Identity()

    def forward(self, img_input):
        x = self.backbone(img_input)
        x = self.head(x)
        return self.activation(x)


### Resnet50

In [None]:
from models.resnet50 import ResNet50

In [None]:
class resnet50(nn.Module):
    def __init__(self, task_type, num_classes=None):
        super(resnet50, self).__init__()

        self.resnet = ResNet50(
            task_type=task_type,
            num_classes=num_classes,
            weights=None
        )

        # Change identity to something else if needed
        self.activation = nn.Identity()

    def forward(self, resnet_input):
        x = self.resnet(resnet_input)

        return self.activation(x)

## COMPILE AND FIT

In [None]:
import gc
import copy

from models.utils import get_loss_fn, calculate_metrics, calculate_metrics_from_numpy, get_class_weighted_loss_fn

In [None]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from torch.optim.lr_scheduler import OneCycleLR
import matplotlib.pyplot as plt
import time
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import os

def compile_and_fit(model, train_loader, val_loader, test_loader, dataset_name, 
                    model_name, image_name, trial_name=None, task='regression', epochs=200, max_lr=1, 
                    div_factor=10, final_div_factor=1, device='cuda', weight_decay=1e-2, pct_start=0.3, save_model=False, class_weights=None, save_dir=None, study=None, patch=None, verbose=False):
    model = model.to(device)
    
    if class_weights != None:
        loss_fn = get_class_weighted_loss_fn(task, class_weights)
    else:
        loss_fn = get_loss_fn(task)

    # Compute min_lr from max_lr and div_factor
    min_lr = max_lr / div_factor

    optimizer = optim.AdamW(model.parameters(), lr=min_lr, weight_decay=weight_decay)
    
    total_steps = epochs * len(train_loader)
    scheduler = OneCycleLR(optimizer, max_lr=max_lr, div_factor=div_factor, final_div_factor=final_div_factor, total_steps=total_steps, pct_start=pct_start, anneal_strategy="cos")
    
    best_val_loss = float('inf')
    best_model = None
    best_epoch = 0
    #early_stopping_counter = 0
    #patience = 10  # Early stopping patience

    history = {'train_loss': [], 'val_loss': [], 'learning_rate': [], 'epoch_time': []}

    if task == 'regression':
        history.update({'train_mse': [],  'val_mse': [], 'train_mae': [],  'val_mae': [], 'train_rmse': [], 'val_rmse': [], 'train_r2': [], 'val_r2': []})
    elif task in ['binary', 'multiclass']:
        history.update({'train_accuracy': [], 'val_accuracy': [], 'train_precision': [], 'val_precision': [], 'train_recall': [], 'val_recall': [], 'train_f1': [], 'val_f1': []})

    start_time = time.time()
    
    for epoch in range(epochs):
        epoch_start_time = time.time()

        model.train()
        train_loss = 0.0
        train_preds = []
        train_targets = []

        for img_data, targets in train_loader:
            img_data, targets = img_data.to(device, non_blocking=True), targets.to(device, non_blocking=True)
            
            optimizer.zero_grad()
            outputs = model(img_data)
            loss = loss_fn(outputs, targets)
            loss.backward()
            optimizer.step()
            scheduler.step()
            
            train_loss += loss.item()
            train_preds.extend(outputs.cpu().detach().numpy())
            train_targets.extend(targets.cpu().numpy())

        train_loss /= len(train_loader)
        if task == 'multiclass':
            y_train_pred = np.vstack(train_preds)
            y_train_true = train_targets
        else:
            y_train_pred = np.concatenate(train_preds)
            y_train_true = np.concatenate(train_targets)
        train_metrics = calculate_metrics_from_numpy(y_train_true, y_train_pred, task)

        model.eval()
        val_loss = 0.0
        val_preds = []
        val_targets = []
        with torch.no_grad():
            for img_data, targets in val_loader:
                img_data, targets = img_data.to(device, non_blocking=True), targets.to(device, non_blocking=True)
                outputs = model(img_data)
                loss = loss_fn(outputs, targets)
                
                val_loss += loss.item()
                val_preds.extend(outputs.cpu().numpy())
                val_targets.extend(targets.cpu().numpy())

        val_loss /= len(val_loader)
        if task == 'multiclass':
            y_val_pred = np.vstack(val_preds)
            y_val_true = val_targets
        else:
            y_val_pred = np.concatenate(val_preds)
            y_val_true = np.concatenate(val_targets)
        
        val_metrics = calculate_metrics_from_numpy(y_val_true, y_val_pred, task)
        
        # Get the current learning rate
        current_lr = scheduler.get_last_lr()

        epoch_time = time.time() - epoch_start_time

        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['learning_rate'].append(current_lr)
        history['epoch_time'].append(epoch_time)

        for k, v in train_metrics.items():
            history[f'train_{k}'].append(v)
        for k, v in val_metrics.items():
            history[f'val_{k}'].append(v)

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model = copy.deepcopy(model.state_dict())
            best_epoch = epoch + 1
            #early_stopping_counter = 0
        #else:
        #    early_stopping_counter += 1
        #    if early_stopping_counter >= patience:
        #        print(f"Early stopping at epoch {epoch + 1}")
        #        break

    total_time = time.time() - start_time
    model.load_state_dict(best_model)

    # Recompute metrics using the best model
    train_metrics, y_true_train, y_pred_train, y_prob_train = calculate_metrics(model, train_loader, device, class_weights, task)
    val_metrics, y_true_val, y_pred_val, y_prob_val  = calculate_metrics(model, val_loader, device, class_weights, task)
    test_metrics, y_true_test, y_pred_test, y_prob_test = calculate_metrics(model, test_loader, device, class_weights, task)

    # Store recomputed metrics
    metrics = {
        'train_loss': train_metrics['loss'],
        'val_loss': val_metrics['loss'],
        'test_loss': test_metrics['loss'],
        'min_lr': min_lr,
        'max_lr': max_lr,
        'total_time': total_time,
        'average_epoch_time': sum(history['epoch_time']) / len(history['epoch_time'])
    }

    # Add task-specific metrics
    for k in train_metrics:
        if k != 'loss':
            metrics[f'train_{k}'] = train_metrics[k]
    for k in val_metrics:
        if k != 'loss':
            metrics[f'val_{k}'] = val_metrics[k]
    for k in test_metrics:
        if k != 'loss':
            metrics[f'test_{k}'] = test_metrics[k]
        
    if verbose:   
        print(f"\nTraining completed in {total_time:.2f} seconds")
        print(f"Best model found at epoch {best_epoch}/{epochs}")
        print(f"Best Train Loss: {metrics['train_loss']:.4f}, Best Val Loss: {metrics['val_loss']:.4f}")
        print(metrics)
    
    if save_model:
        if model_name == "CNN":
            save_path = os.path.join(save_dir, f"{model_name}/{image_name}/best_model/{trial_name}")
        else:
            save_path = os.path.join(save_dir, f"{model_name}/{image_name}/best_model/{trial_name}")
        os.makedirs(save_path, exist_ok=True)

        plot_metric(history['train_loss'], history['val_loss'], 'Loss', save_path)
        if task == 'regression':
            plot_metric(history['train_mse'], history['val_mse'], 'MSE', save_path)
            plot_metric(history['train_rmse'], history['val_rmse'], 'RMSE', save_path)
        else:
            plot_metric(history['train_accuracy'], history['val_accuracy'], 'Accuracy', save_path)
            plot_metric(history['train_f1'], history['val_f1'], 'F1', save_path)

        plot_learning_rate(history['learning_rate'], save_path)

        # Save metrics
        os.makedirs(save_path, exist_ok=True)
        with open(f'{save_path}/best_model_metrics.txt', 'w') as f:
            for key, value in metrics.items():
                f.write(f'{key}: {value}\n')

        # Save model
        torch.save(best_model, f"{save_path}/best_model.pth")
        print(f"Best model saved to {save_path}/best_model.pth")

        # Additional plots for classification
        if task in ["binary"]:
            plot_extra("Train", y_true_train, y_pred_train, y_prob_train, save_path)
            plot_extra("Validation", y_true_val, y_pred_val, y_prob_val, save_path)
            plot_extra("Test", y_true_test, y_pred_test, y_prob_test, save_path)

    del model
    torch.cuda.empty_cache()
    gc.collect()

    return metrics


def plot_extra(split_name, y_true, y_pred, y_prob, save_path):
    y_true = y_true.ravel()
    y_pred = y_pred.ravel()

    # ROC Curve
    RocCurveDisplay.from_predictions(y_true, y_prob)
    auc_score = roc_auc_score(y_true, y_prob)
    plt.plot([0, 1], [0, 1], linestyle='--', color='gray', label='Random')
    plt.title(f"{split_name} ROC Curve (AUC = {auc_score:.2f})")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.legend()
    plt.grid(True)
    plt.savefig(os.path.join(save_path, f"{split_name.lower()}_roc_curve.png"))
    plt.close("all")

    # Precision-Recall Curve
    PrecisionRecallDisplay.from_predictions(y_true, y_prob)
    avg_prec = average_precision_score(y_true, y_prob)
    plt.title(f"{split_name} PR Curve (AP = {avg_prec:.2f})")
    plt.xlabel("Recall")
    plt.ylabel("Precision")
    plt.grid(True)
    plt.savefig(os.path.join(save_path, f"{split_name.lower()}_pr_curve.png"))
    plt.close("all")

    # Normalized confusion matrix
    ConfusionMatrixDisplay.from_predictions(y_true, y_pred, normalize='true').plot(cmap='Blues')
    plt.title(f"{split_name} Confusion Matrix (Normalized)")
    plt.grid(False)
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.savefig(os.path.join(save_path, f"{split_name.lower()}_confusion_matrix_normalized.png"))
    plt.close("all")

    # Raw confusion matrix
    ConfusionMatrixDisplay.from_predictions(y_true, y_pred, normalize=None).plot(cmap='Blues')
    plt.title(f"{split_name} Confusion Matrix (Counts)")
    plt.grid(False)
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.savefig(os.path.join(save_path, f"{split_name.lower()}_confusion_matrix_counts.png"))
    plt.close("all")


def plot_metric(train_metric, val_metric, metric_name, save_path):
    plt.figure()
    plt.plot(train_metric, label=f'Train {metric_name}')
    plt.plot(val_metric, label=f'Validation {metric_name}')
    plt.xlabel('Epoch')
    plt.ylabel(metric_name)
    plt.legend()
    plt.title(f'{metric_name} vs. Epoch')
    save_path = f"{save_path}/{metric_name.lower()}_plot.png"
    plt.savefig(save_path)
    plt.close("all")

def plot_learning_rate(learning_rates, save_path):
    plt.figure()
    plt.plot(learning_rates)
    plt.xlabel('Epoch')
    plt.ylabel('Learning Rate')
    plt.title('Learning Rate vs. Epoch')
    save_path = f"{save_path}/learning_rate_plot.png"
    plt.savefig(save_path)
    plt.close("all")

# EXPERIMENTS

## Vision Transformer

In [None]:
save_dir =  os.path.join("logs", task_type, dataset_name)
model_name = "vit"

# Load config
with open(f"./configs/preprocess/{dataset_name}.json") as f:
    config = json.load(f)

batch_size = config["batch_size"]
epochs = 50
n_trials = 100

if task_type.lower() == 'multiclass':
    num_classes = df.iloc[:,-1].nunique()
else:
    num_classes = 1

device='cuda:0' if torch.cuda.is_available() else 'cpu'

In [None]:
save_dir =  os.path.join("logs", task_type, dataset_name)
model_name = "CNN"

# Load config
with open(f"./configs/preprocess/{dataset_name}.json") as f:
    config = json.load(f)

batch_size = config["batch_size"]
epochs = 50
n_trials = 100

if task_type.lower() == 'multiclass':
    num_classes = df.iloc[:,-1].nunique()
else:
    num_classes = 1

device='cuda:0' if torch.cuda.is_available() else 'cpu'

In [None]:
# ------------------ small helpers ------------------
def _as_list(x):
    return json.loads(x) if isinstance(x, str) else list(x)

def _align_blocks(blocks, n_stages):
    blocks = list(blocks)
    if len(blocks) < n_stages:
        blocks = blocks + [blocks[-1]] * (n_stages - len(blocks))  # pad with last
    elif len(blocks) > n_stages:
        blocks = blocks[:n_stages]                                 # truncate
    return [max(1, int(b)) for b in blocks]

def _can_stride2(h, w):
    # allow stride-2 only if AFTER halving we still have >= 2x2
    return (h // 2) >= 2 and (w // 2) >= 2

def _est_min_size_safe(H, W, stem_type, use_maxpool, n_stages):
    h, w = int(H), int(W)

    # stem downsampling only for large inputs with 7x7
    if stem_type == "7x7" and max(H, W) >= 64:
        h //= 2; w //= 2
        if use_maxpool:
            h //= 2; w //= 2

    # per-stage: at most one stride-2 at stage start, only if safe
    for si in range(n_stages):
        if si > 0 and _can_stride2(h, w):
            h //= 2; w //= 2

    return h, w


def _count_params(m):
    return sum(p.numel() for p in m.parameters())

In [None]:
def objective(trial, model_name, image_name, task_type, 
              train_loader, val_loader, test_loader,
              divisors, imgs_shape, num_classes=None,
              device='cuda', save_dir=None, class_weight=None, epochs=100, path_vit=None):
    
    task = task_type.lower()
    
    if model_name == "vit":

        params = load_search_space(model_name, trial)

        params["patch_size"] = trial.suggest_categorical("patch_size", divisors)

        params["mlp_hidden_dims"] = json.loads(params["mlp_hidden_dims"])

        # prune invalid attention shapes early
        if params["dim"] % params["heads"] != 0:
            raise optuna.exceptions.TrialPruned()

        with open(f"configs/optuna_search/{model_name}.json", "r") as f:
            full_config = json.load(f)

        config = full_config[model_name]["fit"]  # Access the model key

        # Build and train model
        model = ViTMLP(imgs_shape[1], params, task, num_classes)
    else:
        # --- CNN branch (ResNet-style) ---
        params = load_search_space(model_name, trial)
        
        with open(f"{path_vit}/best_params.json", "r") as f:
                best_vit = json.load(f)

        best_vit["total_params"]

        # parse head dims safely (keeps your JSON format)
        params["mlp_hidden_dims"] = json.loads(params["mlp_hidden_dims"])
        
                    
        params["blocks_per_stage"] = json.loads(params["blocks_per_stage"])
        
        # types
        stem_type  = params["stem_type"]
        use_maxpool= params["use_maxpool"]
        stem_width = params["stem_width"]
        base_width = params["base_width"]
        width_mul  = float(params["width_mul"])
        blocks_list = params["blocks_per_stage"]
        n_stages   = len(blocks_list)

        # 1) forbid 7x7 stem on tiny images
        _, H, W = imgs_shape  # imgs_shape = (C,H,W)
        if stem_type == "7x7" and max(H, W) < 64:
            raise optuna.TrialPruned("7x7 stem on small images (<64)")
            
        # Forbid 3×3 on very large images (too fine, too heavy)
        if stem_type == "3x3" and max(H, W) >= 224:
            raise optuna.TrialPruned("3x3 stem on very large images (>224)")

        # 2) worst-case collapse check (avoid < 2x2)
        minH, minW = _est_min_size_safe(H, W, stem_type, use_maxpool, n_stages)
        if min(minH, minW) < 2:
            raise optuna.TrialPruned(f"downsampling collapses spatial to {minH}x{minW} < 2x2")

        with open(f"configs/optuna_search/{model_name}.json", "r") as f:
            full_config = json.load(f)
        config = full_config[model_name]["fit"]

        # Build and train model
        model = CNN(imgs_shape, params, task, num_classes)
        
        # 5) capacity-match prune vs best ViT params
        num_params_vit = float(best_vit["total_params"])
        num_params_cnn = _count_params(model)

        tol = 0.25  # allow up to +25%
        if num_params_cnn > (1.0 + tol) * num_params_vit:
            raise optuna.TrialPruned(
                f"params {num_params_cnn} > {int((1.0+tol)*100)}% of {int(num_params_vit)}"
            )
    
    metrics = compile_and_fit(
        model,
        train_loader, val_loader, test_loader,
        dataset_name=dataset_name,
        model_name=f"trial_{trial.number}",
        image_name=image_name,
        task=task,  # assumed to be defined externally
        max_lr=trial.suggest_float("max_lr", config["max_lr"][1], config["max_lr"][2], log=True),
        div_factor=trial.suggest_int("div_factor", config["div_factor"][1], config["div_factor"][2]),
        final_div_factor=trial.suggest_int("final_div_factor", config["final_div_factor"][1], config["final_div_factor"][2]),
        weight_decay=trial.suggest_float("weight_decay", config["weight_decay"][1], config["weight_decay"][2], log=True),
        pct_start=trial.suggest_float("pct_start", config["pct_start"][1], config["pct_start"][2]),
        epochs=epochs,
        save_model=False,
        class_weights=class_weight
    )

    save_dir = os.path.join(save_dir, model_name, image_name, "optuna")
    os.makedirs(save_dir, exist_ok=True)

    if task == 'regression':
        score = metrics["val_rmse"]
        with open(f"{save_dir}/optuna_trials_log.txt", "a") as f:
            f.write(f"Trial {trial.number} - VAL-RMSE: {score:.4f}, Params: {params}\n")
            f.write("=" * 60 + "\n")
    
    elif task == 'binary':
        score = metrics["val_roc_auc"]
        with open(f"{save_dir}/optuna_trials_log.txt", "a") as f:
            f.write(f"Trial {trial.number} - VAL-AUC: {score:.4f}, Params: {params}\n")
            f.write("=" * 60 + "\n")

    elif task == 'multiclass':
        score = metrics["val_accuracy"]
        with open(f"{save_dir}/optuna_trials_log.txt", "a") as f:
            f.write(f"Trial {trial.number} - VAL-Accuracy: {score:.4f}, Params: {params}\n")
            f.write("=" * 60 + "\n")
    else:
        raise ValueError(f"Unsupported task type: {task_type}")
    
    return score
def objective(trial, model_name, image_name, task_type, 
              train_loader, val_loader, test_loader,
              divisors, imgs_shape, num_classes=None,
              device='cuda', save_dir=None, class_weight=None, epochs=100, path_vit=None):
    
    task = task_type.lower()
    
    if model_name == "vit":

        params = load_search_space(model_name, trial)

        params["patch_size"] = trial.suggest_categorical("patch_size", divisors)

        params["mlp_hidden_dims"] = json.loads(params["mlp_hidden_dims"])

        # prune invalid attention shapes early
        if params["dim"] % params["heads"] != 0:
            raise optuna.exceptions.TrialPruned()

        with open(f"configs/optuna_search/{model_name}.json", "r") as f:
            full_config = json.load(f)

        config = full_config[model_name]["fit"]  # Access the model key

        # Build and train model
        model = ViTMLP(imgs_shape[1], params, task, num_classes)
    else:
        # --- CNN branch (ResNet-style) ---
        params = load_search_space(model_name, trial)
        
        with open(f"{path_vit}/best_params.json", "r") as f:
                best_vit = json.load(f)

        best_vit["total_params"]

        # parse head dims safely (keeps your JSON format)
        params["mlp_hidden_dims"] = json.loads(params["mlp_hidden_dims"])
        
                    
        params["blocks_per_stage"] = json.loads(params["blocks_per_stage"])
        
        # types
        stem_type  = params["stem_type"]
        use_maxpool= params["use_maxpool"]
        stem_width = params["stem_width"]
        base_width = params["base_width"]
        width_mul  = float(params["width_mul"])
        blocks_list = params["blocks_per_stage"]
        n_stages   = len(blocks_list)

        # 1) forbid 7x7 stem on tiny images
        _, H, W = imgs_shape  # imgs_shape = (C,H,W)
        if stem_type == "7x7" and max(H, W) < 64:
            raise optuna.TrialPruned("7x7 stem on small images (<64)")
            
        # Forbid 3×3 on very large images (too fine, too heavy)
        if stem_type == "3x3" and max(H, W) >= 224:
            raise optuna.TrialPruned("3x3 stem on very large images (>224)")

        # 2) worst-case collapse check (avoid < 2x2)
        minH, minW = _est_min_size_safe(H, W, stem_type, use_maxpool, n_stages)
        if min(minH, minW) < 2:
            raise optuna.TrialPruned(f"downsampling collapses spatial to {minH}x{minW} < 2x2")

        with open(f"configs/optuna_search/{model_name}.json", "r") as f:
            full_config = json.load(f)
        config = full_config[model_name]["fit"]

        # Build and train model
        model = CNN(imgs_shape, params, task, num_classes)
        
        # 5) capacity-match prune vs best ViT params
        num_params_vit = float(best_vit["total_params"])
        num_params_cnn = _count_params(model)

        tol = 0.25  # allow up to +25%
        if num_params_cnn > (1.0 + tol) * num_params_vit:
            raise optuna.TrialPruned(
                f"params {num_params_cnn} > {int((1.0+tol)*100)}% of {int(num_params_vit)}"
            )
    
    metrics = compile_and_fit(
        model,
        train_loader, val_loader, test_loader,
        dataset_name=dataset_name,
        model_name=f"trial_{trial.number}",
        image_name=image_name,
        task=task,  # assumed to be defined externally
        max_lr=trial.suggest_float("max_lr", config["max_lr"][1], config["max_lr"][2], log=True),
        div_factor=trial.suggest_int("div_factor", config["div_factor"][1], config["div_factor"][2]),
        final_div_factor=trial.suggest_int("final_div_factor", config["final_div_factor"][1], config["final_div_factor"][2]),
        weight_decay=trial.suggest_float("weight_decay", config["weight_decay"][1], config["weight_decay"][2], log=True),
        pct_start=trial.suggest_float("pct_start", config["pct_start"][1], config["pct_start"][2]),
        epochs=epochs,
        save_model=False,
        class_weights=class_weight
    )

    save_dir = os.path.join(save_dir, model_name, image_name, "optuna")
    os.makedirs(save_dir, exist_ok=True)

    if task == 'regression':
        score = metrics["val_rmse"]
        with open(f"{save_dir}/optuna_trials_log.txt", "a") as f:
            f.write(f"Trial {trial.number} - VAL-RMSE: {score:.4f}, Params: {params}\n")
            f.write("=" * 60 + "\n")
    
    elif task == 'binary':
        score = metrics["val_roc_auc"]
        with open(f"{save_dir}/optuna_trials_log.txt", "a") as f:
            f.write(f"Trial {trial.number} - VAL-AUC: {score:.4f}, Params: {params}\n")
            f.write("=" * 60 + "\n")

    elif task == 'multiclass':
        score = metrics["val_accuracy"]
        with open(f"{save_dir}/optuna_trials_log.txt", "a") as f:
            f.write(f"Trial {trial.number} - VAL-Accuracy: {score:.4f}, Params: {params}\n")
            f.write("=" * 60 + "\n")
    else:
        raise ValueError(f"Unsupported task type: {task_type}")
    
    return score


In [None]:
# === benchmark_eval.py (vision-only; no frozen branches) ======================
from numbers import Number
import os
import json
import numpy as np
import optuna
import torch
from torch import nn

# ----------------------------------------------------------------------------- 
# NEW: optional calflops (robust fallback if not installed)
# -----------------------------------------------------------------------------
try:
    from calflops import calculate_flops as _calflops_calc
    _HAVE_CALFLOPS = True
except Exception:
    _HAVE_CALFLOPS = False

# ----------------------------------------------------------------------------- 
# Config (adjust as needed)
# -----------------------------------------------------------------------------
TOP_K = 5
SINGLE_PASS_SEED = 0              # seed for one-time eval of top-K
FINAL_SEEDS = [0, 1, 2, 3, 4]     # seeds for the final winner
FULL_EPOCHS = 100

# ----------------------------------------------------------------------------- 
# Helpers
# -----------------------------------------------------------------------------
def _count_params(model: nn.Module, trainable_only: bool = False) -> int:
    if trainable_only:
        return sum(p.numel() for p in model.parameters() if p.requires_grad)
    return sum(p.numel() for p in model.parameters())

def _ensure_dir(p: str):
    os.makedirs(p, exist_ok=True)
    return p

def _is_minimize_study(study):
    try:
        return study.direction == optuna.study.StudyDirection.MINIMIZE
    except Exception:
        try:
            return study.directions[0] == optuna.study.StudyDirection.MINIMIZE
        except Exception:
            return True  # fallback

def primary_val_key_for_task(task_type: str):
    t = task_type.lower()
    if t == "regression":   # lower is better
        return "val_rmse", True
    if t == "binary":       # higher is better
        return "val_roc_auc", False
    if t == "multiclass":   # higher is better
        return "val_accuracy", False
    return "val_loss", True

def _sort_trials(trials, minimize: bool):
    return sorted(trials, key=lambda t: t.value, reverse=not minimize)

def _metric_or_default(m: dict, key: str, minimize: bool):
    if key in m and isinstance(m[key], (Number, np.floating, np.integer)):
        return float(m[key])
    # worst side if missing
    return (np.inf if minimize else -np.inf)

def _parse_if_json_list(x):
    if isinstance(x, str):
        try:
            return json.loads(x)
        except Exception:
            pass
    return x

# ----------------------------------------------------------------------------- 
# NEW: small helpers for FLOPs/MACs
# -----------------------------------------------------------------------------
def _humanize(n: float, unit: str = "") -> str:
    try:
        n = float(n)
        for u in ["", "K", "M", "G", "T", "P"]:
            if abs(n) < 1000.0:
                return f"{n:.3f}{u}{unit}"
            n /= 1000.0
        return f"{n:.3f}E{unit}"
    except Exception:
        return str(n)

def _try_compute_flops(model: nn.Module, imgs_shape, batch_size: int = 1):
    """
    Returns dict with numeric and pretty strings, or None if not available.
    {
      'flops': <float>, 'macs': <float>, 'params_from_calflops': <float>,
      'flops_str': 'x.xxG', 'macs_str': 'x.xxG'
    }
    """
    if not _HAVE_CALFLOPS:
        return None
    try:
        # imgs_shape is expected (C,H,W). Be defensive if it's something else.
        if isinstance(imgs_shape, (list, tuple)) and len(imgs_shape) >= 3:
            C, H, W = imgs_shape[-3], imgs_shape[-2], imgs_shape[-1]
        else:
            # fallback to common defaults if shape is unknown
            C, H, W = 3, 224, 224
        flops, macs, params_cf = _calflops_calc(
            model=model,
            input_shape=(batch_size, int(C), int(H), int(W)),
            output_as_string=False
        )
        # calflops may return ints (ops), we store as float for consistency
        out = {
            "flops": float(flops),
            "macs": float(macs),
            "params_from_calflops": float(params_cf),
            "flops_str": _humanize(flops),
            "macs_str": _humanize(macs),
        }
        return out
    except Exception:
        return None

# ----------------------------------------------------------------------------- 
# Build + train + return metrics (with param counts) for a trial
# -----------------------------------------------------------------------------
def evaluate_best_model(
    best_trial,
    train_loader, val_loader, test_loader,
    dataset_name, image_name, task_type,
    save_dir, imgs_shape, trial_name,
    class_weight=None, num_classes=None, epochs=10
):
    """
    Vision-only evaluation (no frozen branches).
      - If model_name == "vit": build ViTMLP directly from best_trial params.
      - Else: build CNN from best_trial params.
    Saves param counts + FLOPs alongside run and returns metrics (augmented).
    """
    task = task_type.lower()
    best_params = best_trial.params

    print(f"\nBest Trial: {best_trial.number}")
    print(f"  Best Score: {best_trial.value:.4f}")
    print("  Best Hyperparameters:")
    for k, v in best_params.items():
        print(f"    {k}: {v}")

    # ---------------- Build model from trial params ----------------
    if model_name == "vit":
        # ViT-only baseline
        architecture_params = {
            k: v for k, v in best_params.items()
            if k in ["patch_size", "dim", "depth", "heads", "mlp_dim",
                     "mlp_hidden_dims", "dropout", "emb_dropout"]
        }
        if isinstance(architecture_params.get("mlp_hidden_dims"), str):
            architecture_params["mlp_hidden_dims"] = json.loads(architecture_params["mlp_hidden_dims"])

        patch = architecture_params["patch_size"]
        model = ViTMLP(imgs_shape[1], architecture_params, task, num_classes)

        fit_params = {
            "max_lr": best_params["max_lr"],
            "div_factor": best_params["div_factor"],
            "final_div_factor": best_params["final_div_factor"],
            "weight_decay": best_params["weight_decay"],
            "pct_start": best_params["pct_start"],
        }

    else:
        architecture_params = {
            k: v for k, v in best_params.items()
            if k in ["in_channels", "activation", "stem_type", "use_maxpool", "stem_width",
                     "n_stages", "blocks_per_stage", "base_width", "width_mul", "mlp_hidden_dims"]
        }
        if isinstance(architecture_params.get("mlp_hidden_dims"), str):
            architecture_params["mlp_hidden_dims"] = json.loads(architecture_params["mlp_hidden_dims"])
        if isinstance(architecture_params.get("blocks_per_stage"), str):
            architecture_params["blocks_per_stage"] = json.loads(architecture_params["blocks_per_stage"])

        patch = None
        model = CNN(imgs_shape, architecture_params, task, num_classes)

        fit_params = {
            "max_lr": best_params["max_lr"],
            "div_factor": best_params["div_factor"],
            "final_div_factor": best_params["final_div_factor"],
            "weight_decay": best_params["weight_decay"],
            "pct_start": best_params["pct_start"],
        }

    # ---------------- Count params ----------------
    total_params = _count_params(model, trainable_only=False)
    trainable_params = _count_params(model, trainable_only=True)
    print(f"  Params: total={total_params:,}  trainable={trainable_params:,}")

    # ---------------- NEW: FLOPs/MACs ----------------
    flops_info = _try_compute_flops(model, imgs_shape, batch_size=1)
    if flops_info is not None:
        print(f"  FLOPs: {flops_info['flops_str']}  MACs: {flops_info['macs_str']}")

    base_dir = _ensure_dir(os.path.join(save_dir, f"{model_name}/{image_name}/best_model/{trial_name}"))
    # Save full best params + counts (+ FLOPs) so you keep exact tuned config
    best_params_with_counts = dict(best_params)
    best_params_with_counts["total_params"] = int(total_params)
    best_params_with_counts["trainable_params"] = int(trainable_params)
    if flops_info is not None:
        best_params_with_counts["flops"] = flops_info["flops"]
        best_params_with_counts["macs"] = flops_info["macs"]
        best_params_with_counts["flops_str"] = flops_info["flops_str"]
        best_params_with_counts["macs_str"] = flops_info["macs_str"]
    with open(os.path.join(base_dir, "best_params.json"), "w") as f:
        json.dump(best_params_with_counts, f, indent=4)

    # ---------------- Train & evaluate ----------------
    metrics = compile_and_fit(
        model,
        train_loader, val_loader, test_loader,
        dataset_name=dataset_name,
        image_name=image_name,
        model_name=model_name,
        trial_name=trial_name,
        task=task,
        max_lr=fit_params["max_lr"],
        div_factor=fit_params["div_factor"],
        final_div_factor=fit_params["final_div_factor"],
        weight_decay=fit_params["weight_decay"],
        pct_start=fit_params["pct_start"],
        epochs=epochs,
        save_model=True,
        class_weights=class_weight,
        save_dir=save_dir,
        patch=(patch if patch is not None else "")
    )

    # Augment metrics with parameter counts + FLOPs
    metrics["total_params"] = int(total_params)
    metrics["trainable_params"] = int(trainable_params)
    if flops_info is not None:
        metrics["flops"] = flops_info["flops"]
        metrics["macs"] = flops_info["macs"]
        metrics["flops_str"] = flops_info["flops_str"]
        metrics["macs_str"] = flops_info["macs_str"]
    return metrics

# ----------------------------------------------------------------------------- 
# Top-K → single-pass → winner → multi-seed driver
# -----------------------------------------------------------------------------
def run_topk_and_multiseed(
    study, model_name, dataset_name, name, task_type, save_dir,
    imgs_shape, num_classes, class_weight,
    train_loader, val_loader, test_loader
):
    """
    Same signature as your hybrid driver, but vision-only (no frozen components).
    """
    minimize = _is_minimize_study(study)
    completed = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]
    if not completed:
        raise RuntimeError("No completed trials in the study.")

    top_trials = _sort_trials(completed, minimize)[:TOP_K]
    primary_key, primary_minimize = primary_val_key_for_task(task_type)
    maximize_primary = not primary_minimize

    print(f"\nEvaluating top-{len(top_trials)} trials once at {FULL_EPOCHS} epochs (seed={SINGLE_PASS_SEED})...\n")

    # Single-pass over top-K
    single_pass_results = []  # list of (trial, trial_name, metrics_dict)
    for trial in top_trials:
        if model_name == "vit":
            best_patch = trial.params.get("patch_size", None)
            trial_name = f"trial_{trial.number}_patch{best_patch}"
            header = f"(Trial {trial.number}, ValObjective: {trial.value:.4f}, patch_size={best_patch})"
        else:
            trial_name = f"trial_{trial.number}"
            header = f"(Trial {trial.number}, ValObjective: {trial.value:.4f})"

        print(f"→ Single-pass full run {header}")
        set_model_seed(SINGLE_PASS_SEED)

        metrics = evaluate_best_model(
            best_trial=trial,
            train_loader=train_loader, val_loader=val_loader, test_loader=test_loader,
            dataset_name=dataset_name, image_name=name, task_type=task_type,
            save_dir=save_dir, imgs_shape=imgs_shape,
            trial_name=trial_name, class_weight=class_weight, num_classes=num_classes,
            epochs=FULL_EPOCHS
        )
        if not isinstance(metrics, dict):
            raise TypeError(f"evaluate_best_model must return dict, got: {type(metrics)}")

        # brief printout (metric + params)
        if primary_key in metrics:
            print(f"   {primary_key}={float(metrics[primary_key]):.6f}")
        tp = metrics.get("total_params"); trp = metrics.get("trainable_params")
        if tp is not None:
            print(f"   params: total={tp:,}, trainable={trp:,}")
        # NEW: echo FLOPs if available
        if "flops_str" in metrics and "macs_str" in metrics:
            print(f"   flops={metrics['flops_str']}, macs={metrics['macs_str']}")
        single_pass_results.append((trial, trial_name, metrics))

    # Winner by primary metric
    if maximize_primary:
        winner_tuple = max(single_pass_results, key=lambda x: _metric_or_default(x[2], primary_key, primary_minimize))
    else:
        winner_tuple = min(single_pass_results, key=lambda x: _metric_or_default(x[2], primary_key, primary_minimize))

    best_trial, best_trial_name, best_single_metrics = winner_tuple
    best_primary_val = _metric_or_default(best_single_metrics, primary_key, primary_minimize)
    print(f"\nWinner after single-pass: Trial {best_trial.number} ({best_trial_name}) "
          f"by {primary_key}={best_primary_val:.6f}")

    # Multi-seed evaluation of winner
    print(f"\nRe-running winner with seeds {FINAL_SEEDS} at {FULL_EPOCHS} epochs...\n")
    winner_save_path = _ensure_dir(os.path.join(save_dir, f"{model_name}/{name}/best_model/{best_trial_name}"))

    per_seed_metrics = []
    numeric_keys = None

    for s in FINAL_SEEDS:
        set_model_seed(s)
        m = evaluate_best_model(
            best_trial=best_trial,
            train_loader=train_loader, val_loader=val_loader, test_loader=test_loader,
            dataset_name=dataset_name, image_name=name, task_type=task_type,
            save_dir=save_dir, imgs_shape=imgs_shape,
            trial_name=f"{best_trial_name}_seed{s}", class_weight=class_weight, num_classes=num_classes,
            epochs=FULL_EPOCHS,
        )
        if not isinstance(m, dict):
            raise TypeError(f"evaluate_best_model must return dict, got: {type(m)}")

        if numeric_keys is None:
            numeric_keys = [k for k, v in m.items() if isinstance(v, (Number, np.floating, np.integer))]
        per_seed_metrics.append(m)

        # quick line
        pk_val = _metric_or_default(m, primary_key, primary_minimize)
        extras = []
        for k in ["test_loss", "test_accuracy", "test_roc_auc", "test_rmse", "val_loss"]:
            if k in m and isinstance(m[k], (Number, np.floating, np.integer)):
                extras.append(f"{k}={float(m[k]):.6f}")
        print(f"   Seed {s}: {primary_key}={pk_val:.6f}" + (", " + ", ".join(extras) if extras else ""))

    # Aggregate across seeds
    aggregates = {}
    for k in numeric_keys or []:
        vals = [float(m[k]) for m in per_seed_metrics if k in m]
        if not vals:
            continue
        mean_k = float(np.mean(vals))
        std_k = float(np.std(vals, ddof=1)) if len(vals) > 1 else 0.0
        aggregates[k] = {"mean": mean_k, "std": std_k}

    # Param counts (same across seeds) & FLOPs (take from single-pass winner)
    winner_total_params = best_single_metrics.get("total_params")
    winner_train_params = best_single_metrics.get("trainable_params")
    winner_flops_str = best_single_metrics.get("flops_str")
    winner_macs_str  = best_single_metrics.get("macs_str")
    winner_flops_num = best_single_metrics.get("flops")
    winner_macs_num  = best_single_metrics.get("macs")

    # Save summary
    out_file = os.path.join(winner_save_path, "winner_multi_seed_summary.txt")
    with open(out_file, "w", encoding="utf-8") as f:
        f.write("# Final winner multi-seed evaluation (vision-only)\n")
        f.write(f"trial_number: {best_trial.number}\n")
        if model_name == "vit":
            f.write(f"patch_size: {best_trial.params.get('patch_size', None)}\n")
        f.write(f"primary_metric: {primary_key}\n")
        f.write(f"seeds: {FINAL_SEEDS}\n")
        # NEW: FLOPs/MACs summary (pretty + numeric if available)
        if winner_flops_str is not None and winner_macs_str is not None:
            f.write("compute:\n")
            f.write(f"  flops: {winner_flops_str}\n")
            f.write(f"  macs: {winner_macs_str}\n")
            if winner_flops_num is not None and winner_macs_num is not None:
                f.write(f"  flops_num: {winner_flops_num}\n")
                f.write(f"  macs_num: {winner_macs_num}\n")
        f.write("per_seed_metrics:\n")
        for s, m in zip(FINAL_SEEDS, per_seed_metrics):
            f.write(f"  - seed: {s}\n")
            for k in (numeric_keys or []):
                if k in m:
                    f.write(f"      {k}: {float(m[k]):.6f}\n")
        f.write("aggregates:\n")
        for k, mm in aggregates.items():
            f.write(f"  {k}:\n")
            f.write(f"    mean: {mm['mean']:.6f}\n")
            f.write(f"    std: {mm['std']:.6f}\n")

    # Console summary
    if primary_key in aggregates:
        print(f"\nWinner aggregated {primary_key}: {aggregates[primary_key]['mean']:.6f} "
              f"± {aggregates[primary_key]['std']:.6f}")
    elif "val_loss" in aggregates:
        print(f"\nWinner aggregated val_loss: {aggregates['val_loss']['mean']:.6f} "
              f"± {aggregates['val_loss']['std']:.6f}")
    if winner_total_params is not None:
        print(f"Model params: total={winner_total_params:,}, trainable={winner_train_params:,}")
    # NEW: echo compute again for clarity
    if winner_flops_str and winner_macs_str:
        print(f"Compute: FLOPs={winner_flops_str}, MACs={winner_macs_str}")
    print(f"Saved multi-seed summary to: {out_file}")

    return {
        "winner_trial_number": best_trial.number,
        "winner_trial_name": best_trial_name,
        "primary_metric": primary_key,
        "aggregates": aggregates,
        "total_params": winner_total_params,
        "trainable_params": winner_train_params,
        "flops": winner_flops_num,
        "macs": winner_macs_num,
        "summary_path": out_file,
    }
# ========================== end benchmark_eval.py =============================


In [None]:
import random
import numpy as np
import torch

def set_model_seed(seed: int):
    # Python built-in RNG
    random.seed(seed)
    # NumPy RNG
    np.random.seed(seed)
    # Torch RNG
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # if you use multi-GPU
    
    # For reproducibility
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


In [None]:
def filter_patch_sizes_infer_size(
    divisors: list[int],
    max_tokens: int = 196
) -> list[int]:
    """
    From `divisors`, assume the image side length S = max(divisors).
    Return those p in divisors such that:
      - p divides S (implicitly true since p ∈ divisors)
      - tokens = (S//p)^2 ≤ max_tokens
      - exclude p == S (the whole image patch)
    """
    if not divisors:
        return []
    S = max(divisors)
    valid = []
    for p in divisors:
        if p <= 0:
            continue
        if p == S:
            continue  # exclude whole-image patch
        # infer tokens
        tokens = (S // p) ** 2
        if tokens <= max_tokens:
            valid.append(p)
    return sorted(set(valid))


In [None]:
from torch.utils.data import DataLoader, Subset
import torch
import numpy as np

def reduce_dataloader(train_loader, fraction=0.25, stratify=True, seed=42):
    """
    Return a new DataLoader that draws from ~fraction of the original train dataset.
    For classification (TensorDataset(..., y)), uses a stratified subsample.
    """
    assert 0 < fraction <= 1.0
    ds = train_loader.dataset
    n = len(ds)
    num_keep = max(1, int(round(n * fraction)))
    idx = np.arange(n)

    # Try stratified pick if labels are available (TensorDataset last tensor is y)
    subset_idx = None
    if stratify and hasattr(ds, "tensors") and len(ds.tensors) >= 2:
        y = ds.tensors[-1].cpu().numpy().ravel()
        try:
            from sklearn.model_selection import StratifiedShuffleSplit
            sss = StratifiedShuffleSplit(n_splits=1, train_size=fraction, random_state=seed)
            chosen, _ = next(sss.split(idx, y))
            subset_idx = idx[chosen]
        except Exception:
            subset_idx = None  # fallback to random below

    # Fallback: random subset with a fixed seed
    if subset_idx is None:
        g = torch.Generator().manual_seed(seed)
        subset_idx = torch.randperm(n, generator=g)[:num_keep].tolist()

    # Build subset dataset and a new DataLoader (reuse original loader settings)
    subset = Subset(ds, subset_idx)  # official Subset utility
    new_loader = DataLoader(
        subset,
        batch_size=train_loader.batch_size,
        shuffle=True,                               # shuffle within the subset
        num_workers=getattr(train_loader, "num_workers", 0),
        pin_memory=getattr(train_loader, "pin_memory", False),
        drop_last=getattr(train_loader, "drop_last", False),
        persistent_workers=getattr(train_loader, "persistent_workers", False),
    )
    return new_loader

### EXPERIMENT: TINTO

In [None]:
#Select the model and the parameters
if task_type.lower() == "regression":
    problem_type = "regression"
else:
    problem_type = "supervised"
name = f"TINTO_blur"

#Define the dataset path and the folder where the images will be saved
images_folder = f"SyntheticImages/{task_type}/{dataset_name}/{name}"

In [None]:
train_loader, val_loader, test_loader, attributes, imgs_shape, label_encoder, class_weight  = load_and_preprocess_data(df, dataset_name, images_folder, problem_type, task_type, seed=SEED, batch_size=batch_size, device=device)

In [None]:
# Determine possible patch sizes for the Vision Transformer by finding divisors of the image width
divisors = find_divisors(imgs_shape[1])
filter_patch_sizes_infer_size(divisors)

In [None]:
divisors = filter_patch_sizes_infer_size(divisors)

In [None]:
divisors

In [None]:
path_vit = f"./logs/{task_type}/{dataset_name}/vit/{name}/best_model/trial_84_patch4"

In [None]:
import optuna
study = optuna.create_study(direction="minimize" if task_type.lower() == "regression" else "maximize")
study.optimize(lambda trial: objective(
    trial=trial,
    model_name=model_name,
    image_name=name,
    task_type=task_type,
    num_classes=num_classes,
    train_loader=reduce_dataloader(train_loader) if reduce else train_loader,
    val_loader=val_loader,
    test_loader=test_loader,
    divisors=divisors,
    imgs_shape=imgs_shape,
    device=device,
    save_dir=save_dir,
    class_weight=None, 
    epochs=epochs,
    path_vit=path_vit
), n_trials=n_trials)

In [None]:
result = run_topk_and_multiseed(
     study=study,
     model_name=model_name,
     dataset_name=dataset_name,
     name=name,
     task_type=task_type,
     save_dir=save_dir,
     imgs_shape=imgs_shape,
     num_classes=num_classes,
     class_weight=None,
     train_loader=train_loader, val_loader=val_loader, test_loader=test_loader
 )
print(result)

### EXPERIMENT: IGTD

In [None]:
#Select the model and the parameters
if task_type.lower() == "regression":
    problem_type = "regression"
else:
    problem_type = "supervised"

name = f"IGTD"

#Define the dataset path and the folder where the images will be saved
images_folder = f"SyntheticImages/{task_type}/{dataset_name}/{name}"

In [None]:
train_loader, val_loader, test_loader, attributes, imgs_shape, label_encoder, class_weight  = load_and_preprocess_data(df, dataset_name, images_folder, problem_type, task_type, seed=SEED, batch_size=batch_size, device=device, pad_images=True, target_size=8)

In [None]:
# Determine possible patch sizes for the Vision Transformer by finding divisors of the image width
divisors = find_divisors(imgs_shape[1])
filter_patch_sizes_infer_size(divisors)

In [None]:
divisors = filter_patch_sizes_infer_size(divisors)

In [None]:
path_vit = f"./logs/{task_type}/{dataset_name}/vit/{name}/best_model/trial_64_patch4"

In [None]:
import optuna
study = optuna.create_study(direction="minimize" if task_type.lower() == "regression" else "maximize")
study.optimize(lambda trial: objective(
    trial=trial,
    model_name=model_name,
    image_name=name,
    task_type=task_type,
    num_classes=num_classes,
    train_loader=reduce_dataloader(train_loader) if reduce else train_loader,
    val_loader=val_loader,
    test_loader=test_loader,
    divisors=divisors,
    imgs_shape=imgs_shape,
    device=device,
    save_dir=save_dir,
    class_weight=None, 
    epochs=epochs,
    path_vit=path_vit
), n_trials=n_trials)

In [None]:
result = run_topk_and_multiseed(
     study=study,
     model_name=model_name,
     dataset_name=dataset_name,
     name=name,
     task_type=task_type,
     save_dir=save_dir,
     imgs_shape=imgs_shape,
     num_classes=num_classes,
     class_weight=None,
     train_loader=train_loader, val_loader=val_loader, test_loader=test_loader
 )
print(result)

### EXPERIMENT: REFINED

In [None]:
#Select the model and the parameters
if task_type.lower() == "regression":
    problem_type = "regression"
else:
    problem_type = "supervised"

name = f"REFINED"

#Define the dataset path and the folder where the images will be saved
images_folder = f"SyntheticImages/{task_type}/{dataset_name}/{name}"

In [None]:
train_loader, val_loader, test_loader, attributes, imgs_shape, label_encoder, class_weight  = load_and_preprocess_data(df, dataset_name, images_folder, problem_type, task_type, seed=SEED, batch_size=batch_size, device=device, pad_images=True, target_size=8)

In [None]:
# Determine possible patch sizes for the Vision Transformer by finding divisors of the image width
divisors = find_divisors(imgs_shape[1])
filter_patch_sizes_infer_size(divisors)

In [None]:
divisors = filter_patch_sizes_infer_size(divisors)

In [None]:
path_vit = f"./logs/{task_type}/{dataset_name}/vit/{name}/best_model/trial_49_patch1"

In [None]:
import optuna
study = optuna.create_study(direction="minimize" if task_type.lower() == "regression" else "maximize")
study.optimize(lambda trial: objective(
    trial=trial,
    model_name=model_name,
    image_name=name,
    task_type=task_type,
    num_classes=num_classes,
    train_loader=reduce_dataloader(train_loader) if reduce else train_loader,
    val_loader=val_loader,
    test_loader=test_loader,
    divisors=divisors,
    imgs_shape=imgs_shape,
    device=device,
    save_dir=save_dir,
    class_weight=None, 
    epochs=epochs,
    path_vit=path_vit
), n_trials=n_trials)

In [None]:
result = run_topk_and_multiseed(
     study=study,
     model_name=model_name,
     dataset_name=dataset_name,
     name=name,
     task_type=task_type,
     save_dir=save_dir,
     imgs_shape=imgs_shape,
     num_classes=num_classes,
     class_weight=None,
     train_loader=train_loader, val_loader=val_loader, test_loader=test_loader
 )
print(result)

### EXPERIMENT: BarGraph

In [None]:
#Select the model and the parameters
if task_type.lower() == "regression":
    problem_type = "regression"
else:
    problem_type = "supervised"

name = f"BarGraph"

#Define the dataset path and the folder where the images will be saved
images_folder = f"SyntheticImages/{task_type}/{dataset_name}/{name}"

In [None]:
train_loader, val_loader, test_loader, attributes, imgs_shape, label_encoder, class_weight  = load_and_preprocess_data(df, dataset_name, images_folder, problem_type, task_type, seed=SEED, batch_size=batch_size, device=device, pad_images=True, target_size=40)

In [None]:
# Determine possible patch sizes for the Vision Transformer by finding divisors of the image width
divisors = find_divisors(imgs_shape[1])
filter_patch_sizes_infer_size(divisors)

In [None]:
divisors = filter_patch_sizes_infer_size(divisors)

In [None]:
path_vit = f"./logs/{task_type}/{dataset_name}/vit/{name}/best_model/trial_61_patch10"

In [None]:
import optuna
study = optuna.create_study(direction="minimize" if task_type.lower() == "regression" else "maximize")
study.optimize(lambda trial: objective(
    trial=trial,
    model_name=model_name,
    image_name=name,
    task_type=task_type,
    num_classes=num_classes,
    train_loader=reduce_dataloader(train_loader) if reduce else train_loader,
    val_loader=val_loader,
    test_loader=test_loader,
    divisors=divisors,
    imgs_shape=imgs_shape,
    device=device,
    save_dir=save_dir,
    class_weight=None, 
    epochs=epochs,
    path_vit=path_vit
), n_trials=n_trials)

In [None]:
result = run_topk_and_multiseed(
     study=study,
     model_name=model_name,
     dataset_name=dataset_name,
     name=name,
     task_type=task_type,
     save_dir=save_dir,
     imgs_shape=imgs_shape,
     num_classes=num_classes,
     class_weight=None,
     train_loader=train_loader, val_loader=val_loader, test_loader=test_loader
 )
print(result)

### EXPERIMENT: DistanceMatrix

In [None]:
#Select the model and the parameters
if task_type.lower() == "regression":
    problem_type = "regression"
else:
    problem_type = "supervised"

name = f"DistanceMatrix"

#Define the dataset path and the folder where the images will be saved
images_folder = f"SyntheticImages/{task_type}/{dataset_name}/{name}"

In [None]:
train_loader, val_loader, test_loader, attributes, imgs_shape, label_encoder, class_weight  = load_and_preprocess_data(df, dataset_name, images_folder, problem_type, task_type, seed=SEED, batch_size=batch_size, device=device, pad_images=True, target_size=40)

In [None]:
# Determine possible patch sizes for the Vision Transformer by finding divisors of the image width
divisors = find_divisors(imgs_shape[1])
filter_patch_sizes_infer_size(divisors)

In [None]:
divisors = filter_patch_sizes_infer_size(divisors)

In [None]:
path_vit = f"./logs/{task_type}/{dataset_name}/vit/{name}/best_model/trial_92_patch20"

In [None]:
import optuna
study = optuna.create_study(direction="minimize" if task_type.lower() == "regression" else "maximize")
study.optimize(lambda trial: objective(
    trial=trial,
    model_name=model_name,
    image_name=name,
    task_type=task_type,
    num_classes=num_classes,
    train_loader=reduce_dataloader(train_loader) if reduce else train_loader,
    val_loader=val_loader,
    test_loader=test_loader,
    divisors=divisors,
    imgs_shape=imgs_shape,
    device=device,
    save_dir=save_dir,
    class_weight=None, 
    epochs=epochs,
    path_vit=path_vit
), n_trials=n_trials)

In [None]:
result = run_topk_and_multiseed(
     study=study,
     model_name=model_name,
     dataset_name=dataset_name,
     name=name,
     task_type=task_type,
     save_dir=save_dir,
     imgs_shape=imgs_shape,
     num_classes=num_classes,
     class_weight=None,
     train_loader=train_loader, val_loader=val_loader, test_loader=test_loader
 )
print(result)

### EXPERIMENT: Combination

In [None]:
#Select the model and the parameters
if task_type.lower() == "regression":
    problem_type = "regression"
else:
    problem_type = "supervised"

name = f"Combination"

#Define the dataset path and the folder where the images will be saved
images_folder = f"SyntheticImages/{task_type}/{dataset_name}/{name}"

In [None]:
train_loader, val_loader, test_loader, attributes, imgs_shape, label_encoder, class_weight  = load_and_preprocess_data(df, dataset_name, images_folder, problem_type, task_type, seed=SEED, batch_size=batch_size, device=device, pad_images=True, target_size=40)

In [None]:
# Determine possible patch sizes for the Vision Transformer by finding divisors of the image width
divisors = find_divisors(imgs_shape[1])
filter_patch_sizes_infer_size(divisors)

In [None]:
divisors = filter_patch_sizes_infer_size(divisors)

In [None]:
path_vit = f"./logs/{task_type}/{dataset_name}/vit/{name}/best_model/trial_91_patch20"

In [None]:
import optuna
study = optuna.create_study(direction="minimize" if task_type.lower() == "regression" else "maximize")
study.optimize(lambda trial: objective(
    trial=trial,
    model_name=model_name,
    image_name=name,
    task_type=task_type,
    num_classes=num_classes,
    train_loader=reduce_dataloader(train_loader) if reduce else train_loader,
    val_loader=val_loader,
    test_loader=test_loader,
    divisors=divisors,
    imgs_shape=imgs_shape,
    device=device,
    save_dir=save_dir,
    class_weight=None, 
    epochs=epochs,
    path_vit=path_vit
), n_trials=n_trials)

In [None]:
result = run_topk_and_multiseed(
     study=study,
     model_name=model_name,
     dataset_name=dataset_name,
     name=name,
     task_type=task_type,
     save_dir=save_dir,
     imgs_shape=imgs_shape,
     num_classes=num_classes,
     class_weight=None,
     train_loader=train_loader, val_loader=val_loader, test_loader=test_loader
 )
print(result)

### EXPERIMENT: SuperTML

In [None]:
#Select the model and the parameters
if task_type.lower() == "regression":
    problem_type = "regression"
else:
    problem_type = "supervised"

name = f"SuperTML"

#Define the dataset path and the folder where the images will be saved
images_folder = f"SyntheticImages/{task_type}/{dataset_name}/{name}"

In [None]:
train_loader, val_loader, test_loader, attributes, imgs_shape, label_encoder, class_weight  = load_and_preprocess_data(df, dataset_name, images_folder, problem_type, task_type, seed=SEED, batch_size=batch_size, device=device)

In [None]:
# Determine possible patch sizes for the Vision Transformer by finding divisors of the image width
divisors = find_divisors(imgs_shape[1])
filter_patch_sizes_infer_size(divisors)

In [None]:
divisors = filter_patch_sizes_infer_size(divisors)

In [None]:
path_vit = f"./logs/{task_type}/{dataset_name}/vit/{name}/best_model/trial_61_patch32"

In [None]:
import optuna
study = optuna.create_study(direction="minimize" if task_type.lower() == "regression" else "maximize")
study.optimize(lambda trial: objective(
    trial=trial,
    model_name=model_name,
    image_name=name,
    task_type=task_type,
    num_classes=num_classes,
    train_loader=reduce_dataloader(train_loader) if reduce else train_loader,
    val_loader=val_loader,
    test_loader=test_loader,
    divisors=divisors,
    imgs_shape=imgs_shape,
    device=device,
    save_dir=save_dir,
    class_weight=None, 
    epochs=epochs,
    path_vit=path_vit
), n_trials=n_trials)

In [None]:
result = run_topk_and_multiseed(
     study=study,
     model_name=model_name,
     dataset_name=dataset_name,
     name=name,
     task_type=task_type,
     save_dir=save_dir,
     imgs_shape=imgs_shape,
     num_classes=num_classes,
     class_weight=None,
     train_loader=train_loader, val_loader=val_loader, test_loader=test_loader
 )
print(result)

### EXPERIMENT: FeatureWrap

In [None]:
#Select the model and the parameters
if task_type.lower() == "regression":
    problem_type = "regression"
else:
    problem_type = "supervised"

name = f"FeatureWrap"

#Define the dataset path and the folder where the images will be saved
images_folder = f"SyntheticImages/{task_type}/{dataset_name}/{name}"

In [None]:
train_loader, val_loader, test_loader, attributes, imgs_shape, label_encoder, class_weight  = load_and_preprocess_data(df, dataset_name, images_folder, problem_type, task_type, seed=SEED, batch_size=batch_size, device=device, pad_images=True, target_size=8)

In [None]:
# Determine possible patch sizes for the Vision Transformer by finding divisors of the image width
divisors = find_divisors(imgs_shape[1])
filter_patch_sizes_infer_size(divisors)

In [None]:
divisors = filter_patch_sizes_infer_size(divisors)

In [None]:
path_vit = f"./logs/{task_type}/{dataset_name}/vit/{name}/best_model/trial_74_patch4"

In [None]:
import optuna
study = optuna.create_study(direction="minimize" if task_type.lower() == "regression" else "maximize")
study.optimize(lambda trial: objective(
    trial=trial,
    model_name=model_name,
    image_name=name,
    task_type=task_type,
    num_classes=num_classes,
    train_loader=reduce_dataloader(train_loader) if reduce else train_loader,
    val_loader=val_loader,
    test_loader=test_loader,
    divisors=divisors,
    imgs_shape=imgs_shape,
    device=device,
    save_dir=save_dir,
    class_weight=None, 
    epochs=epochs,
    path_vit=path_vit
), n_trials=n_trials)

In [None]:
result = run_topk_and_multiseed(
     study=study,
     model_name=model_name,
     dataset_name=dataset_name,
     name=name,
     task_type=task_type,
     save_dir=save_dir,
     imgs_shape=imgs_shape,
     num_classes=num_classes,
     class_weight=None,
     train_loader=train_loader, val_loader=val_loader, test_loader=test_loader
 )
print(result)

### EXPERIMENT: BIE

In [None]:
#Select the model and the parameters
if task_type.lower() == "regression":
    problem_type = "regression"
else:
    problem_type = "supervised"

name = f"BIE"

#Define the dataset path and the folder where the images will be saved
images_folder = f"SyntheticImages/{task_type}/{dataset_name}/{name}"

In [None]:
train_loader, val_loader, test_loader, attributes, imgs_shape, label_encoder, class_weight  = load_and_preprocess_data(df, dataset_name, images_folder, problem_type, task_type, seed=SEED, batch_size=batch_size, device=device)

In [None]:
# Determine possible patch sizes for the Vision Transformer by finding divisors of the image width
divisors = find_divisors(imgs_shape[1])
filter_patch_sizes_infer_size(divisors)

In [None]:
divisors = filter_patch_sizes_infer_size(divisors)

In [None]:
path_vit = f"./logs/{task_type}/{dataset_name}/vit/{name}/best_model/trial_69_patch16"

In [None]:
import optuna
study = optuna.create_study(direction="minimize" if task_type.lower() == "regression" else "maximize")
study.optimize(lambda trial: objective(
    trial=trial,
    model_name=model_name,
    image_name=name,
    task_type=task_type,
    num_classes=num_classes,
    train_loader=reduce_dataloader(train_loader) if reduce else train_loader,
    val_loader=val_loader,
    test_loader=test_loader,
    divisors=divisors,
    imgs_shape=imgs_shape,
    device=device,
    save_dir=save_dir,
    class_weight=None, 
    epochs=epochs,
    path_vit=path_vit
), n_trials=n_trials)

In [None]:
result = run_topk_and_multiseed(
     study=study,
     model_name=model_name,
     dataset_name=dataset_name,
     name=name,
     task_type=task_type,
     save_dir=save_dir,
     imgs_shape=imgs_shape,
     num_classes=num_classes,
     class_weight=None,
     train_loader=train_loader, val_loader=val_loader, test_loader=test_loader
 )
print(result)