# Inference and Metrics for Curiosity Heatmaps

This notebook loads trained models, runs inference on the validation set, and reports:
- Pearson correlation
- SSIM
- MSE
- Spearman rank correlation
- NDCG@K (configurable)

Two models are evaluated:
- BLIP-CuriosityNet (vision-only, bbox-target supervision)
- VQA-CuriosityNet (vision+language, hybrid-target supervision)



In [1]:
# Install packages (quiet)
%pip install -q numpy scipy scikit-image matplotlib tqdm transformers torch torchvision pillow


Note: you may need to restart the kernel to use updated packages.


In [3]:
# Imports and utilities
import json
from pathlib import Path
from typing import Dict, List, Tuple

import numpy as np
import cv2
from PIL import Image

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

from tqdm import tqdm
from scipy.stats import pearsonr, spearmanr
from skimage.metrics import structural_similarity as ssim

from transformers import BlipProcessor, BlipModel

import warnings
warnings.filterwarnings('ignore')

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', DEVICE)

# Robust JSON loader

def load_json_any_encoding(path: str) -> Dict:
    encodings = ['utf-8', 'utf-8-sig', 'utf-16', 'utf-16-le', 'utf-16-be',
                 'utf-32', 'utf-32-le', 'utf-32-be', 'cp1252', 'latin1']
    raw = Path(path).read_bytes()
    for enc in encodings:
        try:
            return json.loads(raw.decode(enc))
        except Exception:
            continue
    raise ValueError(f'Failed to decode JSON: {path}')


def load_all_annotations() -> List[Dict]:
    all_data: List[Dict] = []
    domains = [f'Domain_{i}_Images' for i in range(1, 6)]
    for domain_idx, domain_dir in enumerate(domains):
        ann_path = Path(domain_dir) / 'annotations.json'
        if not ann_path.exists():
            continue
        try:
            data = load_json_any_encoding(str(ann_path))
            key = 'annotations' if 'annotations' in data else ('images' if 'images' in data else None)
            if key is None:
                continue
            for img_entry in data[key]:
                img_name = img_entry['name']
                img_path = Path(domain_dir) / img_name
                if not img_path.exists():
                    continue
                for bbox in img_entry.get('annotations', []):
                    attrs = bbox.get('attributes', {})
                    all_data.append({
                        'image_path': str(img_path),
                        'image_name': img_name,
                        'domain_id': domain_idx,
                        'domain_name': domain_dir,
                        'question': (attrs.get('question', '') or '').strip(),
                        'question_type': attrs.get('question_type', 'why'),
                        'curiosity_score': float(attrs.get('curiosity_score', 0) or 0.0),
                        'bbox': {
                            'xtl': bbox.get('xtl', 0),
                            'ytl': bbox.get('ytl', 0),
                            'xbr': bbox.get('xbr', 0),
                            'ybr': bbox.get('ybr', 0)
                        }
                    })
        except Exception as e:
            print(f'⚠ Error loading {domain_dir}: {e}')
            continue
    return all_data

# Metrics

def normalize01(arr: np.ndarray) -> np.ndarray:
    mn, mx = float(arr.min()), float(arr.max())
    if mx - mn < 1e-8:
        return np.zeros_like(arr, dtype=np.float32)
    return ((arr - mn) / (mx - mn)).astype(np.float32)


def metric_pearson(pred: np.ndarray, tgt: np.ndarray) -> float:
    p = pred.flatten(); t = tgt.flatten()
    if np.std(p) < 1e-8 or np.std(t) < 1e-8:
        return 0.0
    r, _ = pearsonr(p, t)
    return float(r)


def metric_spearman(pred: np.ndarray, tgt: np.ndarray) -> float:
    p = pred.flatten(); t = tgt.flatten()
    if np.std(p) < 1e-8 and np.std(t) < 1e-8:
        return 0.0
    r, _ = spearmanr(p, t)
    return float(0.0 if np.isnan(r) else r)


