<a href="https://colab.research.google.com/github/statrav/image_captioning/blob/main/image_captioning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 이미지 캡셔닝 (Python 프로젝트 실습)

Python을 활용한 이미지 캡셔닝 프로젝트를 통해 파일 다운로드부터 파이썬 프로젝트 실행까지의 모든 단계를 다루겠습니다.

## 개요
이미지 캡셔닝은 주어진 이미지를 설명하는 문장을 생성하는 모델입니다. 예를 들어, 아래의 이미지를 모델에 입력하면 “A black dog sitting among leaves in a forest, surrounded by trees.(검은 개가 숲 속 나무들 사이에서 나뭇잎에 둘러싸여 앉아 있는 모습.)”이라는 캡션을 생성합니다.
<img src = "https://drive.google.com/uc?id=1PIFa73QF1LNQGp_6rjHWR5SAkA4CttVI" height = 200 width = 500>


In [None]:
!ls /content/jpg/dog.jpg

/content/jpg/dog.jpg



## 1. 경로 및 환경 설정

### 1.1 경로 설정
프로젝트 수행에 앞서, 경로와 환경 설정이 중요합니다. 먼저 `captioning`이라는 폴더를 생성하고, `cd` 명령어를 통해 해당 폴더로 이동합니다.

📌 **참고사항**
- `mkdir`: 폴더 생성
- `pwd`: 현재 작업 경로 확인
- `cd`: 작업 경로 이동

In [None]:
!mkdir captioning
!cd captioning

### 1.2 환경 설정
프로젝트를 위해 필요한 라이브러리를 설치합니다.

해당 프로젝트에서는 pytorch 프레임워크와 transformer 모델, 그리고 시각화를 위한 matplotlib 라이브러리를 설치합니다.

In [None]:
pip install torch torchvision transformers matplotlib jupyternotebook



## 2. 데이터셋 다운로드

캡셔닝 모델을 학습하기 위해 사용할 수 있는 데이터셋은 매우 많습니다. 우리는 그 중 **Microsoft COCO (이하 MS COCO)** 데이터셋을 활용해보고자 합니다.

MS COCO는 Object detection(물체 탐지), Segmentation(분류), Captioning에 주로 사용되는 데이터셋으로, 컴퓨터 비전 분야에서 넓은 폭으로 사용되고 있는 데이터셋입니다.

MS COCO 데이터셋 다운받기 위한 shell 스크립트 파일을 작성하겠습니다.

📌 **참고사항**

- `mkdir`: 폴더 생성
- `cd`: 작업 경로 이동
- `wget`: 인터넷 주소 접속
  - MS COCO dataset을 저장할 수 있는 인터넷 사이트로 접속하여, 파일을 다운로드 받습니다.
- `unzip`: 압축 해제

In [None]:
# COCO dataset directory
!mkdir -p /data/coco

# Download COCO Train2014 images and captions
!cd /data/coco
!wget http://images.cocodataset.org/zips/train2014.zip
!wget http://images.cocodataset.org/zips/val2014.zip
!wget http://images.cocodataset.org/annotations/annotations_trainval2014.zip

# Unzip the dataset
!unzip train2014.zip
!unzip val2014.zip
!unzip annotations_trainval2014.zip

모든 파일이 다운로드 되었다면, 반드시 데이터가 정상적으로 다운로드 되었는지 폴더 내 경로로 진입하여 확인하세요.

## 3.1. 환경 셋팅

모델을 학습시킬 데이터셋을 다운받았으니, 이제 학습할 모델을 지정해주겠습니다.

여기서는 **Transformer** 모델을 학습시킵니다.

📌 Transformer는 2017년 Google에서 발표된 이후로 딥러닝 전역에서 활발하게 사용되고 있는 모델로, 캡셔닝을 학습하기에도 유용한 모델입니다.

가장 먼저, 환경 셋팅을 진행하겠습니다.
프로젝트에 필요한 라이브러리를 불러오는 과정입니다.

