# Proving exp

* logit mean, max, min, prob


In [1]:
import sys 
sys.path.append('/data3/KJE/code/WIL_DeepLearningProject_2/VLM_Hallu')
import argparse
import os
import random
from typing import List, Union, Optional, Dict, Tuple
import gc
os.environ["CUDA_VISIBLE_DEVICES"] = "4"

import numpy as np
import pandas as pd

import torch
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score, roc_auc_score, precision_score, recall_score
from typing import Dict, Tuple, List

# import wandb

from transformers import AutoProcessor, LlavaForConditionalGeneration, set_seed  # noqa: F401

from src.model_zoo import get_model
from src.dataset_zoo import get_dataset
from src.misc import seed_all, _default_collate, save_scores
from src.old.probing_utils_copy import load_llava

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
DATASET = "VizWiz"
TRAIN_PATH = "/data3/KJE/code/WIL_DeepLearningProject_2/VLM_Hallu/data/preprocess/llava-1.5-7b-hf-vizwiz_train-llava_answers.csv"
VAL_PATH = "/data3/KJE/code/WIL_DeepLearningProject_2/VLM_Hallu/data/preprocess/llava-1.5-7b-hf-vizwiz_val-llava_answers.csv"
SPLIT = "train" # or val

NUM_WORKERS = 16
 
SEED = 1
seed_all(SEED)

In [3]:
from torch.utils.data import Dataset
# image path, question, 

class Vizwiz(Dataset):
    def __init__(
        self,
        data_path,
        start_idx=0,
        ):
        # image path / question / gold_answer / model_answer / label
        # label 0 -> 정답 (no hallucination) / label 1 -> 오답 (hallucination)  
        data_cv = pd.read_csv(data_path)
        
        self.image_paths = data_cv["image_path"].tolist()[start_idx:]
        self.questions = data_cv["question"].tolist()[start_idx:]
        self.gold_answers = data_cv["gold_answer"].tolist()[start_idx:]
        self.hallu_labels = data_cv["label"].tolist()[start_idx:]
    
    def __len__(self):
        return len(self.questions)
    
    def __getitem__(self, idx: int) :        
        image_path = self.image_paths[idx]
        question = self.questions[idx]
        gold_answer = self.gold_answers[idx]
        hallu_label = self.hallu_labels[idx]
        
        return idx, image_path, question, gold_answer, hallu_label
    
    
    
def viz_collate_fn(batch):
    idxs, images, questions, gold_answers, labels, image_paths = [], [], [], [], [], []
    
    for idx, image_path, question, gold_answer, hallu_label in batch:
        try:
            img = Image.open(image_path).convert("RGB")
        except Exception:
            img = Image.new("RGB", (image_size, image_size), (0, 0, 0))
        
        images.append(img)
        questions.append(question)
        gold_answers.append(gold_answer)
        labels.append(int(hallu_label))
        image_paths.append(image_path)
        idxs.append(idx)
        
    return (idxs, images, questions, gold_answers, labels, image_paths)

In [4]:
from transformers import LlavaForConditionalGeneration, Qwen2VLForConditionalGeneration, InstructBlipProcessor, InstructBlipForConditionalGeneration

def load_model(model_name):
    
    if torch.cuda.is_available():
        cap_major = torch.cuda.get_device_capability(0)[0]  # compute capability of gpu 0
        dtype = torch.bfloat16 if cap_major >= 8 else torch.float16
        device_map = "auto"
    else:
        dtype = torch.float32
        device_map = None
    
    if "llava" in model_name:
        model_id = "llava-hf/llava-1.5-7b-hf"
        processor = AutoProcessor.from_pretrained(
            model_id,
            trust_remote_code=False,
            cache_dir='/data3/hg_weight/hg_weight',
            use_fast=False
        )
        model = LlavaForConditionalGeneration.from_pretrained(
            model_id,
            torch_dtype=dtype,
            low_cpu_mem_usage=True,
            device_map=device_map,
            cache_dir='/data3/hg_weight/hg_weight',
        )    
        tok = processor.tokenizer
        tok.padding_side = "left"
        
    elif 'qwen' in model_name:
        model_id = "Qwen/Qwen2-VL-7B-Instruct"
        device_map = "cuda"
        processor = AutoProcessor.from_pretrained(
            model_id,
            trust_remote_code=False,
            cache_dir='/data3/hg_weight/hg_weight',
            use_fast=False
        )
        model = Qwen2VLForConditionalGeneration.from_pretrained(
            model_id,
            torch_dtype=dtype,
            low_cpu_mem_usage=True,
            device_map=device_map,
            cache_dir="/data3/hg_weight/hg_weight",
        )
        tok = processor.tokenizer
        tok.padding_side = "left"
        if tok.pad_token_id is None and tok.eos_token_id is not None:
            tok.pad_token = tok.eos_token       
        model.generation_config.pad_token_id = tok.pad_token_id
        model.generation_config.do_sample = False
        # model.generation_config.top_p = 1

        
    elif 'instructblip' in model_name:
        model_id = 'Salesforce/instructblip-vicuna-7b'
        processor = InstructBlipProcessor.from_pretrained(
            model_id,
            trust_remote_code=False,
            cache_dir='/data3/hg_weight/hg_weight',
            use_fast=False
        )
        model = InstructBlipForConditionalGeneration.from_pretrained(
            model_id,
            torch_dtype=dtype,
            low_cpu_mem_usage=True,
            device_map=device_map,
            cache_dir='/data3/hg_weight/hg_weight'
        )
        tok = processor.tokenizer
        tok.padding_side = "left"
        
    else:
        print("The model should be one of the following: llava1.5-7b, qwen2-vl-7b, instructblip")
        return None
    
    return model, processor, tok

