# VQA Lite - All-in-One Notebook

Questo notebook contiene tutte le sezioni principali del progetto VQA Lite: configurazione, utilità, modello, preparazione dataset (modalità leggera), training, valutazione e inferenza. È pensato per l’esecuzione locale su MacBook Pro M1 (8GB RAM).

Suggerimenti:
- Attiva un ambiente virtuale e installa i requisiti prima di eseguire.
- Mantieni batch piccoli e limiti nel prepare per non saturare la memoria.


In [None]:
# Installazione/Setup per Colab (auto)
import sys, subprocess, os, yaml, torch, random, numpy as np
from pathlib import Path

IN_COLAB = 'google.colab' in sys.modules
if IN_COLAB:
    pkgs = ['torch','torchvision','sentence-transformers','tqdm','PyYAML','Pillow','numpy']
    subprocess.check_call([sys.executable,'-m','pip','install','-q'] + pkgs)

print('Python:', os.popen('python -V').read().strip())
print('Torch:', torch.__version__)
print('CUDA available:', torch.cuda.is_available())
if hasattr(torch.backends,'mps'):
    print('MPS available:', torch.backends.mps.is_available(), 'built:', torch.backends.mps.is_built())


In [None]:
# Config (Colab-friendly): se mancano i file locali, definisce una config minima in-place
if Path('config.yaml').exists():
    with open('config.yaml','r') as f:
        cfg = yaml.safe_load(f)
else:
    cfg = {
        'device': 'auto',
        'model': {
            'question_dim': 384,
            'image_feature_dim': 256,
            'attention_hidden_dim': 128,
            'dropout': 0.3,
        },
        'answers': ["Sì","No","aeroplano","automobile","uccello","gatto","cervo","cane","rana","cavallo","nave","camion"],
        'categories': {'animale':[2,3,4,5,6,7],'veicolo':[0,1,8,9]},
        'training': {'batch_size': 16,'epochs': 3,'learning_rate': 1e-3,'weight_decay':1e-4,'val_split':0.1,'seed':42,'num_workers':0},
        'paths': {'model_save_path':'vqa_model_best.pth','train_dataset_path':'data/train_dataset.pkl','test_dataset_path':'data/test_dataset.pkl'},
        'embedding_model': 'all-MiniLM-L6-v2'
    }
cfg


In [None]:
# Google Drive (opzionale) - Salva/leggi dati e checkpoint da Drive
try:
    from google.colab import drive  # type: ignore
    IN_COLAB = True
except Exception:
    IN_COLAB = False

if IN_COLAB:
    import os
    from pathlib import Path
    drive.mount('/content/drive')
    base_dir = Path('/content/drive/MyDrive/VQA_lite')
    (base_dir / 'data').mkdir(parents=True, exist_ok=True)
    cfg['paths']['model_save_path'] = str(base_dir / 'vqa_model_best.pth')
    cfg['paths']['train_dataset_path'] = str(base_dir / 'data/train_dataset.pkl')
    cfg['paths']['test_dataset_path'] = str(base_dir / 'data/test_dataset.pkl')
    # Salva una copia della config aggiornata su Drive
    with open(base_dir / 'config_colab.yaml', 'w') as f:
        yaml.safe_dump(cfg, f)
    print('Drive montato. Base dir:', base_dir)
else:
    print('Colab non rilevato: salto mount Drive.')


In [None]:
# Utils (compatibili MPS/CPU/CUDA)
from torchvision import transforms

def get_device(cfg) -> str:
    if torch.cuda.is_available():
        return 'cuda'
    if hasattr(torch.backends,'mps') and torch.backends.mps.is_built() and torch.backends.mps.is_available():
        return 'mps'
    return 'cpu'


def set_seed(seed: int) -> None:
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)


def get_image_transform() -> transforms.Compose:
    return transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

DEVICE = get_device(cfg)
set_seed(cfg['training'].get('seed', 42))
DEVICE


In [None]:
# Modello (ripreso da src/vqa_model.py)
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models

class VQANet(nn.Module):
    def __init__(self, num_answers, question_dim, image_feature_dim, attention_hidden_dim, dropout: float = 0.3):
        super().__init__()
        backbone = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
        self.backbone = nn.Sequential(*list(backbone.children())[:-2])
        for name, param in self.backbone.named_parameters():
            if name.startswith("0") or name.startswith("1") or name.startswith("4"):
                param.requires_grad = False
        self.proj = nn.Conv2d(512, image_feature_dim, kernel_size=1)
        self.attention_conv = nn.Conv2d(image_feature_dim + question_dim, attention_hidden_dim, 1)
        self.attention_fc = nn.Conv2d(attention_hidden_dim, 1, 1)
        self.fc = nn.Sequential(
            nn.Linear(image_feature_dim + question_dim, attention_hidden_dim),
            nn.ReLU(),
            nn.Dropout(p=dropout),
            nn.Linear(attention_hidden_dim, num_answers)
        )

    def forward(self, image, question_emb, temperature: float = 1.0):
        x = self.backbone(image)
        img_features = self.proj(x)
        B, C, H, W = img_features.shape
        question_emb_expanded = question_emb.unsqueeze(-1).unsqueeze(-1).expand(B, -1, H, W)
        combined_features = torch.cat([img_features, question_emb_expanded], dim=1)
        attn_hidden = torch.tanh(self.attention_conv(combined_features))
        logits = self.attention_fc(attn_hidden).view(B, -1)
        logits = logits / max(temperature, 1e-6)
        attn_weights = F.softmax(logits, dim=1).view(B, 1, H, W)
        attended_img_vector = (attn_weights * img_features).sum(dim=[2, 3])
        final_combined = torch.cat([attended_img_vector, question_emb], dim=1)
        return self.fc(final_combined)


