#### 필요 라이브러리

In [None]:
# 데이터 처리
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import json
import itertools
import cv2
from torch.utils.data import DataLoader

# ViT
from timm import create_model, list_models
from types import SimpleNamespace

# GPT
from transformers import GPT2LMHeadModel, GPT2TokenizerFast, get_linear_schedule_with_warmup

# 데이터 증강
import albumentations as A
from albumentations.pytorch import ToTensorV2 # 데이터 -> PyTorch 텐서로 변환
from PIL import Image # 이미지 처리
from pathlib import Path # 파일 경로 관리
from sklearn.model_selection import train_test_split
from torch.cuda.amp import GradScaler, autocast
from tqdm.auto import tqdm # 진행 상태 표시
import gc

# 최종 예측
import subprocess
import matplotlib.pyplot as plt
from IPython.display import display, Video

### Fine-tuning the Model

#### Loading the Data

In [None]:
tokenizer = GPT2TokenizerFast.from_pretrained('gpt2')
tokenizer.pad_token = tokenizer.eos_token
tokenizer.pad_token

In [None]:
### 데이터 불러오기
base_path = '/content/drive/MyDrive/DATA_최종'
images_path = os.path.join(base_path, 'Images')
labels_path = os.path.join(base_path, 'Labels')

In [None]:
### 영상 프레임 가져오기

# 행동 폴더 이름 -> action_classes
action_classes = os.listdir(images_path)
print(f"고냥이의 행동 {len(action_classes)}개 있어요 ฅ^._.^ฅ")

# 각 폴더로 들어가서 - 비디오 이름들 가져오고 - 그 비디오 폴더 안으로 다시 들어가서 - 비디오 프레임들 가져오기 (200개 정도만?)
frames = {}
videos = {}

for action in action_classes:
    videos[action] = [] # 초기화
    frames[action] = [] # 초기화

    images_path_behav = os.path.join(images_path, action) # 각 폴더 경로
    video_names = os.listdir(images_path_behav) # 비디오 이름들 가져오기

    count = 0
    for video in video_names:
        if count >= 100:
          break
        frames_path = os.path.join(images_path_behav, video) # 각 영상 경로
        frame_names = os.listdir(frames_path) # 각 영상의 프레임 이름들

        if not frame_names:
          print(f"경고: {frames_path} 폴더가 비어 있습니다. 건너뜁니다.")
          continue

        one_video_frames = [os.path.join(frames_path, fname) for fname in frame_names]
        frames[action].append(one_video_frames)
        videos[action].append(video)
        count += 1

In [None]:
### 영상 텍스트 가져와서 labels에 저장
labels = []
for action in videos.keys(): # 각 행동별로
    for video in videos[action]: # 각 비디오별로
        label_path = os.path.join(labels_path, action, video) + '.json' # json 파일 경로 설정
        try:
          with open(label_path, 'r') as f:
              json_file = json.load(f)
              labels.append("고양이가 " + json_file['metadata']['owner']['situation'] + " " + json_file['metadata']['action']) # situation, action 불러오기
        except Exception as e:
          print(f"{label_path} 에서 오류 발생: {e}")


In [None]:
!pip install deepl

In [None]:
import deepl

AUTH_KEY = "##########"
translator = deepl.Translator(AUTH_KEY)

batch_size = 10
translated_labels = []

for i in range(0, len(labels), batch_size):
    batch = labels[i:i+batch_size]
    translated_results = translator.translate_text(batch, source_lang="KO", target_lang="EN-US")
    translated_labels.extend([result.text for result in translated_results])

In [None]:
all_videos = list(itertools.chain(*videos.values()))
all_frames = list(itertools.chain(*frames.values()))

In [None]:
### 데이터프레임으로
kitties = pd.DataFrame({'video_names' : all_videos, 'videos' : all_frames, 'caption' : translated_labels})
kitties.head(2)

