## 과제 목표
1. LiMBeR 모델 Architecture의 정성적 이해
2. 학습된 CLIP과 LLM을 활용하여 대규모 시각-언어 멀티모달 LLM을 구축
3. 시각-언어 멀티모달 LLM 학습
3. 학습된 시각-언어 멀티모달 LLM을 활용한 Image Captioning Task

## 참고 자료
 - Paper : [LiMBeR](https://arxiv.org/abs/2209.15162)
 - Code : [LiMBeR github](https://github.com/jmerullo/limber), [Hugging Face Transformers](https://huggingface.co/docs/transformers/index)
 - Dataset : [CC3M](https://github.com/google-research-datasets/conceptual-captions)  


## 0. 환경 세팅


함께 포함된 limber.zip 파일을 압축 해제하여 해당 limber 폴더를 sub3 폴더 안에 넣어줍니다.  

디렉토리 구조는 다음과 같습니다.
- 디렉토리 구조  
      |-- project/  
      |   |-- sub1/  
      |   |-- sub2/   
      |   |-- sub3/  
      |   |   |-- limber/  
      |   |   |-- limber.ipynb   
      |   |   |-- vqa.py

Colab에서 진행하는 경우, 아래 코드 주석을 해제하고 셀을 실행시켜 구글 드라이브 마운트를 합니다.

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

Colab에서 진행하는 경우, 구글 드라이브에 project 디렉토리를 업로드합니다.   
이후, 아래 코드 주석을 해제하고 셀을 실행시켜 현재 작업중인 디렉토리를 이동합니다.

In [None]:
# cd "/content/drive/MyDrive/project/sub3/"

필요한 pakage들을 설치합니다.  

      pip install transformers
      pip install https://github.com/openai/CLIP/archive/master.zip
      pip install torchtyping
      pip install einops
      pip install pyhelpers

Colab에서 진행하는 경우, 아래 코드 주석을 해제하고 셀을 실행시켜 필요한 pakage들을 설치할 수 있습니다.

In [None]:
# !pip install -qq transformers
# !pip install -qq https://github.com/openai/CLIP/archive/master.zip
# !pip install -qq torchtyping
# !pip install -qq einops
# !pip install -qq pyhelpers

seed를 고정해줍니다.

In [None]:
import torch
import numpy as np
import random

random_seed = 1111
torch.manual_seed(random_seed)
torch.cuda.manual_seed(random_seed)
torch.cuda.manual_seed_all(random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(random_seed)
random.seed(random_seed)

## 1. Image Captioning Data 전처리  

**1-1. CC3M(Conceptual Captions) Dataset 설명**    

이번 실습에서는 [CC3M](https://github.com/google-research-datasets/conceptual-captions) 데이터셋을 훈련 및 추론에 사용할 것입니다. CC3M은 이미지 캡셔닝 시스템의 훈련 및 평가를 위해 설계된 300만 개 이상의 (이미지 URL, 캡션) 쌍을 포함하는 데이터셋입니다.

다음은 CC3M 데이터셋의 예시입니다.

이미지:  

![image.png](https://thumb7.shutterstock.com/display_pic_with_logo/1350382/147082805/stock-vector-white-swan-on-the-blue-background-147082805.jpg)  
(그림 출처: https://thumb7.shutterstock.com/display_pic_with_logo/1350382/147082805/stock-vector-white-swan-on-the-blue-background-147082805.jpg)

캡션:
white swan on the blue background

이번 실습은 저희가 제공하는 소량의 CC3M 데이터셋을 사용합니다. 더 많은 데이터로 실습을 진행하고자 하는 분들은 아래 1-2 지침을 따라 데이터셋을 다운로드하고 디렉토리 구조를 만든 후 진행하실 수 있습니다.

**1-2. CC3M 데이터셋 다운로드**

- 다운로드 링크 : https://huggingface.co/datasets/zatepyakin/cc3m_min256_max512/tree/main

- 다운로드 예시

      mkdir cc3m && cd cc3m
      mkdir training && cd training

      wget https://huggingface.co/datasets/zatepyakin/cc3m_min256_max512/resolve/main/00000.tar
      tar -xvf "00000.tar"

      mkdir images
      mv *.jpg images

      mkdir image_data
      mv *.json image_data

      rm *.txt

- 디렉토리 구조  
      |-- cc3m  
      |   |-- train  
      |   |   |-- image_data  
      |   |   |   |-- 000000000.json    
      |   |   |   |-- 000000001.json   
      |   |   |   |-- ...  
      |   |   |-- images  
      |   |   |   |-- 000000000.jpg    
      |   |   |   |-- 000000001.jpg   
      |   |   |   |-- ...

**1-3. Tokenizer 정의**  
텍스트 데이터를 Language Model의 입력으로 사용하기 위해서는 토큰 형태로 변환해야 합니다. 🤗 Transformers 라이브러리는 이를 위한 일련의 규칙에 따라 텍스트를 토큰 시퀀스로 변환하는 토크나이저를 제공합니다.  

Transformers 라이브러리에서 제공하는 tokenizer를 사용하여 tokenizer를 정의합니다.

In [None]:
import warnings
warnings.filterwarnings(action='ignore')
from transformers import GPT2TokenizerFast

# transformers 라이브러리에서 제공하는 pretrained tokenizer를 로드합니다.
tokenizer = GPT2TokenizerFast.from_pretrained("gpt2")
# pad token id를 eos token id로 지정합니다.
tokenizer.pad_token_id = tokenizer.eos_token_id
# 모델이 padding을 오른쪽에 적용하도록 합니다.
tokenizer.padding_side = "right"
# 모델의 입력에 대한 최대 길이(토큰 수로 측정)를 지정합니다.
tokenizer.model_max_length = 2048
# '<|image|>' 문자열을 클래스 토큰으로 설정하고 tokenizer에 추가합니다.
tokenizer.add_special_tokens(
            {'cls_token': '<|image|>'}
        )

정의한 tokenizer의 encode 함수를 사용하여 텍스트를 토크나이저에 전달해보겠습니다. 아래 결과를 보면, 일련의 규칙에 따라 텍스트가 토큰 시퀀스 인덱스로 변환된 것을 볼 수 있습니다.

In [None]:
encoded_input = tokenizer.encode("Do not meddle in the affairs of wizards, for they are subtle and quick to anger.")
print(encoded_input)

토큰 시퀀스 인덱스는 tokenizer의 decode 함수를 통해 원래의 text로 반환됩니다.

In [None]:
print(tokenizer.decode(encoded_input))

원래의 text가 나오는 것을 확인할 수 있습니다.  
이제 tokenizer에 저장되어있는 vocabulary를 불러와서 출력해보도록 하겠습니다.  
표현 가능한 vocabulary는 우리가 위에서 tokenizer에 추가한 '<|image|>' 토큰을 포함하여 총 50258개 입니다.

In [None]:
vocab = tokenizer.get_vocab()

print(list(vocab.items())[:10])
print(f"len(vocab) : {len(vocab)}")

Vocab의 가장 마지막에 있는 토큰이 우리가 추가한 '<|image|>' 토큰임을 알 수 있습니다.

In [None]:
print(vocab['<|image|>'])

**1-4. ImageCaptionDataset class 정의**

CC3M 데이터와 tokenizer가 준비되었으니, 이제 데이터를 모델이 예상하는 입력 형식으로 전처리해주는 custom dataset class를 정의합니다.  

ImageCaptionDataset 클래스를 구현하면서 Req. 1-1을 풀어봅니다. 구현에 필요한 부분은 다음과 같습니다. 해당 """Write your code""" 부분을 구현합니다.

- ImageCaptionDataset 클래스
  - **Req. 1-1**: 데이터 전처리 : ImageCaptionDataset 내 \_\_getitem\_\_() 메소드 구현

In [None]:
from torch.utils.data import Dataset
from tqdm import tqdm
from pathlib import Path
from torchtyping import TensorType
from typing import Tuple
import os
from PIL import Image
from pyhelpers.store import load_json

class ImageCaptionDataset(Dataset):
    def __init__(
        self, data_dir, tokenizer=None, transforms=None, seq_len=2048
    ):
        self.data_dir = Path(data_dir)
        self.tokenizer = tokenizer
        self.transforms = transforms
        self.seq_len = seq_len

        paths = []
        img_data_dir = self.data_dir / "image_data"
        for p in tqdm(Path(img_data_dir).glob(f"*.json"), desc=f"loading dataset paths from {str(img_data_dir)}"):
          paths.append(p)
        self.paths = sorted(paths)
        print("LENGTH OF DATA", len(self.paths))

    def __len__(self):
        return len(self.paths)

    ############################################################################
    # Req 1-1: 데이터 전처리 : ImageCaptionDataset 내 __getitem__() 메소드 구현       #
    ############################################################################
    def __getitem__(
        self, idx
    ) -> Tuple[TensorType["b", "c", "h", "w"], TensorType["b", "s"]]:

        img_data = load_json(self.paths[idx])
        img_path = os.path.join(self.data_dir, "images", img_data['key']+".jpg")

        ################################################################################
        # TODO: img_path를 텐서 형태의 이미지로, caption을 transformers library의 tokenizer를 #
        # 통해 텐서 형태의 토큰으로 변경                                                      #
        ################################################################################
        # *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
        # 1) PIL library의 Image.open() 함수를 사용해서 img_path로부터 RGB 형식으로 변환된
        #    PIL.Image 이미지 객체 얻기
        # 2) PIL.Image 이미지 객체를 사용자 정의된 self.transforms를 이용하여 전처리하여
        #    [channel, height, width] 형태의 텐서로 변환
        # 3) Transformers library의 tokenizer에서 정의된 encode() 함수로 caption을
        #    텐서 형태의 token으로 변환

        img = """Write your code"""
        img_tensor = """Write your code"""

        caption = img_data["caption"]
        token_tensor = self.tokenizer.encode(
            """Write your code""",
            return_tensors="pt",
            max_length=self.seq_len,
            padding="longest",
            truncation=True,
        ).squeeze()

        # *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
        ################################################################################
        #                                 END OF YOUR CODE                             #
        ################################################################################

        return {'images':img_tensor, 'captions':token_tensor}

# 2. LiMBeR 구현

LiMBeR(Linearly Mapping Between Representation spaces) 모델의 기본 접근 방식은 사전 훈련된 이미지 인코더의 hidden size $h_I$를 언어 모델의 input space로 투영하기 위해 linear layer P를 훈련시키는 것입니다.   

**2-1. Image Encoder**  
이미지 인코더로부터 이미지를 나타내는 $h_I$ 차원의 이미지 표현을 추출합니다.

**2-2. Projection Layer**  
Projection $P$는 이미지 표현을 $e_L$ * $k$ 시퀀스의 소프트 프롬프트로 투영합니다. 이를 이미지 프롬프트라고 지칭합니다.

**2-3. Text Decoder (Large Language Model)**  
이미지 프롬프트를 사용하여 언어 모델을 prompting함으로써, 언어 모델은 이미지를 설명하는 캡션을 생성할 수 있습니다.    

Projection Layer의 양쪽에 있는 이미지 인코더와 언어 모델을 동결하여 학습된 파라미터가 프로젝션 레이어에만 존재하도록 합니다.

본 실습에서는 이미지 인코더로 **CLIP RN50x16**를 사용하며 **$k$=144, $h_I$=3072, $e_L$=2048** 로 설정합니다.  
또한 논문에서는 언어 모델로 60억 개의 파라미터를 가진 decoder-only GPT-J 모델을 사용하였으나, Colab의 자원 제한으로 인해 본 실습에서는 13억 개의 파라미터를 가진 **decoder-only GPT-Neo** 모델을 사용합니다.

ImagePrefix 클래스와 LimberGPTJ 클래스의 함수들을 하나씩 구현하면서 Req. 1-2 부터 Req. 1-5를 풀어봅니다. 구현에 필요한 부분은 다음과 같습니다.
- ImagePrefix 클래스  
  - **Req. 1-2**: 이미지를 입력 받아 소프트 프롬프트를 반환하는 ImagePrefix 클래스 구현
- LimberGPTJ 클래스  
  - **Req. 1-3**: LimberGPTJ 내 setmultimodal 함수 구현
  - **Req. 1-4**: LimberGPTJ 내 make_input_embeddings 함수 구현
  - **Req. 1-5**: LimberGPTJ 내 build_labels_for_training 함수 구현

아래 셀부터 하나씩 따라가면서 Req. 1-2 부터 Req. 1-5 에서 구현이 필요한 부분을 읽고, 다시 ImagePrefix 클래스 또는 LimberGPTJ 클래스로 돌아와서 해당 """Write your code""" 부분을 구현해봅니다.  


In [None]:
from functools import partial
from typing import Union
import torch.nn as nn
import clip
from einops import rearrange

############################################################################
# Req 1-2: 이미지를 입력 받아 소프트 프롬프트를 반환하는 ImagePrefix 클래스 구현         #
############################################################################
class ImagePrefix(nn.Module):

    """
    이미지 배치를 입력으로 받아 언어 모델의 word embedding과 동일한 h_I 차원의 소프트 프롬프트를 반환합니다.

    :param encoder_out_dim: 이미지 인코더의 출력 차원 (=h_I)
    :param lm_out_dim: 언어 모델의 입력 및 출력 차원 (=e_L)
    :param out_seq_len: 소프트 프롬프트의 시퀀스 길이 (=k)
    :param image_embed_dropout_prob: 소프트 프롬프트에 적용할 dropout probability
    :param device: 모델을 실행 할 장치
    """

    ################################################################################
    # TODO: 텐서 형태 이미지를 입력받아 CLIP의 image encoder를 통해 이미지 표현을 얻고          #
    # projection layer를 통해 소프트 프롬프트로 투영하는 ImagePrefix 클래스 구현              #
    ################################################################################
    # *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
    # 1) __init__() 함수에서 pretrained CLIP image encoder 가져오기
    # 2) __init__() 함수에서 projection layer 정의
    # 3) __forward__() 함수에서 pretrained CLIP image encoder를 사용하여 image로부터 이미지 표현 얻기
    # 4) __forward__() 함수에서 projection layer를 사용하여 이미지 표현을 soft prompts로 투영

    def __init__(
        self,
        encoder_out_dim, lm_out_dim, out_seq_len, image_embed_dropout_prob, device=None,
    ):
        super().__init__()

        # pretrained CLIP image encoder 가져오기
        self.encoder = """Write your code"""
        self.encoder.attnpool = Lambda(
            partial(rearrange, pattern="b d h w -> b (h w) d")
        )
        # 이미지 인코더의 출력 차원 (=h_I)
        self.encoder_out_dim = encoder_out_dim

        # 언어 모델의 입력 및 출력 차원 (=e_L)
        self.lm_out_dim = lm_out_dim

        # 소프트 프롬프트의 시퀀스 길이 (=k)
        self.out_seq_len = out_seq_len

        # 소프트 프롬프트의 차원
        proj_out_dim = self.lm_out_dim

        # Projection layer 정의
        self.proj = """Write your code"""

        self.dropout = nn.Dropout(image_embed_dropout_prob)

    def forward(
        self, x: TensorType["b", "c", "h", "w"]
    ) -> TensorType["b", "seq", "out_dim"]:

        image_feats = """Write your code"""
        soft_prompts = """Write your code"""

        # 소프트 프롬프트에 dropout 적용
        soft_prompts = self.dropout(soft_prompts)

        return soft_prompts

    # *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
    ################################################################################
    #                                 END OF YOUR CODE                             #
    ################################################################################

In [None]:
from transformers import GPTNeoForCausalLM
from typing import List, Optional
from torchtyping import TensorType
from transformers.file_utils import ModelOutput
from torchvision import transforms as T

class LimberGPTNeo(GPTNeoForCausalLM):

    ############################################################################
    # Req 1-3: LimberGPTJ 내 setmultimodal 함수 구현                              #
    ############################################################################
    def setup_multimodal(self, tokenizer, max_seq_len, encoder_out_dim, lm_out_dim, out_seq_len, image_embed_dropout_prob, freeze_lm, freeze_img_encoder):

        """
        :param tokenizer: tokenizer
        :param max_seq_len: 언어 모델의 입력에 대한 최대 길이(토큰 수로 측정)
        :param encoder_out_dim: 이미지 인코더의 출력 차원 (=h_I)
        :param lm_out_dim: 언어 모델의 입력 및 출력 차원 (=e_L)
        :param out_seq_len: 소프트 프롬프트의 시퀀스 길이 (=k)
        :param image_embed_dropout_prob: 소프트 프롬프트에 적용할 dropout probability
        :param freeze_lm: 언어 모델을 freeze할지 여부
        :param freeze_img_encoder: 이미지 인코더를 freeze할지 여부
        """
        ################################################################################
        # TODO: Transformers의 GPTNeoForCausalLM 클래스를 상속받는 LimberGPTJ 클래스에서       #
        # ImagePrefix 클래스의 인스턴스를 생성한다. Image encoder와 Language model은 freeze시켜  #
        # Image Encoder(freeze) – Projection Layer – Language Model(freeze) 구조를 만든다.#
        ################################################################################
        # *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
        # 1) 언어 모델 freeze
        # 2) ImagePrefix 클래스의 인스턴스 생성
        # 3) 이미지 인코더 freeze

        self.seq_len = max_seq_len
        self.tokenizer = tokenizer

        self.image_token = self.tokenizer.cls_token_id
        self.eos_token = self.tokenizer.eos_token_id
        self.resize_token_embeddings(len(self.tokenizer))

        self.word_embedding = self.transformer.wte

        # 언어 모델 freeze
        if freeze_lm:
            """Write your code"""

        # ImagePrefix 클래스의 인스턴스 생성
        self.image_prefix = """Write your code"""

        # 이미지 프롬프트의 시퀀스 길이 지정
        self.image_prompt_seq_len = self.image_prefix.out_seq_len
        inp_rez = self.image_prefix.encoder.input_resolution

        self.transforms = T.Compose(
        [
            T.Resize(inp_rez, interpolation=T.InterpolationMode.BICUBIC),
            T.CenterCrop(inp_rez),
            lambda image: image.convert("RGB"),
            T.ToTensor(),
            add_batch_dim,
            T.Normalize(
                (0.48145466, 0.4578275, 0.40821073),
                (0.26862954, 0.26130258, 0.27577711),
            ),
        ]
        )

        # 이미지 인코더 freeze
        if freeze_img_encoder:
            """Write your code"""

        # *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
        ################################################################################
        #                                 END OF YOUR CODE                             #
        ################################################################################

    ############################################################################
    # Req 1-4: LimberGPTJ 내 make_input_embeddings 함수 구현                     #
    ############################################################################
    def make_input_embeddings(self,
        images: TensorType["b", "c", "h", "w"] = None,
        captions: Optional[TensorType["b", "seq"]] = None,
        image_embeddings: TensorType["b", "s", "d"] = None,):

        """
        :param images: 이미지
        :param captions: 캡션
        :param image_embeddings: 이미지 프롬프트 (inference 시 사용)
        """

        ################################################################################
        # TODO: ImagePrefix 클래스의 인스턴스를 이용하여 입력 이미지로부터 이미지 프롬프트를 얻고,      #
        # 캡션으로부터 캡션의 임베딩을 얻어 훈련 시 LLM에 인풋으로 들어갈 임베딩 얻기                   #
        ################################################################################
        # *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
        # 1) 이미지 프롬프트 추출
        # 2) 캡션 임베딩 추출
        # 3) 훈련 시 LLM에 인풋으로 들어갈 임베딩 만들기

        if image_embeddings is None:

        """Write your code"""

        # *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
        ################################################################################
        #                                 END OF YOUR CODE                             #
        ################################################################################

        return image_embeddings, caption_embeddings, input_embeddings

    ############################################################################
    # Req 1-5: LimberGPTJ 내 build_labels_for_training 함수 구현                 #
    ############################################################################
    def build_labels_for_training(self,
        image_embeddings: TensorType["b", "s", "d"] = None,
        captions: Optional[TensorType["b", "seq"]] = None,):

        """
        :param image_embeddings: 이미지 프롬프트
        :param captions: 캡션
        """

        ################################################################################
        # TODO: 이미지와 캡션으로부터 언어 모델을 훈련하기 위한 라벨을 만든다.                        #
        # 이미지 프롬프트와, text에서 첫 eos 토큰 이후 반복되는 eos 토큰에 대해서는 loss가            #
        # 계산되지 않아야 하기 때문에 해당하는 label을 -100으로 설정해주어 masking 해준다.            #
        ################################################################################
        # *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
        # 1) 이미지 프롬프트의 length 만큼의 label을 -100으로 설정한다.
        # 2) 이미지 프롬프트의 label과 caption을 concat하여 언어 모델을 훈련하기 위한 라벨을 만든다.
        # 3) text에서 첫 eos 토큰 이후 반복되는 eos 토큰에 대해서 label을 -100으로 설정한다.

        """Write your code"""

        # *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
        ################################################################################
        #                                 END OF YOUR CODE                             #
        ################################################################################

        return labels


    def preprocess_inputs(self, img_path, text_prompt):

        return preprocess_inputs(
            self,
            img_path=img_path,
            text_prompt=text_prompt,
        )

    def top_k_sampling(self, logits, top_k, temperature):

        return top_k_sampling(
            self,
            logits=logits,
            top_k=top_k,
            temperature=temperature,
        )

    def top_p_sampling(self, logits, top_p, temperature):

        return top_p_sampling(
            self,
            logits=logits,
            top_p=top_p,
            temperature=temperature,
        )

    @torch.no_grad()
    def generate(
        self,
        embeddings: TensorType["b", "s", "d"],
        max_steps: int = 100,
        top_k: int = 0,
        top_p: float = 0.9,
        decode: bool = True,
        temperature: float = 0.7,

    ):

        return generate(
            self,
            embeddings=embeddings,
            max_steps=max_steps,
            top_k=top_k,
            top_p=top_p,
            decode=decode,
            temperature=temperature,
        )


    def forward(
        self,
        images: TensorType["b", "c", "h", "w"] = None,
        captions: Optional[TensorType["b", "seq"]] = None,
        output_hidden_states: bool = True,
        image_embeddings: TensorType["b", "s", "d"] = None,
        attention_mask=None,
    ) -> ModelOutput:

        # 훈련 시 LLM에 인풋으로 들어갈 임베딩을 얻는다.
        image_embeddings, caption_embeddings, input_embeddings = self.make_input_embeddings(images,captions,image_embeddings)

        # 이미지와 캡션으로부터 언어 모델을 훈련하기 위한 라벨을 만든다.
        labels = self.build_labels_for_training(image_embeddings, captions)

        # Language Model 추론
        lm_outputs = super().forward(
            inputs_embeds=input_embeddings,
            labels=labels,
            output_hidden_states=output_hidden_states,
            attention_mask=attention_mask,
            use_cache=False,
            return_dict=True,
        )
        return lm_outputs

하이퍼파라미터를 설정해줍니다.

In [None]:
# Dataset parameters
train_dataset_dir='./limber/cc3m/train'

# Model parameters
max_seq_len=2048 # 언어 모델의 입력에 대한 최대 길이(토큰 수로 측정)
encoder_out_dim=3072 # 이미지 인코더의 출력 차원 (=h_I)
lm_out_dim=2048 # 언어 모델의 입력 및 출력 차원 (=e_L)
out_seq_len=144 # 이미지 프롬프트 시퀀스 길이 (=k)
image_embed_dropout_prob=0.1 # 소프트 프롬프트에 적용할 dropout probability

# Training parameters
weight_decay=0.0
train_batch_size=16
min_lr=0.0
lr=8.0e-4
image_enc_lr=2e-06
gradient_accumulation_steps=8
train_steps=100
freeze_lm=True
freeze_img_encoder=True
save='./limber/checkpoints'
exp_name='train'

학습 및 추론에 필요한 유틸리티 클래스와 유틸리티 함수를 정의합니다.

In [None]:
import requests
from io import BytesIO
import  PIL.Image as PilImage
from dataclasses import dataclass
from typing import Union, Any, Callable
from transformers.utils import PaddingStrategy
from collections.abc import Mapping
from collections import defaultdict

# 유틸리티 클래스
class Lambda(torch.nn.Module):
    def __init__(self, fn: Callable):
        super().__init__()
        assert hasattr(fn, "__call__")
        self.fn = fn

    def forward(self, x):
        return self.fn(x)

@dataclass
class ImageCaptionDataCollator:
    tokenizer: GPT2TokenizerFast
    padding: Union[bool, str, PaddingStrategy] = True
    max_length: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    return_tensors: str = "pt"

    def __call__(self, features):
        batch = {}
        batch['input_ids']=[f["captions"] for f in features]
        batch = self.tokenizer.pad(
            batch,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors=self.return_tensors,
        )
        batch['captions']=batch['input_ids']
        batch['images'] = torch.stack([i['images'] for i in features]).float()
        batch['attention_mask'] = torch.stack([torch.cat( (torch.ones(out_seq_len), a) ) for a in batch['attention_mask']]).float()
        del batch['input_ids']
        return batch

# 유틸리티 함수
def add_batch_dim(t):
  return t.unsqueeze(0)

def get_params_for_weight_decay_optimization(module):
    weight_decay_params = {"params": []}
    no_weight_decay_params = {"params": [], "weight_decay": 0.0}
    blacklist_modules = (torch.nn.LayerNorm, torch.nn.Embedding)

    for module_ in module.modules():
        if isinstance(module_, blacklist_modules) or (
            weight_decay == 0.0
        ):
            no_weight_decay_params["params"].extend(
                [
                    p
                    for p in list(module_._parameters.values())
                    if (p is not None) and p.requires_grad
                ]
            )
        else:
            for n, p in list(module_._parameters.items()):
                if p is not None and p.requires_grad:
                    if n != "bias":
                        weight_decay_params["params"].append(p)
                    else:
                        no_weight_decay_params["params"].append(p)
    param_dict = {
        pn: p
        for pn, p in module.named_parameters()
        if p is not None and p.requires_grad
    }
    assert len(no_weight_decay_params["params"]) + len(
        weight_decay_params["params"]
    ) == len(
        param_dict.keys()
    ), "Number of params in both groups != total number of trainable params"
    if weight_decay == 0.0:
        # only return a single param group if no weight decay is being used anyway
        return [no_weight_decay_params]
    return [weight_decay_params, no_weight_decay_params]

def configure_param_groups(model):
    if image_enc_lr is not None:
        image_enc_params = get_params_for_weight_decay_optimization(
            model.image_prefix.encoder
        )
        for pdict in image_enc_params:
            pdict["lr"] = image_enc_lr
        image_proj_params = get_params_for_weight_decay_optimization(
            model.image_prefix.proj
        )

        lm_params = get_params_for_weight_decay_optimization(model.transformer)
        lm_params +=get_params_for_weight_decay_optimization(model.lm_head)

        class_params = []
        if hasattr(model, "class_head") and model.class_head is not None:
            class_params = get_params_for_weight_decay_optimization(
                model.class_head
            )
        all_params = []
        for p in image_enc_params + lm_params + image_proj_params + class_params:
            if p["params"]:
                all_params.append(p)
    else:
        all_params = get_params_for_weight_decay_optimization(model)

    d = defaultdict(dict)
    for param_group in all_params:
        lr = param_group.get("lr", None)
        wd = param_group.get("weight_decay", None)
        key = f"lr_{lr}_wd_{wd}"
        if d[key].get("params") is None:
            d[key]["params"] = []
        d[key]["params"].extend(param_group["params"])
        if lr is not None:
            d[key]["lr"] = lr
        if wd is not None:
            d[key]["weight_decay"] = wd
    all_params = list(d.values())
    n_params = sum([len(d["params"]) for d in all_params])
    param_dict = {
        pn: p for pn, p in model.named_parameters() if p is not None and p.requires_grad
    }
    assert n_params == len(
        param_dict
    ), f"Some parameters are missing from param groups ({n_params} | {len(param_dict)})"
    return all_params

def _prepare_input(data: Union[torch.Tensor, Any]) -> Union[torch.Tensor, Any]:
    if isinstance(data, Mapping):
        return type(data)({k: _prepare_input(v) for k, v in data.items()})
    elif isinstance(data, (tuple, list)):
        return type(data)(_prepare_input(v) for v in data)
    elif isinstance(data, torch.Tensor):
        kwargs = dict(device='cuda')
        return data.to(**kwargs)
    return data

### **[Req. 1-1]** 데이터 전처리 : ImageCaptionDataset 내 \_\_getitem\_\_() 메소드 구현
본 실습은 이미지 경로를 텐서 형태의 이미지로, 캡션을 텐서 형태의 토큰으로 변환해주는 ImageCaptionDataset 클래스의 \_\_getitem\_\_() 메소드를 구현합니다.  

In [None]:
from transformers.modeling_utils import no_init_weights

with no_init_weights():
    model = LimberGPTNeo.from_pretrained('EleutherAI/gpt-neo-1.3B')

model.setup_multimodal(tokenizer, max_seq_len, encoder_out_dim, lm_out_dim, out_seq_len, image_embed_dropout_prob, freeze_lm, freeze_img_encoder)

tokenizer.deprecation_warnings["Asking-to-pad-a-fast-tokenizer"] = True
transforms = model.transforms

In [None]:
from torch.utils.data import DataLoader
# dataset
CC3M = ImageCaptionDataset(
        train_dataset_dir, tokenizer, transforms
    )

CC3M_dataloader = DataLoader(
    CC3M,
    batch_size=1,
    collate_fn=ImageCaptionDataCollator(tokenizer)
)
test_inputs = next(iter(CC3M_dataloader))

test_images = test_inputs["images"]
test_captions = test_inputs["captions"]
print(f"test_images.shape : {test_images.shape}")
print(f"test_captions.shape : {test_captions.shape}")
print(f"test_captions : {test_captions}")

### **[Req. 1-2]** 이미지를 입력 받아 이미지 프롬프트를 반환하는 ImagePrefix 클래스 구현

In [None]:
from torch.cuda.amp import autocast

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
model.to(device)
test_images = test_images.to(device)
test_captions = test_captions.to(device)

with autocast(dtype=torch.float16):
  test_image_representations=model.image_prefix.encoder(test_images)
  test_image_prompts=model.image_prefix(test_images)

print(f"test_image_representations.shape : {test_image_representations.shape}")
print(f"test_image_prompts.shape : {test_image_prompts.shape}")



### **[Req. 1-3]** LimberGPTJ 내 setmultimodal 함수 구현




### **[Req. 1-4]** LimberGPTJ 내 make_input_embeddings 함수 구현

In [None]:
with autocast(dtype=torch.float16):
  test_image_prompts, test_caption_embeddings, test_input_embeddings =model.make_input_embeddings(test_images, test_captions)

print(f"test_image_prompts.shape : {test_image_prompts.shape}")
print(f"test_caption_embeddings.shape : {test_caption_embeddings.shape}")
print(f"test_input_embeddings.shape : {test_input_embeddings.shape}")

### **[Req. 1-5]** LimberGPTJ 내 build_labels_for_training 함수 구현

In [None]:
with autocast(dtype=torch.float16):
  test_labels = model.build_labels_for_training(test_image_prompts,test_captions)

print(f"test_labels[0] : {test_labels[0]}")
print(f"test_labels.shape : {test_labels.shape}")

## 3. LimberGPTJ 학습 스크립트

In [None]:
from torch.optim import AdamW
from torch.utils.data import DataLoader, RandomSampler
from torch.cuda.amp import autocast
import time

start_time = time.time()

save_dir = os.path.join(save, exp_name)
os.makedirs(save_dir, exist_ok=True)

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
model.to(device)

# Dataset 정의
train_dataset = ImageCaptionDataset(
        train_dataset_dir, tokenizer, transforms
    )

# Dataloader 정의
generator = torch.Generator()
seed = 2024
generator.manual_seed(seed)

train_sampler = RandomSampler(train_dataset, generator=generator)
train_dataloader = DataLoader(
    train_dataset,
    batch_size=train_batch_size,
    sampler=train_sampler,
    collate_fn=ImageCaptionDataCollator(tokenizer),
    drop_last=True,
    num_workers=1,
    pin_memory=True,
)

# Optimizer 정의
trainable_parameters = configure_param_groups(model)
optimizer = AdamW(
        trainable_parameters,
        lr,
        betas=(0.9, 0.95),
        weight_decay=weight_decay,
    )
scheduler = WarmupDecayLR(optimizer, lr_decay_iters)

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
model.to(device)

# Training loop
print("Start Training!")
model.train()
model.gradient_checkpointing_enable()
scaler = torch.cuda.amp.GradScaler()

max_steps = train_steps
total_train_batch_size = train_batch_size * gradient_accumulation_steps
steps_in_epoch = len(train_dataloader)
num_update_steps_per_epoch = steps_in_epoch // gradient_accumulation_steps
num_update_steps_per_epoch = max(num_update_steps_per_epoch, 1)
num_train_epochs = max_steps // num_update_steps_per_epoch + int(
    max_steps % num_update_steps_per_epoch > 0
)
num_train_samples = max_steps * total_train_batch_size

print("***** Running training *****")
print(f"  Num examples = {len(train_dataloader.dataset)}")
print(f"  Num Epochs = {num_train_epochs}")
print(f"  Total train batch size = {total_train_batch_size}")
print(f"  Gradient Accumulation steps = {gradient_accumulation_steps}")
print(f"  Total optimization steps = {max_steps}")

global_step=0
tr_loss = torch.tensor(0.0).to(device)
model.zero_grad()

for epoch in range(num_train_epochs):
  for step, inputs in enumerate(train_dataloader):
    inputs = _prepare_input(inputs)
    with autocast(dtype=torch.float16):
        outputs = model(**inputs)
        # Loss 정의
        loss = outputs["loss"] if isinstance(outputs, dict) else outputs[0]
    loss = loss / gradient_accumulation_steps
    # Backpropagation
    scaler.scale(loss).backward()
    tr_loss_step = loss.detach()
    tr_loss += tr_loss_step

    if (step + 1) % gradient_accumulation_steps == 0:
      scaler.unscale_(optimizer)
      nn.utils.clip_grad_norm_(
          model.parameters(),
          1.0,
      )
      # Optimizer step
      scaler.step(optimizer)
      scaler.update()

      model.zero_grad()
      global_step += 1

      # 현재 step number에 해당하는 가중치(weights)와 편향(bias)를 저장합니다.
      if global_step % 10 == 0 :

        # 모델의 state_dict을 가져옵니다.
        state_dict = model.state_dict()
        # 특정 layer(projection layer)의 가중치(weights)와 편향(bias)를 저장합니다.
        selected_layer_params = {key: value for key, value in state_dict.items() if 'image_prefix.proj' in key}
        save_path = f"{save_dir}/step_{global_step}.ckpt"
        torch.save(selected_layer_params, save_path)
        print("Model saved in ", save_path)

      tr_loss_scaler = round(tr_loss.mean().item(), 4)
      tr_loss -= tr_loss
      print(f"'loss': {tr_loss_scaler}, 'step': {global_step}")

print(f"\n\nTraining completed in {time.time() - start_time:.5f} sec.")

# 4. 시각-언어 멀티모달 LLM을 활용하여 이미지 캡션 생성

LimberGPTJ 클래스의 함수들을 하나씩 구현하면서 Req. 1-6부터 Req. 1-8을 풀어봅니다. 구현에 필요한 부분은 다음과 같습니다.

- LimberGPTJ 클래스  
  - **Req. 1-6**: LimberGPTJ 내 preprocess_inputs 함수 구현
  - **Req. 1-7**: LimberGPTJ 내 top_k_sampling 함수 구현
  - **Req. 1-8**: LimberGPTJ 내 top_p_sampling 함수 구현  
아래 셀부터 하나씩 따라가면서 Req. 1-6 와 Req. 1-8 에서 구현이 필요한 부분을 읽고, 다시 LimberGPTJ 클래스의 함수 파트로 돌아와서 해당 """Write your code""" 부분을 구현해봅니다.

In [None]:
import torch.nn.functional as F
from torchtyping import TensorType
from typing import Union, List

############################################################################
# Req 1-6: LimberGPTJ 내 preprocess_inputs 함수 구현                          #
############################################################################
def preprocess_inputs(model, img_path, text_prompt):
    """
    :param img_path: 이미지 경로
    :param text_prompt: 고정된 initial text prompt
    """
    ################################################################################
    # TODO: 이미지와 고정된 initial text prompt를 언어 모델의 인풋으로 들어갈 임베딩으로 변환     #
    ################################################################################
    # *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
    # 1) img_path를 텐서 형태의 이미지로, 고정된 initial text prompt를 텐서 형태의 토큰으로 변환
    # 2) 이미지로부터 이미지 프롬프트를 얻고, 토큰으로부터 text prompt의 임베딩을 얻어 추론 시 언어 모델에
    #    인풋으로 들어갈 임베딩 얻기

    """Write your code"""

    # *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
    ################################################################################
    #                                 END OF YOUR CODE                             #
    ################################################################################
    return input_embedding

############################################################################
# Req 1-7: LimberGPTJ 내 top_k_sampling 함수 구현                             #
############################################################################
def top_k_sampling(model,logits, top_k, temperature):
    """
    :param logits: 모델이 예측한 다음 토큰의 probability (vocabulary에 있는 50258 토큰의 logit 값)
    :param top_k: 다음 토큰 선택시, 상위 k개의 토큰을 후보로 고려
    :param temperature: temperature scaling 값 (예측한 다음 토큰들의 확률 분포를 변형시킴.
                        0에 가까울 수록 확률 분포가 날카로워지며, 1에 가까울 수록 확률 분포가 평평해짐.)
    """
    ################################################################################
    # TODO: 언어 모델이 예측한 다음 단어에 대한 logits(모든 토큰의 logit 값)로부터              #
    # Top-K sampling 방식을 사용하여 다음 토큰을 샘플링                                    #
    ################################################################################
    # *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
    # 1) torch.topk 함수를 사용하여 logits로부터 상위 k개 토큰의 logit과 index를 추출
    # 2) torch.full_like 함수를 사용하여 -infinite 값으로 채워진 logits 크기의 텐서 생성
    # 3) 1에서 구한 상위 k개 토큰의 logit을 2에서 생성한 텐서의 해당 index에 assign
    # 4) torch.nn.functional의 softmax 함수를 사용하여 다음 토큰의 probability 계산
    # 5) torch.multinomial 함수를 사용하여 probability로부터 한 개 토큰을 샘플링 실행 결과 예시

    """Write your code"""

    # *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
    ################################################################################
    #                                 END OF YOUR CODE                             #
    ################################################################################
    return next_token

############################################################################
# Req 1-8: LimberGPTJ 내 top_p_sampling 함수 구현                             #
############################################################################
def top_p_sampling(model,logits, top_p, temperature):
    """
    :param logits: 모델이 예측한 다음 토큰의 probability (vocabulary에 있는 50258 토큰의 logit 값)
    :param top_p: 다음 토큰 선택시, 누적 확률이 p 이상이 되는 최소한의 토큰 집합을 후보로 고려
    :param temperature: temperature scaling 값 (예측한 다음 토큰들의 확률 분포를 변형시킴.
                        0에 가까울 수록 확률 분포가 날카로워지며, 1에 가까울 수록 확률 분포가 평평해짐.)
    """
    ################################################################################
    # TODO: 언어 모델이 예측한 다음 단어에 대한 logits(모든 토큰의 logit 값)로부터              #
    # Top-P sampling 방식을 사용하여 다음 토큰을 샘플링                                    #
    ################################################################################
    # *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
    # 1) torch.sort 함수를 사용하여 logits를 내림차순으로 정렬
    # 2) torch.cumsum 함수를 사용하여 logits의 누적 분포 계산
    # 3) 누적 분포를 바탕으로 해당 인덱스의 토큰이 후보에서 제외하는지 여부를 담은 텐서 생성
    #    i. 누적 확률이 p 이상이 되는 index에 True 값 할당
    #    ii. 누적 확률이 p 이상이 되는 최소한의 토큰 집합이므로, 위에서 할당한 값들을 오른쪽으로 한 칸 씩 이동
    #    iii. 확률이 가장 높은 0번째 index는 항상 False 값 할당
    # 4) 후보에서 제외되는 토큰의 logit은 -infinite로 할당
    # 5) torch.nn.functional의 softmax 함수를 사용하여 다음 토큰의 probability 계산
    # 6) torch.multinomial 함수를 사용하여 probability로부터 한 개 토큰을 샘플링

    """Write your code"""

    # *****END OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****
    ################################################################################
    #                                 END OF YOUR CODE                             #
    ################################################################################
    return next_token

@torch.no_grad()
def generate(
    model: "GPTNeo",
    embeddings: TensorType["b", "s", "d"],
    max_steps: int = 100,
    top_k: int = 0,
    top_p: float = 0.9,
    decode: bool = True,
    temperature: float = 0.7,

):

    """
    embedding에 대한 캡션을 생성합니다.

    :param model: 캡션 생성에 쓰일 모델
    :param embeddings: 캡션을 생성할 임베딩
    :param max_steps: 캡션을 생성할 때 최대 step 수
    :param top_k: Top k 샘플링에 사용할 값, 0이면 샘플링이 사용되지 않음
    :param top_p: Top p 샘플링에 사용할 값, 0이면 샘플링이 사용되지 않음
    :param decode: 토큰을 decode하여 텍스트 형태로 반환할지 또는 토큰을 반환할지 여부
    :param temperature: temperature scaling 값 (예측한 다음 토큰들의 확률 분포를 변형시킴.
                        0에 가까울 수록 확률 분포가 날카로워지며, 1에 가까울 수록 확률 분포가 평평해짐.)
    """

    eos_token = model.eos_token
    b, s, _ = embeddings.shape
    past_key_values = None

    # 이미지 토큰으로 output을 초기화
    out = torch.zeros((b, s), dtype=torch.long).to(model.device) + model.image_token

    # sampling
    for i in range(max_steps):
        if i == 0:
            # 초기 input
            outputs = super(type(model),model).forward(
                inputs_embeds=embeddings,
                use_cache=True,
                past_key_values=past_key_values,
            )
        else:
            # 과거 key와 value를 캐싱하여 마지막 토큰만 사용
            outputs = super(type(model),model).forward(
                input_ids=out[:, -1:], use_cache=True, past_key_values=past_key_values
            )

        logits = outputs.logits[:, -1, :].float()
        past_key_values = outputs.past_key_values

        if top_k > 0:
          # Top K sampling
          next_token = model.top_k_sampling(logits,top_k,temperature)
        if top_p > 0:
          # Nucleus sampling
          next_token = model.top_p_sampling(logits,top_p,temperature)

        out = torch.cat((out, next_token), dim=-1)

        if eos_token is not None and (next_token == eos_token).all():
            break

    if decode:
        captions = []
        for b in out:
          eos_index = (b == eos_token).nonzero()
          if eos_index.any():
            b[eos_index[0] :] = eos_token
          b = b.tolist()
          b=[i for i in b if (not i == model.image_token) and (not i == eos_token)]
          # 토큰을 decode하여 텍스트 형태로 변환
          caption = model.tokenizer.decode(b)
          captions.append(caption)
        out = captions

    return out

**Test Image:**  
![test_image.png](https://ak2.picdn.net/shutterstock/videos/18488392/thumb/9.jpg)  
(그림 출처: https://ak2.picdn.net/shutterstock/videos/18488392/thumb/9.jpg)

In [None]:
import time

# 제공된 미리 학습된 모델 가중치를 로드합니다.
limber_proj_path = "./limber/checkpoints/gpt_neo_pretrained_weights.ckpt"
proj_ckpt = torch.load(limber_proj_path)
proj_ckpt = {key.split("image_prefix.proj.")[1]: value for key, value in proj_ckpt.items()}
model.image_prefix.proj.load_state_dict(proj_ckpt)
model = model.cuda().half()

start_time = time.time()
img_path = "./limber/cc3m/image_captioning.jpg"
model.eval()
print("Start inference...")

### **[Req. 1-6]** LimberGPTJ 내 preprocess_inputs 함수 구현

In [None]:
embeddings = model.preprocess_inputs(img_path,'A picture of')
print(f"embeddings.shape: {embeddings.shape}")

### **[Req. 1-7]** LimberGPTJ 내 top_k_sampling 함수 구현

In [None]:
caption = model.generate(embeddings=embeddings,top_k=3,temperature=0.7)
print("\nOutput: ")
print(caption[0].strip())

### **[Req. 1-8]** LimberGPTJ 내 top_p_sampling 함수 구현

In [None]:
caption = model.generate(embeddings=embeddings,top_p=0.9,temperature=0.7)
print("\nOutput: ")
print(caption[0].strip())