## Import

In [1]:
import os
import pandas as pd

import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import timm

import torchvision.models as models # 이미지
from torchvision import transforms
from PIL import Image

from transformers import GPT2Tokenizer, GPT2Model # 텍스트

from tqdm.auto import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import timm
model_names = timm.list_models()
print(model_names)

['bat_resnext26ts', 'beit_base_patch16_224', 'beit_base_patch16_384', 'beit_large_patch16_224', 'beit_large_patch16_384', 'beit_large_patch16_512', 'beitv2_base_patch16_224', 'beitv2_large_patch16_224', 'botnet26t_256', 'botnet50ts_256', 'caformer_b36', 'caformer_m36', 'caformer_s18', 'caformer_s36', 'cait_m36_384', 'cait_m48_448', 'cait_s24_224', 'cait_s24_384', 'cait_s36_384', 'cait_xs24_384', 'cait_xxs24_224', 'cait_xxs24_384', 'cait_xxs36_224', 'cait_xxs36_384', 'coat_lite_medium', 'coat_lite_medium_384', 'coat_lite_mini', 'coat_lite_small', 'coat_lite_tiny', 'coat_mini', 'coat_small', 'coat_tiny', 'coatnet_0_224', 'coatnet_0_rw_224', 'coatnet_1_224', 'coatnet_1_rw_224', 'coatnet_2_224', 'coatnet_2_rw_224', 'coatnet_3_224', 'coatnet_3_rw_224', 'coatnet_4_224', 'coatnet_5_224', 'coatnet_bn_0_rw_224', 'coatnet_nano_cc_224', 'coatnet_nano_rw_224', 'coatnet_pico_rw_224', 'coatnet_rmlp_0_rw_224', 'coatnet_rmlp_1_rw2_224', 'coatnet_rmlp_1_rw_224', 'coatnet_rmlp_2_rw_224', 'coatnet_rmlp_2

## Dataset

In [3]:
class VQADataset(Dataset):
    def __init__(self, df, tokenizer, transform, img_path, is_test=False):
        self.df = df
        self.tokenizer = tokenizer
        self.transform = transform
        self.img_path = img_path
        self.is_test = is_test

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]

        img_name = os.path.join(self.img_path, row['image_id'] + '.jpg') # 이미지
        image = Image.open(img_name).convert('RGB')
        image = self.transform(image)

        question = row['question'] # 질문
        question = self.tokenizer.encode_plus(
            question,
            truncation=True,
            add_special_tokens=True,
            max_length=32,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
        )

        if not self.is_test:
            answer = row['answer'] # 답변
            answer = self.tokenizer.encode_plus(
                answer,
                max_length=32,
                padding='max_length',
                truncation=True,
                return_tensors='pt')
            return {
                'image': image.squeeze(),
                'question': question['input_ids'].squeeze(),
                'answer': answer['input_ids'].squeeze()
            }
        else:
            return {
                'image': image,
                'question': question['input_ids'].squeeze(),
            }

## Model

In [4]:
class VQAGenerator(nn.Module):
    def __init__(self, vocab_size):
        super(VQAGenerator, self).__init__()
        self.vocab_size = vocab_size

        self.convnextv2_large = timm.create_model('convnextv2_large', pretrained=True)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))  # 추가된 부분
        self.gpt2 = GPT2Model.from_pretrained('gpt2')
        self.gpt2.resize_token_embeddings(vocab_size)

        self.embed = nn.Embedding(vocab_size, self.gpt2.config.hidden_size)
        self.position_embed = nn.Embedding(32, self.gpt2.config.hidden_size)  # 가정: 최대 문장 길이가 32

        combined_features_size = self.convnextv2_large.num_features + self.gpt2.config.hidden_size
        self.lm_head = nn.Linear(combined_features_size, vocab_size, bias=False)

    def forward(self, images, inputs):
        image_features = self.convnextv2_large.forward_features(images)
        image_features = self.avgpool(image_features)  # 추가된 부분
        image_features = image_features.view(image_features.size(0), -1)

        position_ids = torch.arange(inputs.size(1), dtype=torch.long, device=inputs.device)
        position_ids = position_ids.unsqueeze(0).expand(inputs.size())

        inputs_embeds = self.embed(inputs)
        position_embeds = self.position_embed(position_ids)

        hidden_states = inputs_embeds + position_embeds

        outputs = self.gpt2(inputs_embeds=hidden_states, return_dict=True)
        output_features = outputs.last_hidden_state

        image_features = image_features.unsqueeze(1).expand(-1, output_features.size(1), -1)

        combined = torch.cat([image_features, output_features], dim=-1)
        logits = self.lm_head(combined)
        return logits


