<a href="https://colab.research.google.com/github/Gayeon6423/TAVE-Image_Captioning/blob/main/%5BBaseline%5D_MobilNet30_%2B_GRU.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Import

In [1]:
from psutil import virtual_memory
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))
if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

Sat Jan 13 05:06:04 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla T4                       Off | 00000000:00:04.0 Off |                    0 |
| N/A   35C    P8               9W /  70W |      0MiB / 15360MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
import torch
import torch.nn as nn
import pandas as pd
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from tqdm import tqdm
import os
import numpy as np
import random
import warnings
warnings.filterwarnings(action='ignore')

## Hyperparameter Settings

In [4]:
CFG = {
    'IMG_SIZE':224,
    'EPOCHS':10,
    'LR':0.01,
    'BATCH_SIZE':64,
    'SEED':41
    }

## Fixed Random-Seed

In [5]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

## Load Data

In [25]:
# Real Data
train_data = pd.read_csv('/content/drive/MyDrive/Tave_Project_MultiModal/Data/train.csv')[:70]
test_data = pd.read_csv('/content/drive/MyDrive/Tave_Project_MultiModal/Data/test.csv')[:30]

print('raw train data shape : ', train_data.shape)
print('raw test data shape : ', test_data.shape)

raw train data shape :  (70, 4)
raw test data shape :  (30, 2)


In [26]:
# def load_jpg_image(file_path):
#     image = Image.open(file_path)
#     width, height = image.size
#     print(f'이미지 너비:{width}, 이미지 높이:{height}')
#     return image

# jpg_path = '/content/drive/MyDrive/Tave_Project_MultiModal/Data/train/0a0nbjgefy.jpg'
# load_image = load_jpg_image(jpg_path)
# load_image

## Preprocess Dataset

- Custom Dataset

In [27]:
# CustomDataset 클래스 정의
class CustomDataset(Dataset):
    # 생성자 메서드: 데이터프레임과 변환 함수를 초기화
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe  # 전달받은 데이터프레임 저장
        self.transform = transform  # 전달받은 이미지 변환 함수 저장

    # 데이터셋의 총 길이 반환 메서드
    def __len__(self):
        return len(self.dataframe)  # 데이터프레임의 길이 반환

    # 주어진 인덱스에 해당하는 데이터 반환 메서드
    def __getitem__(self, idx):
        # 인덱스에 해당하는 이미지 파일 경로 가져오기
        img_path = self.dataframe.iloc[idx]['img_path']
        # 이미지 열기 및 RGB로 변환
        img = Image.open(img_path).convert('RGB')
        # 이미지에 변환 함수가 지정되어 있으면 적용
        if self.transform:
            img = self.transform(img)
        # 'mos' 컬럼이 데이터프레임에 존재하면 해당 값을 가져오고, 아니면 0.0으로 설정
        mos = float(self.dataframe.iloc[idx]['mos']) if 'mos' in self.dataframe.columns else 0.0
        # 'comments' 컬럼이 데이터프레임에 존재하면 해당 값을 가져오고, 아니면 빈 문자열로 설정
        comment = self.dataframe.iloc[idx]['comments'] if 'comments' in self.dataframe.columns else ""
        # 이미지, mos 값, comment 반환
        return img, mos, comment

## Define Model

- Base Model



```
class BaseModel(nn.Module):
    # 생성자 메서드: 어휘 크기, 임베딩 차원, 히든 차원 초기화
    def __init__(self, vocab_size, embed_dim=256, hidden_dim=512):
        super(BaseModel, self).__init__()

        # ResNet50을 이용한 이미지 특성 추출
        self.cnn_backbone = models.resnet50(pretrained=True)
        # 마지막 완전 연결 레이어를 제거하여 특성을 얻음
        modules = list(self.cnn_backbone.children())[:-1]
        self.cnn = nn.Sequential(*modules)
        # Image quality assessment head
        self.regression_head = nn.Linear(2048, 1)  # ResNet50 last layer has 2048 features
        # Captioning head
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        # 이미지 특성과 캡션 임베딩을 입력으로 받음
        self.lstm = nn.LSTM(embed_dim + 2048, hidden_dim)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    # 순전파 메서드 정의
    def forward(self, images, captions=None):
        # CNN
        features = self.cnn(images)
        # 주어진 특성을 2D 텐서 -> 1D 텐서(하나의 벡터)로 변환
        features_flat = features.view(features.size(0), -1)
        # Image quality regression
        mos = self.regression_head(features_flat)

        # LSTM captioning
        if captions is not None:
            embeddings = self.embedding(captions)
            # 이미지 특성과 캡션 임베딩을 각 단어에 대해 연결
            combined = torch.cat([features_flat.unsqueeze(1).repeat(1, embeddings.size(1), 1), embeddings], dim=2)
            # LSTM
            lstm_out, _ = self.lstm(combined)
            # OUTPUT
            outputs = self.fc(lstm_out)
            return mos, outputs
        else:
            return mos, None
```



- Alpha Modeling