def metric_ssim(pred: np.ndarray, tgt: np.ndarray) -> float:
    # SSIM over 2D maps in [0,1]
    try:
        return float(ssim(pred, tgt, data_range=1.0))
    except Exception:
        return 0.0


def metric_mse(pred: np.ndarray, tgt: np.ndarray) -> float:
    return float(np.mean((pred - tgt) ** 2))


def ndcg_at_k(pred: np.ndarray, tgt: np.ndarray, k: int = 20) -> float:
    p = pred.flatten(); t = tgt.flatten()
    order = np.argsort(-p)
    rel = t[order][:k]
    dcg = float(np.sum((2 ** rel - 1) / np.log2(np.arange(2, k + 2))))
    ideal_order = np.argsort(-t)
    ideal = t[ideal_order][:k]
    idcg = float(np.sum((2 ** ideal - 1) / np.log2(np.arange(2, k + 2))))
    return float(0.0 if idcg == 0.0 else dcg / idcg)

print('Utilities ready')


Using device: cpu
ilities ready


In [None]:
# Dataset definitions: BLIP (bbox target) and VQA (hybrid target)

class DatasetBLIP(Dataset):
    def __init__(self, annotations: List[Dict], image_size: int = 224, patch_size: int = 14,
                 gaussian_sigma: float = 1.0):
        self.ann = annotations
        self.image_size = image_size
        self.patch_size = patch_size
        self.gaussian_sigma = gaussian_sigma

    def __len__(self):
        return len(self.ann)

    def _bbox_to_patch(self, bbox: Dict, orig_w: int, orig_h: int) -> Tuple[int, int, int, int]:
        sx = self.image_size / float(orig_w); sy = self.image_size / float(orig_h)
        xtl = int(bbox['xtl'] * sx); ytl = int(bbox['ytl'] * sy)
        xbr = int(bbox['xbr'] * sx); ybr = int(bbox['ybr'] * sy)
        pw = self.image_size / float(self.patch_size)
        px1 = int(xtl / pw); py1 = int(ytl / pw)
        px2 = int(xbr / pw); py2 = int(ybr / pw)
        px1 = max(0, min(px1, self.patch_size - 1))
        py1 = max(0, min(py1, self.patch_size - 1))
        px2 = max(0, min(px2, self.patch_size - 1))
        py2 = max(0, min(py2, self.patch_size - 1))
        return px1, py1, px2, py2

    def __getitem__(self, idx: int) -> Dict:
        a = self.ann[idx]
        img = Image.open(a['image_path']).convert('RGB')
        orig_w, orig_h = img.size
        img_res = img.resize((self.image_size, self.image_size), Image.BILINEAR)
        img_t = torch.from_numpy(np.array(img_res)).permute(2, 0, 1).float() / 255.0

        px1, py1, px2, py2 = self._bbox_to_patch(a['bbox'], orig_w, orig_h)
        score01 = float(a.get('curiosity_score', 0.0)) / 5.0

        H = self.patch_size
        target = torch.zeros(H, H, dtype=torch.float32)
        mask = torch.zeros(H, H, dtype=torch.float32)
        mask[py1:py2+1, px1:px2+1] = 1.0
        target[py1:py2+1, px1:px2+1] = score01
        if mask.sum() > 0:
            hm = cv2.GaussianBlur(target.numpy(), (0, 0), self.gaussian_sigma)
            target = torch.from_numpy(hm.astype(np.float32))

        return {
            'image': img_t,
            'domain_id': torch.tensor(a['domain_id'], dtype=torch.long),
            'target_map': target,
            'patch_mask': mask,
            'image_path': a['image_path'],
            'bbox_original': a['bbox'],
        }

# Hybrid helpers