In [None]:
# Prepare Dataset (Colab-optimized, no images in PKL, compressed NPZ)
from sentence_transformers import SentenceTransformer
from torchvision.datasets import CIFAR10
from tqdm import tqdm
import numpy as np

DATA_DIR = 'data'
os.makedirs(DATA_DIR, exist_ok=True)

EMBEDDING_MODEL_NAME = cfg['embedding_model']
ANSWER_VOCAB = cfg['answers']
CATEGORIES = cfg['categories']
TRAIN_NPZ = 'data/train_dataset_light.npz'
TEST_NPZ = 'data/test_dataset_light.npz'

answer_to_idx = {a:i for i,a in enumerate(ANSWER_VOCAB)}
cifar10_classes = ["aeroplano","automobile","uccello","gatto","cervo","cane","rana","cavallo","nave","camion"]

# Build QA triples but store only CIFAR indices, not images

def create_indexed_dataset(dataset, description, max_items=2000):
    indices = []
    questions = []
    answers = []
    label_to_category = {}
    for cat, labels in CATEGORIES.items():
        for label in labels:
            label_to_category[label] = cat
    count = 0
    for idx, (_, label) in enumerate(tqdm(dataset, desc=f"Preparazione {description}")):
        true_class_name = cifar10_classes[label]
        true_category = label_to_category.get(label)
        if true_category:
            indices.append(idx)
            questions.append(f"C'è un {true_category}?")
            answers.append(answer_to_idx["Sì"])
        other_category = 'veicolo' if true_category == 'animale' else 'animale'
        indices.append(idx)
        questions.append(f"C'è un {other_category}?")
        answers.append(answer_to_idx["No"])
        indices.append(idx)
        questions.append("Che oggetto c'è nell'immagine?")
        answers.append(answer_to_idx[true_class_name])
        count += 1
        if max_items and count >= max_items:
            break
    return indices, questions, answers

print("🔄 Caricamento CIFAR-10 e modello embedding (CPU)...")
train_set = CIFAR10(root=DATA_DIR, train=True, download=True)
test_set = CIFAR10(root=DATA_DIR, train=False, download=True)
embedding_model = SentenceTransformer(EMBEDDING_MODEL_NAME, device='cpu')

# Limits tuned for Colab Free RAM
train_indices, train_questions, train_answers = create_indexed_dataset(train_set, "Train (light)", max_items=4000)
test_indices, test_questions, test_answers = create_indexed_dataset(test_set, "Test (light)", max_items=1000)

print("⚙️  Calcolo embedding (float16) in batch...")
train_emb = embedding_model.encode(train_questions, convert_to_numpy=True, show_progress_bar=True, batch_size=256, normalize_embeddings=False)
test_emb = embedding_model.encode(test_questions, convert_to_numpy=True, show_progress_bar=True, batch_size=256, normalize_embeddings=False)

train_emb = train_emb.astype(np.float16, copy=False)
test_emb = test_emb.astype(np.float16, copy=False)
train_indices = np.asarray(train_indices, dtype=np.int32)
test_indices = np.asarray(test_indices, dtype=np.int32)
train_answers = np.asarray(train_answers, dtype=np.int16)
test_answers = np.asarray(test_answers, dtype=np.int16)

print("💾 Salvataggio compresso (.npz)...")
np.savez_compressed(TRAIN_NPZ, indices=train_indices, emb=train_emb, y=train_answers)
np.savez_compressed(TEST_NPZ, indices=test_indices, emb=test_emb, y=test_answers)
print("✅ Dataset light salvato in NPZ compressi.")


In [None]:
# Training (light, Colab-optimized loading)
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import CIFAR10

class VQADatasetNPZ(torch.utils.data.Dataset):
    def __init__(self, npz_path: str, cifar_split: str = 'train', data_root: str = 'data'):
        data = np.load(npz_path)
        self.indices = data['indices']
        self.emb = data['emb']  # float16
        self.y = data['y']
        self.transform = get_image_transform()
        self.cifar = CIFAR10(root=data_root, train=(cifar_split=='train'), download=True)
    def __len__(self):
        return len(self.y)
    def __getitem__(self, i):
        img_idx = int(self.indices[i])
        img, _ = self.cifar[img_idx]
        image = self.transform(img).float()
        q = torch.from_numpy(self.emb[i].astype(np.float32, copy=False))
        y = int(self.y[i])
        return image, q, y

