
# DINOv3 → Fastai Regression Head (Valence–Arousal)

This notebook shows how to **freeze a DINOv3 backbone** (from 🤗 Transformers) and train a **small regression head** in **fastai** for **valence–arousal (V-A)** prediction.

- Targets are expected in **FindingEmo** units: **V∈[-3,3]**, **A∈[0,6]**.  
- For training we map to a centered reference space: **v_ref = v/3**, **a_ref = (a−3)/3** → both in **[-1,1]**.  
- We compute **CCC** (Concordance Corr. Coefficient) per-dimension and the mean CCC.  
- Backbone remains **frozen**; only the tiny head trains.

In [17]:
# --- Install latest libraries (uncomment if needed) ---
# %pip install -U torch torchvision torchaudio
# %pip install -U fastai transformers timm torchmetrics datasets
# %pip install -U accelerate
#
# If you're on Apple Silicon and want MPS acceleration, make sure your PyTorch build supports MPS.
# See: https://pytorch.org/get-started/locally/


# Import Libs

In [18]:
from __future__ import annotations

import os, math, random, shutil, json
from pathlib import Path
import numpy as np
import pandas as pd

import torch
from torch import nn
import torch.nn.functional as F

# Fastai
from fastai.vision.all import *

# Transformers
from transformers import AutoImageProcessor, AutoModel

# Metrics
from torchmetrics.functional.regression import concordance_corrcoef as ccc_fn

# Device (prefers Apple MPS on M-series Macs)
if torch.backends.mps.is_available():
    device = torch.device("mps")
elif torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

torch.set_float32_matmul_precision("high")
device


device(type='mps')

In [19]:
# Load environment variables
from dotenv import load_dotenv

load_dotenv()

# Set HuggingFace token for authentication
if "HUGGINGFACE_TOKEN" in os.environ:
    from huggingface_hub import login

    login(token=os.environ["HUGGINGFACE_TOKEN"])

# Config

In [None]:
MODEL_NAME = "facebook/dinov3-vitb16-pretrain-lvd1689m"  # you can switch to vits16/vitsplus/vit7b16, etc.
IMAGE_SIZE = 608
BATCH_SIZE = 32
NUM_WORKERS = 12
EPOCHS = 2
ALPHA_CCC = 0.7  # weight for CCC in the mixed loss: loss = ALPHA_CCC*(1-mean_ccc) + (1-ALPHA_CCC)*MSE

# Use only a fraction of the dataset for faster experiments
DATA_FRACTION = 0.2  # Only downsamples Training Data
SAMPLE_SEED = 2025

DATA_ROOT = Path("../data")  # FindingEmo dataset root
CSV_TRAIN = Path(
    "../data/train_clean_full.csv"
)  # CSV with columns: image_path,valence,arousal
CSV_VALID = Path("../data/valid_clean_full.csv")  # Validation split
CSV_TEST = Path("../data/test_clean_full.csv")  # Test split


# Load DINOv3 Processor + Backbone

In [None]:
processor = AutoImageProcessor.from_pretrained(MODEL_NAME)
backbone = AutoModel.from_pretrained(MODEL_NAME)

# Freeze backbone
for p in backbone.parameters():
    p.requires_grad = False

backbone = backbone.to(device)
backbone.eval()

# Infer feature dim for the pooled output
with torch.inference_mode():
    # Create a single dummy image tensor with processor's expected size
    dummy = torch.zeros(1, 3, IMAGE_SIZE, IMAGE_SIZE, dtype=torch.float32)
    out = backbone(pixel_values=dummy.to(device))
    feat_dim = None
    if hasattr(out, "pooler_output") and (out.pooler_output is not None):
        feat_dim = out.pooler_output.shape[-1]
    else:
        # Fallback: ViT CLS token (last_hidden_state[:, 0, :]) or spatial mean for ConvNext-like
        if hasattr(out, "last_hidden_state") and out.last_hidden_state.ndim == 3:
            feat_dim = out.last_hidden_state.shape[-1]
        elif hasattr(out, "last_hidden_state") and out.last_hidden_state.ndim == 4:
            feat_dim = out.last_hidden_state.shape[1]
        else:
            raise RuntimeError("Unable to determine DINOv3 feature dimension.")
feat_dim