In [None]:
### 잘 로드되었는지 확인
image_path = kitties['videos'][5][13]
image = cv2.imread(image_path)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
plt.imshow(image)

#### Dataset, DataLoader

In [None]:
class Dataset:
    def __init__(self, df, tokenizer, transform):
        self.df = df
        self.tokenizer = tokenizer
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        sample = self.df.iloc[idx, : ]
        frame_paths = sample['videos']
        caption = sample['caption']

        # 이미지 불러오기
        frames = []
        for frame_path in frame_paths:
            image = cv2.imread(frame_path)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            image = np.array(image, dtype = np.uint8)
            image = self.transform(image = image)["image"] # 바뀐 애들 중에서 image만 떼어오기
            frames.append(image)

        frames_tensor = torch.stack([torch.as_tensor(frame, dtype = torch.float32).clone().detach() for frame in frames])

        # 텍스트 토큰화
        caption = f"{caption}<|endoftext|>"
        input_ids = self.tokenizer(
            caption,
            truncation=True,
            return_tensors = "pt")['input_ids'].squeeze(0) # squeeze by gpt
        labels = input_ids.clone()
        labels[ :-1] = input_ids[1: ]
        return frames_tensor, input_ids, labels

In [None]:
train_kitties, val_kitties = train_test_split(kitties, test_size=0.1)

In [None]:
train_kitties = Dataset(train_kitties, tokenizer, transform)
val_kitties = Dataset(val_kitties, tokenizer, transform)

In [None]:
# 영상들의 프레임 개수 리스트 - 얼마로 패딩할지 정하기 위함 (평균 프레임 개수로 정했습니당)
frames_num = []
for i in range(len(kitties)):
    frames_num.append(len(kitties.loc[i, 'videos']))

video_padding = np.mean(frames_num)

In [None]:
def collate_fn(batch):
    frames, input_ids, labels = zip(*batch)

    # 이미지 패딩
    padding_size = int(video_padding)
    padded_frames = []

    for video in frames:
        current_length = video.shape[0]

        if current_length > padding_size: # 패딩보다 길다면
            video = video[ :padding_size] # 잘라내고
        else:
            pad_size = padding_size - current_length # 짧으면 부족한 만큼 채우기
            pad_tensor = torch.zeros((pad_size, *video.shape[1: ]))
            video = torch.cat([video, pad_tensor], dim = 0)

        padded_frames.append(video)

    padded_frames_together = torch.stack(padded_frames)

    # 텍스트 패딩
    input_ids = torch.nn.utils.rnn.pad_sequence(input_ids, batch_first = True)
    labels = torch.nn.utils.rnn.pad_sequence(labels, batch_first = True, padding_value = -100) # 나중에 loss 계산할 때 무시하라고 -100으로 처리

    return padded_frames_together, input_ids, labels

In [None]:
### DataLoader 설정
train_dataloader = DataLoader(
    train_kitties,
    batch_size = 4,
    shuffle = True,
    num_workers = 1,
    collate_fn = collate_fn)

val_dataloader = DataLoader(
    val_kitties,
    batch_size = 4,
    shuffle = False,
    num_workers = 1,
    collate_fn = collate_fn)

### Model Structures

#### Model Structure - GPT