또한 위에서 다운받은 데이터에 대한 경로를 지정함으로써, 적절하게 데이터를 불러올 수 있도록 하겠습니다.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import math
from tqdm.notebook import trange, tqdm
import random

import torch
import torch.nn as nn
from torch import optim
from torch.utils.data import DataLoader
import torch.nn.functional as F
from torch.distributions import Categorical

import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torchvision

from transformers import AutoTokenizer
os.environ["TOKENIZERS_PARALLELISM"] = "false"

torch.backends.cuda.matmul.allow_tf32 = True

# Define the root directory of the dataset
data_set_root='./data/coco'
train_set ='train2014'
validation_set ='val2014'

train_image_path = os.path.join(data_set_root, train_set)
train_ann_file = '{}/annotations/captions_{}.json'.format(data_set_root, train_set)

val_image_path = os.path.join(data_set_root, validation_set)
val_ann_file = '{}/annotations/captions_{}.json'.format(data_set_root, validation_set)

## 3.2. 모델 정의

모델을 학습하기 전에, 모델을 정의해 줄 필요가 있습니다.
학습에 사용될 인코더-디코더 모델을 정의하도록 하겠습니다.

아래에서 사용된 모델과 함수들에 대해 간단한 설명을 돕습니다.


---


💡 SampleCaption:

- sample 리스트에서 무작위로 하나의 요소를 선택하여 반환하는 역할을 합니다.

💡 TokenDrop

- 토큰 시퀀스에서 무작위로 일부 토큰을 특정 확률에 따라 공백 토큰으로 대체하는 데이터 증강 클래스입니다.

- 모델이 특정 정보를 의도적으로 제거하고 학습할 수 있게 합니다.

💡 extract_patches

- 입력 이미지 텐서를 작은 패치(조각)로 분할하는 함수입니다.

💡 SinusoidalPosEmb

- 입력 토큰의 위치에 따른 사인 및 코사인 함수 기반의 포지셔널 임베딩을 생성합니다.

- 시퀀스 모델에서 순서 정보를 전달하기 위해 사용됩니다.

💡 AttentionBlock

- 다중 헤드 어텐션 레이어를 정의하는 클래스입니다.

💡 TransformerBlock

- 인코더와 디코더 블록에서 공통적으로 사용되는 트랜스포머 블록을 정의합니다.

- 각 블록은 어텐션 레이어와 MLP(다층 퍼셉트론)로 구성되며, 순차적으로 레이어 정규화와 잔차 연결이 포함됩니다.

💡 Decoder

- 전체 디코더를 구성하며, 입력 토큰 시퀀스와 인코더 출력값을 받아 출력 시퀀스를 생성합니다.

- 임베딩 층과 포지셔널 임베딩, 여러 층의 트랜스포머 블록을 사용하여, 입력 시퀀스의 단어들을 예측하는 디코더 역할을 합니다.

💡 VisionEncoder

- 이미지를 입력으로 받아 이를 처리하고 시퀀스 형태로 변환하여 임베딩 벡터로 변환합니다.

- 이미지의 패치를 추출한 후 선형 변환을 거쳐 인코딩을 수행하고, 포지셔널 임베딩과 트랜스포머 블록을 적용하여 이미지 특징을 추출하는 역할을 합니다.

💡 VisionEncoderDecoder

- 전체 인코더-디코더 모델을 정의합니다.

- VisionEncoder와 Decoder를 각각 인코더와 디코더로 사용하여 이미지 시퀀스를 인코딩한 후, 디코더를 통해 최종 예측값을 생성합니다.


---




In [None]:
class SampleCaption(nn.Module):
    def __call__(self, sample):
        rand_index = random.randint(0, len(sample) - 1)
        return sample[rand_index]

tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

