In [None]:
# Author : shadril238
!find /content -maxdepth 3 -iname "sympy.py" -o -iname "sympy"

In [None]:
!pip -q uninstall -y sympy
!pip -q install -U sympy==1.13.1

import sympy
print("sympy version:", sympy.__version__)

In [None]:
# Imports and configs

import os, random, math, time, json
from dataclasses import dataclass
from typing import Dict, List, Tuple

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import cv2
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Subset
from torch.amp import autocast, GradScaler

from torchvision import datasets

from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    classification_report, confusion_matrix, accuracy_score,
    f1_score, balanced_accuracy_score
)

import timm
from timm.data import resolve_data_config, create_transform
from timm.utils import ModelEmaV2

!pip install optuna captum
import optuna
from captum.attr import LayerGradCam, IntegratedGradients

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

def seed_everything(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = True

seed_everything(42)

@dataclass
class CFG:
    dataset_path: str = "/content/drive/MyDrive/ml-project2/dataset"
    num_workers: int = 2

    train_ratio: float = 0.70
    val_ratio: float = 0.15
    test_ratio: float = 0.15

    batch_size: int = 32
    epochs_quick: int = 6
    epochs_full: int = 14
    patience: int = 5

    lr: float = 3e-4
    lr_head: float = 1e-3
    weight_decay: float = 1e-4
    label_smoothing: float = 0.05

    img_size: int = 224
    img_size_big: int = 384

    mixup: float = 0.2
    cutmix: float = 0.2

    grad_clip: float = 1.0

    n_trials_best_model: int = 12
    n_trials_custom_vit: int = 15

CFG = CFG()

In [None]:
# Mounts Google Drive
from google.colab import drive
drive.mount("/content/drive")

assert os.path.exists(CFG.dataset_path), f"Dataset path not found: {CFG.dataset_path}"
print("Dataset path OK:", CFG.dataset_path)


In [None]:
# Data Analysis

img_exts = (".png", ".jpg", ".jpeg", ".bmp", ".gif", ".webp")

def scan_dataset(root: str) -> pd.DataFrame:
    rows = []
    for cls in sorted(os.listdir(root)):
        cls_dir = os.path.join(root, cls)
        if not os.path.isdir(cls_dir):
            continue
        files = [f for f in os.listdir(cls_dir) if f.lower().endswith(img_exts)]
        rows.append({"class": cls, "count": len(files)})
    return pd.DataFrame(rows).sort_values("count", ascending=False).reset_index(drop=True)

df_stats = scan_dataset(CFG.dataset_path)
display(df_stats)
print("Total images:", int(df_stats["count"].sum()))
print("Num classes:", len(df_stats))

plt.figure(figsize=(12, max(6, len(df_stats)*0.25)))
plt.barh(df_stats["class"], df_stats["count"])
plt.gca().invert_yaxis()
plt.title("Class Distribution")
plt.xlabel("Images per class")
plt.tight_layout()
plt.show()

def show_samples_per_class(root: str, df: pd.DataFrame, k: int = 24, cols: int = 6):
    k = min(k, len(df))
    rows = math.ceil(k / cols)
    plt.figure(figsize=(cols*3, rows*3))
    shown = 0
    for i in range(k):
        cls = df.loc[i, "class"]
        cls_dir = os.path.join(root, cls)
        files = [f for f in os.listdir(cls_dir) if f.lower().endswith(img_exts)]
        if not files:
            continue
        p = os.path.join(cls_dir, random.choice(files))
        img = cv2.imread(p)
        if img is None:
            continue
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        shown += 1
        plt.subplot(rows, cols, shown)
        plt.imshow(img)
        plt.title(cls, fontsize=9)
        plt.axis("off")
    plt.tight_layout()
    plt.show()

show_samples_per_class(CFG.dataset_path, df_stats, k=24, cols=6)


In [None]:
# Detect corrupted images and size outliers.

def check_images(root: str, max_per_class: int = 3000) -> pd.DataFrame:
    bad = []
    sizes = []

    for cls in sorted(os.listdir(root)):
        cls_dir = os.path.join(root, cls)
        if not os.path.isdir(cls_dir):
            continue
        files = [f for f in os.listdir(cls_dir) if f.lower().endswith(img_exts)]
        files = files[:max_per_class]
        for f in files:
            p = os.path.join(cls_dir, f)
            try:
                img = Image.open(p).convert("RGB")
                w, h = img.size
                sizes.append((w, h))
            except Exception as e:
                bad.append({"class": cls, "file": f, "error": str(e)})

    df_bad = pd.DataFrame(bad)
    print("Bad images found:", len(df_bad))

    if len(sizes) > 0:
        sizes = np.array(sizes)
        print("Width min, median, max:", int(sizes[:,0].min()), int(np.median(sizes[:,0])), int(sizes[:,0].max()))
        print("Height min, median, max:", int(sizes[:,1].min()), int(np.median(sizes[:,1])), int(sizes[:,1].max()))

        plt.figure(figsize=(6,5))
        plt.scatter(sizes[:,0], sizes[:,1], s=6, alpha=0.25)
        plt.title("Image size scatter (W vs H)")
        plt.xlabel("Width")
        plt.ylabel("Height")
        plt.tight_layout()
        plt.show()

    return df_bad

df_bad = check_images(CFG.dataset_path)
if len(df_bad) > 0:
    display(df_bad.head(30))


In [None]:
# Creates stratified train/val/test splits

base_ds = datasets.ImageFolder(CFG.dataset_path)
classes = base_ds.classes
num_classes = len(classes)
targets = np.array([t for _, t in base_ds.samples])
idx_all = np.arange(len(base_ds))

train_idx, tmp_idx = train_test_split(
    idx_all,
    test_size=(1.0 - CFG.train_ratio),
    stratify=targets,
    random_state=42
)

tmp_targets = targets[tmp_idx]
val_rel = CFG.val_ratio / (CFG.val_ratio + CFG.test_ratio)

val_idx, test_idx = train_test_split(
    tmp_idx,
    test_size=(1.0 - val_rel),
    stratify=tmp_targets,
    random_state=42
)

print("Split sizes:", len(train_idx), len(val_idx), len(test_idx))
print("Num classes:", num_classes)

def split_counts(idxs):
    return np.bincount(targets[idxs], minlength=num_classes)

df_split = pd.DataFrame({
    "class": classes,
    "train": split_counts(train_idx),
    "val": split_counts(val_idx),
    "test": split_counts(test_idx),
})
display(df_split.sort_values("train", ascending=False).head(20))

plt.figure(figsize=(12,4))
plt.plot(df_split["train"].values, label="train")
plt.plot(df_split["val"].values, label="val")
plt.plot(df_split["test"].values, label="test")
plt.title("Per-class counts across splits (class order = ImageFolder classes)")
plt.legend()
plt.grid(True, alpha=0.2)
plt.tight_layout()
plt.show()


In [None]:
# Builds model-specific transforms, previews augmentations and normalization.

def build_transforms(model_name: str, img_size: int, is_train: bool):
    model = timm.create_model(model_name, pretrained=True)
    data_cfg = resolve_data_config({}, model=model)
    data_cfg["input_size"] = (3, img_size, img_size)

    if is_train:
        tfm = create_transform(
            input_size=data_cfg["input_size"],
            is_training=True,
            interpolation=data_cfg.get("interpolation", "bicubic"),
            mean=data_cfg.get("mean", (0.485, 0.456, 0.406)),
            std=data_cfg.get("std", (0.229, 0.224, 0.225)),
            crop_pct=data_cfg.get("crop_pct", 0.875),

            auto_augment="rand-m7-mstd0.5-inc1",
            re_prob=0.10,
            re_mode="pixel",
            re_count=1,
        )
    else:
        tfm = create_transform(
            input_size=data_cfg["input_size"],
            is_training=False,
            interpolation=data_cfg.get("interpolation", "bicubic"),
            mean=data_cfg.get("mean", (0.485, 0.456, 0.406)),
            std=data_cfg.get("std", (0.229, 0.224, 0.225)),
            crop_pct=data_cfg.get("crop_pct", 0.875),
        )

    return tfm, data_cfg

def denorm_tensor(x, mean, std):
    mean = torch.tensor(mean)[:, None, None]
    std  = torch.tensor(std)[:, None, None]
    x = x.cpu() * std + mean
    return x.clamp(0, 1)

preview_model = "tf_efficientnetv2_s"
train_tfm, cfg_t = build_transforms(preview_model, CFG.img_size, True)

preview_ds = datasets.ImageFolder(CFG.dataset_path, transform=train_tfm)
x, y = preview_ds[random.randint(0, len(preview_ds)-1)]

plt.figure(figsize=(4,4))
plt.imshow(denorm_tensor(x, cfg_t["mean"], cfg_t["std"]).permute(1,2,0))
plt.title(f"Augmented sample | class: {classes[y]}")
plt.axis("off")
plt.tight_layout()
plt.show()


In [None]:
# Creates dataLoaders

def make_loaders(model_name: str, img_size: int, batch_size: int):
    train_tfm, data_cfg = build_transforms(model_name, img_size, True)
    eval_tfm, _ = build_transforms(model_name, img_size, False)

    train_ds = datasets.ImageFolder(CFG.dataset_path, transform=train_tfm)
    val_ds   = datasets.ImageFolder(CFG.dataset_path, transform=eval_tfm)
    test_ds  = datasets.ImageFolder(CFG.dataset_path, transform=eval_tfm)

    train_subset = Subset(train_ds, train_idx)
    val_subset   = Subset(val_ds, val_idx)
    test_subset  = Subset(test_ds, test_idx)

    train_loader = DataLoader(
        train_subset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=CFG.num_workers,
        pin_memory=True,
        drop_last=True
    )
    val_loader = DataLoader(
        val_subset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=CFG.num_workers,
        pin_memory=True
    )
    test_loader = DataLoader(
        test_subset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=CFG.num_workers,
        pin_memory=True
    )

    return train_loader, val_loader, test_loader, data_cfg, train_subset, val_subset, test_subset


In [None]:
# Defines class-weighting, evaluation metrics, and EMA training loop.

def make_class_weight_tensor():
    class_counts = np.bincount(targets[train_idx], minlength=num_classes).astype(np.float32)
    w = (class_counts.sum() / np.maximum(class_counts, 1.0))
    w = w / w.mean()
    return torch.tensor(w, device=device)

@torch.no_grad()
def evaluate(model, loader, criterion):
    model.eval()
    total_loss = 0.0
    all_y, all_p = [], []
    n = 0

    for x, y in loader:
        x, y = x.to(device), y.to(device)
        with autocast(device_type="cuda" if device=="cuda" else "cpu"):
            logits = model(x)
            loss = criterion(logits, y)

        total_loss += loss.item() * x.size(0)
        preds = logits.argmax(1)

        all_y.append(y.detach().cpu().numpy())
        all_p.append(preds.detach().cpu().numpy())
        n += x.size(0)

    y_true = np.concatenate(all_y)
    y_pred = np.concatenate(all_p)

    return {
        "loss": total_loss / max(1, n),
        "acc": accuracy_score(y_true, y_pred),
        "bal_acc": balanced_accuracy_score(y_true, y_pred),
        "f1_macro": f1_score(y_true, y_pred, average="macro"),
    }, y_true, y_pred

def plot_history(hist, title):
    df = pd.DataFrame(hist)
    plt.figure(figsize=(12,4))

    plt.subplot(1,2,1)
    plt.plot(df["train_loss"], label="train_loss")
    plt.plot(df["val_loss"], label="val_loss")
    plt.title(title + " loss")
    plt.legend()
    plt.grid(True, alpha=0.2)

    plt.subplot(1,2,2)
    plt.plot(df["train_acc"], label="train_acc")
    plt.plot(df["val_acc"], label="val_acc")
    plt.title(title + " accuracy")
    plt.legend()
    plt.grid(True, alpha=0.2)

    plt.tight_layout()
    plt.show()

def train_model(
    model,
    train_loader,
    val_loader,
    criterion,
    epochs: int,
    lr: float,
    weight_decay: float,
    patience: int,
    model_name: str
):
    model = model.to(device)
    scaler = GradScaler("cuda") if device == "cuda" else GradScaler("cpu")
    ema = ModelEmaV2(model, decay=0.999)

    opt = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
    sch = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=max(1, epochs))

    best = -1.0
    best_state = None
    bad = 0
    hist = []

    for ep in range(1, epochs+1):
        model.train()
        tot, cor, tloss = 0, 0, 0.0

        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            opt.zero_grad(set_to_none=True)

            with autocast(device_type="cuda" if device=="cuda" else "cpu"):
                logits = model(x)
                loss = criterion(logits, y)

            scaler.scale(loss).backward()
            scaler.unscale_(opt)
            torch.nn.utils.clip_grad_norm_(model.parameters(), CFG.grad_clip)
            scaler.step(opt)
            scaler.update()

            ema.update(model)

            tloss += loss.item() * x.size(0)
            cor += (logits.argmax(1) == y).sum().item()
            tot += x.size(0)

        sch.step()

        val_m, _, _ = evaluate(ema.module, val_loader, criterion)

        tr_acc = cor / max(1, tot)
        tr_loss = tloss / max(1, tot)

        hist.append({
            "epoch": ep,
            "train_loss": tr_loss,
            "train_acc": tr_acc,
            "val_loss": val_m["loss"],
            "val_acc": val_m["acc"],
            "val_f1_macro": val_m["f1_macro"],
            "lr": opt.param_groups[0]["lr"]
        })

        print(f"[{model_name}] ep {ep:02d} tr_acc {tr_acc:.4f} val_acc {val_m['acc']:.4f} val_f1 {val_m['f1_macro']:.4f}")

        if val_m["acc"] > best:
            best = val_m["acc"]
            best_state = {k: v.detach().cpu() for k, v in ema.module.state_dict().items()}
            bad = 0
        else:
            bad += 1
            if bad >= patience:
                print("Early stopping")
                break

    if best_state is not None:
        ema.module.load_state_dict(best_state)

    return ema.module, hist