In [None]:
class GPT2Attention(nn.Module):
    def __init__(self,config):
        super().__init__()
        self.embed_dim = config.embed_dim
        self.n_heads = config.num_heads
        assert self.embed_dim % self.n_heads == 0, 'embedding dimension by be divisible by number of heads'
        self.head_size = self.embed_dim // self.n_heads
        self.seq_len = config.seq_len

        self.c_attn = nn.Linear(self.embed_dim, self.head_size * self.n_heads * 3,bias=True)
        self.scale = self.head_size ** -0.5

        self.register_buffer('mask',torch.tril(torch.ones(1,1,self.seq_len,self.seq_len)))

        self.c_proj = nn.Linear(self.embed_dim, self.embed_dim, bias=True)

        self.attn_dropout = nn.Dropout(config.attention_dropout)
        self.resid_dropout = nn.Dropout(config.residual_dropout)


    def forward(self, x):
        b,t,c = x.shape
        # q,k,v shape individually: batch_size x seq_len x embed_dim
        # we know that qk_t = q x k_t, where q=bxtxhead_dim, k_t=bxhead_timxt
        q,k,v = self.c_attn(x).chunk(3,dim=-1)
        q = q.view(b,t,self.n_heads,self.head_size).permute(0,2,1,3) # batch x n_heads x seq_len x head_dim
        k = k.view(b,t,self.n_heads,self.head_size).permute(0,2,1,3)
        v = v.view(b,t,self.n_heads,self.head_size).permute(0,2,1,3)

        qk_t = (q@k.transpose(-2,-1)) * self.scale
        qk_t = qk_t.masked_fill(self.mask[:,:,:t,:t]==0,float('-inf'))
        qk_t = F.softmax(qk_t,dim=-1)
        weights = self.attn_dropout(qk_t)

        attention = weights @ v # batch x n_heads x t x head_size
        attention = attention.permute(0,2,1,3).contiguous().view(b,t,c) # batch x t x embed_dim

        out = self.c_proj(attention)
        out = self.resid_dropout(out)

        return out

In [None]:
class GPT2CrossAttention(nn.Module):
    def __init__(self,config):
        super().__init__()
        self.embed_dim = config.embed_dim
        self.n_heads = config.num_heads
        assert self.embed_dim % self.n_heads == 0, 'embedding dimension by be divisible by number of heads'
        self.head_size = self.embed_dim // self.n_heads
        self.seq_len = config.seq_len

        self.q = nn.Linear(self.embed_dim,self.embed_dim)
        self.k = nn.Linear(self.embed_dim,self.embed_dim)
        self.v = nn.Linear(self.embed_dim,self.embed_dim)
        self.scale = self.head_size ** -0.5

        self.c_proj = nn.Linear(self.embed_dim, self.embed_dim, bias=True)

        self.attn_dropout = nn.Dropout(config.attention_dropout)
        self.resid_dropout = nn.Dropout(config.residual_dropout)

        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)


    def forward(self, q,k,v):
        b,t,c = q.shape

        q = self.q(q)
        k = self.k(k)
        v = self.v(v)

        q = q.view(b,q.size(1),self.n_heads,self.head_size).permute(0,2,1,3) # batch x n_heads x seq_len x head_dim
        k = k.view(b,k.size(1),self.n_heads,self.head_size).permute(0,2,1,3)
        v = v.view(b,v.size(1),self.n_heads,self.head_size).permute(0,2,1,3)

        qk_t = (q@k.transpose(-2,-1)) * self.scale
        qk_t = F.softmax(qk_t,dim=-1)
        weights = self.attn_dropout(qk_t)

        attention = weights @ v # batch x n_heads x t x head_size
        attention = attention.permute(0,2,1,3).contiguous().view(b,t,c) # batch x t x embed_dim

        out = self.c_proj(attention)
        out = self.resid_dropout(out)

        return out

In [None]:
class GPT2MLP(nn.Module):
    def __init__(self,config):
        super().__init__()
        self.embed_dim = config.embed_dim
        self.mlp_ratio = config.mlp_ratio
        self.mlp_dropout = config.mlp_dropout

        self.c_fc = nn.Linear(self.embed_dim,self.embed_dim*self.mlp_ratio)
        self.c_proj = nn.Linear(self.embed_dim*self.mlp_ratio,self.embed_dim)
        self.act = nn.GELU()
        self.dropout = nn.Dropout(self.mlp_dropout)

    def forward(self,x):
        x = self.c_fc(x)
        x = self.act(x)
        x = self.c_proj(x)
        x = self.dropout(x)
        return x

