In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!cp /content/drive/MyDrive/LOOCV_2models_VIT/loo_temp_Anthisnes_Chateau_de_Xhos_Camera_1_HIT.zip /content
!unzip /content/loo_temp_Anthisnes_Chateau_de_Xhos_Camera_1_HIT.zip -d /content > /dev/null

In [None]:
import os
import random
import shutil

# Base path
base_path = "/content/loo_temp_Anthisnes_Chateau_de_Xhos_Camera_1_HIT"
train_dir = os.path.join(base_path, "train")
val_dir = os.path.join(base_path, "val")

background_dir = os.path.join(train_dir, "background")
bats_dir = os.path.join(train_dir, "bats")

val_background_dir = os.path.join(val_dir, "background")
val_bats_dir = os.path.join(val_dir, "bats")

# Fixed seed for reproducibility
RANDOM_SEED = 42
random.seed(RANDOM_SEED)

# Gîte names
chosen_gites = [
    'Pont_de_Bousval_Photos_2022_PHOTO',
    'Modave_Camera_3_toiture_PHOTO',
    'Bornival_PHOTO_2023CAM04',
    'Pont_de_Bousval_Photos_2023_PHOTO_WK6HDBOUSVAL',
    'Pont_de_Bousval_Photos_2023_PHOTO_2022CAM12',
    'Pont_de_Bousval_Photos_2023_PHOTO_2023CAM06',
    'Bornival_PHOTO_2023CAM03',
    'Chaumont_Gistoux_Camera_2',
    'Chaumont_Gistoux_Camera_1',
    'Pont_de_Bousval_Photos_2023_PHOTO_2023CAM05',
    #'Anthisnes_Chateau_de_Xhos_Camera_1_HIT',
    'Jenneret_Camera_1_PHOTO',
    'Modave_Camera_plancher_PHOTO'
]

# Randomly select a gîte using fixed seed
held_out_gite = random.choice(chosen_gites)
print(f"📦 Holding out gîte for validation: {held_out_gite}")
print(f"🧪 Reproducible with seed: {RANDOM_SEED}")

# Create val folders
os.makedirs(val_background_dir, exist_ok=True)
os.makedirs(val_bats_dir, exist_ok=True)

# Function to move matching files
def move_files_by_gite(source_dir, target_dir, gite_name):
    moved_count = 0
    for fname in sorted(os.listdir(source_dir)):  # sort to ensure order
        if gite_name in fname:
            shutil.move(os.path.join(source_dir, fname), os.path.join(target_dir, fname))
            moved_count += 1
    return moved_count

# Move files
bkg_moved = move_files_by_gite(background_dir, val_background_dir, held_out_gite)
bats_moved = move_files_by_gite(bats_dir, val_bats_dir, held_out_gite)

print(f"✅ Moved {bkg_moved} background images and {bats_moved} bat images to validation set.")


In [None]:
# bat_finetune_mnv3_t4_with_test.py
"""
Optimized for Colab T4 GPU, with train/val/test splits and final classification report.
Uses the new torch.amp API to avoid deprecation warnings.
"""

from pathlib import Path
import os
import time
import random
import json

import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms as T
from torchvision.models import mobilenet_v3_small, MobileNet_V3_Small_Weights
from sklearn.metrics import precision_score, recall_score, f1_score, classification_report
from tqdm import tqdm

# ---------- User Configuration ----------
DATA_ROOT          = Path("/content/loo_temp_Anthisnes_Chateau_de_Xhos_Camera_1_HIT")
DATA_FRACTION      = 1.0        # fraction of each split to use (0 < fraction ≤ 1)
EPOCHS             = 25
BATCH_SIZE         = 64
LEARNING_RATE      = 1e-4
FREEZE_RATIO       = 0.6       # fraction of backbone layers to freeze
ENABLE_AMP         = True      # automatic mixed precision
OUTPUT_DIR         = Path("outputs")
NUM_WORKERS        = min(4, os.cpu_count() or 1)
PIN_MEMORY         = True
PERSISTENT_WORKERS = True
PREFETCH_FACTOR    = 2
SEED               = 42
# ----------------------------------------