In [None]:
# Extracts pretrained CNN embeddings for train, val, test splits.

@torch.no_grad()
def extract_embeddings(backbone_name: str, img_size: int, batch_size: int):
    eval_tfm, cfg = build_transforms(backbone_name, img_size, False)
    ds = datasets.ImageFolder(CFG.dataset_path, transform=eval_tfm)

    tr = Subset(ds, train_idx)
    va = Subset(ds, val_idx)
    te = Subset(ds, test_idx)

    tr_loader = DataLoader(tr, batch_size=batch_size, shuffle=False, num_workers=CFG.num_workers, pin_memory=True)
    va_loader = DataLoader(va, batch_size=batch_size, shuffle=False, num_workers=CFG.num_workers, pin_memory=True)
    te_loader = DataLoader(te, batch_size=batch_size, shuffle=False, num_workers=CFG.num_workers, pin_memory=True)

    feat_model = timm.create_model(backbone_name, pretrained=True, num_classes=0).to(device).eval()
    dim = feat_model.num_features
    print("Backbone:", backbone_name, "| feature_dim:", dim)

    def run(loader):
        feats, labs = [], []
        for x, y in loader:
            x = x.to(device)
            with autocast(device_type="cuda" if device=="cuda" else "cpu"):
                f = feat_model(x)
            feats.append(f.detach().cpu().float().numpy())
            labs.append(y.numpy())
        return np.concatenate(feats), np.concatenate(labs)

    Xtr, ytr = run(tr_loader)
    Xva, yva = run(va_loader)
    Xte, yte = run(te_loader)

    return (Xtr, ytr), (Xva, yva), (Xte, yte), cfg