In [28]:
class BaseModel(nn.Module):
    # 생성자 메서드: 어휘 크기, 임베딩 차원, 히든 차원 초기화
    def __init__(self, vocab_size, embed_dim=256, hidden_dim=512):
        super(BaseModel, self).__init__()

        # 이미지 특성 추출
        # self.cnn_backbone = models.resnet50(pretrained=True) # ResNet50
        self.cnn_backbone = models.mobilenet_v3_large(pretrained=True) # Mobilenet_v3
        # 마지막 완전 연결 레이어를 제거하여 특성을 얻음
        modules = list(self.cnn_backbone.children())[:-1]
        self.cnn = nn.Sequential(*modules)

        # Image quality assessment head
        # self.regression_head = nn.Linear(2048, 1) # ResNet50:2048 features
        self.regression_head = nn.Linear(960, 1) # Moblienet: 960 features
        # Captioning head
        self.embedding = nn.Embedding(vocab_size, embed_dim)

        # 이미지 특성과 캡션 임베딩을 입력으로 받음
        # self.lstm = nn.LSTM(embed_dim + 2048, hidden_dim) # ResNet50 + LSTM
        # self.gru = nn.GRU(embed_dim + 2048, hidden_dim) # ResNet50 + GRU
        # self.lstm = nn.LSTM(embed_dim + 960, hidden_dim) # Mobilenet_v3 + LSTM
        self.gru = nn.GRU(embed_dim + 960, hidden_dim) # Mobilenet_v3 + GRU
        self.fc = nn.Linear(hidden_dim, vocab_size)

    # 순전파 메서드 정의
    def forward(self, images, captions=None):
        # CNN
        features = self.cnn(images)
        # 주어진 특성을 2D 텐서 -> 1D 텐서(하나의 벡터)로 변환
        features_flat = features.view(features.size(0), -1)
        # Image quality regression
        mos = self.regression_head(features_flat)

        # Image captioning
        if captions is not None:
            embeddings = self.embedding(captions)
            # 이미지 특성과 캡션 임베딩을 각 단어에 대해 연결
            combined = torch.cat([features_flat.unsqueeze(1).repeat(1, embeddings.size(1), 1), embeddings], dim=2)

            # Caption Layer
            # lstm_out, _ = self.lstm(combined) # Caption : LSTM
            gru_out, _ = self.gru(combined) # Caption : GRU
            # OUTPUT
            # outputs = self.fc(lstm_out) # Caption : LSTM
            outputs = self.fc(gru_out) # Caption : GRU
            return mos, outputs
        else:
            return mos, None

In [29]:
# 단어 사전 생성
all_comments = ' '.join(train_data['comments']).split()
vocab = set(all_comments) # 중복 제거
vocab = ['<PAD>', '<SOS>', '<EOS>'] + list(vocab)
word2idx = {word: idx for idx, word in enumerate(vocab)}
idx2word = {idx: word for word, idx in word2idx.items()}

# 데이터셋 및 DataLoader 생성
transform = transforms.Compose([
    transforms.Resize((CFG['IMG_SIZE'], CFG['IMG_SIZE'])),
    transforms.ToTensor()])

train_dataset = CustomDataset(train_data, transform)
train_loader = DataLoader(train_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=True)

# 모델, 손실함수, 옵티마이저
model = BaseModel(len(vocab)).cuda()
# 화질 점수 평가 산식
criterion1 = nn.MSELoss()
# 캡셔닝 평가 산식
criterion2 = nn.CrossEntropyLoss(ignore_index=word2idx['<PAD>'])
optimizer = torch.optim.Adam(model.parameters(), lr=CFG['LR'])

In [30]:
# 학습
model.train() # 모델을 학습 모드로 설정

# 에폭 반복(모델 10번 학습)
for epoch in range(CFG['EPOCHS']):
    total_loss = 0 # 에폭별 총 손실 초기화
    loop = tqdm(train_loader, leave=True) # 학습 진행바 확인

    # 미니배치 반복(전체 데이터 64개로 나누어서 학습)
    for imgs, mos, comments in loop:
        # 이미지와 MOS값을 GPU로 이동(훈련 및 추론이 GPU에서 진행)
        imgs, mos = imgs.float().cuda(), mos.float().cuda()

        # Batch Preprocessing
        comments_tensor = torch.zeros((len(comments), len(max(comments, key=len)))).long().cuda()
        for i, comment in enumerate(comments):
            # 시작 토큰과 끝 토큰 추가하여 토큰화
            tokenized = ['<SOS>'] + comment.split() + ['<EOS>']
            comments_tensor[i, :len(tokenized)] = torch.tensor([word2idx[word] for word in tokenized])

        # Forward & Loss
        predicted_mos, predicted_comments = model(imgs, comments_tensor)
        # 이미지 품질 손실
        loss1 = criterion1(predicted_mos.squeeze(1), mos)
        # 캡션 생성 손실
        loss2 = criterion2(predicted_comments.view(-1, len(vocab)), comments_tensor.view(-1))
        # 전체 손실
        loss = loss1 + loss2

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # 총 손실 누적
        total_loss += loss.item()
        loop.set_description(f"Epoch {epoch + 1}")
        loop.set_postfix(loss=loss.item()) # 현재 미니배치의 손실 출력

    # 에폭 종료 후 평균 손실 출력
    print(f"Epoch {epoch + 1} finished with average loss: {total_loss / len(train_loader):.4f}")

