# DINOv3 + SAM + VO Evaluation (Accuracy-First)

This notebook evaluates DINOv3 embeddings against CLIP for egocentric Re-ID, adds optional SAM masking, explores dense feature heatmaps and basic VO (pose) baselines, and saves visual outputs for medium-term impact assessment before real-time tuning.

In [2]:
# 1) Environment Setup and Version Pinning
import sys, os, subprocess, json
from pathlib import Path

required = {
    "torch": "2.2",
    "torchvision": None,
    "timm": "1.0.20",
    "transformers": "4.56.0",
    "accelerate": None,
    "opencv-python": None,
    "scikit-learn": None,
    "matplotlib": None,
    "einops": None,
    "numpy": None,
}

print("Python:", sys.version)

# Print installed versions and device info
missing = []
versions = {}
for pkg, ver in required.items():
    try:
        mod = __import__(pkg.replace("-", "_"))
        v = getattr(mod, "__version__", "unknown")
        versions[pkg] = v
    except Exception:
        versions[pkg] = None
        missing.append(pkg)

print("Installed:", json.dumps(versions, indent=2))
if missing:
    print("Missing packages:", missing)
    print("Tip (zsh): pip install -U ", " ".join([f"{m}{'>='+required[m] if required[m] else ''}" for m in missing]))

# CUDA/MPS info
try:
    import torch
    print("Torch version:", torch.__version__)
    print("CUDA available:", torch.cuda.is_available())
    if torch.cuda.is_available():
        print("CUDA device:", torch.cuda.get_device_name(0))
    print("MPS available:", getattr(torch.backends, 'mps', None) and torch.backends.mps.is_available())
except Exception as e:
    print("Torch not importable:", e)

Python: 3.10.18 | packaged by conda-forge | (main, Jun  4 2025, 14:46:00) [Clang 18.1.8 ]
Installed: {
  "torch": "2.5.1",
  "torchvision": "0.20.1",
  "timm": "1.0.22",
  "transformers": "4.57.1",
  "accelerate": "1.6.0",
  "opencv-python": null,
  "scikit-learn": null,
  "matplotlib": "3.10.6",
  "einops": "0.6.1",
  "numpy": "1.26.4"
}
Missing packages: ['opencv-python', 'scikit-learn']
Tip (zsh): pip install -U  opencv-python scikit-learn
Torch version: 2.5.1
CUDA available: False
MPS available: False


In [3]:
# 2) GPU/Precision Configuration and Reproducibility
import random
import numpy as np

CFG = {
    "precision": "fp32",  # fp32|bf16|fp16
    "deterministic": True,
    "compile": False,
    "batch_size": 8,
    "img_size": 256,
}

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
try:
    import torch
    torch.manual_seed(SEED)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(SEED)
    if CFG["deterministic"]:
        torch.backends.cudnn.benchmark = False
        torch.backends.cudnn.deterministic = True
    if CFG["precision"] in ("bf16", "fp16") and torch.cuda.is_available():
        dtype = torch.bfloat16 if CFG["precision"] == "bf16" else torch.float16
    else:
        dtype = torch.float32
    print("Using dtype:", dtype)
except Exception as e:
    print("Torch config skipped:", e)

Using dtype: torch.float32


In [4]:
# 3) Load Images and Video Frames
from typing import List, Tuple
import cv2

DATA_ROOT = Path("../data/examples").resolve()
OUTPUT_DIR = Path("../results/dino_eval_outputs").resolve()
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)


def load_images_from_dir(img_dir: Path, max_images: int = 64) -> List[np.ndarray]:
    imgs = []
    exts = {".jpg", ".jpeg", ".png", ".bmp"}
    for p in sorted(img_dir.glob("**/*")):
        if p.suffix.lower() in exts:
            bgr = cv2.imread(str(p))
            if bgr is None:
                continue
            rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
            imgs.append(rgb)
            if len(imgs) >= max_images:
                break
    return imgs


def sample_video_frames(video_path: Path, skip: int = 10, max_frames: int = 200) -> List[np.ndarray]:
    cap = cv2.VideoCapture(str(video_path))
    frames = []
    i = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        if i % skip == 0:
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(rgb)
            if len(frames) >= max_frames:
                break
        i += 1
    cap.release()
    return frames