def _spectral_residual_saliency(image_path: str, small_size: int = 256, blur_sigma: int = 3) -> np.ndarray:
    img = cv2.imread(image_path)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY).astype(np.float32)
    h, w = gray.shape
    g_small = cv2.resize(gray, (small_size, small_size))
    F = np.fft.fft2(g_small)
    A = np.abs(F); P = np.angle(F)
    logA = np.log(A + 1e-8); logA_sm = cv2.blur(logA, (3, 3))
    SR = logA - logA_sm
    recon = np.fft.ifft2(np.exp(SR + 1j * P))
    S = np.abs(recon).astype(np.float32)
    S = cv2.GaussianBlur(S, (0, 0), blur_sigma)
    S = cv2.resize(S, (w, h))
    return normalize01(S)


def _gaussian_heatmap(width: int, height: int, anns: List[Dict], sigma_divisor: float = 4.0,
                      score_exp: float = 1.5, post_blur: int = 1) -> np.ndarray:
    H = np.zeros((height, width), dtype=np.float32)
    for ann in anns:
        xtl, ytl = float(ann['xtl']), float(ann['ytl'])
        xbr, ybr = float(ann['xbr']), float(ann['ybr'])
        score = float(ann['attributes'].get('curiosity_score', 0.0))
        cx, cy = (xtl + xbr) / 2.0, (ytl + ybr) / 2.0
        sx, sy = max((xbr - xtl)/sigma_divisor, 1e-3), max((ybr - ytl)/sigma_divisor, 1e-3)
        amp = (score / 5.0) ** score_exp
        x0 = int(max(0, cx - 3*sx)); x1 = int(min(width,  cx + 3*sx))
        y0 = int(max(0, cy - 3*sy)); y1 = int(min(height, cy + 3*sy))
        yy, xx = np.ogrid[y0:y1, x0:x1]
        g = amp * np.exp(-(((xx - cx)**2)/(2*sx**2) + ((yy - cy)**2)/(2*sy**2)))
        H[y0:y1, x0:x1] = np.maximum(H[y0:y1, x0:x1], g)
    if H.max() > 0:
        H = normalize01(cv2.GaussianBlur(H, (0,0), post_blur))
    return H


def _soft_box_mask(width: int, height: int, anns: List[Dict], soft_sigma: int = 8) -> np.ndarray:
    M = np.zeros((height, width), dtype=np.float32)
    for ann in anns:
        xtl = int(ann['xtl']); ytl = int(ann['ytl'])
        xbr = int(ann['xbr']); ybr = int(ann['ybr'])
        M[ytl:ybr, xtl:xbr] = 1.0
    M = cv2.GaussianBlur(M, (0,0), soft_sigma)
    return normalize01(M) if M.max() > 0 else M


def build_hybrid_heatmap(image_path: str, width: int, height: int, bbox: Dict, score: float,
                         alpha: float=0.8, beta: float=0.2, boost: float=2.0,
                         lambda_out: float=0.15, sal_small: int=256) -> np.ndarray:
    anns = [{'xtl': float(bbox['xtl']), 'ytl': float(bbox['ytl']),
             'xbr': float(bbox['xbr']), 'ybr': float(bbox['ybr']),
             'attributes': {'curiosity_score': float(score)}}]
    G = _gaussian_heatmap(width, height, anns)
    S = _spectral_residual_saliency(image_path, small_size=sal_small)
    # Build soft mask directly from the provided bbox (robust against collation issues)
    M = np.zeros((height, width), dtype=np.float32)
    xtl = int(bbox['xtl']); ytl = int(bbox['ytl']); xbr = int(bbox['xbr']); ybr = int(bbox['ybr'])
    M[ytl:ybr, xtl:xbr] = 1.0
    M = cv2.GaussianBlur(M, (0, 0), 8)
    M = normalize01(M) if M.max() > 0 else M
    W = lambda_out + (boost - lambda_out) * M
    S_att = W * S
    C = alpha * G + beta * S_att
    return normalize01(C)