class TokenDrop(nn.Module):

    def __init__(self, prob=0.1, blank_token=1, eos_token=102):
        self.prob = prob
        self.eos_token = eos_token
        self.blank_token = blank_token

    def __call__(self, sample):
        mask = torch.bernoulli(self.prob * torch.ones_like(sample)).long()
        can_drop = (~(sample == self.eos_token)).long()
        mask = mask * can_drop
        mask[:, 0] = torch.zeros_like(mask[:, 0]).long()
        replace_with = (self.blank_token * torch.ones_like(sample)).long()
        sample_out = (1 - mask) * sample + mask * replace_with
        return sample_out

def extract_patches(image_tensor, patch_size=16):
    bs, c, h, w = image_tensor.size()
    unfold = torch.nn.Unfold(kernel_size=patch_size, stride=patch_size)
    unfolded = unfold(image_tensor)
    unfolded = unfolded.transpose(1, 2).reshape(bs, -1, c * patch_size * patch_size)
    return unfolded

class SinusoidalPosEmb(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim

    def forward(self, x):
        device = x.device
        half_dim = self.dim // 2
        emb = math.log(10000) / (half_dim - 1)
        emb = torch.exp(torch.arange(half_dim, device=device) * -emb)
        emb = x[:, None] * emb[None, :]
        emb = torch.cat((emb.sin(), emb.cos()), dim=-1)
        return emb

class AttentionBlock(nn.Module):
    def __init__(self, hidden_size=128, num_heads=4, masking=True):
        super(AttentionBlock, self).__init__()
        self.masking = masking
        self.multihead_attn = nn.MultiheadAttention(hidden_size, num_heads=num_heads, batch_first=True, dropout=0.0)

    def forward(self, x_in, kv_in, key_mask=None):
        if self.masking:
            bs, l, h = x_in.shape
            mask = torch.triu(torch.ones(l, l, device=x_in.device), 1).bool()
        else:
            mask = None
        return self.multihead_attn(x_in, kv_in, kv_in, attn_mask=mask, key_padding_mask=key_mask)[0]

class TransformerBlock(nn.Module):
    def __init__(self, hidden_size=128, num_heads=4, decoder=False, masking=True):
        super(TransformerBlock, self).__init__()
        self.decoder = decoder
        self.norm1 = nn.LayerNorm(hidden_size)
        self.attn1 = AttentionBlock(hidden_size=hidden_size, num_heads=num_heads, masking=masking)
        if self.decoder:
            self.norm2 = nn.LayerNorm(hidden_size)
            self.attn2 = AttentionBlock(hidden_size=hidden_size,
                                        num_heads=num_heads, masking=False)
        self.norm_mlp = nn.LayerNorm(hidden_size)
        self.mlp = nn.Sequential(nn.Linear(hidden_size, hidden_size * 4), nn.ELU(), nn.Linear(hidden_size * 4, hidden_size))

    def forward(self, x, input_key_mask=None, cross_key_mask=None, kv_cross=None):
        x = self.attn1(x, x, key_mask=input_key_mask) + x
        x = self.norm1(x)
        if self.decoder:
            x = self.attn2(x, kv_cross, key_mask=cross_key_mask) + x
            x = self.norm2(x)
        x = self.mlp(x) + x
        return self.norm_mlp(x)

class Decoder(nn.Module):
    def __init__(self, num_emb, hidden_size=128, num_layers=3, num_heads=4):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(num_emb, hidden_size)
        self.embedding.weight.data = 0.001 * self.embedding.weight.data
        self.pos_emb = SinusoidalPosEmb(hidden_size)
        self.blocks = nn.ModuleList([TransformerBlock(hidden_size, num_heads, decoder=True) for _ in range(num_layers)])
        self.fc_out = nn.Linear(hidden_size, num_emb)

    def forward(self, input_seq, encoder_output, input_padding_mask=None,
                encoder_padding_mask=None):
        input_embs = self.embedding(input_seq)
        bs, l, h = input_embs.shape
        seq_indx = torch.arange(l, device=input_seq.device)
        pos_emb = self.pos_emb(seq_indx).reshape(1, l, h).expand(bs, l, h)
        embs = input_embs + pos_emb
        for block in self.blocks:
            embs = block(embs, input_key_mask=input_padding_mask, cross_key_mask=encoder_padding_mask, kv_cross=encoder_output)
        return self.fc_out(embs)

class VisionEncoder(nn.Module):
    def __init__(self, image_size, channels_in, patch_size=16, hidden_size=128,
                 num_layers=3, num_heads=4):
        super(VisionEncoder, self).__init__()

        self.patch_size = patch_size
        self.fc_in = nn.Linear(channels_in * patch_size * patch_size, hidden_size)

        seq_length = (image_size // patch_size) ** 2
        self.pos_embedding = nn.Parameter(torch.empty(1, seq_length, hidden_size).normal_(std=0.02))
        self.blocks = nn.ModuleList([TransformerBlock(hidden_size, num_heads, decoder=False, masking=False) for _ in range(num_layers)])
    def forward(self, image):
        bs = image.shape[0]
        patch_seq = extract_patches(image, patch_size=self.patch_size)
        patch_emb = self.fc_in(patch_seq)
        embs = patch_emb + self.pos_embedding
        for block in self.blocks:
            embs = block(embs)
        return embs

class VisionEncoderDecoder(nn.Module):
    def __init__(self, image_size, channels_in, num_emb, patch_size=16,
                 hidden_size=128, num_layers=(3, 3), num_heads=4):
        super(VisionEncoderDecoder, self).__init__()

        self.encoder = VisionEncoder(image_size=image_size, channels_in=channels_in, patch_size=patch_size, hidden_size=hidden_size, num_layers=num_layers[0], num_heads=num_heads)

        self.decoder = Decoder(num_emb=num_emb, hidden_size=hidden_size, num_layers=num_layers[1], num_heads=num_heads)

    def forward(self, input_image, target_seq, padding_mask):
        bool_padding_mask = padding_mask == 0
        encoded_seq = self.encoder(image=input_image)
        decoded_seq = self.decoder(input_seq=target_seq,
                                   encoder_output=encoded_seq,
                                   input_padding_mask=bool_padding_mask)
        return decoded_seq

## 3. 모델 학습

이제 본격적으로 모델 학습을 수행하겠습니다.

모델 학습에 필요한 아래의 파라미터(변수)들을 우선 지정합니다.


---


🧱 Parameters
- Optimizer: 모델의 가중치를 업데이트하여 손실을 최소화하는 알고리즘
- Image size: 입력 이미지의 가로와 세로 크기, 보통 픽셀 단위
- Epoch: 전체 데이터셋을 한 번 모두 학습하는 과정
- Batch size: 한 번의 학습 단계에서 처리하는 데이터 샘플 개수
- Hidden size: 트랜스포머 등 모델의 은닉층에서 사용되는 벡터의 차원
- Num layers: 모델 내부에 있는 레이어(층)의 개수
- Num head: 다중 헤드 어텐션에서 병렬적으로 사용하는 어텐션 헤드의 개수
- Patch size: 이미지를 나눌 작은 패치(조각)의 크기


---


이후, 지정한 epoch 수 만큼 학습을 진행하도록 하겠습니다.

In [None]:

# Define the learning rate for the optimizer
learning_rate = 1e-4

# Image size
image_size = 128

# Define the number of epochs for training
nepochs = 3

# Define the batch size for mini-batch gradient descent
batch_size = 128

# GPU
device = torch.device(1 if torch.cuda.is_available() else 'cpu')

# Embedding Size
hidden_size = 192

# Number of Transformer blocks for the (Encoder, Decoder)
num_layers = (6, 6)

# MultiheadAttention Heads
num_heads = 8

# Size of the patches
patch_size = 8

# Create model
caption_model = VisionEncoderDecoder(image_size=image_size, channels_in=test_images.shape[1], num_emb=tokenizer.vocab_size, patch_size=patch_size, num_layers=num_layers,hidden_size=hidden_size, num_heads=num_heads).to(device)

# Initialize the optimizer with above parameters
optimizer = optim.Adam(caption_model.parameters(), lr=learning_rate)
scaler = torch.cuda.amp.GradScaler()

# Define the loss function
loss_fn = nn.CrossEntropyLoss(reduction="none")

td = TokenDrop(0.5)

# Initialize the training loss logger
training_loss_logger = []

# Transforms
train_transform = transforms.Compose([transforms.Resize(image_size),
                                      transforms.RandomCrop(image_size),
                                      transforms.ToTensor(),
                                      transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225]),
                                      transforms.RandomErasing(p=0.5)])