In [None]:
class GPT2Block(nn.Module):
    def __init__(self,config):
        super().__init__()
        self.embed_dim = config.embed_dim
        self.ln_1 = nn.LayerNorm(self.embed_dim)
        self.attn = GPT2Attention(config)
        self.ln_2 = nn.LayerNorm(self.embed_dim)
        self.mlp = GPT2MLP(config)
        self.ln_3 = nn.LayerNorm(self.embed_dim)
        self.cross_attn = GPT2CrossAttention(config)

    def forward(self,x,enc_out):
        x = x+self.attn(self.ln_1(x))
        x = x+self.cross_attn(self.ln_2(x),enc_out,enc_out)
        x = x+self.mlp(self.ln_3(x))
        return x

#### Model Structure - ViT & GPT

In [None]:
class VisionGPT2Model(nn.Module):
    def __init__(self,config):
        super().__init__()

        self.config = config

        ##### 이거로 ViT 생성 ㅇmㅇ..
        vit = create_model('vit_base_patch16_224', pretrained=True, num_classes=0)
        self.patch_embed = vit.patch_embed # 입력 이미지마다 패치로 분할, 각 패치별로 임베딩 벡터
        num_patches = self.patch_embed.num_patches
        self.cls_token = vit.cls_token
        embed_len = num_patches + vit.num_prefix_tokens
        self.pos_embed = vit.pos_embed
        self.pos_drop = nn.Dropout(p=0.)

        self.blocks = nn.ModuleList([vit.blocks[i] for i in range(config.depth)])

        self.transformer = nn.ModuleDict(dict(
            wte = nn.Embedding(config.vocab_size,config.embed_dim),
            wpe = nn.Embedding(config.seq_len,config.embed_dim),
            drop = nn.Dropout(config.emb_dropout),
            h = nn.ModuleList([GPT2Block(config) for _ in range(config.depth)]),
            ln_f = nn.LayerNorm(config.embed_dim)
        ))
        self.lm_head = nn.Linear(config.embed_dim,config.vocab_size,bias=False)
        self.transformer.wte.weight = self.lm_head.weight

    def _pos_embed(self,x):
        pos_embed = self.pos_embed
        x = torch.cat((self.cls_token.expand(x.shape[0], -1, -1), x), dim=1)
        x = x + pos_embed
        return self.pos_drop(x)

    def pretrained_layers_trainable(self,trainable=False):
        layers = [
            self.cls_token, self.patch_embed, self.pos_embed, self.blocks,
            self.transformer.wte, self.transformer.wpe,
            self.transformer.ln_f, self.lm_head
        ]
        gpt_layers = [[
            self.transformer.h[i].ln_1,self.transformer.h[i].ln_2,
            self.transformer.h[i].attn,self.transformer.h[i].mlp
        ] for i in range(self.config.depth)]
        for l in gpt_layers:
            layers.extend(l)

        for layer in layers:
            if not isinstance(layer,nn.Parameter):
                for p in layer.parameters():
                    p.requires_grad = trainable
            else:
                layer.requires_grad = trainable

        total_frozen_params = sum([p.numel() for p in self.parameters() if not p.requires_grad])
        print(f'{total_frozen_params=}')

    def unfreeze_gpt_layers(self,):
        gpt_layers = [[
            self.transformer.h[i].ln_1,self.transformer.h[i].ln_2,
            self.transformer.h[i].attn,self.transformer.h[i].mlp
        ] for i in range(self.config.depth)]
        flatten = []
        for l in gpt_layers:
            flatten.extend(l)

        for layer in flatten:
            if not isinstance(layer,nn.Parameter):
                for p in layer.parameters():
                    p.requires_grad = True
            else:
                layer.requires_grad = True

    @classmethod
    def from_pretrained(self,config):
        model = VisionGPT2Model(config)
        sd = model.state_dict()
        keys = sd.keys()
        ignore_matches = ['blocks.','cross_attn.','ln_3','cls_token','pos_embed','patch_embed.','.attn.mask']
        vit_keys = [key for key in keys if any(match in key for match in ignore_matches)]
        gpt_keys = [key for key in keys if key not in vit_keys]

        gpt2_small = GPT2LMHeadModel.from_pretrained('gpt2')
        sd_hf = gpt2_small.state_dict()
        hf_keys = sd_hf.keys()
        hf_keys = [k for k in hf_keys if not k.endswith('.attn.masked_bias')]
        hf_keys = [k for k in hf_keys if not k.endswith('.attn.bias')]
        transposed = ['attn.c_attn.weight', 'attn.c_proj.weight', 'mlp.c_fc.weight', 'mlp.c_proj.weight']

        for k in hf_keys:
            if any(match in k for match in ignore_matches):
                continue
            if any(k.endswith(w) for w in transposed):
                assert sd_hf[k].shape[::-1] == sd[k].shape
                with torch.no_grad():
                    sd[k].copy_(sd_hf[k].t())
            else:
                assert sd_hf[k].shape == sd[k].shape
                with torch.no_grad():
                    sd[k].copy_(sd_hf[k])

        model.load_state_dict(sd)

        return model

    ###################### 이 부분 수정
    def embedding_images(self, videos): # videos : [batch_size, padded_frames, 3, 224, 224]

      all_video_embeddings = []

      for video in videos: # video : [padded_frames, channels, height, width]

          one_video_embeddings = []

          for frame in video: # frame : [channels, height, width]

              frame = frame.unsqueeze(0)
              image = self.patch_embed(frame)
              image = self._pos_embed(image)

              for i in range(self.config.depth):
                  embedded = self.blocks[i](image)
              one_video_embeddings.append(embedded)

          one_video_embeddings_tensor = torch.stack(one_video_embeddings, dim = 0)
          mean_embedding = torch.mean(one_video_embeddings_tensor, dim = 0)

          all_video_embeddings.append(mean_embedding)

      all_video_embeddings_tensor = torch.cat(all_video_embeddings, dim = 0)

      return all_video_embeddings_tensor # [batch_size, num_patches, embedding_dim]


    def forward(self, videos, input_ids, labels = None):

        # print(f"Videos shape: {videos.shape}")  # [batch_size, 3, 224, 224]
        # print(f"Input IDs shape: {input_ids.shape}")  # [batch_size, seq_length]

        # if labels is not None:
        #     print(f"Labels shape: {labels.shape}")  # [batch_size, seq_length]

        video_embedding = self.embedding_images(videos)
        # print(f"Video_embedding shape: {video_embedding.shape}")

        token_embeddings = self.transformer.wte(input_ids) # [batch_size, seq_len, embedding_dim]
        pos_embs = torch.arange(0,input_ids.size(1)).to(input_ids.device)
        positional_embeddings = self.transformer.wpe(pos_embs)
        gpt_input = self.transformer.drop(token_embeddings + positional_embeddings)

        # GPT 블록
        for i in range(self.config.depth): # 모델의 깊이만큼
            video_embedding = self.blocks[i](video_embedding)
            gpt_input = self.transformer.h[i](gpt_input, video_embedding) # i 번째 레이어 지나서

        gpt_output = self.transformer.ln_f(gpt_input) # 최종 output

        if labels is not None:
            lm_logits = self.lm_head(gpt_output)
            loss = F.cross_entropy(lm_logits.view(-1, lm_logits.shape[-1]), labels.view(-1))
            return loss

        lm_logits = self.lm_head(gpt_output[:, [-1], :])
        return lm_logits

    ###################### 이 부분 수정

    def generate(self, frames, sequence, max_tokens=50, temperature=1.0, deterministic=False):
        for _ in range(max_tokens): # max_token 개수만큼 반복하면서 문장 생성

            out = self(frames, sequence)
            out = out[:,-1,:] / temperature
            probs = F.softmax(out,dim=-1)
            if deterministic:
                next_token = torch.argmax(probs,dim=-1,keepdim=True)
            else:
                next_token = torch.multinomial(probs,num_samples=1)
            sequence = torch.cat([sequence,next_token],dim=1)
            if next_token.item() == tokenizer.eos_token_id:
                break

        return sequence.cpu().flatten()