cnn_backbone = "tf_efficientnetv2_s"
(Xtr_cnn, ytr), (Xva_cnn, yva), (Xte_cnn, yte), cfg_cnn = extract_embeddings(cnn_backbone, CFG.img_size, batch_size=64)

print("CNN embeddings:", Xtr_cnn.shape, Xva_cnn.shape, Xte_cnn.shape)


In [None]:
# Extracts ViT embeddings and concatenates with CNN for hybrid features.

vit_backbone = "vit_base_patch16_224"
(Xtr_vit, _), (Xva_vit, _), (Xte_vit, _), cfg_vit = extract_embeddings(vit_backbone, CFG.img_size, batch_size=64)

print("ViT embeddings:", Xtr_vit.shape, Xva_vit.shape, Xte_vit.shape)

Xtr_h = np.concatenate([Xtr_cnn, Xtr_vit], axis=1)
Xva_h = np.concatenate([Xva_cnn, Xva_vit], axis=1)
Xte_h = np.concatenate([Xte_cnn, Xte_vit], axis=1)

print("Hybrid embeddings:", Xtr_h.shape, Xva_h.shape, Xte_h.shape)


In [None]:
# Visualizes CNN and hybrid embeddings using t-SNE plots.

from sklearn.manifold import TSNE
from sklearn.preprocessing import StandardScaler

def tsne_plot(X, y, title, max_points=2000):
    n = min(max_points, len(X))
    idx = np.random.choice(len(X), size=n, replace=False)
    Xs = StandardScaler().fit_transform(X[idx])

    tsne = TSNE(n_components=2, perplexity=30, init="pca", learning_rate="auto", random_state=42)
    Z = tsne.fit_transform(Xs)

    plt.figure(figsize=(7,6))
    plt.scatter(Z[:,0], Z[:,1], c=y[idx], s=6, alpha=0.6)
    plt.title(title)
    plt.tight_layout()
    plt.show()

tsne_plot(Xtr_cnn, ytr, "t-SNE: CNN embeddings (train subset)")
tsne_plot(Xtr_h, ytr, "t-SNE: Hybrid embeddings (CNN + ViT) (train subset)")