# reproducibility
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.benchmark = True

# prepare output dir
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# device
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}, num_workers={NUM_WORKERS}")

# ImageNet normalization
IMG_MEAN = [0.485, 0.456, 0.406]
IMG_STD  = [0.229, 0.224, 0.225]

# transforms
train_tf = T.Compose([
    T.RandomHorizontalFlip(),
    T.RandomRotation(15),
    T.ColorJitter(0.2, 0.2, 0.2),
    T.Resize((96, 96)),
    T.ToTensor(),
    T.Normalize(IMG_MEAN, IMG_STD),
])
test_tf = T.Compose([
    T.Resize((96, 96)),
    T.ToTensor(),
    T.Normalize(IMG_MEAN, IMG_STD),
])

# datasets
train_ds = datasets.ImageFolder(DATA_ROOT/"train", transform=train_tf)
val_ds   = datasets.ImageFolder(DATA_ROOT/"val",   transform=test_tf)
test_ds  = datasets.ImageFolder(DATA_ROOT/"test",  transform=test_tf)

# optionally subsample
def maybe_subsample(ds, frac):
    if 0 < frac < 1.0:
        n = len(ds)
        keep = random.sample(range(n), max(1, int(n*frac)))
        return Subset(ds, keep)
    return ds

train_ds = maybe_subsample(train_ds, DATA_FRACTION)
val_ds   = maybe_subsample(val_ds,   DATA_FRACTION)
test_ds  = maybe_subsample(test_ds,  DATA_FRACTION)

print(f"▶ train: {len(train_ds)} imgs, val: {len(val_ds)} imgs, test: {len(test_ds)} imgs")

# loaders
train_loader = DataLoader(
    train_ds, batch_size=BATCH_SIZE, shuffle=True,
    num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY,
    persistent_workers=PERSISTENT_WORKERS, prefetch_factor=PREFETCH_FACTOR
)
val_loader = DataLoader(
    val_ds, batch_size=BATCH_SIZE, shuffle=False,
    num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY,
    persistent_workers=PERSISTENT_WORKERS, prefetch_factor=PREFETCH_FACTOR
)
test_loader = DataLoader(
    test_ds, batch_size=BATCH_SIZE, shuffle=False,
    num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY,
    persistent_workers=PERSISTENT_WORKERS, prefetch_factor=PREFETCH_FACTOR
)

# model setup
weights = MobileNet_V3_Small_Weights.IMAGENET1K_V1
model = mobilenet_v3_small(weights=weights)
num_classes = len(train_ds.dataset.classes) if isinstance(train_ds, Subset) else len(train_ds.classes)
in_f = model.classifier[3].in_features
model.classifier[3] = nn.Linear(in_f, num_classes)
model = model.to(DEVICE)

# freeze backbone
backbone = [p for n, p in model.named_parameters() if not n.startswith("classifier")]
num_freeze = int(len(backbone) * FREEZE_RATIO)
for p in backbone[:num_freeze]:
    p.requires_grad = False

# optimizer / scheduler / loss / AMP scaler
optimizer = torch.optim.AdamW(
    filter(lambda p: p.requires_grad, model.parameters()),
    lr=LEARNING_RATE, weight_decay=1e-4
)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)
criterion = nn.CrossEntropyLoss()

from torch import amp
scaler = amp.GradScaler() if ENABLE_AMP else None

# metrics store
best_val_f1 = 0.0
history = {
    "train_loss": [], "train_acc": [],
    "val_loss":   [], "val_acc":   [],
    "val_prec":   [], "val_rec":   [], "val_f1": []
}

def compute_acc(logits, y):
    return (logits.argmax(1) == y).float().mean().item()

