```
pandas==2.2.2
torch==2.6.0+cu124
torchaudio==2.6.0+cu124
turchvision==0.21.0+cu124
pillow==11.2.1
transformers==4.53.2
numpy==2.0.2
tqdm==4.67.1
scikit-learn==1.6.1
peft==0.16.0
```
python==3.11.13

OS==Ubuntu 22.04.4 LTS

In [None]:
!unzip -qq scpc_data.zip

In [None]:

import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from transformers import CLIPProcessor, CLIPModel
import numpy as np
from tqdm.notebook import tqdm
import os
import logging
from sklearn.model_selection import train_test_split
import random
from peft import LoraConfig, TaskType, get_peft_model, prepare_model_for_kbit_training
from torch.amp import autocast, GradScaler
import warnings
warnings.filterwarnings('ignore')


In [None]:
seed = 41

random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
#torch.backends.cudnn.deterministic = True
#torch.backends.cudnn.benchmark = False

In [None]:
class Config:
    def __init__(self):
        self.model_name = "laion/CLIP-ViT-L-14-laion2B-s32B-b82K"
        self.processor_name = "laion/CLIP-ViT-L-14-laion2B-s32B-b82K"
        self.train_csv_path = 'train_combined.csv'
        self.test_csv_path = 'test.csv'
        self.sample_submission_path = 'sample_submission.csv'
        self.output_submission_path = 'submission_pluto.csv'
        self.image_base_path = './'
        self.batch_size = 64  # 배치 크기 증가
        self.num_epochs = 6
        self.learning_rate = 1e-4  # 학습률 감소
        self.weight_decay = 1.5e-2
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.num_choices = 4
        self.validation_split = 0.15
        self.patience = 3  # Early stopping
        self.save_best_model = True
        self.model_save_path = 'best_clip_vqa_model.pth'
        # LoRA 설정
        self.lora_r = 32  # rank
        self.lora_alpha = 64  # alpha
        self.lora_dropout = 0.1

        self.lora_target_modules = [
            "q_proj",
            "v_proj",
            #"k_proj",
            "out_proj",  # attention layers
           # "fc1", "fc2",  # feed forward layers
           # "visual_projection", #"text_projection"  # projection layers
        ]

In [None]:
config = Config()
print(f"Using device: {config.device}")
print(f"Model: {config.model_name}")
print(f"Batch size: {config.batch_size}")
print(f"Learning rate: {config.learning_rate}")

In [None]:
class VQADataset(Dataset):
    def __init__(self, df, processor, image_base_path, is_train=True):
        self.df = df.reset_index(drop=True)
        self.processor = processor
        self.image_base_path = image_base_path
        self.is_train = is_train
        self.num_choices = config.num_choices

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image_path = os.path.join(self.image_base_path, row["img_path"])
        question = row["Question"]
        choices = [row["A"], row["B"], row["C"], row["D"]]

        try:
            image = Image.open(image_path).convert("RGB")
        except Exception as e:
            #logger.error(f"Error loading image {image_path}: {e}")
            # 기본 이미지 생성 (검은색 이미지)
            print('error: Unable to open image')
            image = Image.new('RGB', (224, 224), color='black')

        # 질문과 각 선택지를 결합한 텍스트 생성
        texts = [f"Question: {question} Answer: {choice}" for choice in choices]

        # 이미지와 텍스트 인코딩
        try:
            # 이미지 인코딩
            image_inputs = self.processor(
                images=image,
                return_tensors="pt",
                do_rescale=True,
                do_normalize=True
            )

            # 텍스트 인코딩
            text_inputs = self.processor(
                text=texts,
                return_tensors="pt",
                padding=True,
                truncation=True,
                max_length=77
            )

            result = {
                'pixel_values': image_inputs.pixel_values.squeeze(0),
                'input_ids': text_inputs.input_ids,
                'attention_mask': text_inputs.attention_mask,
                'id': row["ID"]
            }

            if self.is_train:
                label_to_index = {'A': 0, 'B': 1, 'C': 2, 'D': 3}
                result['labels'] = torch.tensor(label_to_index[row["answer"]], dtype=torch.long)

            return result

        except Exception as e:

            raise e