Epoch 1: 100%|██████████| 2/2 [00:01<00:00,  1.62it/s, loss=12.4]


Epoch 1 finished with average loss: 26.2382


Epoch 2: 100%|██████████| 2/2 [00:01<00:00,  1.67it/s, loss=5.08]


Epoch 2 finished with average loss: 14.6108


Epoch 3: 100%|██████████| 2/2 [00:01<00:00,  1.89it/s, loss=15.6]


Epoch 3 finished with average loss: 14.9701


Epoch 4: 100%|██████████| 2/2 [00:01<00:00,  1.93it/s, loss=4.52]


Epoch 4 finished with average loss: 8.8979


Epoch 5: 100%|██████████| 2/2 [00:01<00:00,  1.93it/s, loss=10.3]


Epoch 5 finished with average loss: 8.2552


Epoch 6: 100%|██████████| 2/2 [00:01<00:00,  1.91it/s, loss=6.07]


Epoch 6 finished with average loss: 5.9714


Epoch 7: 100%|██████████| 2/2 [00:01<00:00,  1.86it/s, loss=3.52]


Epoch 7 finished with average loss: 4.0235


Epoch 8: 100%|██████████| 2/2 [00:01<00:00,  1.91it/s, loss=2.68]


Epoch 8 finished with average loss: 3.2117


Epoch 9: 100%|██████████| 2/2 [00:01<00:00,  1.74it/s, loss=2.35]


Epoch 9 finished with average loss: 2.7107


Epoch 10: 100%|██████████| 2/2 [00:01<00:00,  1.75it/s, loss=1.95]

Epoch 10 finished with average loss: 2.1017





## Inference & Submit

In [31]:
# 버전 명 입력
version_name = 'Mobilnet_v3+GRU_700_300'

In [32]:
# 테스트 데이터셋 및 변환 함수를 사용하여 CustomDataset 생성
test_dataset = CustomDataset(test_data, transform)
# DataLoader를 사용하여 테스트 데이터를 미니배치로 로드
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
# 모델을 평가 모드로 설정
model.eval()
# 예측된 MOS 점수 및 캡션을 저장할 리스트 초기화
predicted_mos_list = []
predicted_comments_list = []

In [33]:
# greedy_decode 함수 정의
def greedy_decode(model, image, max_length=50):
    image = image.unsqueeze(0).cuda()
    # 모델을 통해 이미지 전달하여 MOS 및 은닉 상태 계산
    mos, _ = model(image)
    output_sentence = []

    # 시작 토큰 설정
    current_token = torch.tensor([word2idx['<SOS>']]).cuda()
    hidden = None
    features = model.cnn(image).view(image.size(0), -1)

    for _ in range(max_length):
        # 현재 토큰을 임베딩하고 모델에 전달하여 다음 토큰 예측
        embeddings = model.embedding(current_token).unsqueeze(0)
        combined = torch.cat([features.unsqueeze(1), embeddings], dim=2)
        # out, hidden = model.lstm(combined, hidden) # Captioning model : LSTM
        out, hidden = model.gru(combined, hidden) # Captioning model : GRU
        output = model.fc(out.squeeze(0))
        _, current_token = torch.max(output, dim=1)

        # <EOS> 토큰에 도달하면 반복 중지
        if current_token.item() == word2idx['<EOS>']:
            break
        # <SOS> 또는 <PAD> 토큰은 생성한 캡션에 추가하지 않음
        if current_token.item() not in [word2idx['<SOS>'], word2idx['<PAD>']]:
            output_sentence.append(idx2word[current_token.item()])

    return mos.item(), ' '.join(output_sentence)

In [40]:
# 추론 과정
with torch.no_grad():
    for imgs, _, _ in tqdm(test_loader):
        for img in imgs:
            img = img.float().cuda()
            # 각 이미지에 대해 greedy decode 함수를 사용하여 MOS 및 캡션 예측
            mos, caption = greedy_decode(model, img)
            # 예측 결과를 리스트에 추가
            predicted_mos_list.append(mos)
            predicted_comments_list.append(caption)

100%|██████████| 1/1 [00:01<00:00,  1.23s/it]


In [41]:
predicted_comments_list

['too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too',
 'too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too',
 'too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too',
 'too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too',
 'too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too too to

In [24]:
# 결과 저장
result_df = pd.DataFrame({
    'img_name': test_data['img_name'],
    'mos': predicted_mos_list,
    'comments': predicted_comments_list  # 캡션 부분은 위에서 생성한 것을 사용
})

# 예측 결과에 NaN이 있다면, 제출 시 오류가 발생하므로 후처리 진행 (sample_submission.csv과 동일하게)
result_df['comments'] = result_df['comments'].fillna('Nice Image.')
result_df.to_csv(f'/content/drive/MyDrive/Tave_Project_MultiModal/Data/submit/submit_{version_name}.csv', index=False)

print("Inference completed and results saved to submit.csv.")

Inference completed and results saved to submit.csv.