# training loop
for epoch in range(1, EPOCHS + 1):
    t0 = time.time()

    # --- train ---
    model.train()
    running_loss = 0.0; running_acc = 0.0; n_train = 0
    for X, y in tqdm(train_loader, desc=f"Epoch {epoch}/{EPOCHS} [Train]"):
        X, y = X.to(DEVICE, non_blocking=True), y.to(DEVICE, non_blocking=True)
        optimizer.zero_grad()
        if ENABLE_AMP:
            with amp.autocast(device_type=DEVICE.type):
                logits = model(X)
                loss   = criterion(logits, y)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            logits = model(X)
            loss   = criterion(logits, y)
            loss.backward()
            optimizer.step()

        bs = y.size(0)
        running_loss += loss.item() * bs
        running_acc  += compute_acc(logits, y) * bs
        n_train      += bs

    # --- validate ---
    model.eval()
    v_loss = 0.0; v_acc = 0.0; n_val = 0
    all_preds, all_targs = [], []
    with torch.no_grad():
        for X, y in tqdm(val_loader, desc=f"Epoch {epoch}/{EPOCHS} [Val]"):
            X, y = X.to(DEVICE, non_blocking=True), y.to(DEVICE, non_blocking=True)
            if ENABLE_AMP:
                with amp.autocast(device_type=DEVICE.type):
                    logits = model(X)
            else:
                logits = model(X)

            loss  = criterion(logits, y)
            preds = logits.argmax(1)

            v_loss += loss.item() * y.size(0)
            v_acc  += (preds == y).sum().item()
            n_val  += y.size(0)
            all_preds.extend(preds.cpu().tolist())
            all_targs.extend(y.cpu().tolist())

    tr_loss, tr_acc = running_loss / n_train, running_acc / n_train
    val_loss, val_acc = v_loss / n_val, v_acc / n_val
    prec = precision_score(all_targs, all_preds, average='binary')
    rec  = recall_score(all_targs, all_preds, average='binary')
    f1   = f1_score(all_targs, all_preds, average='binary')

    history["train_loss"].append(tr_loss)
    history["train_acc"].append(tr_acc)
    history["val_loss"].append(val_loss)
    history["val_acc"].append(val_acc)
    history["val_prec"].append(prec)
    history["val_rec"].append(rec)
    history["val_f1"].append(f1)

    print(
        f"[{epoch:02d}/{EPOCHS}] "
        f"Train loss={tr_loss:.4f}, acc={tr_acc*100:.2f}% | "
        f"Val loss={val_loss:.4f}, acc={val_acc*100:.2f}% | "
        f"P={prec*100:.2f}% R={rec*100:.2f}% F1={f1*100:.2f}% | "
        f"{time.time()-t0:.1f}s"
    )

    # save best
    if f1 > best_val_f1:
        best_val_f1 = f1
        ckpt = OUTPUT_DIR / "best_model.pt"
        torch.jit.script(model.cpu()).save(ckpt)
        model.to(DEVICE)
        print(f" ↳ New best model saved ({f1*100:.2f}% F1)")

    scheduler.step()

# save final state + history
torch.save(model.state_dict(), OUTPUT_DIR / "last_model.pth")
with open(OUTPUT_DIR / "history.json", "w") as f:
    json.dump(history, f, indent=2)

print(f"\nTraining done. Best Val F1: {best_val_f1*100:.2f}%")
print(f"Outputs in {OUTPUT_DIR}/")

# ======= Final evaluation on TEST set =======
print("\nEvaluating on TEST set:")
model = torch.jit.load(OUTPUT_DIR / "best_model.pt").to(DEVICE)
model.eval()

all_preds, all_targs = [], []
with torch.no_grad():
    for X, y in tqdm(test_loader, desc="Test Eval"):
        X = X.to(DEVICE, non_blocking=True)
        logits = model(X)
        preds = logits.argmax(1).cpu().tolist()
        all_preds.extend(preds)
        all_targs.extend(y.tolist())

class_names = train_ds.dataset.classes if isinstance(train_ds, Subset) else train_ds.classes
report = classification_report(all_targs, all_preds, target_names=class_names, digits=4)
print(report)

with open(OUTPUT_DIR / "classification_report.txt", "w") as f:
    f.write(report)

print(f"Test report written to {OUTPUT_DIR}/classification_report.txt")