## DataLoader

In [5]:
# 데이터 불러오기
train_df = pd.read_csv(r'C:\multi\open (1)\train.csv')
test_df = pd.read_csv(r'C:\multi\open (1)\test.csv')
sample_submission = pd.read_csv(r'open (1)\sample_submission.csv')
train_img_path = r'C:\multi\open (1)\image\train'
test_img_path = r'C:\multi\open (1)\image\test'

#train_df= train_df[:128]

# dataset & dataloader
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
tokenizer.add_special_tokens({'pad_token': '[PAD]'})
vocab_size = len(tokenizer)

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

train_dataset = VQADataset(train_df, tokenizer, transform, train_img_path, is_test=False)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

## Train & Inference

In [6]:
def train(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0

    for data in tqdm(loader, total=len(loader)):
        images = data['image'].to(device)
        inputs = data['question'].to(device)
        targets = data['answer'].to(device)

        optimizer.zero_grad()

        outputs = model(images, inputs)

        loss = criterion(outputs.view(-1, outputs.size(-1)), targets.view(-1))
        total_loss += loss.item()

        loss.backward()
        optimizer.step()

    avg_loss = total_loss / len(loader)
    return avg_loss

In [7]:
def inference(model, loader):
    model.eval()
    preds = []
    with torch.no_grad():
        for data in tqdm(loader, total=len(loader)):
            images = data['image'].to(device)
            question = data['question'].to(device)

            outputs = model(images, question) # [batch, sequence, vocab]

            _, pred = torch.max(outputs, dim=2) # values, indices = _, pred
            preds.extend(pred.cpu().numpy())

    return preds

In [8]:
torch.cuda.is_available()

True

## Run!

In [9]:
#
torch.cuda.empty_cache()

In [10]:
# device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(f"current device is {device}")

# Model
model = VQAGenerator(vocab_size).to(device)

# Criterion and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=5e-5)

# Initial loss
best_loss = float('inf')

# Training loop
for epoch in range(100):
    avg_loss = train(model, train_loader, optimizer, criterion)
    print(f"Epoch: {epoch+1}, Loss: {avg_loss:.4f}")

    # Save best model
    if avg_loss < best_loss:
        best_loss = avg_loss
        torch.save(model.state_dict(), 'best_model.pt')


current device is cuda


 17%|█▋        | 3898/22471 [21:13<1:41:35,  3.05it/s]

## Post-Processing

In [12]:
# Dataset & DataLoader
test_dataset = VQADataset(test_df, tokenizer, transform, test_img_path, is_test=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# inference
preds = inference(model, test_loader)

no_pad_output = []
for pred in preds:
    output = pred[pred != 50257] # [PAD] token 제외
    no_pad_output.append(tokenizer.decode(output).strip()) # 토큰 id -> 토큰

100%|██████████| 633/633 [02:49<00:00,  3.74it/s]


## Submission

In [13]:
sample_submission['answer'] = no_pad_output
sample_submission.to_csv('submission.csv', index=False)

In [21]:
solution = pd.read_csv('solution.csv')

FileNotFoundError: [Errno 2] No such file or directory: 'solution.csv'