### Train

#### Trainer

In [None]:
class Trainer:
    def __init__(self, model_config, train_config, dls):

        self.train_config = train_config
        self.model_config = model_config
        self.device = self.train_config.device

        self.model = VisionGPT2Model.from_pretrained(model_config).to(self.device)
        self.model.pretrained_layers_trainable(trainable=False)

        print(f'trainable parameters: {sum([p.numel() for p in self.model.parameters() if p.requires_grad])}')

        self.tokenizer = GPT2TokenizerFast.from_pretrained('gpt2')
        self.tokenizer.pad_token = self.tokenizer.eos_token

        self.scaler = GradScaler()

        self.train_dl, self.val_dl = dls

        total_steps = len(self.train_dl)

        self.optim = torch.optim.Adam(self.model.parameters(), lr=self.train_config.lr / 25.)
        self.sched = torch.optim.lr_scheduler.OneCycleLR(
            self.optim,
            max_lr=self.train_config.lr,
            epochs=self.train_config.epochs,
            steps_per_epoch=total_steps
        )

#         self.sched = get_linear_schedule_with_warmup(self.optim,num_warmup_steps=0,num_training_steps=total_steps)

        self.metrics = pd.DataFrame()
        self.metrics[['train_loss','train_perplexity','val_loss','val_perplexity']] = None

        self.gen_tfms = A.Compose([
            A.Resize(224,224),
            A.Normalize(mean=[0.5,0.5,0.5],std=[0.5,0.5,0.5],always_apply=True),
            ToTensorV2()
        ])

    ###################### 수정한 부분

    def save_model(self, model_path = None):
        save_path = Path(model_path) if model_path is not None else self.train_config.model_path
        sd = self.model.state_dict()
        filename = f'captioner_best.pt'
        torch.save(sd, save_path / filename)


    def load_best_model(self,model_path = None):
        load_path = Path(model_path) if model_path is not None else self.train_config.model_path
        sd = torch.load(load_path/'captioner.pt')
        self.model.load_state_dict(sd)

    def save_model_epoch(self, epoch, model_path = None):
        save_path = Path(model_path) if model_path is not None else self.train_config.model_path
        sd = self.model.state_dict()
        filename = f'captioner_epoch_{epoch}.pt'
        torch.save(sd, save_path / filename)
        print(f"Epoch {epoch + 1} 완료. 모델 저장도 완료 :) ")

    def train_one_epoch(self,epoch):

        prog = tqdm(self.train_dl,total=len(self.train_dl))

        running_loss = 0.

        for image, input_ids, labels in prog:

            with torch.amp.autocast('cuda'): ##
                image = image.to(self.device)
                input_ids = input_ids.to(self.device)
                labels = labels.to(self.device)

                loss = self.model(image,input_ids,labels)

                self.scaler.scale(loss).backward()
                self.scaler.step(self.optim)
                self.scaler.update()
                self.sched.step()
                self.optim.zero_grad(set_to_none=True)

                running_loss += loss.item()

                prog.set_description(f'train loss: {loss.item():.3f}')

            del image, input_ids, labels, loss

        train_loss = running_loss / len(self.train_dl)
        train_pxp = np.exp(train_loss)

        self.metrics.loc[epoch,['train_loss','train_perplexity']] = (train_loss,train_pxp)


    @torch.no_grad()
    def valid_one_epoch(self,epoch):

        prog = tqdm(self.val_dl,total=len(self.val_dl))

        running_loss = 0.

        for image, input_ids, labels in prog:

            with autocast():
                image = image.to(self.device)
                input_ids = input_ids.to(self.device)
                labels = labels.to(self.device)

                loss = self.model(image,input_ids,labels)
                running_loss += loss.item()

                prog.set_description(f'valid loss: {loss.item():.3f}')

            del image, input_ids, labels, loss

        val_loss = running_loss / len(self.val_dl)
        val_pxp = np.exp(val_loss)

        self.metrics.loc[epoch,['val_loss','val_perplexity']] = (val_loss,val_pxp)

        return val_pxp


    def clean(self):
        gc.collect()
        torch.cuda.empty_cache()


    def fit(self,):

        best_pxp = 1e9
        best_epoch = -1
        prog = tqdm(range(self.train_config.epochs))

        for epoch in prog:

            if epoch == self.train_config.freeze_epochs_gpt:
                self.model.unfreeze_gpt_layers()
                print('unfreezing GPT2 entirely...')

            if epoch == self.train_config.freeze_epochs_all:
                self.model.pretrained_layers_trainable(trainable=True)

            self.model.train()
            prog.set_description('training')
            self.train_one_epoch(epoch)
            self.clean()

            self.model.eval()
            prog.set_description('validating')
            pxp = self.valid_one_epoch(epoch)
            self.clean()

            print(self.metrics.tail(1))

            if pxp < best_pxp:
                best_pxp = pxp
                best_epoch = epoch
                print('saving best model...')
                self.save_model(model_path = '/content/drive/MyDrive/Trained_Model')

            self.save_model_epoch(epoch, model_path = '/content/drive/MyDrive/Trained_Model')

        return {
            'best_perplexity': best_pxp,
            'best_epoch': best_epoch
        }

    ###################### 수정한 부분

    @torch.no_grad()
    def generate_caption(self, video_path, transform, max_tokens = 50,temperature = 1.0,deterministic = False):

        self.model.eval()

        frames = []

        for frame_path in video_path: # 하나의 비디오 경로 (여러 프레임으로 이루어진)
            image = cv2.imread(frame_path)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            image = np.array(image, dtype = np.uint8)
            image = transform(image = image)["image"]
            frames.append(image)

        frames_tensor = torch.stack([torch.tensor(frame, dtype = torch.float32) for frame in frames])
        frames_tensor = frames_tensor.unsqueeze(0).to(self.device)

        # sequence 초기화 (시작점) - bos_token_id 값을 가지는 [1, 1]의 텐서
        sequence = torch.ones(1,1).to(device=self.device).long() * self.tokenizer.bos_token_id

        caption = self.model.generate(
            frames_tensor,
            sequence,
            max_tokens = max_tokens,
            temperature = temperature,
            deterministic = deterministic
        )
        caption = self.tokenizer.decode(caption.numpy(),skip_special_tokens=True)

        return caption