train_ds_full = VQADatasetNPZ('data/train_dataset_light.npz', cifar_split='train', data_root='data')
val_split = min(0.2, cfg['training'].get('val_split', 0.1))
val_size = max(1, int(len(train_ds_full)*val_split))
train_size = len(train_ds_full)-val_size
train_ds, val_ds = random_split(train_ds_full, [train_size, val_size])

bs = 64 if torch.cuda.is_available() else 8
train_loader = DataLoader(train_ds, batch_size=bs, shuffle=True, num_workers=2, pin_memory=torch.cuda.is_available())
val_loader = DataLoader(val_ds, batch_size=bs, shuffle=False, num_workers=2, pin_memory=torch.cuda.is_available())

m = cfg['model']
num_answers = len(cfg['answers'])
model = VQANet(num_answers, m['question_dim'], m['image_feature_dim'], m['attention_hidden_dim'], dropout=m.get('dropout',0.3)).to(DEVICE)

criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=cfg['training']['learning_rate'], weight_decay=cfg['training'].get('weight_decay',0.0))

use_autocast = torch.cuda.is_available() or (DEVICE=='mps')

def evaluate(model, loader):
    model.eval()
    total, correct = 0, 0
    with torch.no_grad():
        for images, q, y in loader:
            images, q, y = images.to(DEVICE), q.to(DEVICE), torch.tensor(y).to(DEVICE)
            if use_autocast and DEVICE!='cpu':
                with torch.autocast(device_type='cuda' if DEVICE=='cuda' else 'mps', dtype=torch.float16):
                    out = model(images, q)
            else:
                out = model(images, q)
            pred = out.argmax(1)
            total += y.size(0)
            correct += (pred==y).sum().item()
    return 100.0*correct/max(1,total)

best_acc = 0.0
for epoch in range(1):
    model.train()
    for images, q, y in train_loader:
        images, q, y = images.to(DEVICE), q.to(DEVICE), torch.tensor(y).to(DEVICE)
        optimizer.zero_grad()
        if use_autocast and DEVICE!='cpu':
            with torch.autocast(device_type='cuda' if DEVICE=='cuda' else 'mps', dtype=torch.float16):
                out = model(images, q)
                loss = criterion(out, y)
        else:
            out = model(images, q)
            loss = criterion(out, y)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
    acc = evaluate(model, val_loader)
    best_acc = max(best_acc, acc)
    print(f"Epoch 1/1 - val acc: {acc:.2f}%")

print(f"Best acc: {best_acc:.2f}%")


In [None]:
# Evaluate (light, NPZ-based)
from torch.utils.data import DataLoader

bs = 64 if torch.cuda.is_available() else 8
test_ds = VQADatasetNPZ('data/test_dataset_light.npz', cifar_split='test', data_root='data')
test_loader = DataLoader(test_ds, batch_size=bs, shuffle=False, num_workers=2, pin_memory=torch.cuda.is_available())

def eval_loader(model, loader):
    model.eval()
    total, correct = 0, 0
    with torch.no_grad():
        for images, q, y in loader:
            images, q, y = images.to(DEVICE), q.to(DEVICE), torch.tensor(y).to(DEVICE)
            out = model(images, q)
            pred = out.argmax(1)
            total += y.size(0)
            correct += (pred==y).sum().item()
    return 100.0*correct/max(1,total)

acc_test = eval_loader(model, test_loader)
print(f"Test acc (light): {acc_test:.2f}%")


In [None]:
# Normalizza nomi (rimuove suffisso V2 se presente)
if 'train_data_v2' in globals():
    train_data = train_data_v2
    del train_data_v2
if 'test_data_v2' in globals():
    test_data = test_data_v2
    del test_data_v2



In [None]:
# Inference (Colab: carica una tua immagine opzionale)
from PIL import Image

answers_vocab = cfg['answers']

# Puoi caricare un file da Colab: from google.colab import files; files.upload()
img_path = 'data/test_image.jpg'  # Cambia percorso se vuoi
question = "C'è un cane?"

tr = get_image_transform()
img = Image.open(img_path).convert('RGB')
img_t = tr(img).unsqueeze(0).to(DEVICE)

from sentence_transformers import SentenceTransformer
emb_model = SentenceTransformer(cfg['embedding_model'], device='cpu')
q = emb_model.encode(question, convert_to_tensor=True)
if q.dim()==1:
    q = q.unsqueeze(0)
q = q.to(DEVICE)

model.eval()
with torch.no_grad():
    out = model(img_t, q)
    pred = out.argmax(1).item()

print('Domanda:', question)
print('Risposta:', answers_vocab[pred])
