## VS CODE

In [1]:
import os
import gc
import pandas as pd
import torch
import numpy as np
from sklearn.model_selection import StratifiedKFold
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
import timm
from timm import create_model
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from torch.amp import autocast, GradScaler

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# === Configurations ===
ROOT_DIR = Path("C:/Users/rsriram3/Documents/ind_study")
OUTPUT_DIR = ROOT_DIR / "data"
# CSV_PATH = OUTPUT_DIR / "full_augmented_images_metrics.csv"
CSV_PATH = ROOT_DIR / "all_image_metrics.csv"
CHECKPOINT_DIR = OUTPUT_DIR / "best_checkpoints"
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)
# IMAGE_ROOT = ROOT_DIR / "images"
NUM_CLASSES = 2
IMAGE_SIZE = 224
BATCH_SIZE = 32
EPOCHS = 15
PATIENCE = 3
NUM_FOLDS = 5
USE_MIXUP = True
ALPHA_MIXUP = 0.4
TEMPERATURE = 2.0
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"


In [3]:
# === Transforms ===
transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [4]:
# # === Dataset with optional aux features ===
# class SharedHeadDataset(Dataset):
#     def __init__(self, df, use_aux=True):
#         self.df = df.reset_index(drop=True)
#         self.use_aux = use_aux

#     def __len__(self):
#         return len(self.df)

#     def __getitem__(self, idx):
#         row = self.df.iloc[idx]
#         image_path = (ROOT_DIR / row['image']).resolve()
#         image = Image.open(image_path).convert("RGB")
#         image = transform(image)
#         label = torch.tensor(row['label'], dtype=torch.long)

#         if self.use_aux:
#             aux = torch.tensor([row['brightness'], row['edge_density'], row['entropy']], dtype=torch.float32)
#             return image, aux, label
#         else:
#             return image, label
        
# === Dataset ===
class SharedHeadDataset(torch.utils.data.Dataset):
    def __init__(self, csv_path, root_dir):
        self.data = pd.read_csv(csv_path)
        self.root_dir = Path(root_dir)
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
        ])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        img_path = self.root_dir / row['image']
        ## ENDURING ell images are in RGB
        image = Image.open(img_path).convert("RGB")
        image = self.transform(image)

        aux = torch.tensor([
            row['brightness'], 
            row['edge_density'], 
            row['entropy']
        ], dtype=torch.float32)
        
        label = torch.tensor(row['label'], dtype=torch.long)
        return image, aux, label

In [5]:
# === Collate function to unify batch ===
def collate_fun(batch):
    images, auxs, labels = zip(*batch)
    return torch.stack(images), torch.stack(auxs), torch.tensor(labels)