In [None]:
model_config = SimpleNamespace(
    vocab_size = 50_257,
    embed_dim = 768,
    num_heads = 12,
    seq_len = 1024,
    depth = 12,
    attention_dropout = 0.1,
    residual_dropout = 0.1,
    mlp_ratio = 4,
    mlp_dropout = 0.1,
    emb_dropout = 0.1,
)
train_config = SimpleNamespace(
    epochs = 3, ################### 기존은 5번
    freeze_epochs_gpt = 1,
    freeze_epochs_all = 2,
    lr = 1e-4,
    device = 'cuda',
    model_path = Path('captioner'),
    batch_size = 32
)

In [None]:
import warnings
warnings.simplefilter("ignore", UserWarning)

In [None]:
trainer = Trainer(model_config, train_config, (train_dataloader, val_dataloader))

#### Train

In [None]:
trainer.fit()

### Predictions

In [None]:
model_path = '/content/drive/MyDrive/Trained_Model'

# best 모델 불러오기
trainer.load_best_model(model_path)

1. 분석하고 싶은 영상을 가져오기
2. 프레임으로 나누는 작업 진행
3. 결과 프레임들을 특정 경로(드라이브)에 저장
4. 프레임들 불러와서 new_frames_path 에 넣어주도록 generate_caption 함수를 조정했습니당