In [None]:
# Trains MLP classifier on extracted CNN embeddings.

class MLPClassifier(nn.Module):
    def __init__(self, in_dim: int, num_classes: int, hidden: int = 512, dropout: float = 0.3):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, hidden),
            nn.BatchNorm1d(hidden),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden, hidden//2),
            nn.BatchNorm1d(hidden//2),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden//2, num_classes),
        )

    def forward(self, x):
        return self.net(x)

class NumpyFeatureDataset(torch.utils.data.Dataset):
    def __init__(self, X: np.ndarray, y: np.ndarray):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)
    def __len__(self):
        return len(self.y)
    def __getitem__(self, i):
        return self.X[i], self.y[i]

def train_mlp_on_features(Xtr, ytr, Xva, yva, Xte, yte, hidden=512, dropout=0.3, lr=1e-3, wd=1e-4, epochs=25, patience=6, name="MLP"):
    tr_ds = NumpyFeatureDataset(Xtr, ytr)
    va_ds = NumpyFeatureDataset(Xva, yva)
    te_ds = NumpyFeatureDataset(Xte, yte)

    tr_loader = DataLoader(tr_ds, batch_size=256, shuffle=True, drop_last=True)
    va_loader = DataLoader(va_ds, batch_size=256, shuffle=False)
    te_loader = DataLoader(te_ds, batch_size=256, shuffle=False)

    model = MLPClassifier(Xtr.shape[1], num_classes, hidden=hidden, dropout=dropout).to(device)

    # class weighted loss
    class_counts = np.bincount(ytr, minlength=num_classes).astype(np.float32)
    w = (class_counts.sum() / np.maximum(class_counts, 1.0))
    w = w / w.mean()
    w = torch.tensor(w, device=device)

    criterion = nn.CrossEntropyLoss(weight=w, label_smoothing=0.0)
    opt = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=wd)

    best = -1.0
    best_state = None
    bad = 0
    hist = []

    for ep in range(1, epochs+1):
        model.train()
        tloss, cor, tot = 0.0, 0, 0
        for xb, yb in tr_loader:
            xb, yb = xb.to(device), yb.to(device)
            opt.zero_grad(set_to_none=True)
            logits = model(xb)
            loss = criterion(logits, yb)
            loss.backward()
            opt.step()

            tloss += loss.item() * xb.size(0)
            cor += (logits.argmax(1) == yb).sum().item()
            tot += xb.size(0)

        model.eval()
        with torch.no_grad():
            vloss, vcor, vtot = 0.0, 0, 0
            for xb, yb in va_loader:
                xb, yb = xb.to(device), yb.to(device)
                logits = model(xb)
                loss = criterion(logits, yb)
                vloss += loss.item() * xb.size(0)
                vcor += (logits.argmax(1) == yb).sum().item()
                vtot += xb.size(0)

        tr_acc = cor / max(1, tot)
        va_acc = vcor / max(1, vtot)
        hist.append({"epoch": ep, "train_loss": tloss/max(1,tot), "train_acc": tr_acc, "val_loss": vloss/max(1,vtot), "val_acc": va_acc})

        print(f"[{name}] ep {ep:02d} tr_acc {tr_acc:.4f} val_acc {va_acc:.4f}")

        if va_acc > best:
            best = va_acc
            best_state = {k: v.detach().cpu() for k, v in model.state_dict().items()}
            bad = 0
        else:
            bad += 1
            if bad >= patience:
                print("Early stopping")
                break

    model.load_state_dict(best_state)

    model.eval()
    all_p = []
    with torch.no_grad():
        for xb, yb in te_loader:
            xb = xb.to(device)
            logits = model(xb)
            all_p.append(logits.argmax(1).cpu().numpy())
    y_pred = np.concatenate(all_p)

    test_acc = accuracy_score(yte, y_pred)
    test_f1  = f1_score(yte, y_pred, average="macro")
    return model, hist, {"test_acc": test_acc, "test_f1_macro": test_f1}

mlp_cnn, hist_mlp_cnn, res_mlp_cnn = train_mlp_on_features(
    Xtr_cnn, ytr, Xva_cnn, yva, Xte_cnn, yte,
    hidden=512, dropout=0.35, lr=1e-3, wd=1e-4,
    epochs=30, patience=6,
    name="MLP_CNNfeat"
)

plot_history(hist_mlp_cnn, "MLP on CNN features")
print("MLP_CNNfeat:", res_mlp_cnn)


In [None]:
# Fine-tunes end-to-end ViT baseline and evaluates performance.

vit_model_name = "vit_base_patch16_224"
train_loader, val_loader, test_loader, data_cfg, tr_sub, va_sub, te_sub = make_loaders(
    vit_model_name, CFG.img_size, CFG.batch_size
)

w = make_class_weight_tensor()
criterion = nn.CrossEntropyLoss(weight=w, label_smoothing=CFG.label_smoothing)

vit_model = timm.create_model(vit_model_name, pretrained=True, num_classes=num_classes)

vit_model, hist_vit = train_model(
    vit_model, train_loader, val_loader,
    criterion=criterion,
    epochs=CFG.epochs_quick,
    lr=CFG.lr,
    weight_decay=CFG.weight_decay,
    patience=CFG.patience,
    model_name="ViT_end2end"
)

plot_history(hist_vit, "ViT end to end (quick)")

test_m_vit, y_true_vit, y_pred_vit = evaluate(vit_model, test_loader, criterion)
print("ViT test:", test_m_vit)


In [None]:
# Trains MLP on hybrid CNN+ViT concatenated features.

mlp_hybrid, hist_mlp_hybrid, res_mlp_hybrid = train_mlp_on_features(
    Xtr_h, ytr, Xva_h, yva, Xte_h, yte,
    hidden=1024, dropout=0.35, lr=8e-4, wd=2e-4,
    epochs=30, patience=6,
    name="MLP_Hybridfeat"
)