# Demo load
images = load_images_from_dir(DATA_ROOT, max_images=32)
print(f"Loaded {len(images)} images from {DATA_ROOT}")

Loaded 0 images from /Users/riddhiman.rana/Desktop/Coding/Orion/orion-research/data/examples


In [5]:
# 4) DINOv3 Backbones via Transformers and timm
from orion.managers.model_manager import ModelManager

mm = ModelManager.get_instance()

# Prefer using our backend wrapper for consistency across codebase
try:
    dino = mm.dino
    print("DINO backend ready via ModelManager")
except Exception as e:
    print("Falling back: direct Transformers/timm loading failed:", e)
    dino = None

# Also expose CLIP for comparison (label verification baseline)
try:
    clip = mm.clip
    print("CLIP backend ready via ModelManager")
except Exception as e:
    print("CLIP unavailable:", e)
    clip = None

Transformers load failed (You are trying to access a gated repo.
Make sure to have access to it at https://huggingface.co/facebook/dinov3-vitb16-pretrain-lvd1689m.
403 Client Error. (Request ID: Root=1-691543ab-30a49c832414567178f0870a;11e2e9e2-ba9c-4f0f-97e8-dd8455351b8f)

Cannot access gated repo for url https://huggingface.co/facebook/dinov3-vitb16-pretrain-lvd1689m/resolve/main/preprocessor_config.json.
Access to model facebook/dinov3-vitb16-pretrain-lvd1689m is restricted and you are not in the authorized list. Visit https://huggingface.co/facebook/dinov3-vitb16-pretrain-lvd1689m to ask for access.); falling back to timm…
Failed to load DINO: Could not load DINO. Install either 'transformers>=4.56.0' or 'timm>=1.0.20'.
Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll sti

Falling back: direct Transformers/timm loading failed: Could not load DINO. Install either 'transformers>=4.56.0' or 'timm>=1.0.20'.


`torch_dtype` is deprecated! Use `dtype` instead!


CLIP backend ready via ModelManager


In [None]:
# 5) Preprocessing Transforms (LVD-1689M vs SAT-493M)
import torch
from torchvision import transforms

mean_lvd = (0.485, 0.456, 0.406)
std_lvd  = (0.229, 0.224, 0.225)

mean_sat = (0.430, 0.411, 0.296)
std_sat  = (0.213, 0.156, 0.143)

def make_transform(img_size: int = 256, sat: bool = False):
    m = mean_sat if sat else mean_lvd
    s = std_sat if sat else std_lvd
    return transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((img_size, img_size), antialias=True),
        transforms.ConvertImageDtype(dtype=torch.float32),
        transforms.Normalize(mean=m, std=s),
    ])

# Smoke test
if images:
    t = make_transform(CFG["img_size"], sat=False)
    test_tensor = t(images[0])
    assert test_tensor.shape[1] == CFG["img_size"], "Transform resize failed"
    print("Transform OK:", tuple(test_tensor.shape))
else:
    print("No images loaded yet; skipping transform test")

In [None]:
# 6) Dense Feature Extraction and Pooling
import numpy as np
from typing import Dict


def l2n(v: np.ndarray, eps: float = 1e-8) -> np.ndarray:
    n = np.linalg.norm(v) + eps
    return (v / n).astype(np.float32)


def encode_images(embedder, imgs: list) -> np.ndarray:
    vecs = []
    for im in imgs:
        try:
            v = embedder.encode_image(im)
            vecs.append(v)
        except Exception as e:
            # Fallback: skip image
            print("Embedding failed:", e)
    if not vecs:
        return np.zeros((0, 1), dtype=np.float32)
    return np.stack(vecs, axis=0)


clip_vecs = encode_images(clip, images) if clip else None
dino_vecs = encode_images(dino, images) if dino else None

print(
    "Shapes:",
    {"clip": None if clip_vecs is None else clip_vecs.shape,
     "dino": None if dino_vecs is None else dino_vecs.shape}
)

In [None]:
# 7) Cosine Similarity Heatmaps and Token Matching
import matplotlib.pyplot as plt