In [6]:
# === Mixup ===
def mixup_data(x, aux, y, alpha=ALPHA_MIXUP):
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1
    batch_size = x.size()[0]
    index = torch.randperm(batch_size).to(x.device)

    mixed_x = lam * x + (1 - lam) * x[index, :]
    mixed_aux = lam * aux + (1 - lam) * aux[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, mixed_aux, y_a, y_b, lam

In [7]:
def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

In [8]:
# # === Extended Model === OLDER VERSION -> WITH FEWER LAYERS
# class ExtendedModel(nn.Module):
#     def __init__(self, backbone, num_classes=NUM_CLASSES):
#         super().__init__()
#         self.backbone = create_model(backbone, pretrained=True, num_classes=0, global_pool="avg")
#         self.head = nn.Sequential(
#             # nn.Linear(backbone.num_features + 3, 512),
#             nn.Linear(backbone.num_features + 3, 256),
#             nn.ReLU(),
#             nn.Dropout(0.3),
#             # nn.Linear(512, num_classes)
#             nn.Linear(256, num_classes)
#         )
#         # self.backbone.reset_classifier(0)

#     def forward(self, x, aux):
#         features = self.backbone(x)
#         combined = torch.cat([features, aux], dim=1)
#         return self.head(combined)

In [None]:
# === Extended Model === NEWER VERSION -> WITH ADDED LAYERS
class ExtendedModel(nn.Module):
    def __init__(self, backbone, num_classes=NUM_CLASSES):
        super().__init__()
        self.backbone = create_model(backbone, pretrained=True, num_classes=0, global_pool="avg")
        self.aux_head = nn.Sequential(
            nn.Linear(3, 32), # (3, 16)
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(), # (16, 16)
        )
        self.head = nn.Sequential(
            nn.LayerNorm(self.backbone.num_features + 16), ## earlier - BatchNorm1d(self.backbone.num_features + 16),
            nn.Linear(self.backbone.num_features + 16, 256), ## earlier - nn.Linear(self.backbone.num_features + 3, 256),
            nn.ReLU(),
            nn.Dropout(0.4), ## Earlier - 0.3
            nn.Linear(256, 64), ## Earlier - (512, 256)
            nn.ReLU(),
            nn.Dropout(0.4), ## Earlier - 0.3
            nn.Linear(64, 2) ## Earlier - (256, 2)
        )

    def forward(self, x, aux):
        features = self.backbone(x)
        aux_embedding = self.aux_head(aux)
        combined = torch.cat([features, aux_embedding], dim=1)
        return self.head(combined)

In [10]:
# === Evaluation ===
def evaluate_model(model, dataloader):
    model.eval()
    all_preds, all_labels = [], []
    
    with torch.no_grad():
        for images, aux, labels in dataloader:
            images, aux, labels = images.to(DEVICE), aux.to(DEVICE), labels.to(DEVICE)
            outputs = model(images, aux)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    cm = confusion_matrix(all_labels, all_preds)

    print("\n[ Classification Report: ]")
    print(classification_report(all_labels, all_preds, digits=4))

    return acc, f1, cm, all_preds, all_labels

In [11]:
def plot_metrics(train_accs, val_accs, model_name, save_path):
    save_path.mkdir(parents=True, exist_ok=True)
    plt.figure(figsize=(8, 5))
    plt.plot(train_accs, label="Train Acc")
    plt.plot(val_accs, label="Val Acc")
    plt.title(f"Accuracy vs Epoch - {model_name}")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.grid(True)
    plt.legend()
    plt.savefig(save_path / f"{model_name}_accuracy.png")
    plt.close()

In [12]:
# === Plot Confusion Matrix ===
def plot_confusion_matrix(cm, model_name):
    plt.figure(figsize=(6, 5))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.title(f"Confusion Matrix: {model_name}")
    plt.show()

In [13]:
## NEWER VERSION OF EARLY STOPPING
class EarlyStopping:
    def __init__(self, patience=3, delta=0.001, verbose=True):
        self.patience = patience
        self.delta = delta
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = float('inf')
        self.verbose = verbose

    def __call__(self, val_loss, model, path):
        score = -val_loss
        if self.best_score is None or score > self.best_score + self.delta:
            self.best_score = score
            self.save_checkpoint(val_loss, model, path)
            self.counter = 0
        else:
            self.counter += 1
            if self.verbose:
                print(f"EarlyStopping: {self.counter}/{self.patience}")
            if self.counter >= self.patience:
                self.early_stop = True

    def save_checkpoint(self, val_loss, model, path):
        torch.save(model.state_dict(), path)
        self.val_loss_min = val_loss

## OLDER VERSION OF EARLY STOPPING

# class EarlyStopping:
#     def __init__(self, patience=5, verbose=False):
#         self.patience = patience
#         self.verbose = verbose
#         self.counter = 0
#         self.best_score = None
#         self.early_stop = False

#     def __call__(self, val_acc):
#         if self.best_score is None or val_acc > self.best_score:
#             self.best_score = val_acc
#             self.counter = 0
#         else:
#             self.counter += 1
#             if self.verbose:
#                 print(f"EarlyStopping counter: {self.counter} / {self.patience}")
#             if self.counter >= self.patience:
#                 self.early_stop = True

In [None]:
# === Training ===
def train_model(model_name, train_csv, val_csv, root_dir, save_path):
    train_ds = SharedHeadDataset(train_csv, root_dir)
    val_ds = SharedHeadDataset(val_csv, root_dir)
    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fun)
    val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, collate_fn=collate_fun)

    model = ExtendedModel(model_name).to(DEVICE)
    # optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
    optimizer = torch.optim.AdamW(model.parameters(), lr=3e-5, weight_decay=5e-3) ## try 2e-3 next
    scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=3, T_mult=1)
    scaler = GradScaler()

    early_stopper = EarlyStopping(patience=3, delta=0.001, verbose=True)
    early_stop_path = CHECKPOINT_DIR / f"{model_name}_earlystop_best.pth"

    train_accs, val_accs, train_losses, val_losses = [], [], [], []

    for epoch in range(EPOCHS):
        model.train()
        correct, total = 0, 0
        total_train_loss = 0.0
        progress = tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS}")

        for images, aux, labels in progress:
            images, aux, labels = images.to(DEVICE), aux.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()

            if USE_MIXUP:
                images, aux_features, targets_a, targets_b, lam = mixup_data(images, aux, labels, ALPHA_MIXUP)
                mixup_mode = True
            else:
                aux_features = aux
                mixup_mode = False

            with autocast(device_type=DEVICE):
                outputs = model(images, aux_features)
                if mixup_mode:
                    loss = mixup_criterion(F.cross_entropy, outputs, targets_a, targets_b, lam)
                else:
                    loss = F.cross_entropy(outputs, labels, label_smoothing=0.1)  ## before - no label_smoothing

            total_train_loss += loss.item()
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            preds = torch.argmax(outputs, dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
            progress.set_postfix(loss=loss.item(), acc=correct / total)

        train_acc = correct / total
        train_loss = total_train_loss / len(train_loader)

        # === Validation loss ===
        model.eval()
        total_val_loss = 0.0
        with torch.no_grad():
            for images, aux, labels in val_loader:
                images, aux, labels = images.to(DEVICE), aux.to(DEVICE), labels.to(DEVICE)
                outputs = model(images, aux)
                total_val_loss += F.cross_entropy(outputs, labels).item()
        val_loss = total_val_loss / len(val_loader)

        val_acc, _, _, _, _ = evaluate_model(model, val_loader)

        # Record metrics
        train_accs.append(train_acc)
        val_accs.append(val_acc)
        train_losses.append(train_loss)
        val_losses.append(val_loss)

        # Early stopping check
        early_stopper(val_loss, model, early_stop_path)
        if early_stopper.early_stop:
            print("Early stopping triggered.")
            break

        scheduler.step(epoch)
        gc.collect()

    # Save last state and reload best if early stopped
    torch.save(model.state_dict(), save_path)
    if early_stopper.early_stop:
        model.load_state_dict(torch.load(early_stop_path))

    return model, train_accs, val_accs, train_losses, val_losses


### PREVIOUS TRAINING PIPELINE

In [15]:
# # === Model Training Function - Early stopping===

# ## avoid if using newer pipeline

# def train_model(model_name, train_csv, val_csv, root_dir, model_save_path):
#     train_df = pd.read_csv(train_csv)
#     val_df = pd.read_csv(val_csv)

#     train_dataset = SharedHeadDataset(train_df, use_aux=True)
#     val_dataset = SharedHeadDataset(val_df, use_aux=True)

#     train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
#     val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)

#     base_model = timm.create_model(model_name, pretrained=True, num_classes=0)
#     model = ExtendedModel(base_model, num_classes=NUM_CLASSES).to(DEVICE)
#     optimizer = optim.AdamW(model.parameters(), lr=3e-4)
#     scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)
#     criterion = nn.CrossEntropyLoss()

#     early_stopper = EarlyStopping(patience=PATIENCE, verbose=True)

#     train_accs, val_accs = [], []
#     best_val_acc = 0.0

#     print("Epoch\tTrain Acc\tVal Acc\tVal F1")
#     for epoch in range(EPOCHS):
#         model.train()
#         correct, total = 0, 0
#         for images, aux, labels in tqdm(train_loader):
#             images, aux, labels = images.to(DEVICE), aux.to(DEVICE), labels.to(DEVICE)
#             optimizer.zero_grad()
#             mixed_x, mixed_aux, y_a, y_b, lam = mixup_data(images, aux, labels)
#             outputs = model(mixed_x, mixed_aux)
#             loss = mixup_criterion(criterion, outputs, y_a, y_b, lam)
#             loss.backward()
#             optimizer.step()
#             _, preds = torch.max(outputs, 1)
#             total += labels.size(0)
#             correct += (lam * preds.eq(y_a).sum().item() + (1 - lam) * preds.eq(y_b).sum().item())

#         train_acc = correct / total
#         val_acc, val_f1, val_cm = evaluate_model(model, val_loader)
#         train_accs.append(train_acc)
#         val_accs.append(val_acc)
#         scheduler.step()

#         print(f"{epoch+1}\t{train_acc:.4f}\t{val_acc:.4f}\t{val_f1:.4f}")

#         if val_acc > best_val_acc:
#             best_val_acc = val_acc
#             torch.save(model.state_dict(), model_save_path)

#         early_stopper(val_acc)
#         if early_stopper.early_stop:
#             print("Early stopping triggered.")
#             break

#     print("Best Val Accuracy:", best_val_acc)
#     return model, train_accs, val_accs

In [None]:
# model_configs = [
#     ("swin_small_patch4_window7_224.ms_in1k", "swin_model.pth"),
#     ("coatnet_1_rw_224.sw_in1k", "coatnet_model.pth"),
#     ("convnext_small.fb_in1k", "convnext_model.pth"),
#     ("tiny_vit_5m_224.dist_in22k_ft_in1k", "tiny_vit_model.pth"),
#     ("edgenext_xx_small.in1k", "edgenext_xx_model.pth")
# ]

In [23]:
model_configs = [
    ("swin_small_patch4_window7_224.ms_in1k", "swin_model.pth"), ## Swin-Small Transformer
    ("coatnet_1_rw_224.sw_in1k", "coatnet_model.pth"), ## CoatNet-1
    ("convnext_small.fb_in1k", "convnext_model.pth"), ## ConvNext-Small
    ("tiny_vit_5m_224.dist_in22k_ft_in1k", "tiny_vit_model.pth"), ## Tiny-ViT-5M
    ("edgenext_xx_small.in1k", "edgenext_xx_model.pth"), ## EdgeNext-XX-Small
    
    ("mobileone_s0.apple_in1k", "mobileone_model.pth"), ## MobileOne-S0
    ("lcnet_050.ra2_in1k ", "lcnet_model.pth"), ## LCNet-050
    ("tinynet_a.in1k", "tinynetA_model.pth"), ## TinyNet-A0
    ("ghostnetv2_100.in1k", "ghostnet_model.pth"), ## GhostNetV2-100
    ("mobilevitv2_050.cvnets_in1k", "mobilevitv2_model.pth"), ## MobileViT-V2-050
    
]

In [17]:
# Find all rows with any NA values
df = pd.read_csv(CSV_PATH)
rows_with_na = df[df.isnull().any(axis=1)]
print(rows_with_na)

                                                   image  label  brightness  \
14222  ShanghaiTech Data\SHHA\images\0060_trivialaug2...      1         NaN   

       edge_density  entropy  
14222           NaN      NaN  


In [18]:
# === Load & Clean CSV ===
print(df.shape)
df = df.dropna(subset=['image', 'label', 'brightness', 'edge_density', 'entropy']).reset_index(drop=True)
print(df.shape)

(18775, 5)
(18774, 5)


In [19]:
# === Cross-validation ===
skf = StratifiedKFold(n_splits=NUM_FOLDS, shuffle=True, random_state=42)

## tiny_vit model

In [None]:
model_id = 3
model_name = model_configs[model_id][0]
history = model_configs[model_id][1]
save_path = ROOT_DIR / "models" / history.split('.')[0]
save_path.mkdir(parents=True, exist_ok=True)
plot_save_path = Path("C:/Users/rsriram3/Documents/ind_study/test-IIM") / "figures" / "shared_head_figures" / history.split('.')[0]
plot_save_path.mkdir(parents=True, exist_ok=True)

In [21]:
# Create folds directory
FOLDS_DIR = OUTPUT_DIR / "folds"
FOLDS_DIR.mkdir(parents=True, exist_ok=True)

In [None]:
for fold, (train_idx, val_idx) in enumerate(skf.split(df, df['label'])):
    print(f"\n Fold {fold + 1}")

    # Split and save
    train_df = df.iloc[train_idx].reset_index(drop=True)
    val_df = df.iloc[val_idx].reset_index(drop=True)

    train_fold_path = FOLDS_DIR / f"{history}_train_fold_{fold+1}.csv"
    val_fold_path = FOLDS_DIR / f"{history}_val_fold_{fold+1}.csv"
    train_df.to_csv(train_fold_path, index=False)
    val_df.to_csv(val_fold_path, index=False)

    # Train and save model
    model_save_path = save_path / f"{model_name}_fold{fold+1}.pth"
    model, train_accs, val_accs, train_losses, val_losses = train_model(
        model_name, train_fold_path, val_fold_path, ROOT_DIR, model_save_path
    )

    # Plot
    plot_metrics(train_accs, val_accs, f"{model_name}_fold{fold+1}", plot_save_path)

# for fold, (train_idx, val_idx) in enumerate(skf.split(df, df['label'])):
#     print(f"\n Fold {fold + 1} ")
#     train_df = df.iloc[train_idx]
#     val_df = df.iloc[val_idx]

#     train_fold_path = FOLDS_DIR / f"{history}_train_fold_{fold+1}.csv"
#     val_fold_path = FOLDS_DIR / f"{history}_val_fold_{fold+1}.csv"
#     train_df.to_csv(train_fold_path, index=False)
#     val_df.to_csv(val_fold_path, index=False)

#     model_save_path = save_path / f"{model_name}_fold{fold+1}.pth"
#     model, train_accs, val_accs = train_model(model_name, train_fold_path, val_fold_path, ROOT_DIR, model_save_path)
#     plot_metrics(train_accs, val_accs, f"{model_name}_fold{fold+1}", plot_save_path)


## mobileone_s0 model

In [24]:
model_id = 5 # ## mobileone_s0
model_name = model_configs[model_id][0]
history = model_configs[model_id][1]
save_path = ROOT_DIR / "models" / history.split('.')[0]
save_path.mkdir(parents=True, exist_ok=True)
plot_save_path = Path("C:/Users/rsriram3/Documents/ind_study/test-IIM") / "figures" / "shared_head_figures" / history.split('.')[0]
plot_save_path.mkdir(parents=True, exist_ok=True)

In [25]:
for fold, (train_idx, val_idx) in enumerate(skf.split(df, df['label'])):
    print(f"\n Fold {fold + 1}")

    # Split and save
    train_df = df.iloc[train_idx].reset_index(drop=True)
    val_df = df.iloc[val_idx].reset_index(drop=True)

    train_fold_path = FOLDS_DIR / f"{history}_train_fold_{fold+1}.csv"
    val_fold_path = FOLDS_DIR / f"{history}_val_fold_{fold+1}.csv"
    train_df.to_csv(train_fold_path, index=False)
    val_df.to_csv(val_fold_path, index=False)

    # Train and save model
    model_save_path = save_path / f"{model_name}_fold{fold+1}.pth"
    model, train_accs, val_accs, train_losses, val_losses = train_model(
        model_name, train_fold_path, val_fold_path, ROOT_DIR, model_save_path
    )

    # Plot
    plot_metrics(train_accs, val_accs, f"{model_name}_fold{fold+1}", plot_save_path)

# for fold, (train_idx, val_idx) in enumerate(skf.split(df, df['label'])):
#     print(f"\n Fold {fold + 1} ")
#     train_df = df.iloc[train_idx]
#     val_df = df.iloc[val_idx]

#     train_fold_path = FOLDS_DIR / f"{history}_train_fold_{fold+1}.csv"
#     val_fold_path = FOLDS_DIR / f"{history}_val_fold_{fold+1}.csv"
#     train_df.to_csv(train_fold_path, index=False)
#     val_df.to_csv(val_fold_path, index=False)

#     model_save_path = save_path / f"{model_name}_fold{fold+1}.pth"
#     model, train_accs, val_accs = train_model(model_name, train_fold_path, val_fold_path, ROOT_DIR, model_save_path)
#     plot_metrics(train_accs, val_accs, f"{model_name}_fold{fold+1}", plot_save_path)



 Fold 1


To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development
Epoch 1/15: 100%|██████████| 470/470 [05:42<00:00,  1.37it/s, acc=0.837, loss=0.263] 



[ Classification Report: ]
              precision    recall  f1-score   support

           0     1.0000    1.0000    1.0000      2797
           1     1.0000    1.0000    1.0000       958

    accuracy                         1.0000      3755
   macro avg     1.0000    1.0000    1.0000      3755
weighted avg     1.0000    1.0000    1.0000      3755



Epoch 2/15: 100%|██████████| 470/470 [04:19<00:00,  1.81it/s, acc=0.818, loss=0.115]  



[ Classification Report: ]
              precision    recall  f1-score   support

           0     1.0000    1.0000    1.0000      2797
           1     1.0000    1.0000    1.0000       958

    accuracy                         1.0000      3755
   macro avg     1.0000    1.0000    1.0000      3755
weighted avg     1.0000    1.0000    1.0000      3755

EarlyStopping: 1/3


Epoch 3/15: 100%|██████████| 470/470 [04:19<00:00,  1.81it/s, acc=0.831, loss=0.0202] 



[ Classification Report: ]
              precision    recall  f1-score   support

           0     1.0000    1.0000    1.0000      2797
           1     1.0000    1.0000    1.0000       958

    accuracy                         1.0000      3755
   macro avg     1.0000    1.0000    1.0000      3755
weighted avg     1.0000    1.0000    1.0000      3755

EarlyStopping: 2/3


Epoch 4/15: 100%|██████████| 470/470 [04:21<00:00,  1.80it/s, acc=0.824, loss=0.0818] 



[ Classification Report: ]
              precision    recall  f1-score   support

           0     1.0000    1.0000    1.0000      2797
           1     1.0000    1.0000    1.0000       958

    accuracy                         1.0000      3755
   macro avg     1.0000    1.0000    1.0000      3755
weighted avg     1.0000    1.0000    1.0000      3755

EarlyStopping: 3/3
Early stopping triggered.

 Fold 2


Epoch 1/15:  24%|██▎       | 111/470 [01:02<03:23,  1.77it/s, acc=0.787, loss=0.288] 


KeyboardInterrupt: 

## TESTING CODE SNIPPET

In [None]:
from PIL import Image
import torch
import torchvision.transforms as transforms
from pathlib import Path
import pandas as pd

# === Load model ===
model = ExtendedModel(model_name).to(DEVICE)
model.load_state_dict(torch.load("/content/drive/MyDrive/Research/Independent study/models/checkpoints/tiny_vit_model_fold1.pth"))
model.eval()

# === Define transforms (must match training) ===
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

# === Compute auxiliary features ===
def compute_image_features(image_path):
    from skimage import io, filters, exposure
    from scipy.stats import entropy as scipy_entropy
    import numpy as np

    img = io.imread(image_path)
    gray = img.mean(axis=2) if img.ndim == 3 else img
    edges = filters.sobel(gray)
    edge_density = edges.mean()
    brightness = gray.mean() / 255.0
    hist = exposure.histogram(gray, nbins=256)[0]
    ent = scipy_entropy(hist + 1e-8)
    return brightness, edge_density, ent

# === Prediction function ===
def predict_image(image_path, model):
    image = Image.open(image_path).convert("RGB")
    image_tensor = transform(image).unsqueeze(0).to(DEVICE)
    
    brightness, edge_density, ent = compute_image_features(image_path)
    aux_tensor = torch.tensor([[brightness, edge_density, ent]], dtype=torch.float32).to(DEVICE)
    
    with torch.no_grad():
        output = model(image_tensor, aux_tensor)
        pred = torch.argmax(output, dim=1).item()
    
    label_str = "Vehicle (0)" if pred == 0 else "Crowd (1)"
    print(f"Predicted Label: {pred} → {label_str}")
    return pred

# === Call prediction ===
test_img_path = "/content/drive/MyDrive/Research/Independent study/ShanghaiTech Data/SHHA/images/0001.jpg"
predict_image(test_img_path, model)


### TESTING FOLD - EXTERNAL VALIDATION SET

In [None]:
# Path to external test CSV (not used in fold splitting)
external_test_csv = ROOT_DIR / "data/final_test_set.csv"  # <-- update this path accordingly

# Create folds using only the training dataset (df)
for fold, (train_idx, _) in enumerate(skf.split(df, df['label'])):
    print(f"\n Fold {fold + 1}")

    # Split and save training data (validation is from external_test_csv)
    train_df = df.iloc[train_idx].reset_index(drop=True)

    train_fold_path = FOLDS_DIR / f"{history}_train_fold_{fold+1}.csv"
    val_fold_path = external_test_csv  # Use the same external test set each time

    train_df.to_csv(train_fold_path, index=False)

    # Train and save model using external test set for validation
    model_save_path = save_path / f"{model_name}_fold{fold+1}.pth"
    model, train_accs, val_accs, train_losses, val_losses = train_model(
        model_name, train_fold_path, val_fold_path, ROOT_DIR, model_save_path
    )

    # Plot
    plot_metrics(train_accs, val_accs, f"{model_name}_fold{fold+1}", plot_save_path)