plot_history(hist_mlp_hybrid, "MLP on Hybrid features (CNN + ViT)")
print("MLP_Hybridfeat:", res_mlp_hybrid)


In [None]:
# Compares three models
best_val_mlp_cnn = max([x["val_acc"] for x in hist_mlp_cnn])
best_val_mlp_hyb = max([x["val_acc"] for x in hist_mlp_hybrid])
best_val_vit     = max([x["val_acc"] for x in hist_vit])

df_cmp = pd.DataFrame([
    {"model_type": "DL_on_CNN_features", "best_val_acc": best_val_mlp_cnn, "test_acc": res_mlp_cnn["test_acc"]},
    {"model_type": "Hybrid_DL_on_CNNplusViT_features", "best_val_acc": best_val_mlp_hyb, "test_acc": res_mlp_hybrid["test_acc"]},
    {"model_type": "ViT_end2end", "best_val_acc": best_val_vit, "test_acc": test_m_vit["acc"]},
]).sort_values("best_val_acc", ascending=False)

display(df_cmp)

best_type = df_cmp.iloc[0]["model_type"]
print("Best by validation:", best_type)


In [None]:
# Tunes best model hyperparameters using Optuna optimization trials.

def objective_best_model(trial: optuna.Trial):
    if best_type in ["DL_on_CNN_features", "Hybrid_DL_on_CNNplusViT_features"]:
        Xtr = Xtr_cnn if best_type == "DL_on_CNN_features" else Xtr_h
        Xva = Xva_cnn if best_type == "DL_on_CNN_features" else Xva_h
        ytr0, yva0 = ytr, yva

        hidden = trial.suggest_categorical("hidden", [256, 512, 768, 1024, 1536])
        dropout = trial.suggest_float("dropout", 0.1, 0.5)
        lr = trial.suggest_float("lr", 1e-4, 3e-3, log=True)
        wd = trial.suggest_float("weight_decay", 1e-6, 5e-3, log=True)

        model, hist, _ = train_mlp_on_features(
            Xtr, ytr0, Xva, yva0, Xva, yva0,
            hidden=hidden, dropout=dropout, lr=lr, wd=wd,
            epochs=18, patience=5,
            name="tune_mlp"
        )
        best_val = max([h["val_acc"] for h in hist])
        return best_val

    else:
        model_name = vit_model_name
        lr = trial.suggest_float("lr", 1e-5, 8e-4, log=True)
        wd = trial.suggest_float("weight_decay", 1e-6, 5e-3, log=True)
        ls = trial.suggest_float("label_smoothing", 0.0, 0.15)

        bs = trial.suggest_categorical("batch_size", [16, 32])
        img_size = trial.suggest_categorical("img_size", [224, 384])

        train_loader, val_loader, _, _, _, _, _ = make_loaders(model_name, img_size, bs)

        w = make_class_weight_tensor()
        criterion = nn.CrossEntropyLoss(weight=w, label_smoothing=ls)

        model = timm.create_model(model_name, pretrained=True, num_classes=num_classes)

        model, hist = train_model(
            model, train_loader, val_loader,
            criterion=criterion,
            epochs=6,
            lr=lr,
            weight_decay=wd,
            patience=3,
            model_name="tune_vit"
        )
        best_val = max([h["Acc"] for h in hist])
        return best_val

study_best = optuna.create_study(direction="maximize")
study_best.optimize(objective_best_model, n_trials=CFG.n_trials_best_model)

print("Best tuned params for best model:", study_best.best_params)
print("Best acc:", study_best.best_value)
best_params = study_best.best_params


In [None]:
# Retrains tuned best model fully

final_results = {}

if best_type in ["DL_on_CNN_features", "Hybrid_DL_on_CNNplusViT_features"]:
    Xtr = Xtr_cnn if best_type == "DL_on_CNN_features" else Xtr_h
    Xva = Xva_cnn if best_type == "DL_on_CNN_features" else Xva_h
    Xte = Xte_cnn if best_type == "DL_on_CNN_features" else Xte_h

    final_mlp, hist_final_mlp, res_final_mlp = train_mlp_on_features(
        Xtr, ytr, Xva, yva, Xte, yte,
        hidden=best_params["hidden"],
        dropout=best_params["dropout"],
        lr=best_params["lr"],
        wd=best_params["weight_decay"],
        epochs=35, patience=7,
        name="FINAL_best_MLP"
    )
    plot_history(hist_final_mlp, "FINAL best MLP (tuned)")
    final_results["best_model_test_acc"] = res_final_mlp["test_acc"]
    print("FINAL best MLP:", res_final_mlp)

else:
    img_size = best_params["img_size"]
    bs = best_params["batch_size"]
    lr = best_params["lr"]
    wd = best_params["weight_decay"]
    ls = best_params["label_smoothing"]

    train_loader, val_loader, test_loader, data_cfg, _, _, _ = make_loaders(vit_model_name, img_size, bs)
    w = make_class_weight_tensor()
    criterion = nn.CrossEntropyLoss(weight=w, label_smoothing=ls)

    vit_final = timm.create_model(vit_model_name, pretrained=True, num_classes=num_classes)
    vit_final, hist_final_vit = train_model(
        vit_final, train_loader, val_loader,
        criterion=criterion,
        epochs=CFG.epochs_full,
        lr=lr,
        weight_decay=wd,
        patience=CFG.patience,
        model_name="FINAL_best_ViT"
    )
    plot_history(hist_final_vit, "FINAL best ViT (tuned)")

    test_m, yt, yp = evaluate(vit_final, test_loader, criterion)
    print("FINAL best ViT test:", test_m)
    final_results["best_model_test_acc"] = test_m["acc"]


In [None]:
# Tunes CustomViT architecture and training hyperparameters via Optuna.

import optuna
import torch.nn as nn
import timm