class DatasetVQA(Dataset):
    def __init__(self, annotations: List[Dict], processor: BlipProcessor,
                 image_size: int = 224, patch_size: int = 14,
                 question_text: str = 'Which parts of the image are likely to provoke human curiosity?'):
        self.ann = annotations
        self.processor = processor
        self.image_size = image_size
        self.patch_size = patch_size
        self.question_text = question_text

    def __len__(self):
        return len(self.ann)

    def _bbox_to_patch(self, bbox: Dict, orig_w: int, orig_h: int) -> Tuple[int, int, int, int]:
        sx = self.image_size / float(orig_w); sy = self.image_size / float(orig_h)
        xtl = int(bbox['xtl'] * sx); ytl = int(bbox['ytl'] * sy)
        xbr = int(bbox['xbr'] * sx); ybr = int(bbox['ybr'] * sy)
        pw = self.image_size / float(self.patch_size)
        px1 = int(xtl / pw); py1 = int(ytl / pw)
        px2 = int(xbr / pw); py2 = int(ybr / pw)
        px1 = max(0, min(px1, self.patch_size - 1))
        py1 = max(0, min(py1, self.patch_size - 1))
        px2 = max(0, min(px2, self.patch_size - 1))
        py2 = max(0, min(py2, self.patch_size - 1))
        return px1, py1, px2, py2

    def __getitem__(self, idx: int) -> Dict:
        a = self.ann[idx]
        img = Image.open(a['image_path']).convert('RGB')
        orig_w, orig_h = img.size
        img_res = img.resize((self.image_size, self.image_size), Image.BILINEAR)
        img_t = torch.from_numpy(np.array(img_res)).permute(2, 0, 1).float() / 255.0

        # Hybrid target
        C = build_hybrid_heatmap(a['image_path'], orig_w, orig_h, a['bbox'], a.get('curiosity_score', 0.0))
        C_14 = cv2.resize(C, (self.patch_size, self.patch_size), interpolation=cv2.INTER_LINEAR)
        target = torch.from_numpy(C_14.astype(np.float32))

        # Patch mask from bbox
        px1, py1, px2, py2 = self._bbox_to_patch(a['bbox'], orig_w, orig_h)
        mask = torch.zeros(self.patch_size, self.patch_size, dtype=torch.float32)
        mask[py1:py2+1, px1:px2+1] = 1.0

        text_inputs = self.processor.tokenizer(
            self.question_text, padding='max_length', truncation=True, max_length=32, return_tensors='pt')

        return {
            'image': img_t,
            'input_ids': text_inputs.input_ids.squeeze(0),
            'attention_mask': text_inputs.attention_mask.squeeze(0),
            'target_map': target,
            'patch_mask': mask,
            'image_path': a['image_path'],
            'bbox_original': a['bbox'],
            'question_type': a.get('question_type', 'why')
        }

print(' Datasets ready')


Datasets ready


In [None]:
# Model definitions

class BLIPCuriosityNet(nn.Module):
    def __init__(self, num_domains: int = 5, hidden_dim: int = 768, freeze_vision: bool = True):
        super().__init__()
        self.blip = BlipModel.from_pretrained('Salesforce/blip-image-captioning-base')
        self.vision = self.blip.vision_model
        if freeze_vision:
            for p in self.vision.parameters():
                p.requires_grad = False
        self.domain_embedding = nn.Embedding(num_domains, 128)
        self.domain_proj = nn.Linear(128, hidden_dim)
        # Match training name to load weights correctly
        self.curiosity_head = nn.Sequential(
            nn.LayerNorm(hidden_dim),
            nn.Linear(hidden_dim, 512),
            nn.GELU(),
            nn.Dropout(0.1),
            nn.Linear(512, 1)
        )

    def forward(self, images: torch.Tensor, domain_ids: torch.Tensor) -> torch.Tensor:
        out = self.vision(pixel_values=images)
        feats = out.last_hidden_state[:, 1:, :]  # [B,196,D]
        d = self.domain_proj(self.domain_embedding(domain_ids)).unsqueeze(1)
        feats = feats + d
        scores = self.curiosity_head(feats).squeeze(-1)  # [B,196]
        B = images.size(0)
        return scores.view(B, 14, 14)