In [5]:
def load_dataset(split, batch_size, start_idx=0):
    data_path = TRAIN_PATH if split == 'train' else VAL_PATH
    dataset = Vizwiz(data_path, start_idx=start_idx)

    joint_loader = DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=NUM_WORKERS, collate_fn=viz_collate_fn)

    return joint_loader

In [6]:
# load prompt
def build_prompt(tokenizer, question, model_type) -> str:
    if ("llava" in model_type):
        content = [{"type": "image"}, {"type": "text", "text": question+'Answer in one word.'}]
        if hasattr(tokenizer, "apply_chat_template"):
            messages = [{"role": "user", "content": content}]
            try:
                prompt = tokenizer.apply_chat_template(
                    messages,
                    add_generation_prompt=True,
                    tokenize=False
                )
                return prompt
            except Exception:
                pass

        return "<image>\n" + question.strip() + "\n"
    elif "qwen" in model_type:
        content = [{"type": "image"}, {"type": "text", "text": question+'Answer in one word.'}]
        if hasattr(tokenizer, "apply_chat_template"):
            # print("########## Qwen tokenizer has chat_template attr")
            messages = [{"role": "user", "content": content}]
            try:
                prompt = tokenizer.apply_chat_template(
                    messages,
                    add_generation_prompt=True,
                    tokenize=False
                )
                return prompt
            except Exception:
                pass

        return "<image>\n" + question.strip() + "\n"
    

    elif "instructblip" in model_type:
        prompt = question + "Answer in one word."
        return prompt

In [7]:
import math
from PIL import Image

def fix_tiny_image(img, base=28, round_to_multiple=True):
    # img: PIL.Image
    if not isinstance(img, Image.Image):
        img = Image.fromarray(img).convert("RGB")
    else:
        img = img.convert("RGB")

    w, h = img.size
    
    if min(w, h) < base:
        scale = math.ceil(base / min(w, h))
        w, h = w * scale, h * scale
        img = img.resize((w, h), Image.BICUBIC)

    
    if round_to_multiple:
        new_w = int(math.ceil(img.width  / base) * base)
        new_h = int(math.ceil(img.height / base) * base)
        if (new_w, new_h) != img.size:
            img = img.resize((new_w, new_h), Image.BICUBIC)

    return img


def ensure_images_ok(images):
    fixed = []
    for im in images:
        if isinstance(im, (str, bytes)):  # 경로일 경우
            im = Image.open(im).convert("RGB")
        fixed.append(fix_tiny_image(im, base=28, round_to_multiple=True))
    return fixed

In [None]:
# generation -> logit mean, max, min, prob 저장
import json
from tqdm import tqdm
from PIL import Image
from datetime import datetime
import torch.nn.functional as F

IS_TEST = False
OUTPUT_ROOT = "/data3/KJE/code/WIL_DeepLearningProject_2/VLM_Hallu/output"