정리) 최종 프레임들 경로가 필요합니다

In [None]:
# 경로들 정리

## 최종 모델 저장, 불러올 때 - /content/drive/MyDrive/Trained_Model
## 테스트용 비디오 불러올 때 - /content/drive/MyDrive/Testing_Videos => "testing_dir"
## 프레임 저장은 Testing_Videos 안에 폴더 만들어서 저장 (영상 이름으로 폴더 만들어서)

#### Framing & Predicting

In [None]:
### 프레임 추출 + 캡션 생성 - 준희 추가

### 영상에서 프레임 추출
# video : 비디오 경로 / frames_dir : 프레임 추출해서 저장할 경로 / fps : 초당 추출할 프레임 수

def extract_frames(video, frames_dir, fps):
  os.makedirs(frames_dir, exist_ok = True)
  command = f"ffmpeg -i \"{video}\" -vf fps={fps} \"{frames_dir}/frame_%4d.jpg\""
  result = subprocess.run(command, shell=True, capture_output=True, text=True)

  # 에러 발생시를 위한..
  if result.returncode != 0:
    print(f"Error: {result.stderr}")

  return sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.endswith(".jpg")])

### 영상 캡션 생성
# frames_path : 프레임들 저장된 폴더
def predicting(frames_path, transform):
    frames = os.listdir(frames_path)
    new_frames_path = [os.path.join(frames_dir, frame) for frame in frames]
    new_caption = trainer.generate_caption(new_frames_path, transform, temperature = 1.0, deterministic = False)
    return new_caption