class CustomViT(nn.Module):
    def __init__(
        self,
        backbone_name: str,
        img_size: int,
        num_classes: int,
        head_hidden: int,
        dropout: float,
        unfreeze_blocks: int
    ):
        super().__init__()

        # make backbone match the input image size
        # dynamic_img_size=True makes it tolerant in many timm ViT variants
        self.backbone = timm.create_model(
            backbone_name,
            pretrained=True,
            num_classes=0,
            img_size=img_size,
            dynamic_img_size=True
        )
        d = self.backbone.num_features

        # freeze all
        for p in self.backbone.parameters():
            p.requires_grad = False

        # unfreeze last N blocks (if available)
        if hasattr(self.backbone, "blocks"):
            n_blocks = len(self.backbone.blocks)
            k = max(1, min(int(unfreeze_blocks), n_blocks))
            for blk in self.backbone.blocks[-k:]:
                for p in blk.parameters():
                    p.requires_grad = True

        # unfreeze norm (if available)
        if hasattr(self.backbone, "norm"):
            for p in self.backbone.norm.parameters():
                p.requires_grad = True

        # custom classifier head
        self.head = nn.Sequential(
            nn.Linear(d, head_hidden),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(head_hidden, num_classes)
        )

    def forward(self, x):
        f = self.backbone(x)
        return self.head(f)

def objective_custom_vit(trial: optuna.Trial):
    lr = trial.suggest_float("lr", 1e-5, 6e-4, log=True)
    wd = trial.suggest_float("weight_decay", 1e-6, 5e-3, log=True)
    ls = trial.suggest_float("label_smoothing", 0.0, 0.12)

    head_hidden = trial.suggest_categorical("head_hidden", [256, 512, 768, 1024])
    dropout = trial.suggest_float("dropout", 0.1, 0.5)
    unfreeze_blocks = trial.suggest_int("unfreeze_blocks", 2, 10)

    img_size = trial.suggest_categorical("img_size", [224, 384])
    bs = trial.suggest_categorical("batch_size", [16, 32])

    backbone = vit_backbone  # e.g. "vit_base_patch16_224"

    # loaders use your transforms with chosen img_size
    train_loader, val_loader, _, _, _, _, _ = make_loaders(backbone, img_size, bs)

    w = make_class_weight_tensor()
    criterion = nn.CrossEntropyLoss(weight=w, label_smoothing=ls)

    # IMPORTANT: pass img_size into CustomViT
    model = CustomViT(
        backbone_name=backbone,
        img_size=img_size,
        num_classes=num_classes,
        head_hidden=head_hidden,
        dropout=dropout,
        unfreeze_blocks=unfreeze_blocks
    )

    model, hist = train_model(
        model, train_loader, val_loader,
        criterion=criterion,
        epochs=7,
        lr=lr,
        weight_decay=wd,
        patience=3,
        model_name="tune_custom_vit"
    )
    return max(h["val_acc"] for h in hist)

study_custom = optuna.create_study(direction="maximize")
study_custom.optimize(objective_custom_vit, n_trials=CFG.n_trials_custom_vit)

print("Custom ViT best params:", study_custom.best_params)
print("Custom ViT best val acc:", study_custom.best_value)

custom_params = study_custom.best_params


In [None]:
# Evaluate Matrics

backbone = vit_backbone
img_size = custom_params["img_size"]
bs = custom_params["batch_size"]

train_loader, val_loader, test_loader, data_cfg, _, _, _ = make_loaders(backbone, img_size, bs)

w = make_class_weight_tensor()
criterion = nn.CrossEntropyLoss(weight=w, label_smoothing=custom_params["label_smoothing"])

custom_vit_final = CustomViT(
    backbone_name=backbone,
    img_size=img_size,
    num_classes=num_classes,
    head_hidden=custom_params["head_hidden"],
    dropout=custom_params["dropout"],
    unfreeze_blocks=custom_params["unfreeze_blocks"]
)

custom_vit_final, hist_custom_final = train_model(
    custom_vit_final,
    train_loader, val_loader,
    criterion=criterion,
    epochs=CFG.epochs_full,
    lr=custom_params["lr"],
    weight_decay=custom_params["weight_decay"],
    patience=CFG.patience,
    model_name="FINAL_CustomViT"
)

plot_history(hist_custom_final, "FINAL Custom ViT (tuned)")

train_m, y_true, y_pred = evaluate(custom_vit_final, train_loader, criterion)
print("Custom ViT:", train_m)
print(classification_report(y_true, y_pred, target_names=classes, digits=4))

@torch.no_grad()
def tta_eval(model, loader):
    model.eval()
    all_y, all_p = [], []
    for x, y in loader:
        x = x.to(device)
        with autocast(device_type="cuda" if device=="cuda" else "cpu"):
            logits1 = model(x)
            logits2 = model(torch.flip(x, dims=[3]))
            logits = (logits1 + logits2) / 2.0
        preds = logits.argmax(1).cpu().numpy()
        all_p.append(preds)
        all_y.append(y.numpy())
    y_true = np.concatenate(all_y)
    y_pred = np.concatenate(all_p)
    return accuracy_score(y_true, y_pred), f1_score(y_true, y_pred, average="macro")

tta_acc, tta_f1 = tta_eval(custom_vit_final, train_loader)
print("Custom ViT Test acc:", tta_acc)
print("Custom ViT macro F1:", tta_f1)

In [None]:
from sklearn.metrics import confusion_matrix
import numpy as np
import matplotlib.pyplot as plt

def plot_confusion_matrix(cm, class_names, title="Confusion Matrix", normalize=False, max_classes_to_show=None):
    cm = cm.astype(np.float32)

    if normalize:
        cm = cm / (cm.sum(axis=1, keepdims=True) + 1e-8)

    if max_classes_to_show is not None and cm.shape[0] > max_classes_to_show:
        cm = cm[:max_classes_to_show, :max_classes_to_show]
        class_names = class_names[:max_classes_to_show]

    plt.figure(figsize=(10, 8))
    plt.imshow(cm, interpolation="nearest")
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(class_names))
    plt.xticks(tick_marks, class_names, rotation=90, fontsize=8)
    plt.yticks(tick_marks, class_names, fontsize=8)
    plt.ylabel("True Label")
    plt.xlabel("Predicted Label")
    plt.tight_layout()
    plt.show()