class VQACuriosityNet(nn.Module):
    def __init__(self, hidden_dim: int = 768, patch_size: int = 14, freeze_vision: bool = True, freeze_text: bool = True):
        super().__init__()
        self.patch_size = patch_size
        self.blip = BlipModel.from_pretrained('Salesforce/blip-image-captioning-base')
        self.vision = self.blip.vision_model
        self.text = self.blip.text_model
        if freeze_vision:
            for p in self.vision.parameters():
                p.requires_grad = False
        if freeze_text:
            for p in self.text.parameters():
                p.requires_grad = False
        self.q_proj = nn.Linear(hidden_dim, hidden_dim)
        self.k_proj = nn.Linear(hidden_dim, hidden_dim)
        self.v_proj = nn.Linear(hidden_dim, hidden_dim)
        self.scale = hidden_dim ** 0.5
        # Match training names
        self.fusion_ln = nn.LayerNorm(hidden_dim)
        self.decoder = nn.Sequential(
            nn.LayerNorm(hidden_dim),
            nn.Linear(hidden_dim, hidden_dim),
            nn.GELU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_dim, 1)
        )

    def forward(self, images: torch.Tensor, input_ids: torch.Tensor, attention_mask: torch.Tensor) -> torch.Tensor:
        V = self.vision(pixel_values=images).last_hidden_state[:, 1:, :]  # [B,196,D]
        T = self.text(input_ids=input_ids, attention_mask=attention_mask).last_hidden_state  # [B,L,D]
        Q = self.q_proj(V); K = self.k_proj(T); Vt = self.v_proj(T)
        attn = torch.matmul(Q, K.transpose(1, 2)) / self.scale
        w = torch.softmax(attn, dim=-1)
        attn_txt = torch.matmul(w, Vt)
        fused = self.fusion_ln(V + attn_txt)
        scores = self.decoder(fused).squeeze(-1)  # [B,196]
        B = images.size(0)
        return scores.view(B, self.patch_size, self.patch_size)

print(' Models ready')


Models ready


In [6]:
# Build datasets and loaders (validation split only)
from sklearn.model_selection import train_test_split

all_annotations = load_all_annotations()
train_ann, val_ann = train_test_split(all_annotations, test_size=0.2, random_state=42)

processor = BlipProcessor.from_pretrained('Salesforce/blip-image-captioning-base')

val_blip = DatasetBLIP(val_ann, image_size=224, patch_size=14)
val_vqa  = DatasetVQA(val_ann, processor=processor, image_size=224, patch_size=14)

loader_blip = DataLoader(val_blip, batch_size=8, shuffle=False, num_workers=0)
loader_vqa  = DataLoader(val_vqa,  batch_size=6, shuffle=False, num_workers=0)

print('Val sizes -> BLIP:', len(val_blip), '| VQA:', len(val_vqa))


Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


Val sizes -> BLIP: 44 | VQA: 44


In [None]:
# Load weights
blip_ckpt = 'blip_curiosity_net_best.pth'
vqa_ckpt  = 'vqa_curiosity_best.pth'

model_blip = BLIPCuriosityNet().to(DEVICE)
if Path(blip_ckpt).exists():
    sd = torch.load(blip_ckpt, map_location=DEVICE)
    state = sd.get('model_state_dict', sd)
    model_blip.load_state_dict(state, strict=False)
    print(' Loaded BLIP weights')
else:
    print(' BLIP checkpoint not found:', blip_ckpt)

model_vqa = VQACuriosityNet().to(DEVICE)
if Path(vqa_ckpt).exists():
    sd = torch.load(vqa_ckpt, map_location=DEVICE)
    state = sd.get('model_state_dict', sd)
    model_vqa.load_state_dict(state, strict=False)
    print(' Loaded VQA weights')
else:
    print(' VQA checkpoint not found:', vqa_ckpt)