In [None]:
def collate_fn(batch):
    """배치 데이터를 적절히 결합"""
    if not batch:
        return None

    # 배치 내 모든 텐서의 크기를 맞춤
    pixel_values = torch.stack([item['pixel_values'] for item in batch])

    # 텍스트 입력들의 최대 길이 찾기
    max_length = 0
    for item in batch:
        max_length = max(max_length, item['input_ids'].shape[1])

    # 패딩을 적용하여 모든 텍스트 입력의 길이를 맞춤
    padded_input_ids = []
    padded_attention_mask = []

    for item in batch:
        input_ids = item['input_ids']  # shape: (4, seq_len)
        attention_mask = item['attention_mask']  # shape: (4, seq_len)

        current_length = input_ids.shape[1]
        if current_length < max_length:
            # 패딩 적용
            pad_size = max_length - current_length
            input_ids = torch.nn.functional.pad(input_ids, (0, pad_size), value=0)
            attention_mask = torch.nn.functional.pad(attention_mask, (0, pad_size), value=0)

        padded_input_ids.append(input_ids)
        padded_attention_mask.append(attention_mask)

    # 배치 차원으로 결합
    all_input_ids = torch.cat(padded_input_ids, dim=0)  # shape: (batch_size * 4, max_length)
    all_attention_mask = torch.cat(padded_attention_mask, dim=0)  # shape: (batch_size * 4, max_length)

    ids = [item['id'] for item in batch]

    result = {
        'pixel_values': pixel_values,
        'input_ids': all_input_ids,
        'attention_mask': all_attention_mask,
        'ids': ids
    }

    if 'labels' in batch[0]:
        labels = torch.stack([item['labels'] for item in batch])
        result['labels'] = labels

    return result

In [None]:
class CLIPVQAModel(nn.Module):
    """CLIP 기반 VQA 모델"""
    def __init__(self, clip_model_name, num_choices=4, lora_config=None):
        super().__init__()
        self.clip_model = CLIPModel.from_pretrained(
            clip_model_name,
            use_safetensors=False,
            #load_in_4bit=True,  # 또는 load_in_8bit=True
            #device_map="auto",
            )
        self.num_choices = num_choices

        #self.clip_model = prepare_model_for_kbit_training(self.clip_model)

            # LoRA 적용
        self.clip_model = get_peft_model(self.clip_model, lora_config)

        print("LoRA 모델 정보:")
        self.clip_model.print_trainable_parameters()

    def forward(self, pixel_values, input_ids, attention_mask):
        batch_size = pixel_values.shape[0]

        # 이미지 특성 추출
        image_features = self.clip_model.get_image_features(pixel_values)

        # 텍스트 특성 추출
        text_features = self.clip_model.get_text_features(
            input_ids=input_ids,
            attention_mask=attention_mask
        )

        # 특성 정규화
        image_features = image_features / image_features.norm(p=2, dim=-1, keepdim=True)
        text_features = text_features / text_features.norm(p=2, dim=-1, keepdim=True)

        # 텍스트 특성을 (batch_size, num_choices, embedding_dim)로 재구성
        text_features = text_features.view(batch_size, self.num_choices, -1)

        # 유사도 계산
        # image_features: (batch_size, embedding_dim)
        # text_features: (batch_size, num_choices, embedding_dim)
        logits = torch.bmm(
            image_features.unsqueeze(1),
            text_features.transpose(1, 2)
        ).squeeze(1)

        # 온도 스케일링 적용
        logit_scale = self.clip_model.logit_scale.exp()
        logits = logit_scale * logits

        return logits

In [None]:
def create_lora_config(config):
    """LoRA 설정 생성"""
    lora_config = LoraConfig(
        r=config.lora_r,
        lora_alpha=config.lora_alpha,
        target_modules=config.lora_target_modules,
        lora_dropout=config.lora_dropout,
        bias="none",
        task_type=TaskType.FEATURE_EXTRACTION,
    )
    return lora_config

In [None]:
test_df = pd.read_csv(config.test_csv_path)

In [None]:
processor = CLIPProcessor.from_pretrained(config.processor_name)

In [None]:
test_dataset = VQADataset(test_df, processor, config.image_base_path, is_train=False)

In [None]:
test_loader = DataLoader(
        test_dataset,
        batch_size=config.batch_size,
        shuffle=False,
        collate_fn=collate_fn,
        num_workers=2
    )
lora_config = create_lora_config(config)
    # 모델 생성
model = CLIPVQAModel(config.model_name, config.num_choices,lora_config).to(config.device)

In [None]:
# 베스트 모델 로드
if config.save_best_model and os.path.exists(config.model_save_path):
        model.load_state_dict(torch.load(config.model_save_path))
        

    # 테스트 추론
print("\n" + "=" * 60)
print("Starting Inference...")
print("=" * 60)

model.eval()
predictions = []
ids = []

with torch.no_grad():
        for batch in tqdm(test_loader, desc="Testing"):
            if batch is None:
                continue

            pixel_values = batch['pixel_values'].to(config.device)
            input_ids = batch['input_ids'].to(config.device)
            attention_mask = batch['attention_mask'].to(config.device)

            logits = model(pixel_values, input_ids, attention_mask)
            pred_indices = logits.argmax(dim=1).cpu().numpy()

            label_map = {0: "A", 1: "B", 2: "C", 3: "D"}
            batch_predictions = [label_map[idx] for idx in pred_indices]

            predictions.extend(batch_predictions)
            ids.extend(batch['ids'])

    # 제출 파일 생성
submission_df = pd.DataFrame({'ID': ids, 'answer': predictions})
submission_df.to_csv(config.output_submission_path, index=False)

print(f"✅ Submission saved to {config.output_submission_path}")