def run_generation_w_logits(model, proc, tok, joint_loader, model_type, split):
    device = model.device
    # dtype  = torch.float16 if torch.cuda.is_available() else torch.float32

    cur_time = datetime.now().strftime("%m%d_%H%M")
    save_dir  = os.path.join(OUTPUT_ROOT, f"{model_type}_{DATASET}")
    os.makedirs(save_dir, exist_ok=True)
    save_path = os.path.join(save_dir, f"logits_{split}_{cur_time}.json")

    all_results = []
    batch_cnt = 0

    with torch.no_grad():
        for batch in tqdm(joint_loader):
            outputs=[]
            logit_means, logit_mins, logit_maxs, log_probs = [], [], [], []
            
            idxs, images, questions, gold_answers, labels, image_paths = batch
            prompts = [build_prompt(tok, q, model_type) for q in questions]
            images = ensure_images_ok(images)
            inputs = proc(
                images=images,
                text=prompts,
                padding=True,
                return_tensors="pt"
            ).to(device)
            
            # input_ids = inputs["input_ids"].to(device)
            # attention_mask = inputs["attention_mask"].to(device)
            # pixel_values = inputs["pixel_values"].to(device=device)
            
            outputs = model.generate(
                **inputs,
                use_cache=True,  
                max_new_tokens=5,  
                return_dict_in_generate=True,
                output_scores=True,   
            )
            
            sequences = outputs.sequences
            scores = outputs.scores
            logits_0  = scores[0]    
            logprobs_0 = F.log_softmax(logits_0, dim=-1)   # (B, V)

            batch_size = sequences.size(0)
            
            start_pos = sequences.size(1) - len(scores)
            first_token_ids = sequences[:, start_pos]
            
            logit_means = logits_0.mean(dim=-1)            # (B,)
            logit_mins  = logits_0.min(dim=-1).values      # (B,)
            logit_maxs  = logits_0.max(dim=-1).values      # (B,)
            
            gather_ids       = first_token_ids.unsqueeze(1)             # (B,1)
            first_logprobs   = torch.gather(logprobs_0, 1, gather_ids).squeeze(1)  # (B,)
            
            model_answers = []
            
            for i in range(batch_size):  
                gen_ids = sequences[i, start_pos:] 
                text = tok.decode(gen_ids, skip_special_tokens=True)
                model_answers.append(text)
                
            if IS_TEST:
                print(model.generation_config)
                i = 3
                tid = sequences[:, start_pos][i].item()        # 선택된 첫 토큰 id
                lp  = logprobs_0[i, tid].item()                # 해당 토큰 logprob
                first_decoded = tok.decode(tid, skip_special_tokens=True)
                print("stored logprob:", first_logprobs[i].item(), "| recomputed:", lp)

                print("mean/min/max:",
                    logits_0[i].mean().item(),
                    logits_0[i].min().item(),
                    logits_0[i].max().item())
                
                
                print("saved mean/min/max:",
                    float(logit_means[i].item()),
                    float(logit_mins[i].item()),
                    float(logit_maxs[i].item())
                )

                print("decoded:", model_answers[i])
                print("first_token:", first_decoded, "first token id:", tid)
            
                
            for i in range(batch_size):
                rec = {
                    "idx":             int(idxs[i]),
                    "question":        questions[i],
                    "image_path":      image_paths[i],
                    "gold_answer":     gold_answers[i],
                    "model_answer":    model_answers[i],
                    "hallu_label":     labels[i],
                    "logit_mean":      float(logit_means[i].item()),
                    "logit_min":       float(logit_mins[i].item()),
                    "logit_max":       float(logit_maxs[i].item()),
                    "logprob":         float(first_logprobs[i].item()),
                }
                all_results.append(rec)
            
            with open(save_path, "w", encoding="utf-8") as f:
                json.dump(all_results, f, ensure_ascii=False, indent=4)
                
            batch_cnt += 1
            if IS_TEST: 
                if batch_cnt == 1: 
                    break
            
        print(f"Saved {len(all_results)} records to {save_path}")
        

In [9]:
# run generation llava - val
# model_type = "llava1.5"
# cur_batch = 5
# cur_split = "val"

# model, proc, tok = load_model(model_type)
# joint_loader = load_dataset(cur_split, cur_batch)

# run_generation_w_logits(model, proc, tok, joint_loader, model_type, cur_split)

In [10]:
# # run generation qwen - val
# model_type = "instructblip"
# cur_batch = 15
# cur_split = "train"

# model, proc, tok = load_model(model_type)
# joint_loader = load_dataset(cur_split, cur_batch)

# run_generation_w_logits(model, proc, tok, joint_loader, model_type, cur_split)

In [None]:
# run generation instructblip - val
model_type = "qwen2"
cur_batch = 5
cur_split = "train"
start_idx = 11270

model, proc, tok = load_model(model_type)
joint_loader = load_dataset(cur_split, cur_batch, start_idx)

run_generation_w_logits(model, proc, tok, joint_loader, model_type, cur_split)

Loading checkpoint shards: 100%|██████████| 5/5 [00:14<00:00,  2.87s/it]
100%|██████████| 745/745 [3:13:38<00:00, 15.59s/it]  

Saved 3721 records to /data3/KJE/code/WIL_DeepLearningProject_2/VLM_Hallu/output/qwen2_VizWiz/logits_train_0918_1358.json





: 

In [None]:
# generation and scoring
from sklearn.linear_model import SGDClassifier

SGD_TRAIN_PATH = "/data3/KJE/code/WIL_DeepLearningProject_2/VLM_Hallu/output/llava1.5_VizWiz/logits_0916_2251.json"

with open(SGD_TRAIN_PATH, "r", encoding="utf-8") as f:
    train_data = json.load(f)
val_data = pd.read_csv(SGD_VAL_PATH)

df_train = pd.DataFrame(train_data)
df_eval  = pd.DataFrame(eval_data)

# 사용할 단일 특징 목록
FEATURES = ["logit_mean", "logit_min", "logit_max", "logprob"]
TARGET   = "hallu_label"

y_train_raw = df_train[TARGET].values
if np.issubdtype(df_train[TARGET].dtype, np.number):
    le = None
    y_train = y_train_raw.astype(int)
else:
    le = LabelEncoder()
    y_train = le.fit_transform(y_train_raw)

# 평가용 라벨 (동일 인코더 적용)
if le is None:
    y_eval = df_eval[TARGET].astype(int).values
else:
    # 평가셋 라벨에 학습셋에 없던 클래스가 있으면 예외 발생하므로 주의
    y_eval = le.transform(df_eval[TARGET].values)