In [None]:
### 최종 함수
# video_name : 진짜 비디오 이름 / testing_dir : 영상 저장된 경로

def framing_and_predicting(video_name, testing_dir, fps):
    # 경로 지정
    video_dir = os.path.join(testing_dir, f"{video_name}.mp4")
    frames_dir = os.path.join(testing_dir, f"Frames/{video_name}")

    # 프레이밍, 저장
    frames = extract_frames(video_dir, frames_dir, fps = fps)
    print(f"Extracted {len(frames)} frames.")

    new_caption = predicting(frames_dir, transform)
    return new_caption

In [None]:
# 일단 고양이 영상 하나에 테스팅
testing_dir = "/content/drive/MyDrive/Testing_Videos/Videos" # 영상이 저장된 경로

framing_and_predicting("이비 와작", testing_dir, fps = 2)

#### Framing & Predicting & Visualizing the Video

In [None]:
def framing_and_predicting_and_visualizing(video_name, testing_dir, fps):
    # 경로 지정
    video_dir = os.path.join(testing_dir, f"{video_name}.mp4")
    frames_dir = os.path.join(testing_dir, f"Frames/{video_name}")

    # 프레이밍, 저장
    frames = extract_frames(video_dir, frames_dir, fps = fps)

    #########################################################
    # new_caption = predicting(frames_dir, transform)
    new_caption = "a cute fluffy kitty"

    # 비디오 표시
    display(Video(video_dir, embed=True, width=640, height=480))

    # 제목 텍스트 추가
    cap = cv2.VideoCapture(video_dir)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) # 영상이랑 가로 길이 맞췄으면 좋겠어서....

    plt.figure(figsize=(width / 90, 0.2), facecolor = 'black') # inch 기준은 96이라는데.. 흠
    plt.text(0.5, 0.5, f"^._.^ {new_caption} ^._.^", ha = 'center', va = 'center', color = 'white')
    plt.axis('off')
    plt.show()

In [None]:
### 캡션 생성하고 비디오 재생하기
testing_dir = "/content/drive/MyDrive/Testing_Videos/Videos" # 영상이 저장된 경로

framing_and_predicting_and_visualizing("이비 와작", testing_dir, fps = 2 )