In [1]:
!pip install git+https://github.com/huggingface/transformers accelerate qwen-vl-utils bitsandbytes

Collecting git+https://github.com/huggingface/transformers
  Cloning https://github.com/huggingface/transformers to /tmp/pip-req-build-uwb929k5
  Running command git clone --filter=blob:none --quiet https://github.com/huggingface/transformers /tmp/pip-req-build-uwb929k5
  Resolved https://github.com/huggingface/transformers to commit e2122c4bcb74d942bb93c11dcb55aafc4c7fdf23
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting qwen-vl-utils
  Downloading qwen_vl_utils-0.0.14-py3-none-any.whl.metadata (9.0 kB)
Collecting bitsandbytes
  Downloading bitsandbytes-0.48.1-py3-none-manylinux_2_24_x86_64.whl.metadata (10 kB)
Collecting huggingface-hub==1.0.0.rc5 (from transformers==5.0.0.dev0)
  Downloading huggingface_hub-1.0.0rc5-py3-none-any.whl.metadata (14 kB)
Collecting tokenizers<=0.23.0,>=0.22.0 (from transformers==5.0.0.dev0)
  Downloading tokenizers-0.22.1-c

In [2]:
pip install -U scikit-learn

Collecting scikit-learn
  Downloading scikit_learn-1.7.2-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (11 kB)