@torch.no_grad()
def get_preds(model, loader):
    model.eval()
    all_y, all_p = [], []
    for x, y in loader:
        x = x.to(device)
        with autocast(device_type="cuda" if device=="cuda" else "cpu"):
            logits = model(x)
        preds = logits.argmax(1).cpu().numpy()
        all_p.append(preds)
        all_y.append(y.numpy())
    y_true = np.concatenate(all_y)
    y_pred = np.concatenate(all_p)
    return y_true, y_pred

@torch.no_grad()
def tta_eval_preds(model, loader):
    model.eval()
    all_y, all_p = [], []
    for x, y in loader:
        x = x.to(device)
        with autocast(device_type="cuda" if device=="cuda" else "cpu"):
            logits1 = model(x)
            logits2 = model(torch.flip(x, dims=[3]))
            logits = (logits1 + logits2) / 2.0
        preds = logits.argmax(1).cpu().numpy()
        all_p.append(preds)
        all_y.append(y.numpy())
    y_true = np.concatenate(all_y)
    y_pred = np.concatenate(all_p)
    return y_true, y_pred

ytr_true, ytr_pred = get_preds(custom_vit_final, train_loader)
cm_train = confusion_matrix(ytr_true, ytr_pred, labels=list(range(num_classes)))
plot_confusion_matrix(cm_train, classes, title="Confusion Matrix", normalize=False)
plot_confusion_matrix(cm_train, classes, title="Confusion Matrix (normalized)", normalize=True)

print("Confusion matrices generated")


In [None]:
# Generates Integrated Gradients heatmaps to explain CustomViT predictions.

custom_vit_final.eval()
ig = IntegratedGradients(custom_vit_final)

samples_to_show = 4
idxs = np.random.choice(range(len(test_loader.dataset)), size=min(samples_to_show, len(test_loader.dataset)), replace=False)

plt.figure(figsize=(12, 3*samples_to_show))

for i, idx in enumerate(idxs, start=1):
    x, y = test_loader.dataset[idx]
    x_in = x.unsqueeze(0).to(device)

    with torch.no_grad():
        pred = custom_vit_final(x_in).argmax(1).item()

    attr = ig.attribute(x_in, target=pred, n_steps=16)
    attr = attr.abs().mean(dim=1).squeeze().detach().cpu().numpy()
    attr = (attr - attr.min()) / (attr.max() - attr.min() + 1e-8)

    img = denorm_tensor(x, data_cfg["mean"], data_cfg["std"]).permute(1,2,0).numpy()
    heat = cv2.applyColorMap((attr*255).astype(np.uint8), cv2.COLORMAP_JET)
    heat = cv2.cvtColor(heat, cv2.COLOR_BGR2RGB) / 255.0
    overlay = (0.55*img + 0.45*heat).clip(0,1)

    plt.subplot(samples_to_show, 3, (i-1)*3 + 1)
    plt.imshow(img)
    plt.title(f"Raw | True: {classes[y]}")
    plt.axis("off")

    plt.subplot(samples_to_show, 3, (i-1)*3 + 2)
    plt.imshow(attr, cmap="jet")
    plt.title("IG heatmap")
    plt.axis("off")

    plt.subplot(samples_to_show, 3, (i-1)*3 + 3)
    plt.imshow(overlay)
    plt.title(f"Overlay | Pred: {classes[pred]}")
    plt.axis("off")

plt.tight_layout()
plt.show()


In [None]:
# XAI (LIME Setup)

!pip -q install lime scikit-image

import numpy as np
import matplotlib.pyplot as plt

from lime import lime_image
from skimage.segmentation import slic, mark_boundaries
from PIL import Image

lime_eval_tfm, _ = build_transforms(backbone, img_size, is_train=False)

def get_raw_test_sample(i: int):
    subset = test_loader.dataset
    img_path, y = subset.dataset.samples[subset.indices[i]]
    img = Image.open(img_path).convert("RGB")
    img_np = np.array(img, dtype=np.uint8)
    return img_np, int(y), img_path

@torch.no_grad()
def lime_predict_proba(images: List[np.ndarray]) -> np.ndarray:
    batch = []
    for img_np in images:
        pil = Image.fromarray(img_np.astype(np.uint8))
        x = lime_eval_tfm(pil)
        batch.append(x)
    batch = torch.stack(batch, dim=0).to(device)

    custom_vit_final.eval()
    with autocast(device_type="cuda" if device=="cuda" else "cpu"):
        logits = custom_vit_final(batch)
        probs = torch.softmax(logits, dim=1)

    return probs.detach().cpu().numpy()

lime_explainer = lime_image.LimeImageExplainer()

def lime_segmenter(x):
    return slic(x, n_segments=120, compactness=10, sigma=1)


In [None]:
# Run LIME explanations on images

samples_to_show = 4
test_n = len(test_loader.dataset)
idxs = np.random.choice(range(test_n), size=min(samples_to_show, test_n), replace=False)

plt.figure(figsize=(14, 4 * len(idxs)))

for row_i, idx in enumerate(idxs, start=1):
    img_np, y_true, img_path = get_raw_test_sample(idx)

    probs = lime_predict_proba([img_np])[0]
    y_pred = int(np.argmax(probs))
    conf = float(probs[y_pred])

    explanation = lime_explainer.explain_instance(
        image=img_np,
        classifier_fn=lime_predict_proba,
        top_labels=3,
        hide_color=0,
        num_samples=700,
        segmentation_fn=lime_segmenter
    )

    temp, mask = explanation.get_image_and_mask(
        label=y_pred,
        positive_only=True,
        num_features=12,
        hide_rest=False
    )

    plt.subplot(len(idxs), 3, (row_i-1)*3 + 1)
    plt.imshow(img_np)
    plt.title(f"Raw\nTrue: {classes[y_true]}\nPred: {classes[y_pred]} ({conf:.2f})")
    plt.axis("off")

    plt.subplot(len(idxs), 3, (row_i-1)*3 + 2)
    plt.imshow(mark_boundaries(temp / 255.0, mask))
    plt.title("LIME (positive regions)")
    plt.axis("off")

    topk = 5
    top_idx = np.argsort(probs)[::-1][:topk]
    top_probs = probs[top_idx]
    top_names = [classes[i] for i in top_idx]

    plt.subplot(len(idxs), 3, (row_i-1)*3 + 3)
    plt.barh(top_names[::-1], top_probs[::-1])
    plt.title("Top predictions")
    plt.xlabel("Probability")
    plt.tight_layout()

