# Model
- ResNet50 + GPT2 이용
- google colab 사용(GPU 환경)

### 드라이브 마운트

In [None]:
from google.colab import drive
drive.mount('/content/drive')

### import

In [None]:
# colab은 휘발성 특징으로 매번 설치 작업 필요
!pip install transformers

In [None]:
import os
import glob
import pandas as pd
import shutil

import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

import torchvision.models as models # 이미지
from torchvision import transforms
from PIL import Image

from transformers import GPT2Tokenizer, GPT2Model # 텍스트

from tqdm.auto import tqdm

### 데이터 로드 
방법 1. 구글 드라이브에 로컬에서 추출한 이미지 폴더 업로드 후 압축 풀기
 - 대용량 train 원본 이미지 데이터의 용량을 줄일 수 있음  
  
방법 2. 전체 train 이미지 폴더에서 바로 추출

In [None]:
## 방법 1

# 압축을 풀 폴더 경로
%cd /content/drive/MyDrive/Colab Notebooks/zerobase/DL_project/data/image

!unzip -qq "/content/drive/MyDrive/Colab Notebooks/data/zip file/train_10000.zip"
!unzip -qq "/content/drive/MyDrive/Colab Notebooks/data/zip file/test_2000.zip"

In [None]:
## 방법 2
text_train = pd.read_csv('./data/train.csv')
train_path_10000 = text_train[text_train.answer.isin(['yes', 'no'])].image_id.value_counts()[:10000].index.to_list()
test_path_2000 = text_train[text_train.answer.isin(['yes', 'no'])].image_id.value_counts()[10000:12000].index.to_list()


# Train folder2 생성
src = '/content/drive/MyDrive/Colab Notebooks/data/image/train/'
dst = '/content/drive/MyDrive/Colab Notebooks/data/image/train_10000/'

if not os.path.exists(dst):
  os.makedirs(dst)

for image in tqdm(train_path_10000):
    source = src + image + '.jpg'
    destination = dst + image + '.jpg'

    shutil.copyfile(source, destination)


# Test folder2 생성
src = '/content/drive/MyDrive/Colab Notebooks/data/image/train/'
dst = '/content/drive/MyDrive/Colab Notebooks/data/image/test_2000/'

if not os.path.exists(dst):
  os.makedirs(dst)

for image in tqdm(test_path_2000):
    source = src + image + '.jpg'
    destination = dst + image + '.jpg'

    shutil.copyfile(source, destination)


# csv 파일 추출
train_10000_text = text_train[text_train.answer.isin(['yes', 'no']) & text_train.image_id.isin(train_path_10000)].reset_index(drop=True)
train_10000_text.to_csv('./data/train_10000.csv', index=False)

test_2000_text = text_train[text_train.answer.isin(['yes', 'no']) & text_train.image_id.isin(test_path_2000)].reset_index(drop=True)
test_2000_text.to_csv('./data/test_2000.csv', index=False)

In [None]:
# 이미지 데이터 개수 확인
path_tr = '/content/drive/MyDrive/Colab Notebooks/data/image/train_10000'
path_te = '/content/drive/MyDrive/Colab Notebooks/data/image/test_2000'

images_tr = glob.glob(f'{path_tr}/*.jpg')
images_te = glob.glob(f'{path_te}/*.jpg')

len(images_tr), len(images_te)

### 데이터셋 생성 클래스

In [None]:
class VQADataset(Dataset):
    def __init__(self, df, tokenizer, transform, img_path, is_test=False):
        self.df = df
        self.tokenizer = tokenizer
        self.transform = transform
        self.img_path = img_path
        self.is_test = is_test

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]

        img_name = os.path.join(self.img_path, row['image_id'] + '.jpg') # 이미지
        image = Image.open(img_name).convert('RGB')
        image = self.transform(image)

        question = row['question'] # 질문
        question = self.tokenizer.encode_plus(
            question,
            truncation=True,
            add_special_tokens=True,
            max_length=32,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
        )

        if not self.is_test:
            answer = row['answer'] # 답변
            answer = self.tokenizer.encode_plus(
                answer,
                max_length=32,
                padding='max_length',
                truncation=True,
                return_tensors='pt')
            return {
                # np.sqeeuze(): 차원이 1인 axis를 제거
                'image': image.squeeze(),
                'question': question['input_ids'].squeeze(),
                'answer': answer['input_ids'].squeeze()
            }
        else:
            return {
                'image': image,
                'question': question['input_ids'].squeeze(),
            }

### 모델 클래스

In [None]:
class VQAModel(nn.Module):
    def __init__(self, vocab_size):
        super(VQAModel, self).__init__()
        self.vocab_size = vocab_size

        self.resnet = models.resnet50(pretrained=True)
        self.gpt2 = GPT2Model.from_pretrained('gpt2')
        self.gpt2.resize_token_embeddings(vocab_size) # 추가한 [PAD] 토큰 반영

        combined_features_size = 1000 + self.gpt2.config.hidden_size # resnet 출력 차원 + gpt2 출력 차원
        self.classifier = nn.Linear(combined_features_size, vocab_size)

    def forward(self, images, question):
        image_features = self.resnet(images)
        image_features = image_features.view(image_features.size(0),-1)

        outputs = self.gpt2(question)
        output_features = outputs.last_hidden_state # [batch, sequence, hidden]

        image_features = image_features.unsqueeze(1).expand(-1, output_features.size(1),-1) # [batch, sequence, 1000]

        combined = torch.cat([image_features, output_features], dim=-1) # [batch, sequence, 1000+hidden]
        output = self.classifier(combined) # [batch, vocab_size]
        return output