In [22]:
# =======================
# Helpers: scaling & CCC
# =======================
def fe_to_ref(va: Tensor) -> Tensor:
    "Map FindingEmo V∈[-3,3], A∈[0,6] -> reference space [-1,1]"
    v = va[..., 0] / 3.0
    a = (va[..., 1] - 3.0) / 3.0
    return torch.stack([v, a], dim=-1)


def ref_to_fe(va_ref: Tensor) -> Tensor:
    "Inverse map: reference [-1,1] -> FindingEmo units"
    v = va_ref[..., 0] * 3.0
    a = va_ref[..., 1] * 3.0 + 3.0
    return torch.stack([v, a], dim=-1)


def ccc_mean(pred: Tensor, targ: Tensor) -> Tensor:
    "Mean CCC across V and A"
    # torchmetrics.functional returns per-output CCC for (N,2) inputs
    c = ccc_fn(pred, targ)  # shape: (2,)
    c = torch.nan_to_num(c, nan=0.0, posinf=0.0, neginf=0.0)
    return c.mean()


class CCCMixedLoss(nn.Module):
    "Mixed loss: alpha*(1-mean CCC) + (1-alpha)*MSE over the two dims"

    def __init__(self, alpha: float = 0.7):
        super().__init__()
        self.alpha = alpha

    def forward(self, pred: Tensor, targ: Tensor) -> Tensor:
        mse = F.mse_loss(pred, targ)
        ccc = ccc_mean(pred, targ)
        return self.alpha * (1.0 - ccc) + (1.0 - self.alpha) * mse


In [23]:
# =======================
# Load FindingEmo dataset
# =======================

df_train = pd.read_csv(CSV_TRAIN)
df_valid = pd.read_csv(CSV_VALID)
df_test = pd.read_csv(CSV_TEST)

# Optionally downsample only the TRAIN split to a fraction; keep valid/test full
if DATA_FRACTION is not None and DATA_FRACTION < 1.0:
    df_train = df_train.sample(frac=DATA_FRACTION, random_state=SAMPLE_SEED)

# Show a peek
len(df_train), len(df_valid), len(df_test), df_train.head(2)


(1299,
 2808,
 2797,
                                                                        image_path  \
 7331  ../data/Run_2/Surprised people hugging/e58a56c484418c2f01cd957d7e31e301.jpg   
 4491                ../data/Run_2/Repelled thirty-something protest/453583704.jpg   
 
       valence  arousal  
 7331        1        2  
 4491       -1        4  )

In [None]:
# =======================
# Fastai transform: apply HF processor per item
# =======================
class HFProcessorTransform(Transform):
    def __init__(self, processor, target_size=(800, 608)):
        self.processor = processor
        self.target_size = target_size

    def encodes(self, img: PILImage):
        # Resize with padding to target size first
        img_resized = img.resize(self.target_size, resample=Image.Resampling.BILINEAR)
        # Then apply processor normalization only (without resizing)
        proc = self.processor(
            images=np.array(img_resized),
            size={"height": self.target_size[1], "width": self.target_size[0]},
            return_tensors="pt",
        )
        x = proc.pixel_values[0]
        return TensorImage(x)


# Label getter that reads FE units from df and maps to reference space [-1,1]
def get_y_ref(row):
    va = torch.tensor([row["valence"], row["arousal"]], dtype=torch.float32)
    return fe_to_ref(va)

In [None]:
# =======================
# DataBlock + DataLoaders
# =======================

H, W = 800, 608  # -> divisible by 16


def df_to_dls(df_train, df_valid, bs=BATCH_SIZE, num_workers=NUM_WORKERS):
    dblock = DataBlock(
        blocks=(ImageBlock, RegressionBlock(n_out=2)),
        get_x=ColReader("image_path"),
        get_y=get_y_ref,
        item_tfms=[HFProcessorTransform(processor, target_size=(H, W))],
    )
    dls = dblock.dataloaders(
        df_train, valid_df=df_valid, bs=bs, num_workers=num_workers
    )
    return dls


dls = df_to_dls(df_train, df_valid)
dls.one_batch()[0].shape, dls.one_batch()[1].shape