Downloading scikit_learn-1.7.2-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (9.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.7/9.7 MB[0m [31m66.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: scikit-learn
  Attempting uninstall: scikit-learn
    Found existing installation: scikit-learn 1.2.2
    Uninstalling scikit-learn-1.2.2:
      Successfully uninstalled scikit-learn-1.2.2
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
category-encoders 2.7.0 requires scikit-learn<1.6.0,>=1.0.0, but you have scikit-learn 1.7.2 which is incompatible.
cesium 0.12.4 requires numpy<3.0,>=2.0, but you have numpy 1.26.4 which is incompatible.
sklearn-compat 0.1.3 r

In [4]:
# ================== CONFIG ==================
INPUT_DIR     = "/kaggle/input/qwen-csv/Qwen_test"  # cartella che contiene i .csv
Y_TRUE_COL    = "gt_label"                            # cambia se serve
Y_PRED_COL    = "pred_label"                          # cambia se serve
OUTPUT_DIR    = "/kaggle/working/qwen_test"                     # cartella di output
FILE_PATTERN  = "*.csv"                               # pattern file da processare
# ============================================

import os
import glob
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import (
    accuracy_score,
    precision_recall_fscore_support,
    confusion_matrix,
    ConfusionMatrixDisplay,
)

os.makedirs(OUTPUT_DIR, exist_ok=True)

def _norm(s: pd.Series) -> pd.Series:
    """Trim + uppercase stringhe; lascia int/float inalterati."""
    if s.dtype == object:
        return s.astype(str).str.strip().replace({"": np.nan}).str.upper()
    return s

def evaluate_one(csv_path: str, out_dir: str, y_true_col: str, y_pred_col: str):
    df = pd.read_csv(csv_path)

    if y_true_col not in df.columns or y_pred_col not in df.columns:
        raise ValueError(
            f"Colonne non trovate in {os.path.basename(csv_path)}. "
            f"Disponibili: {list(df.columns)}"
        )

    y_true = _norm(df[y_true_col])
    y_pred = _norm(df[y_pred_col])

    # rimuovi righe con NaN
    mask = y_true.notna() & y_pred.notna()
    dropped = int((~mask).sum())
    if dropped > 0:
        print(f"[{os.path.basename(csv_path)}] Righe scartate per NaN: {dropped}")
    y_true = y_true[mask]
    y_pred = y_pred[mask]

    # etichette
    labels = sorted(pd.Index(y_true.unique()).union(y_pred.unique()).tolist())

    # metriche globali
    acc = accuracy_score(y_true, y_pred)
    p_micro, r_micro, f1_micro, _ = precision_recall_fscore_support(y_true, y_pred, average="micro", zero_division=0)
    p_macro, r_macro, f1_macro, _ = precision_recall_fscore_support(y_true, y_pred, average="macro", zero_division=0)
    p_weighted, r_weighted, f1_weighted, _ = precision_recall_fscore_support(y_true, y_pred, average="weighted", zero_division=0)

    # metriche per classe
    p_c, r_c, f1_c, sup_c = precision_recall_fscore_support(
        y_true, y_pred, labels=labels, average=None, zero_division=0
    )

    # confusion matrix
    cm = confusion_matrix(y_true, y_pred, labels=labels)

    # costruisci unico CSV (long)
    rows = [
        {"scope":"summary","class":"","metric":"accuracy","value":acc},
        {"scope":"summary","class":"","metric":"precision_micro","value":p_micro},
        {"scope":"summary","class":"","metric":"recall_micro","value":r_micro},
        {"scope":"summary","class":"","metric":"f1_micro","value":f1_micro},
        {"scope":"summary","class":"","metric":"precision_macro","value":p_macro},
        {"scope":"summary","class":"","metric":"recall_macro","value":r_macro},
        {"scope":"summary","class":"","metric":"f1_macro","value":f1_macro},
        {"scope":"summary","class":"","metric":"precision_weighted","value":p_weighted},
        {"scope":"summary","class":"","metric":"recall_weighted","value":r_weighted},
        {"scope":"summary","class":"","metric":"f1_weighted","value":f1_weighted},
    ]
    for c, p, r, f1, sup in zip(labels, p_c, r_c, f1_c, sup_c):
        rows += [
            {"scope":"per-class","class":str(c),"metric":"precision","value":p},
            {"scope":"per-class","class":str(c),"metric":"recall","value":r},
            {"scope":"per-class","class":str(c),"metric":"f1","value":f1},
            {"scope":"per-class","class":str(c),"metric":"support","value":sup},
        ]
    metrics_df = pd.DataFrame(rows)

    # nomi coerenti con l'input
    stem = os.path.splitext(os.path.basename(csv_path))[0]
    metrics_csv_path = os.path.join(out_dir, f"{stem}_metrics.csv")
    cm_png_path      = os.path.join(out_dir, f"{stem}_confusion_matrix.png")

    metrics_df.to_csv(metrics_csv_path, index=False)

    # salva immagine CM
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    disp.plot(values_format="d")
    plt.title("Confusion Matrix")
    
    # usa la figura restituita dal display
    disp.figure_.tight_layout()
    disp.figure_.savefig(cm_png_path, dpi=200)
    plt.close(disp.figure_)

    print(f"[OK] {stem}: metrics -> {metrics_csv_path}")
    print(f"[OK] {stem}: confusion matrix -> {cm_png_path}")

# ---- LOOP sui CSV ----
csv_files = sorted(glob.glob(os.path.join(INPUT_DIR, FILE_PATTERN)))
if not csv_files:
    print(f"Nessun file trovato in {INPUT_DIR} con pattern {FILE_PATTERN}")
else:
    print(f"Trovati {len(csv_files)} file CSV. Avvio calcolo metriche...")
    for csvf in csv_files:
        try:
            evaluate_one(csvf, OUTPUT_DIR, Y_TRUE_COL, Y_PRED_COL)
        except Exception as e:
            print(f"[ERRORE] {os.path.basename(csvf)}: {e}")
    print("Completato.")


Trovati 9 file CSV. Avvio calcolo metriche...
[ERRORE] dynamic_checkpoint_qwen.csv: At least one label specified must be in y_true
[ERRORE] dynamic_checkpoint_qwen_blurred.csv: At least one label specified must be in y_true
[ERRORE] dynamic_checkpoint_qwen_shuffled.csv: At least one label specified must be in y_true
[ERRORE] dynamic_new_dataset_Qwen.csv: At least one label specified must be in y_true
[ERRORE] dynamic_test_Qwen_finetuned_2.csv: At least one label specified must be in y_true
[ERRORE] static_0_checkpoint_qwen.csv: At least one label specified must be in y_true
[ERRORE] static_0_checkpoint_qwen_blurred.csv: At least one label specified must be in y_true
[ERRORE] static_0_new_dataset_Qwen.csv: At least one label specified must be in y_true
[ERRORE] static_test_Qwen_finetuned_2.csv: At least one label specified must be in y_true
Completato.


In [None]:
!zip -r /kaggle/working/results_idefics.zip /kaggle/working/idefics_test


In [11]:
import os
from datasets import load_from_disk
from PIL import Image

# Percorso dataset
dataset_path = "/kaggle/input/new-correct-dataset-2/kaggle/working/new_correct_dataset_2"

# Cartelle di output
output_dir = "/kaggle/working/test_images_2"
fake_dir = os.path.join(output_dir, "fake_images")
real_dir = os.path.join(output_dir, "real_images")

os.makedirs(fake_dir, exist_ok=True)
os.makedirs(real_dir, exist_ok=True)

# Numero massimo di immagini da salvare per classe
max_per_class = 4

# Carica il dataset
dataset = load_from_disk(dataset_path)

# Contatori
fake_count = 0
real_count = 0

for sample in dataset:
    label = sample['label']
    img_path = sample["image_k"]  # assuming image_ssh contiene il path dell'immagine

    # Salva solo se non abbiamo superato il massimo
    if label == 1 and fake_count < max_per_class:
        print("Sono qui fake")
        image = Image.open(img_path).convert("RGB")
        save_path = os.path.join(fake_dir, f"fake_{fake_count+1}.png")
        image.save(save_path)
        fake_count += 1

    elif label == 0 and real_count < max_per_class:
        print("Sono qui real")
        image = Image.open(img_path).convert("RGB")
        save_path = os.path.join(real_dir, f"real_{real_count+1}.png")
        image.save(save_path)
        real_count += 1

    # Termina se abbiamo raccolto tutte le immagini necessarie
    if fake_count >= max_per_class and real_count >= max_per_class:
        break

print(f"Salvate {fake_count} immagini FAKE in {fake_dir}")
print(f"Salvate {real_count} immagini REAL in {real_dir}")
print(f"Tutte le immagini salvate in: {output_dir}")


Sono qui real
Sono qui fake
Sono qui fake
Sono qui real
Sono qui real
Sono qui real
Sono qui fake
Sono qui fake
Salvate 4 immagini FAKE in /kaggle/working/test_images_2/fake_images
Salvate 4 immagini REAL in /kaggle/working/test_images_2/real_images
Tutte le immagini salvate in: /kaggle/working/test_images_2


In [18]:
!zip -r /kaggle/working/attn_insights_2.zip /kaggle/working/attn_insights


  adding: kaggle/working/attn_insights/ (stored 0%)
  adding: kaggle/working/attn_insights/sample703_dynamic_attn_tokens.csv (deflated 56%)
  adding: kaggle/working/attn_insights/sample98_dynamic_attn_prompt.png (deflated 14%)
  adding: kaggle/working/attn_insights/sample25_dynamic_attn_alltokens.png (deflated 16%)
  adding: kaggle/working/attn_insights/sample703_dynamic_attn_prompt.png (deflated 14%)
  adding: kaggle/working/attn_insights/sample931_dynamic_attn_alltokens.png

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


 (deflated 17%)
  adding: kaggle/working/attn_insights/sample324_dynamic_attn_alltokens.png (deflated 16%)
  adding: kaggle/working/attn_insights/sample347_dynamic_attn_prompt.png (deflated 14%)
  adding: kaggle/working/attn_insights/sample790_dynamic_attn_tokens.csv (deflated 55%)
  adding: kaggle/working/attn_insights/sample703_dynamic_attn_alltokens.png (deflated 16%)
  adding: kaggle/working/attn_insights/sample931_dynamic_attn_tokens.csv (deflated 55%)
  adding: kaggle/working/attn_insights/sample931_dynamic_attn_prompt.png (deflated 15%)
  adding: kaggle/working/attn_insights/sample25_dynamic_attn_prompt.png (deflated 14%)
  adding: kaggle/working/attn_insights/sample98_dynamic_attn_tokens.csv (deflated 58%)
  adding: kaggle/working/attn_insights/sample25_dynamic_attn_tokens.csv (deflated 57%)
  adding: kaggle/working/attn_insights/sample790_dynamic_attn_alltokens.png (deflated 18%)
  adding: kaggle/working/attn_insights/sample790_dynamic_attn_prompt.png (deflated 14%)
  adding: 

In [21]:
from datasets import Dataset

# Percorso del file salvato
# json_path = "/kaggle/input/final-test-set/final_dataset.json"
dataset_path = '/kaggle/input/blurred-dataset/blurred_dataset'

# Carica il dataset Hugging Face
# hf_dataset = Dataset.from_json(json_path)
hf_dataset = Dataset.load_from_disk(dataset_path)

# Controlla un esempio
print(hf_dataset)


Dataset({
    features: ['img_id', 'image', 'width', 'height', 'label', 'x_t', 'image_k', 'image_k_modified'],
    num_rows: 1101
})


In [22]:
print(hf_dataset['image_k'][0])

/kaggle/input/images-test/images_test/02a5f39ca584e0d6.jpg


In [5]:
from collections import Counter

counter = Counter(example["label"] for example in hf_dataset)
print(counter)


Counter({0: 555, 1: 546})


In [4]:
from transformers import pipeline

# Carica il classificatore zero-shot
classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")

# Etichette da prevedere
import re

LABELS = ["real", "fake"]

def extract_label_from_response(response: str) -> int:
    result = classifier(response, LABELS)
    return LABELS.index(result["labels"][0])

config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/1.63G [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

Device set to use cuda:0


In [5]:
!pip install -U peft


Collecting peft
  Downloading peft-0.17.1-py3-none-any.whl.metadata (14 kB)
Downloading peft-0.17.1-py3-none-any.whl (504 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m504.9/504.9 kB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25hInstalling collected packages: peft
  Attempting uninstall: peft
    Found existing installation: peft 0.15.2
    Uninstalling peft-0.15.2:
      Successfully uninstalled peft-0.15.2
Successfully installed peft-0.17.1


In [10]:
dynamic_prompt_2 = f"You are an expert in image forensics. First, carefully analyze the following description of the image, focusing on details that might reveal whether it is generated or real:."
dynamic_prompt_3 = "\nAfter your analysis, state your finale judgment: 'REAL' or 'FAKE'."
                                #"After your analysis, state your finale judgment: 'REAL' or 'FAKE'."
print(dynamic_prompt_2+dynamic_prompt_3)

You are an expert in image forensics. First, carefully analyze the following description of the image, focusing on details that might reveal whether it is generated or real:.
After your analysis, state your finale judgment: 'REAL' or 'FAKE'.


In [13]:
from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor, BitsAndBytesConfig, GenerationConfig, LogitsProcessorList, MinLengthLogitsProcessor
from PIL import Image
import torch
import os

os.environ["CUDA_LAUNCH_BLOCKING"] = "1"


FEW_SHOT_EXAMPLES = [
    # Real examples (fotografie autentiche)
    {
        "context": "A street photo taken at midday with perfectly natural shadows and realistic reflections on wet pavement.",
        "label": "[Real]",
        "motivation": "Consistent lighting and natural shadow falloff; reflections sono coerenti con le superfici."
    },
    {
        "context": "An outdoor portrait under golden hour light, showing morbidi gradienti di colore nel cielo e dettagli realistici nella pelle.",
        "label": "[Real]",
        "motivation": "Transizioni tonali naturali e texture dettagliate senza artefatti di sintesi."
    },
    # Fake examples (AI-generated)
    {
        "context": "A foggy forest scene with a faint glow halo around certain trees and pixel-level repeating patterns on the foliage.",
        "label": "[Fake]",
        "motivation": "The halo suggests blending artifacts; repeating patches betray AI-generation."
    },
    {
        "context": "An indoor scene where the textures on the walls appear overly smooth and uniform, and shadows lack realistic variation.",
        "label": "[Fake]",
        "motivation": "Uniform texture and flat shadows are typical AI generation artifacts."
    }
]

STATIC_PROMPTS = [
    "Is the image real or fake? Answer with 'REAL' or 'FAKE'.",
    "Analyze and classify: [Real] / [Fake].",
    "Based on lighting, texture, and edges, decide: [Real] / [Fake]."
]

MODEL = "Qwen/Qwen2.5-VL-3B-Instruct"

# --- mette QUESTO BLOCCO PRIMA di class QwenVLTester(...) ---

import torch
import numpy as np
import matplotlib.pyplot as plt
import os
from typing import Dict, List, Tuple

ATTN_OUTDIR = "/kaggle/working/attn_insights"
os.makedirs(ATTN_OUTDIR, exist_ok=True)

def _tokens_from_ids(tokenizer, input_ids: List[int]) -> List[str]:
    toks = tokenizer.convert_ids_to_tokens(input_ids)
    return [t.replace("Ġ"," ").replace("▁"," ") for t in toks]

def _mask_prompt_tokens(tokenizer, toks: List[str]) -> List[bool]:
    specials = set(tokenizer.all_special_tokens)
    keep = []
    for t in toks:
        t_stripped = t.strip()
        if not t_stripped or t_stripped in specials:
            keep.append(False); continue
        if "<image>" in t_stripped or "<img>" in t_stripped or "<vision>" in t_stripped:
            keep.append(False); continue
        keep.append(True)
    return keep

def _aggregate_attention(attentions: List[torch.Tensor], layer_reduce: str = "mean") -> torch.Tensor:
    """
    attentions: lista di per-layer (batch, n_heads, seq, seq) oppure None.
    Ritorna (batch, seq, seq) mediando sulle heads e combinando i layer non-None.
    """
    valid_layers = [a for a in attentions if a is not None]
    if len(valid_layers) == 0:
        raise ValueError("Nessun layer di attention disponibile (tutti None). "
                         "Assicurati che output_attentions=True e use_cache=False nel forward.")
    # media su heads layer-per-layer
    per_layer = [a.mean(dim=1) for a in valid_layers]  # (batch, seq, seq)
    if layer_reduce == "mean":
        A = torch.stack(per_layer, dim=0).mean(dim=0)  # media su layer validi
    elif layer_reduce == "sum":
        A = torch.stack(per_layer, dim=0).sum(dim=0)   # somma su layer validi
    else:
        A = per_layer[-1]  # ultimo layer valido
    return A  # (batch, seq, seq)


def _normalize(v: np.ndarray) -> np.ndarray:
    v = np.asarray(v, dtype=float); s = v.sum()
    return v / s if s > 0 else v

class AttentionInspectorMixin:
    def forward_with_attn(self, image_path: str, prompt: str):
        img = Image.open(image_path).convert("RGB").resize((224, 224), Image.BILINEAR)
        chat = self.processor.apply_chat_template(
            [{"role": "user", "content": [{"type": "image", "image": None}, {"type": "text", "text": prompt}]}],
            tokenize=False, add_generation_prompt=True
        )
        inputs = self.processor(text=[chat], images=[img], padding=True, return_tensors="pt").to(self.model.device)
        with torch.no_grad():
            out = self.model(**inputs, output_attentions=True, return_dict=True)
        return inputs, out

    def analyze_prompt_attention(self, image_path: str, prompt: str, file_stem: str,
                                 layer_reduce: str = "mean", save_with_image_tokens: bool = False) -> Dict[str, str]:
        inputs, out = self.forward_with_attn(image_path, prompt)
        assert out.attentions is not None, "output_attentions non fornito dal modello."
        input_ids = inputs["input_ids"][0].tolist()
        toks = _tokens_from_ids(self.processor.tokenizer, input_ids)
        keep_mask = _mask_prompt_tokens(self.processor.tokenizer, toks)
        A = _aggregate_attention(out.attentions, layer_reduce=layer_reduce)[0].float().cpu().numpy()
        q_idx = inputs["input_ids"].shape[-1] - 1
        attn_to_all = _normalize(A[q_idx])

        idx_prompt = [i for i, k in enumerate(keep_mask) if k]
        toks_prompt = [toks[i] for i in idx_prompt]
        scores_prompt = attn_to_all[idx_prompt]
        order = np.argsort(scores_prompt)[::-1]
        toks_sorted = [toks_prompt[i] for i in order]
        idx_sorted  = [idx_prompt[i]  for i in order]
        scr_sorted  = [float(scores_prompt[i]) for i in order]
        cum_sorted  = list(np.cumsum(scr_sorted))

        # CSV ranking
        csv_path = os.path.join(ATTN_OUTDIR, f"{file_stem}_attn_tokens.csv")
        import csv
        with open(csv_path, "w", newline="", encoding="utf-8") as f:
            w = csv.writer(f)
            w.writerow(["rank","token","input_index","score","cum_score"])
            for r,(tk,ii,ss,cc) in enumerate(zip(toks_sorted, idx_sorted, scr_sorted, cum_sorted), start=1):
                w.writerow([r, tk, ii, f"{ss:.6f}", f"{cc:.6f}"])

        # Heatmap prompt
        fig = plt.figure(figsize=(max(8, len(toks_prompt)*0.25), 2.5))
        plt.imshow(scores_prompt[np.newaxis, :], aspect="auto")
        plt.yticks([0], ["attention score"])
        plt.xticks(range(len(toks_prompt)), toks_prompt, rotation=80, ha="right")
        plt.title("Attention to prompt tokens (last prompt token as query)")
        plt.colorbar(); plt.tight_layout()
        png_prompt = os.path.join(ATTN_OUTDIR, f"{file_stem}_attn_prompt.png")
        fig.savefig(png_prompt, dpi=220); plt.close(fig)

        png_full = ""
        if save_with_image_tokens:
            fig2 = plt.figure(figsize=(max(8, len(toks)*0.22), 2.8))
            plt.imshow(attn_to_all[np.newaxis, :], aspect="auto")
            plt.yticks([0], ["attention score"])
            plt.xticks(range(len(toks)), toks, rotation=80, ha="right")
            plt.title("Attention to ALL input tokens (incl. image markers)")
            plt.colorbar(); plt.tight_layout()
            png_full = os.path.join(ATTN_OUTDIR, f"{file_stem}_attn_alltokens.png")
            fig2.savefig(png_full, dpi=220); plt.close(fig2)

        return {"csv": csv_path, "png_prompt": png_prompt, "png_alltokens": png_full}


class QwenVLTester(AttentionInspectorMixin):
    def __init__(self):
        super().__init__()
        self.model_id = "/kaggle/input/finetuned-model-2/finetuned_model"
        # Configurazione per 4-bit quantization
        bnb_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_use_double_quant=True,
            bnb_4bit_quant_type="nf4",
            bnb_4bit_compute_dtype=torch.float16
        )

        # Caricamento modello e processor
        #self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
            #self.model_id,
            #quantization_config=bnb_config,
            #device_map="auto"
        #)

        from peft import PeftModel

        # Carica il modello base con attenzione "eager"
        base_model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
            MODEL,
            quantization_config=bnb_config,
            device_map="auto",
            attn_implementation="eager",      # <--- importante
        )
        
        # Applica i pesi LoRA fine-tunati
        self.model = PeftModel.from_pretrained(base_model, self.model_id)

        self.model.config.output_attentions = True
        self.model.eval()


        self.processor = AutoProcessor.from_pretrained(
            MODEL,
            image_processor_kwargs={"size": (224, 224)}
        )

        self.gen_config = GenerationConfig(
            do_sample=True,
            temperature=0.7,
            top_p=0.9,
            max_new_tokens=100
        )


    def build_prompt(self, prompt_type: str = "dynamic", x_t: str = "") -> str:
        # Costruisce un prompt few-shot con esempi rinforzati
        few_shot = "".join([
            f"Example:\nContext: {ex['context']}\nAnswer: {ex['label']} Motivation: {ex['motivation']}\n\n"
            for ex in FEW_SHOT_EXAMPLES
        ])

        if prompt_type == "dynamic" and x_t:
            # Context dinamico fornito dall'utente
            dynamic_prompt = (
                "You are an expert in image forensics. "
                "First, carefully analyze the following description of the image, focusing on details that might reveal whether it is generated or real:\n"
                "[TECH_START]\n"
                f"{x_t.strip()}\n"
                "[TECH_END]\n"
                "After your analysis, state your final judgment: 'REAL' or 'FAKE'."
            )
            return dynamic_prompt
        else:
            # Prompt statico, con uno dei template
            template = STATIC_PROMPTS[int(prompt_type.split('_')[-1]) if '_' in prompt_type else 0]
            return (
                f"Question: {template}\nAnswer:"  # obbliga scelta esplicita
            )

    def generate(self, image_path: str, prompt: str) -> str:
        # 1) Carica immagine
        img = Image.open(image_path).convert("RGB").resize((224, 224), Image.BILINEAR)

        # 2) Applica template chat
        chat = self.processor.apply_chat_template(
            [{"role": "user", "content": [{"type": "image", "image": None}, {"type": "text", "text": prompt}]}],
            tokenize=False,
            add_generation_prompt=True
        )

        # 3) Tokenizzazione + immagini
        inputs = self.processor(
            text=[chat],
            images=[img],
            padding=True,
            return_tensors="pt"
        ).to(self.model.device)

        # 4) Generazione con sampling leggero e top-p
        out = self.model.generate(
            **inputs,
            generation_config=self.gen_config,
            return_dict_in_generate=True,
            use_cache=False,
            output_scores=True
        )

        # 5) Decodifica
        seq = out.sequences[:, inputs["input_ids"].shape[-1]:]
        answer = self.processor.batch_decode(
            seq,
            skip_special_tokens=True,
            clean_up_tokenization_spaces=True
        )
        return answer[0].strip()

    def extract_label(self, response: str) -> int:
        # Estrae la label numerica a partire dalla risposta testuale
        return extract_label_from_response(response)

    def test(self, example: dict, prompt_type: str = "dynamic"):
        # Interfaccia principale per testare un esempio
        torch.cuda.empty_cache()
        prompt = self.build_prompt(prompt_type=prompt_type, x_t=example.get("x_t", ""))
        response = self.generate(example["image_k"], prompt)
        label = self.extract_label(response)
        return label, response

# Funzione wrapper
_tester_instance = QwenVLTester()

def test_qwen_vl(example, prompt_type="dynamic"):
    return _tester_instance.test(example, prompt_type)


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [14]:
# ================== CONFIG ==================
WINDOW_LAST_N = 30
MAX_SAMPLES   = 2     # quanti esempi analizzare (a piacere)
OUT_DIR       = "/kaggle/working/attn_insights_new_prompt"
# ============================================

import os
import math
import csv
import numpy as np
import pandas as pd
os.makedirs(OUT_DIR, exist_ok=True)

def _char_to_token_idx_via_greedy(toks: list[str], char_pos: int) -> int:
    """
    Grezza ma efficace: costruisce cumulata delle lunghezze dei token grezzi
    (così come convert_ids_to_tokens li restituisce) e trova il primo token
    la cui cumulata supera char_pos.
    """
    cum = []
    s = ""
    for t in toks:
        s += t
        cum.append(len(s))
    for i, c in enumerate(cum):
        if c >= char_pos:
            return i
    return len(toks) - 1

def _segment_token_span_from_prompt_text(prompt: str, toks_all: list[str]) -> tuple[int, int] | None:
    # 1) prova con i nuovi tag
    start_char = prompt.find("[TECH_START]")
    end_char   = prompt.find("[TECH_END]")
    if start_char != -1 and end_char != -1 and end_char > start_char:
        # il contenuto tecnico è tra i tag, escludendo i tag stessi
        start_char = start_char + len("[TECH_START]")
        # end_char rimane l'inizio di [TECH_END]
        start_tok = _char_to_token_idx_via_greedy(toks_all, start_char)
        end_tok   = _char_to_token_idx_via_greedy(toks_all, end_char)
        return (start_tok, max(start_tok, end_tok-1))

    # 2) fallback legacy: vecchi marcatori (se mai tornassero)
    start_str = "Image Description and Technical Analysis:"
    end_str   = "Question:"
    start_char = prompt.find(start_str)
    end_char   = prompt.find(end_str)
    if start_char != -1 and end_char != -1 and end_char > start_char:
        start_tok = _char_to_token_idx_via_greedy(toks_all, start_char + len(start_str))
        end_tok   = _char_to_token_idx_via_greedy(toks_all, end_char)
        return (start_tok, max(start_tok, end_tok-1))

    # 3) fallback minimal: prova a trovare direttamente x_t (se lo passi alla funzione o lo hai disponibile)
    return None

def _attention_share_last_n(scores_prompt: np.ndarray, n_last: int = 30) -> dict:
    n = len(scores_prompt)
    n_tail = min(n_last, n)
    mass_last_n = float(scores_prompt[-n_tail:].sum()) if n > 0 else float("nan")
    return {"mass_last_n": mass_last_n, "n_prompt_tokens": n, "n_tail": n_tail}

def _attention_share_technical(attn_to_all: np.ndarray,
                               idx_prompt: list[int],
                               tech_span_all_tokens: tuple[int, int] | None) -> float | float:
    """
    Calcola la quota di attenzione riservata ai token del SEGMENTO TECNICO,
    limitandosi ai soli token del PROMPT (esclude special/immagine).
    - attn_to_all: vettore attenzione normalizzato (len = seq_len totale)
    - idx_prompt: indici dei token di prompt “tenuti” (solo testo prompt)
    - tech_span_all_tokens: (start,end) sugli indici dell'intera sequenza (prima del filtro)
    """
    if tech_span_all_tokens is None:
        return float("nan")
    start_all, end_all = tech_span_all_tokens
    end_all = max(start_all, end_all)
    # prendi gli indici di prompt che cadono nel range tecnico in spazi token globali
    tech_prompt_positions = [j for j in idx_prompt if start_all <= j <= end_all]
    if len(tech_prompt_positions) == 0:
        return float("nan")
    return float(attn_to_all[tech_prompt_positions].sum())

def analyze_subset_with_technical(dataset,
                                  tester,
                                  idxs,
                                  prompt_mode="dynamic",
                                  window_last_n: int = 30) -> pd.DataFrame:
    """
    Per ciascun esempio:
      - calcola mass_last_n sugli ultimi N token
      - se dynamic: calcola mass_technical sul blocco x_t
      - ritorna DataFrame riga-per-esempio
    """
    rows = []
    for i in idxs:
        ex = dataset[i]
        prompt = tester.build_prompt(prompt_type=prompt_mode, x_t=ex.get("x_t",""))
        # Forward solo per attention (no generazione)
        inputs, out = tester.forward_with_attn(ex["image_k"], prompt)

        input_ids = inputs["input_ids"][0].tolist()
        toks_all  = _tokens_from_ids(tester.processor.tokenizer, input_ids)
        keep_mask = _mask_prompt_tokens(tester.processor.tokenizer, toks_all)

        # aggrega heads/layers e prendi attn dell'ultimo token del prompt (come query)
        A = _aggregate_attention(out.attentions, "mean")[0].float().cpu().numpy()
        q_idx = inputs["input_ids"].shape[-1] - 1
        attn_to_all = _normalize(A[q_idx])

        idx_prompt = [j for j, k in enumerate(keep_mask) if k]
        scores_prompt = attn_to_all[idx_prompt]

        # quota ultimi N token
        share_tail = _attention_share_last_n(scores_prompt, n_last=window_last_n)

        # quota blocco tecnico (solo per dynamic)
        if prompt_mode.startswith("dynamic"):
            tech_span = _segment_token_span_from_prompt_text(prompt, toks_all)
            mass_technical = _attention_share_technical(attn_to_all, idx_prompt, tech_span)
        else:
            mass_technical = float("nan")

        rows.append({
            "idx": i,
            "prompt_mode": prompt_mode,
            "mass_last_n": share_tail["mass_last_n"],
            "n_prompt_tokens": share_tail["n_prompt_tokens"],
            "n_tail": share_tail["n_tail"],
            "mass_technical": mass_technical
        })
    return pd.DataFrame(rows)

# ----- ESECUZIONE: dynamic + static -----
N = min(MAX_SAMPLES, len(hf_dataset))
idxs = list(range(N))

df_dyn = analyze_subset_with_technical(hf_dataset, _tester_instance, idxs, prompt_mode="dynamic",  window_last_n=WINDOW_LAST_N)
df_sta = analyze_subset_with_technical(hf_dataset, _tester_instance, idxs, prompt_mode="static_0", window_last_n=WINDOW_LAST_N)

df_all = pd.concat([df_dyn, df_sta], ignore_index=True)

# salva dettagli per esempio
per_example_csv = os.path.join(OUT_DIR, "attention_shares_with_technical.csv")
df_all.to_csv(per_example_csv, index=False)
print("[OK] Per-example ->", per_example_csv)

# riepilogo per modalità
summary = (df_all.groupby("prompt_mode")[["mass_last_n","mass_technical"]]
                  .agg(["mean","std","median","min","max"]))
summary_csv = os.path.join(OUT_DIR, "attention_shares_with_technical_summary.csv")
summary.to_csv(summary_csv)
print("[OK] Summary ->", summary_csv)


[OK] Per-example -> /kaggle/working/attn_insights_new_prompt/attention_shares_with_technical.csv
[OK] Summary -> /kaggle/working/attn_insights_new_prompt/attention_shares_with_technical_summary.csv


  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()


Unnamed: 0_level_0,mass_last_n,mass_last_n,mass_last_n,mass_last_n,mass_last_n,mass_technical,mass_technical,mass_technical,mass_technical,mass_technical
Unnamed: 0_level_1,mean,std,median,min,max,mean,std,median,min,max
prompt_mode,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2
dynamic,0.44441,0.002185,0.44441,0.442865,0.445955,0.108772,0.002588,0.108772,0.106942,0.110602
static_0,0.563659,0.002646,0.563659,0.561788,0.56553,,,,,


In [15]:

import os
from typing import List, Dict

def generate_heatmaps_for_indices(
    dataset,
    tester,
    indices,
    prompt_mode="dynamic",
    out_dir="/kaggle/working/attn_insights",
    save_with_image_tokens=True
):
    import os
    os.makedirs(out_dir, exist_ok=True)

    # cast a int Python e (facoltativo) filtra out-of-range
    indices = [int(i) for i in indices if 0 <= int(i) < len(dataset)]

    results = []
    for idx in indices:
        ex = dataset[idx]  # ora è int Python
        prompt = tester.build_prompt(prompt_type=prompt_mode, x_t=ex.get("x_t", ""))
        stem = f"sample{idx}_{prompt_mode}"
        paths = tester.analyze_prompt_attention(
            image_path=ex["image_k"],
            prompt=prompt,
            file_stem=stem,
            layer_reduce="mean",
            save_with_image_tokens=save_with_image_tokens
        )
        paths.update({"idx": idx, "prompt_mode": prompt_mode})
        results.append(paths)
        print(f"[OK] {stem} -> {paths['png_prompt']} | {paths['csv']}" + (f" | {paths['png_alltokens']}" if paths['png_alltokens'] else ""))
    return results



In [16]:
import pandas as pd
import numpy as np

def pick_examples_by_outcome(
    preds_csv: str,
    k_per_bin: int = 2,
    seed: int = 0,
    idx_col: str | None = "idx",          # se None o non presente, usa df.index
    gt_col: str = "gt_label",
    pred_col: str = "pred_label"
):
    """
    Restituisce indici per TP/TN/FP/FN.
    Se idx_col non esiste, usa l'indice del DataFrame (df.index) come idx.
    """
    rng = np.random.default_rng(seed)
    df = pd.read_csv(preds_csv)

    # determina colonna idx
    if idx_col is None or idx_col not in df.columns:
        df["__idx__"] = df.index
        idx_col = "__idx__"

    # normalizza stringhe
    def _norm(v):
        if isinstance(v, str): return v.strip().upper()
        return v

    if gt_col not in df.columns or pred_col not in df.columns:
        raise ValueError(f"Mancano colonne {gt_col}/{pred_col} nel CSV. Colonne disponibili: {list(df.columns)}")

    df["gt_n"]   = df[gt_col].map(_norm)
    df["pred_n"] = df[pred_col].map(_norm)

    # mappa a 0/1 (gestisce stringhe/booleani)
    def _to01(v):
        if v in ("REAL", 1, "1", "TRUE", True):  return 1
        if v in ("FAKE", 0, "0", "FALSE", False): return 0
        return np.nan

    df["gt01"]   = df["gt_n"].map(_to01)
    df["pred01"] = df["pred_n"].map(_to01)
    df = df.dropna(subset=["gt01","pred01"])

    df["ok"] = (df["gt01"] == df["pred01"]).astype(int)

    TP = df[(df["gt01"]==1) & (df["pred01"]==1)]
    TN = df[(df["gt01"]==0) & (df["pred01"]==0)]
    FP = df[(df["gt01"]==0) & (df["pred01"]==1)]
    FN = df[(df["gt01"]==1) & (df["pred01"]==0)]

    def pick(dfbin):
        if len(dfbin) == 0: return []
        idxs = dfbin[idx_col].tolist()
        if len(idxs) <= k_per_bin: return idxs
        return list(rng.choice(idxs, size=k_per_bin, replace=False))

    return {"TP": pick(TP), "TN": pick(TN), "FP": pick(FP), "FN": pick(FN)}


In [17]:
bins = pick_examples_by_outcome(
    "/kaggle/input/dynamic-finetune/dynamic_test_Qwen_finetuned_2.csv",
    k_per_bin=1,
    idx_col=None,             # <-- usa l'indice del CSV come idx
    gt_col="gt_label",
    pred_col="pred_label"
)
for label, idxs in bins.items():
    print(label, idxs)
    generate_heatmaps_for_indices(hf_dataset, _tester_instance, idxs, prompt_mode="dynamic", save_with_image_tokens=True)


TP [937]
[OK] sample937_dynamic -> /kaggle/working/attn_insights/sample937_dynamic_attn_prompt.png | /kaggle/working/attn_insights/sample937_dynamic_attn_tokens.csv | /kaggle/working/attn_insights/sample937_dynamic_attn_alltokens.png
TN [661]
[OK] sample661_dynamic -> /kaggle/working/attn_insights/sample661_dynamic_attn_prompt.png | /kaggle/working/attn_insights/sample661_dynamic_attn_tokens.csv | /kaggle/working/attn_insights/sample661_dynamic_attn_alltokens.png
FP [539]
[OK] sample539_dynamic -> /kaggle/working/attn_insights/sample539_dynamic_attn_prompt.png | /kaggle/working/attn_insights/sample539_dynamic_attn_tokens.csv | /kaggle/working/attn_insights/sample539_dynamic_attn_alltokens.png
FN [170]
[OK] sample170_dynamic -> /kaggle/working/attn_insights/sample170_dynamic_attn_prompt.png | /kaggle/working/attn_insights/sample170_dynamic_attn_tokens.csv | /kaggle/working/attn_insights/sample170_dynamic_attn_alltokens.png


In [None]:
from datasets import load_from_disk
from tqdm import tqdm
import pandas as pd

DATASET_PATH = "/kaggle/input/test-dataset-kaggle/test_dataset_kaggle"
OUTPUT_PATH = "./results.csv"

def run_test():
    #dataset = load_from_disk(DATASET_PATH)
    for i in range(10):
        #print("Dataset structure:\n", hf_dataset)
        example = hf_dataset[i]
        print("Ground_Truth:\n", example['label'])
        #print("Image:\n", example['image'])
        #print("Prompt:\n", example['x_t'])
        label, response = test_qwen_vl(example, "static_0")
        print("Output label:\n", label)
        print("Output response:\n", response)

#run_test()


In [None]:
import os
import pandas as pd
from tqdm.auto import tqdm
from datasets import load_from_disk

# DATASET_PATH   = "/kaggle/input/test-dataset-kaggle/test_dataset_kaggle"
CHECKPOINT_CSV = "/kaggle/working/dynamic_test_Qwen_finetuned_2.csv"
PROMPT_TYPE    = "dynamic"
CHECKPOINT_EVERY = 30  # salva ogni 30 campioni

# 1) Carica dataset
#dataset = load_from_disk(DATASET_PATH)

# 2) Se esiste già un CSV di checkpoint, riloadalo e salta i campioni già processati
if os.path.exists(CHECKPOINT_CSV):
    df = pd.read_csv(CHECKPOINT_CSV)
    processed_ids = set(df["img_id"])
else:
    df = pd.DataFrame(columns=["img_id","gt_label","pred_label","response"])
    processed_ids = set()

# 3) Loop con tqdm e checkpoint ogni N campioni
for sample in tqdm(hf_dataset, desc=f"Eval {PROMPT_TYPE}", dynamic_ncols=True):
    img_id = sample["img_id"]
    if img_id in processed_ids:
        continue

    try:
        pred_label, resp = test_qwen_vl(sample, prompt_type=PROMPT_TYPE)
    except Exception as e:
        print("Errore nella generazione")
        pred_label, resp = -1, f"[ERROR] {e}"

    # prepara la nuova riga
    row = {
        "img_id":     img_id,
        "gt_label":   sample["label"],
        "pred_label": pred_label,
        "response":   resp
    }
    # concatena in un colpo solo
    df = pd.concat([df, pd.DataFrame([row])], ignore_index=True)
    processed_ids.add(img_id)

    # salva checkpoint
    if len(processed_ids) % CHECKPOINT_EVERY == 0:
        df.to_csv(CHECKPOINT_CSV, index=False)

# 4) Alla fine salva il CSV definitivo
df.to_csv(CHECKPOINT_CSV, index=False)
print("✅ Checkpoint salvato in:", CHECKPOINT_CSV)

# 5) Calcolo accuracy su quelli validi
valid = df["pred_label"] != -1
acc = (df.loc[valid, "gt_label"] == df.loc[valid, "pred_label"]).mean()
print(f"Accuracy {PROMPT_TYPE}: {acc:.4f}")