def cosine_sim_matrix(X: np.ndarray) -> np.ndarray:
    if X is None or len(X) == 0:
        return np.zeros((0, 0), dtype=np.float32)
    Xn = X / (np.linalg.norm(X, axis=1, keepdims=True) + 1e-8)
    return Xn @ Xn.T


if dino_vecs is not None and len(dino_vecs) >= 2:
    S = cosine_sim_matrix(dino_vecs)
    plt.figure(figsize=(5, 4))
    plt.imshow(S, vmin=-1, vmax=1, cmap="coolwarm")
    plt.colorbar()
    plt.title("DINO global cosine similarity")
    plt.tight_layout()
    plt.savefig(OUTPUT_DIR / "dino_global_similarity.png", dpi=150)
    plt.show()
else:
    print("Not enough DINO vectors for similarity plot")

In [None]:
# 8) Segmentation with SAM (point/box prompts) — optional
try:
    from transformers import SamModel, SamProcessor
    _sam_available = True
    print("Transformers SAM available")
except Exception:
    _sam_available = False
    print("SAM not available; skipping segmentation")

sam_model = None
sam_processor = None

if _sam_available:
    try:
        sam_processor = SamProcessor.from_pretrained("facebook/sam-vit-huge")
        sam_model = SamModel.from_pretrained("facebook/sam-vit-huge")
        if torch.cuda.is_available():
            sam_model = sam_model.to("cuda")
        elif getattr(torch.backends, "mps", None) and torch.backends.mps.is_available():
            sam_model = sam_model.to("mps")
        else:
            sam_model = sam_model.to("cpu")
        print("SAM model loaded")
    except Exception as e:
        print("Failed to load SAM:", e)
        _sam_available = False


def run_sam_masks(image_rgb: np.ndarray, points=None, boxes=None):
    if not _sam_available or sam_model is None or sam_processor is None:
        return []
    inputs = sam_processor(images=image_rgb, input_points=points, input_boxes=boxes, return_tensors="pt")
    inputs = {k: v.to(sam_model.device) for k, v in inputs.items()}
    with torch.inference_mode():
        outputs = sam_model(**inputs)
    # Basic thresholding
    masks = outputs.pred_masks.squeeze(0).detach().cpu().numpy()  # [N, H, W]
    bin_masks = [(m > 0.0).astype(np.uint8) for m in masks]
    return bin_masks

In [None]:
# 9) Mask-Guided Feature Pooling and Object Descriptors

def masked_mean_descriptor(embedder, image_rgb: np.ndarray, mask: np.ndarray) -> np.ndarray:
    # Fallback to global if token features aren't exposed by backend; use crop
    ys, xs = np.where(mask > 0)
    if len(xs) == 0 or len(ys) == 0:
        return embedder.encode_image(image_rgb)
    x0, x1 = int(xs.min()), int(xs.max())
    y0, y1 = int(ys.min()), int(ys.max())
    crop = image_rgb[y0:y1+1, x0:x1+1]
    return embedder.encode_image(crop)

obj_descs = []
if dino is not None and images:
    if _sam_available:
        ms = run_sam_masks(images[0])
        print(f"SAM masks: {len(ms)}")
        for m in ms[:5]:
            obj_descs.append(masked_mean_descriptor(dino, images[0], m))
    else:
        # Create a simple box mask as placeholder
        h, w, _ = images[0].shape
        bx = int(0.2 * w), int(0.2 * h), int(0.6 * w), int(0.6 * h)
        m = np.zeros((h, w), dtype=np.uint8)
        m[bx[1]:bx[3], bx[0]:bx[2]] = 1
        obj_descs.append(masked_mean_descriptor(dino, images[0], m))

print("Object descriptors:", None if not obj_descs else np.stack(obj_descs).shape)

In [None]:
# 10) Frame-to-Frame Matching and Pose Estimation (VO baseline)
from typing import Optional

def sample_keypoints(image_rgb: np.ndarray, n: int = 200) -> np.ndarray:
    # Use a simple grid sampler as placeholder
    h, w, _ = image_rgb.shape
    ys = np.linspace(h*0.1, h*0.9, int(np.sqrt(n))).astype(int)
    xs = np.linspace(w*0.1, w*0.9, int(np.sqrt(n))).astype(int)
    pts = np.array([(x, y) for y in ys for x in xs], dtype=np.float32)
    return pts[:n]