In [None]:
# =======================
# Model: Frozen DINOv3 + tiny MLP head
# =======================
class DinoV3Regressor(nn.Module):
    def __init__(
        self,
        backbone: nn.Module,
        feat_dim: int,
        hidden: int | None = None,
        p: float = 0.0,
    ):
        super().__init__()
        self.backbone = backbone
        if hidden and hidden > 0:
            self.head = nn.Sequential(
                nn.LayerNorm(feat_dim),
                nn.Dropout(p),
                nn.Linear(feat_dim, hidden),
                nn.GELU(),
                nn.Linear(hidden, 2),
            )
        else:
            self.head = nn.Sequential(
                nn.LayerNorm(feat_dim), nn.Dropout(p), nn.Linear(feat_dim, 2)
            )
        # Ensure backbone is frozen
        for p in self.backbone.parameters():
            p.requires_grad = False

    def forward(self, x: Tensor) -> Tensor:
        # x: pixel_values [B,3,H,W] already processor-normalized
        out = self.backbone(pixel_values=x)
        if hasattr(out, "pooler_output") and (out.pooler_output is not None):
            feats = out.pooler_output
        else:
            # CLS token (ViT) or spatial mean (ConvNeXt-like outputs)
            if hasattr(out, "last_hidden_state") and out.last_hidden_state.ndim == 3:
                feats = out.last_hidden_state[:, 0, :]
            else:
                # [B,C,H,W] -> global avg pool
                feats = out.last_hidden_state.mean(dim=(-1, -2))
        return self.head(feats)


model = DinoV3Regressor(backbone, feat_dim, hidden=512, p=0.1).to(device)
model


In [None]:
# =======================
# Fastai metrics & loss
# =======================
def ccc_v(inp, targ):
    return ccc_fn(inp[:, 0], targ[:, 0])


def ccc_a(inp, targ):
    return ccc_fn(inp[:, 1], targ[:, 1])


def ccc_avg(inp, targ):
    return (ccc_v(inp, targ) + ccc_a(inp, targ)) / 2


loss_func = CCCMixedLoss(alpha=ALPHA_CCC)
metrics = [ccc_v, ccc_a, ccc_avg, mse]  # mse here is fastai's MSE metric


In [None]:
# =======================
# Create Learner & train
# =======================
learn = Learner(
    dls, model, loss_func=loss_func, metrics=metrics, cbs=[CSVLogger()]
).to_fp32()

# Choose optimizer and hyperparameters (FastAI will create Adam by default)
# You can override like this:
learn.opt_func = partial(Adam, wd=1e-2)
# Find a good learning rate
lr_min, lr_steep = learn.lr_find(suggest_funcs=(minimum, steep))
print("Suggested LRs:", lr_min, lr_steep)
lr = float(lr_min)
lr

In [None]:
# Optional: LR finder with capped iterations for small subsets
lr_min2, lr_steep2 = learn.lr_find(
    suggest_funcs=(minimum, steep), num_it=min(100, len(dls.train))
)
print("Subset-safe Suggested LRs:", lr_min2, lr_steep2)
# Optionally override lr for training below
lr = float(lr_min2)
lr


In [None]:
learn.fit_one_cycle(EPOCHS, lr)

# Evaluate on Test Set

In [None]:
# If your df_test actually contains the target columns:
test_dl = dls.test_dl(df_test, with_labels=True)
test_metrics = learn.validate(dl=test_dl)

names = ["test_loss", "test_ccc_v", "test_ccc_a", "test_ccc_avg", "test_mse"]
# Handle None values in metrics
safe_metrics = [float(m) if m is not None else 0.0 for m in test_metrics]
dict(zip(names, safe_metrics))


## Save head weights (state_dict) and full model if desired


In [None]:
SAVE_DIR = Path("./checkpoints")
SAVE_DIR.mkdir(exist_ok=True)
head_path = SAVE_DIR / "dinov3_head.pth"
torch.save(model.head.state_dict(), head_path)

# Optionally export the whole fastai Learner
# learn.export(SAVE_DIR/"learner.pkl")

head_path, head_path.exists()


In [None]:
# =======================
# Inference helper (returns V,A in FE units)
# =======================
@torch.inference_mode()
def predict_image(img_path: str | Path):
    img = PILImage.create(img_path)
    x = HFProcessorTransform(processor)(img)  # [3,H,W] tensor
    x = x.unsqueeze(0).to(device)
    out_ref = model(x)  # [-1,1] space
    out_fe = ref_to_fe(out_ref.cpu())
    return out_fe.squeeze(0)


# Quick smoke test
sample_path = df_test.iloc[0]["image_path"]
predict_image(sample_path)