### 데이터 로더

In [None]:
# 데이터 불러오기
train_df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/zerobase/DL_project/data/train_10000.csv')
test_df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/zerobase/DL_project/data/test_2000.csv')

train_img_path = '/content/drive/MyDrive/Colab Notebooks/zerobase/DL_project/data/image/train_10000'
test_img_path = '/content/drive/MyDrive/Colab Notebooks/zerobase/DL_project/data/image/test_2000'

# dataset & dataloader
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
tokenizer.add_special_tokens({'pad_token': '[PAD]'})
vocab_size = len(tokenizer)

# 이미지 정규화 -> 오차역전파 시 gradient 계산을 수행할 때 데이터가 유사한 범위를 가지도록 하기 위함
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),          # 모든 이미지 픽셀 값을 0~1 범위로 변환
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),    # 각 채널별 평균(mean)을 뺀 뒤, 표준편차(std)로 나누어 정규화 진행 [R, G, B] -> -1 ~ 1 범위로 변환
])

train_dataset = VQADataset(train_df, tokenizer, transform, train_img_path, is_test=False)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

### 학습과 추론

In [None]:
def train(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0
    correct_predictions = 0
    total_predictions = 0

    itr = 1
    p_itr = 1000

    for data in tqdm(loader, total=len(loader)):
        images = data['image'].to(device)
        question = data['question'].to(device)
        answer = data['answer'].to(device)

        optimizer.zero_grad()

        '''
        # outputs의 크기가 (배치 크기)x(클래스의 개수)이므로 열이 하나의 이미지의 대응되는 벡터를 나타냄
        '''
        outputs = model(images, question)

        # output: [batch, sequence, vocab], answer : [batch, sequence]
        loss = criterion(outputs.view(-1, outputs.size(-1)), answer.view(-1))
        total_loss += loss.item()

        '''
        # torch.max 함 : 텐서에서 최대값을 구하는 함수
        # torch.max(input) -> Tensor
        # torch.max(input, dim, keepdim=False, *, out=None) -> tuple (max, max_indices)
        '''

        # Accuracy calculation
        _, preds = torch.max(outputs, dim=2)  # Get predicted token indices
        mask = (answer != tokenizer.pad_token_id)  # 패딩 토큰 무시

        correct_predictions += torch.sum(preds[mask] == answer[mask])
        total_predictions += mask.sum()


        loss.backward()
        optimizer.step()

        if itr % p_itr == 0:
            print('Iteration {} -> Accuracy: {:.4f}'.format(itr, correct_predictions / total_predictions))
        itr+=1

    avg_loss = total_loss / len(loader)
    accuracy = correct_predictions / total_predictions

    return avg_loss, accuracy

In [None]:
def inference(model, loader):
    model.eval()
    preds = []
    with torch.no_grad():
        for data in tqdm(loader, total=len(loader)):
            images = data['image'].to(device)
            question = data['question'].to(device)

            outputs = model(images, question) # [batch, sequence, vocab]

            _, pred = torch.max(outputs, dim=2) # values, indices = _, pred
            preds.extend(pred.cpu().numpy())

    return preds

### 전체 코드 실행(학습)
- lr : learning rate (1e-3, 1e-4, 1e-5) 변경
- batch_size : batch size (8, 16, 32, 64) 변경 → 데이터 로더에서 수행

In [None]:
# device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(f"current device is {device}")

# Model
model = VQAModel(vocab_size).to(device)

# Criterion and Optimizer
criterion = nn.CrossEntropyLoss()
# nn.CrossEntropyLoss에는 이미 nn.LogSoftmax가 포함되어 있다. 따라서 log값이 씌워진 output값이 아닌 생 output값을 줘야함

optimizer = optim.AdamW(model.parameters(), lr=1e-4)

# Training loop
for epoch in range(10):
    avg_loss, accuracy = train(model, train_loader, optimizer, criterion)
    print(f"Epoch: {epoch+1}, Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}")
    print('----------------------------------------------------------------')

### test 데이터 answer 생성 후 비교

In [None]:
# Dataset & DataLoader
test_dataset = VQADataset(test_df, tokenizer, transform, test_img_path, is_test=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

# inference
preds = inference(model, test_loader)

no_pad_output = []
for pred in preds:
    output = pred[pred != 50257] # [PAD] token 제외
    no_pad_output.append(tokenizer.decode(output).strip()) # 토큰 id -> 토큰

test_df['predict_answer'] = no_pad_output
test_df.to_csv('test_result.csv', index=False)

In [None]:
solution = pd.read_csv('test_result.csv')
solution