`BlipModel` is going to be deprecated in future release, please use `BlipForConditionalGeneration`, `BlipForQuestionAnswering` or `BlipForImageTextRetrieval` depending on your usecase.
Some weights of BlipModel were not initialized from the model checkpoint at Salesforce/blip-image-captioning-base and are newly initialized: ['logit_scale', 'text_model.embeddings.LayerNorm.bias', 'text_model.embeddings.LayerNorm.weight', 'text_model.embeddings.position_embeddings.weight', 'text_model.embeddings.word_embeddings.weight', 'text_model.encoder.layer.0.attention.output.LayerNorm.bias', 'text_model.encoder.layer.0.attention.output.LayerNorm.weight', 'text_model.encoder.layer.0.attention.output.dense.bias', 'text_model.encoder.layer.0.attention.output.dense.weight', 'text_model.encoder.layer.0.attention.self.key.bias', 'text_model.encoder.layer.0.attention.self.key.weight', 'text_model.encoder.layer.0.attention.self.query.bias', 'text_model.encoder.layer.0.attention.self.query.weight', 'text_mo

Loaded BLIP weights


`BlipModel` is going to be deprecated in future release, please use `BlipForConditionalGeneration`, `BlipForQuestionAnswering` or `BlipForImageTextRetrieval` depending on your usecase.
Some weights of BlipModel were not initialized from the model checkpoint at Salesforce/blip-image-captioning-base and are newly initialized: ['logit_scale', 'text_model.embeddings.LayerNorm.bias', 'text_model.embeddings.LayerNorm.weight', 'text_model.embeddings.position_embeddings.weight', 'text_model.embeddings.word_embeddings.weight', 'text_model.encoder.layer.0.attention.output.LayerNorm.bias', 'text_model.encoder.layer.0.attention.output.LayerNorm.weight', 'text_model.encoder.layer.0.attention.output.dense.bias', 'text_model.encoder.layer.0.attention.output.dense.weight', 'text_model.encoder.layer.0.attention.self.key.bias', 'text_model.encoder.layer.0.attention.self.key.weight', 'text_model.encoder.layer.0.attention.self.query.bias', 'text_model.encoder.layer.0.attention.self.query.weight', 'text_mo

Loaded VQA weights


In [8]:
# Inference helpers

def evaluate_loader_heatmaps(pred_maps: List[np.ndarray], tgt_maps: List[np.ndarray], ndcg_ks: List[int]):
    metrics = {
        'pearson': [], 'spearman': [], 'ssim': [], 'mse': [],
        **{f'ndcg@{k}': [] for k in ndcg_ks}
    }
    for pred, tgt in zip(pred_maps, tgt_maps):
        p = normalize01(pred); t = normalize01(tgt)
        metrics['pearson'].append(metric_pearson(p, t))
        metrics['spearman'].append(metric_spearman(p, t))
        metrics['ssim'].append(metric_ssim(p, t))
        metrics['mse'].append(metric_mse(p, t))
        for k in ndcg_ks:
            metrics[f'ndcg@{k}'].append(ndcg_at_k(p, t, k=k))
    return {k: float(np.mean(v)) for k, v in metrics.items()}, metrics


def run_inference_blip(model: nn.Module, loader: DataLoader, ndcg_ks: List[int]):
    model.eval()
    preds, tgts = [], []
    with torch.no_grad():
        for batch in tqdm(loader, desc='BLIP inference'):
            images = batch['image'].to(DEVICE)
            domain = batch['domain_id'].to(DEVICE)
            out = model(images, domain).cpu().numpy()
            tgt = batch['target_map'].numpy()
            preds.extend(list(out)); tgts.extend(list(tgt))
    return evaluate_loader_heatmaps(preds, tgts, ndcg_ks)


def run_inference_vqa(model: nn.Module, loader: DataLoader, ndcg_ks: List[int]):
    model.eval()
    preds, tgts = [], []
    with torch.no_grad():
        for batch in tqdm(loader, desc='VQA inference'):
            images = batch['image'].to(DEVICE)
            input_ids = batch['input_ids'].to(DEVICE)
            attention_mask = batch['attention_mask'].to(DEVICE)
            out = model(images, input_ids, attention_mask).cpu().numpy()
            tgt = batch['target_map'].numpy()
            preds.extend(list(out)); tgts.extend(list(tgt))
    return evaluate_loader_heatmaps(preds, tgts, ndcg_ks)

print('Inference helpers ready')


ference helpers ready


In [9]:
# Run inference and gather metrics

NDCG_KS = [5, 10, 20]

summary = {}

blip_mean, blip_all = run_inference_blip(model_blip, loader_blip, NDCG_KS)
summary['BLIP-CuriosityNet'] = blip_mean
print('BLIP mean metrics:', blip_mean)

vqa_mean, vqa_all = run_inference_vqa(model_vqa, loader_vqa, NDCG_KS)
summary['VQA-CuriosityNet'] = vqa_mean
print('VQA mean metrics:', vqa_mean)

# Save
outdir = Path('inference_outputs'); outdir.mkdir(exist_ok=True)
with open(outdir / 'metrics.json', 'w') as f:
    json.dump({'summary': summary, 'ndcg_ks': NDCG_KS, 'blip_all': blip_all, 'vqa_all': vqa_all}, f, indent=2)
print('Saved metrics to', outdir / 'metrics.json')


BLIP inference: 100%|██████████| 6/6 [00:15<00:00,  2.66s/it]


BLIP mean metrics: {'pearson': 0.4224326319315217, 'spearman': 0.37843533846782956, 'ssim': 0.33373851862700976, 'mse': 0.18075483969666742, 'ndcg@5': 0.53326053906122, 'ndcg@10': 0.5447952676519591, 'ndcg@20': 0.5602379274741165}


VQA inference: 100%|██████████| 8/8 [00:24<00:00,  3.08s/it]

VQA mean metrics: {'pearson': 0.43619992617856373, 'spearman': 0.532549357821294, 'ssim': 0.20362672629086506, 'mse': 0.22387673180889, 'ndcg@5': 0.40532851549812654, 'ndcg@10': 0.46368258540464624, 'ndcg@20': 0.53055352894218}
ved metrics to inference_outputs\metrics.json





In [10]:
# Visualize predicted overlays for specific images (both models)
from matplotlib import pyplot as plt
from matplotlib import cm

def save_overlay(img_np: np.ndarray, heatmap_up: np.ndarray, save_path: Path, alpha: float = 0.6):
    hm = normalize01(heatmap_up)
    hot = cm.get_cmap('hot')
    hm_rgb = (hot(hm)[..., :3] * 255).astype(np.uint8)
    blend = np.clip(alpha * hm_rgb + (1 - alpha) * img_np, 0, 255).astype(np.uint8)
    plt.imsave(str(save_path), blend)

names = ['img_061.png', 'img_024.png', 'img_060.png', 'img_166.png' , 'img_154.png']
out_vis = Path('inference_outputs/vis'); out_vis.mkdir(parents=True, exist_ok=True)

prompt = 'Which parts of the image are likely to provoke human curiosity?'

for name in names:
    # Find annotation entry (first match)
    ann = next((a for a in all_annotations if a['image_name'].lower() == name.lower()), None)
    if ann is None:
        print(f'⚠ Not found in annotations: {name}');
        continue

    img_path = ann['image_path']
    img = Image.open(img_path).convert('RGB')
    W, H = img.size
    img_np = np.array(img)

    # Prepare image tensor
    img_res = img.resize((224, 224), Image.BILINEAR)
    img_t = torch.from_numpy(np.array(img_res)).permute(2, 0, 1).float() / 255.0
    img_t = img_t.unsqueeze(0).to(DEVICE)

    # BLIP prediction
    with torch.no_grad():
        d = torch.tensor([ann.get('domain_id', 0)], dtype=torch.long, device=DEVICE)
        pred_blip = model_blip(img_t, d).cpu().numpy()[0]
    pred_blip_up = cv2.resize(pred_blip, (W, H), interpolation=cv2.INTER_LINEAR)
    save_overlay(img_np, pred_blip_up, out_vis / f'BLIP_{name.replace(".png", "")}_overlay.png')

    # VQA prediction
    with torch.no_grad():
        token = processor.tokenizer(prompt, padding='max_length', truncation=True, max_length=32, return_tensors='pt')
        inp_ids = token.input_ids.to(DEVICE)
        attn = token.attention_mask.to(DEVICE)
        pred_vqa = model_vqa(img_t, inp_ids, attn).cpu().numpy()[0]
    pred_vqa_up = cv2.resize(pred_vqa, (W, H), interpolation=cv2.INTER_LINEAR)
    save_overlay(img_np, pred_vqa_up, out_vis / f'VQA_{name.replace(".png", "")}_overlay.png')

    print(f'Saved overlays for {name} →', out_vis)


Saved overlays for img_061.png → inference_outputs\vis
Saved overlays for img_024.png → inference_outputs\vis
Saved overlays for img_060.png → inference_outputs\vis
Saved overlays for img_166.png → inference_outputs\vis
Saved overlays for img_154.png → inference_outputs\vis


In [11]:
# Save raw 14×14 heatmaps (BLIP & VQA) for selected images
from matplotlib import pyplot as plt

names = ['img_061.png', 'img_024.png', 'img_060.png', 'img_166.png' , 'img_154.png']
out14 = Path('inference_outputs/heatmaps14'); out14.mkdir(parents=True, exist_ok=True)

for name in names:
    ann = next((a for a in all_annotations if a['image_name'].lower() == name.lower()), None)
    if ann is None:
        print(f'⚠ Not found in annotations: {name}');
        continue

    # Prepare image tensor
    img = Image.open(ann['image_path']).convert('RGB')
    img_res = img.resize((224, 224), Image.BILINEAR)
    img_t = torch.from_numpy(np.array(img_res)).permute(2, 0, 1).float() / 255.0
    img_t = img_t.unsqueeze(0).to(DEVICE)

    # BLIP 14×14
    with torch.no_grad():
        d = torch.tensor([ann.get('domain_id', 0)], dtype=torch.long, device=DEVICE)
        blip_14 = model_blip(img_t, d).cpu().numpy()[0]
    np.save(out14 / f'BLIP_{name.replace(".png", "")}_14.npy', blip_14)
    np.savetxt(out14 / f'BLIP_{name.replace(".png", "")}_14.csv', blip_14, delimiter=',', fmt='%.6f')
    plt.imsave(out14 / f'BLIP_{name.replace(".png", "")}_14_hot.png', normalize01(blip_14), cmap='hot')

    # VQA 14×14
    with torch.no_grad():
        token = processor.tokenizer('Which parts of the image are likely to provoke human curiosity?',
                                    padding='max_length', truncation=True, max_length=32, return_tensors='pt')
        inp_ids = token.input_ids.to(DEVICE)
        attn = token.attention_mask.to(DEVICE)
        vqa_14 = model_vqa(img_t, inp_ids, attn).cpu().numpy()[0]
    np.save(out14 / f'VQA_{name.replace(".png", "")}_14.npy', vqa_14)
    np.savetxt(out14 / f'VQA_{name.replace(".png", "")}_14.csv', vqa_14, delimiter=',', fmt='%.6f')
    plt.imsave(out14 / f'VQA_{name.replace(".png", "")}_14_hot.png', normalize01(vqa_14), cmap='hot')

    print(f'Saved 14×14 arrays and images for {name} →', out14)


Saved 14×14 arrays and images for img_061.png → inference_outputs\heatmaps14
Saved 14×14 arrays and images for img_024.png → inference_outputs\heatmaps14
Saved 14×14 arrays and images for img_060.png → inference_outputs\heatmaps14
Saved 14×14 arrays and images for img_166.png → inference_outputs\heatmaps14
Saved 14×14 arrays and images for img_154.png → inference_outputs\heatmaps14