plt.show()


In [None]:
import os, json, time
import numpy as np
import pandas as pd
import torch
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report

save_root = "/content/drive/MyDrive/ml-project2/snake_research_runs"
run_name = time.strftime("run_%Y%m%d_%H%M%S")
save_dir = os.path.join(save_root, run_name)
os.makedirs(save_dir, exist_ok=True)

print("Saving to:", save_dir)

def save_json(obj, path):
    with open(path, "w") as f:
        json.dump(obj, f, indent=2)

def save_df(df, path):
    df.to_csv(path, index=False)

def save_text(text, path):
    with open(path, "w") as f:
        f.write(text)

def save_fig(name):
    plt.savefig(os.path.join(save_dir, f"{name}.png"), dpi=200, bbox_inches="tight")

def safe_torch_save(state_dict, name):
    torch.save(state_dict, os.path.join(save_dir, name))

np.save(os.path.join(save_dir, "train_idx.npy"), np.array(train_idx))
np.save(os.path.join(save_dir, "val_idx.npy"), np.array(val_idx))
np.save(os.path.join(save_dir, "test_idx.npy"), np.array(test_idx))

label_map = {"class_to_idx": base_ds.class_to_idx, "idx_to_class": {v:k for k,v in base_ds.class_to_idx.items()}}
save_json(label_map, os.path.join(save_dir, "label_map.json"))

if "hist_mlp_cnn" in globals():
    save_df(pd.DataFrame(hist_mlp_cnn), os.path.join(save_dir, "hist_mlp_cnn.csv"))
if "hist_mlp_hybrid" in globals():
    save_df(pd.DataFrame(hist_mlp_hybrid), os.path.join(save_dir, "hist_mlp_hybrid.csv"))
if "hist_vit" in globals():
    save_df(pd.DataFrame(hist_vit), os.path.join(save_dir, "hist_vit.csv"))
if "hist_final_mlp" in globals():
    save_df(pd.DataFrame(hist_final_mlp), os.path.join(save_dir, "hist_final_mlp.csv"))
if "hist_final_vit" in globals():
    save_df(pd.DataFrame(hist_final_vit), os.path.join(save_dir, "hist_final_vit.csv"))
if "hist_custom_final" in globals():
    save_df(pd.DataFrame(hist_custom_final), os.path.join(save_dir, "hist_custom_vit.csv"))
if "hist_custom_final" not in globals() and "hist_custom_final" in locals():
    save_df(pd.DataFrame(hist_custom_final), os.path.join(save_dir, "hist_custom_vit.csv"))

# Feature MLPs
if "mlp_cnn" in globals():
    safe_torch_save(mlp_cnn.state_dict(), "mlp_cnnfeat.pth")
if "mlp_hybrid" in globals():
    safe_torch_save(mlp_hybrid.state_dict(), "mlp_hybridfeat.pth")

# ViT baseline
if "vit_model" in globals():
    safe_torch_save(vit_model.state_dict(), "vit_end2end_quick.pth")
if "vit_final" in globals():
    safe_torch_save(vit_final.state_dict(), "vit_final_tuned.pth")

# Custom ViT final
if "custom_vit_final" in globals():
    safe_torch_save(custom_vit_final.state_dict(), "custom_vit_final.pth")

artifacts = {
    "classes": classes,
    "cfg": CFG.__dict__,
    "best_model_type": best_type if "best_type" in globals() else None,
    "best_model_tuned_params": best_params if "best_params" in globals() else None,
    "custom_vit_params": custom_params if "custom_params" in globals() else None,
}

if "test_m" in globals():
    artifacts["custom_vit_test"] = {
        "acc": float(test_m.get("acc", -1)),
        "macro_f1": float(test_m.get("f1_macro", -1)),
        "loss": float(test_m.get("loss", -1))
    }
if "tta_acc" in globals():
    artifacts["custom_vit_test_tta"] = {"acc": float(tta_acc), "macro_f1": float(tta_f1)}

save_json(artifacts, os.path.join(save_dir, "run_artifacts.json"))

if "yt_true" in globals() and "yt_pred" in globals():
    rep = classification_report(yt_true, yt_pred, target_names=classes, digits=4)
    save_text(rep, os.path.join(save_dir, "classification_report_test.txt"))

    cm = confusion_matrix(yt_true, yt_pred)
    np.save(os.path.join(save_dir, "confusion_matrix_test.npy"), cm)

    plt.figure(figsize=(10,8))
    plt.imshow(cm)
    plt.title("Confusion Matrix (Test)")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.colorbar()
    plt.tight_layout()
    save_fig("confusion_matrix_test")
    plt.close()

if "Xtr_cnn" in globals():
    np.save(os.path.join(save_dir, "Xtr_cnn.npy"), Xtr_cnn)
    np.save(os.path.join(save_dir, "Xva_cnn.npy"), Xva_cnn)
    np.save(os.path.join(save_dir, "Xte_cnn.npy"), Xte_cnn)
if "Xtr_h" in globals():
    np.save(os.path.join(save_dir, "Xtr_hybrid.npy"), Xtr_h)
    np.save(os.path.join(save_dir, "Xva_hybrid.npy"), Xva_h)
    np.save(os.path.join(save_dir, "Xte_hybrid.npy"), Xte_h)

print("Saved everything into:", save_dir)