def match_points(p1: np.ndarray, p2: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    # Identity correspondence for placeholder grid
    n = min(len(p1), len(p2))
    return p1[:n], p2[:n]


def estimate_pose(p1: np.ndarray, p2: np.ndarray, K: Optional[np.ndarray] = None):
    if K is None:
        # Focal ~ 1000 px default placeholder, principal center
        K = np.array([[1000.0, 0, 640.0], [0, 1000.0, 360.0], [0, 0, 1]])
    E, mask = cv2.findEssentialMat(p1, p2, K, method=cv2.RANSAC, prob=0.999, threshold=1.0)
    if E is None:
        return None, None, None
    _, R, t, mask2 = cv2.recoverPose(E, p1, p2, K)
    return R, t.squeeze(), (mask, mask2)

if len(images) >= 2:
    pts1 = sample_keypoints(images[0])
    pts2 = sample_keypoints(images[1])
    m1, m2 = match_points(pts1, pts2)
    R, t, m = estimate_pose(m1, m2)
    print("Pose R:\n", R)
    print("Pose t:", t)
else:
    print("Need at least 2 frames for VO baseline")

In [None]:
# 11) Minimal Trajectory Integration and Visualization
from mpl_toolkits.mplot3d import Axes3D  # noqa: F401

def integrate_trajectory(R_list, t_list):
    # Simple chaining in camera coordinates
    poses = []
    T = np.eye(4)
    poses.append(T.copy())
    for R, t in zip(R_list, t_list):
        Ti = np.eye(4)
        Ti[:3, :3] = R
        Ti[:3, 3] = t
        T = T @ Ti
        poses.append(T.copy())
    return poses

# Placeholder if we estimated only one relative pose
R_list = []
t_list = []
if len(images) >= 3:
    for i in range(2):
        p1 = sample_keypoints(images[i])
        p2 = sample_keypoints(images[i+1])
        m1, m2 = match_points(p1, p2)
        R, t, _ = estimate_pose(m1, m2)
        if R is not None:
            R_list.append(R)
            t_list.append(t)

poses = integrate_trajectory(R_list, t_list)

# Plot 2D and 3D
xyz = np.array([P[:3, 3] for P in poses]) if poses else np.zeros((0, 3))
plt.figure(figsize=(5,4))
if len(xyz):
    plt.plot(xyz[:,0], xyz[:,2], marker='o')
    plt.title("Trajectory XZ")
    plt.xlabel("X")
    plt.ylabel("Z")
    plt.tight_layout()
    plt.savefig(OUTPUT_DIR/"trajectory_xz.png", dpi=150)
    plt.show()
else:
    print("No trajectory to plot")

In [None]:
# 12) Optional: Depth Prediction via DINOv3 Depther Head (skip if unavailable)
try:
    import torch
    REPO_DIR = "."  # requires local dinov3 repo; leave as placeholder
    # depther = torch.hub.load(REPO_DIR, 'dinov3_vit7b16_dd', source='local', weights='PATH', backbone_weights='PATH')
    print("Depther example requires local weights; skipping by default")
except Exception as e:
    print("Depther not available:", e)

In [None]:
# 13) k-NN Probe on Small Labeled Subset (proxy for representation)
try:
    from sklearn.neighbors import KNeighborsClassifier
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score
    _sk_ok = True
except Exception:
    _sk_ok = False

# Minimal synthetic labels: alternate labels for demo if no dataset
if dino_vecs is not None and len(dino_vecs) >= 10 and _sk_ok:
    X = dino_vecs
    y = np.array([(i % 2) for i in range(len(X))])
    Xtr, Xte, ytr, yte = train_test_split(X, y, test_size=0.3, random_state=SEED, stratify=y)
    clf = KNeighborsClassifier(n_neighbors=5, metric='cosine')
    clf.fit(Xtr, ytr)
    pred = clf.predict(Xte)
    acc = accuracy_score(yte, pred)
    print(f"k-NN proxy (binary alt labels) accuracy: {acc:.3f}")
else:
    print("Skipping k-NN probe (need sklearn and >=10 embeddings)")

In [None]:
# 14) Performance Profiling and Memory Tracking
import time

def profile_embedder(embedder, imgs, n_warmup=2):
    if embedder is None or not imgs:
        return None
    # Warmup
    for _ in range(n_warmup):
        _ = embedder.encode_image(imgs[0])
    t0 = time.time()
    for im in imgs:
        _ = embedder.encode_image(im)
    t1 = time.time()
    dt = t1 - t0
    ips = len(imgs) / max(dt, 1e-6)
    mem = None
    try:
        import torch
        if torch.cuda.is_available():
            mem = torch.cuda.max_memory_allocated() / 1024**3
            torch.cuda.reset_peak_memory_stats()
    except Exception:
        pass
    return {"images": len(imgs), "sec": dt, "img_per_sec": ips, "gpu_peak_gb": mem}

clip_prof = profile_embedder(clip, images[:16]) if clip else None
dino_prof = profile_embedder(dino, images[:16]) if dino else None
print("Profiles:", {"clip": clip_prof, "dino": dino_prof})

In [None]:
# 15) Medium-term Metrics: Mask Stability and Trajectory Drift

def mask_iou(m1: np.ndarray, m2: np.ndarray) -> float:
    inter = np.logical_and(m1>0, m2>0).sum()
    union = np.logical_or(m1>0, m2>0).sum() + 1e-6
    return float(inter/union)

# Placeholder demo: if we created a box mask, compare the same mask to itself
if 'm' in globals():
    print("Mask IoU self:", mask_iou(m, m))

# Trajectory drift: simple cumulative distance as drift proxy
if len(poses) >= 2:
    dists = np.linalg.norm(np.diff(np.array([P[:3,3] for P in poses]), axis=0), axis=1)
    drift = dists.sum()
    print(f"Trajectory drift (proxy): {drift:.4f}")
else:
    print("No trajectory drift metric available")

In [None]:
# 16) Ablations: Backbone and Resolution Sweeps (stub)
ABLATION_MODELS = [
    "facebook/dinov3-convnext-tiny-pretrain-lvd1689m",
    "facebook/dinov3-convnext-base-pretrain-lvd1689m",
    "facebook/dinov3-vitb16-pretrain-lvd1689m",
]
ABLATION_SIZES = [224, 256]

print("Define sweeps here; use orion.backends.dino_backend.DINOEmbedder per model/size")

In [None]:
# 17) Save Visual Artifacts and Logs
import platform
meta = {
    "config": CFG,
    "seed": SEED,
    "python": sys.version,
    "platform": platform.platform(),
    "torch": versions.get("torch"),
    "timm": versions.get("timm"),
    "transformers": versions.get("transformers"),
}
with open(OUTPUT_DIR/"run_meta.json", "w") as f:
    json.dump(meta, f, indent=2)
print("Saved metadata to", OUTPUT_DIR/"run_meta.json")

In [None]:
# 18) Unit Tests: Determinism, Shapes, Reproducibility (lightweight)
# These are sanity checks; integrate into tests/ as needed.

def assert_between(x, a, b):
    assert a <= x <= b, f"Value {x} not in [{a},{b}]"

# Cosine range check
if dino_vecs is not None and len(dino_vecs) >= 2:
    S = cosine_sim_matrix(dino_vecs)
    assert np.isfinite(S).all()
    assert_between(float(S.min()), -1.01, 1.01)
    assert_between(float(S.max()), -1.01, 1.01)
    print("Determinism/light checks passed for cosine sim")
else:
    print("Skip checks (insufficient embeddings)")

In [None]:
# 19) Config Switch: Full-Accuracy vs Realtime Mode

def set_mode(mode: str = "accuracy"):
    if mode == "accuracy":
        CFG.update({
            "precision": "fp32",
            "deterministic": True,
            "batch_size": 4,
            "img_size": 256,
        })
    elif mode == "realtime":
        CFG.update({
            "precision": "bf16",
            "deterministic": False,
            "batch_size": 16,
            "img_size": 224,
        })
    else:
        raise ValueError("mode must be 'accuracy' or 'realtime'")
    print("Mode set:", mode, CFG)

set_mode("accuracy")