transform = transforms.Compose([transforms.Resize(image_size),
                                transforms.CenterCrop(image_size),
                                transforms.ToTensor(),
                                transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])])

train_dataset = datasets.CocoCaptions(root=train_image_path,
                                      annFile=train_ann_file,
                                      transform=train_transform,
                                      target_transform=SampleCaption())

val_dataset = datasets.CocoCaptions(root=val_image_path,
                                    annFile=val_ann_file,
                                    transform=transform,
                                    target_transform=SampleCaption())

# Data Load
data_loader_train = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=8)
data_loader_val = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, num_workers=8)

dataiter = next(iter(data_loader_val))
test_images, test_captions = dataiter

# Iterate over epochs
for epoch in trange(0, nepochs, leave=False, desc="Epoch"):
    # Set the model in training mode
    caption_model.train()
    steps = 0
    # Iterate over the training data loader
    for images, captions in tqdm(data_loader_train, desc="Training", leave=False):
        images = images.to(device)

        # Tokenize and pre-process the captions
        tokens = tokenizer(captions, padding=True, truncation=True, return_tensors="pt")
        token_ids = tokens['input_ids'].to(device)
        padding_mask = tokens['attention_mask'].to(device)
        bs = token_ids.shape[0]

        # Shift the input sequence to create the target sequence
        target_ids = torch.cat((token_ids[:, 1:],
                                torch.zeros(bs, 1, device=device).long()), 1)
        tokens_in = td(token_ids)
        with torch.cuda.amp.autocast():
            # Forward pass
            pred = caption_model(images, tokens_in, padding_mask=padding_mask)

        # Compute the loss
        loss = (loss_fn(pred.transpose(1, 2), target_ids) * padding_mask).mean()

        # Backpropagation
        optimizer.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # Log the training loss
        training_loss_logger.append(loss.item())

## 4. 모델 실행

모델이 잘 학습되었다면, 실제로 모델을 통해 이미지의 캡션을 생성해보도록 하겠습니다.

임의의 한 파일을 통해, 실제로 캡션을 확인해볼까요?

여기서는 validation 데이터셋의 이미지로 확인해보도록 하겠습니다.


---


🎨 Image

<img src = "https://drive.google.com/uc?id=177zLaXPriug3w7MHtUcufFN5rEpbYB8A" height = 200 width = 300>


---


위의 이미지에 대해 "a shoe rack with some shoes and a dog sleeping on them" 등과 같은 caption이 생성되는 것을 확인할 수 있습니다.

이상으로 captioning project를 모두 마치도록 하겠습니다.

In [None]:

# Create a dataloader itterable object
dataiter = next(iter(data_loader_val))
# Sample from the itterable object
test_images, test_captions = dataiter

# Choose an index within the batch
index = 0
test_image = test_images[index].unsqueeze(0)

# Lets visualise an entire batch of images!
plt.figure(figsize = (3,3))
out = torchvision.utils.make_grid(test_image, 1, normalize=True)
_ = plt.imshow(out.numpy().transpose((1, 2, 0)))
print(test_captions[index])