# Description
1. BEiT3 모델을 초기화하여 이미지 임베딩을 추출합니다.
2. HuBERT 모델을 사용하여 오디오 임베딩을 추출합니다.
3. 두 모델을 활용하여 비디오에서 이미지, 오디오 임베딩 정보를 추출합니다.

In [None]:
!pip install torchscale moviepy torchaudio transformers



# Import

In [None]:
import torch
import numpy as np
import os
import tempfile
from PIL import Image
import moviepy.editor as mpy
from transformers import Wav2Vec2FeatureExtractor, HubertModel
from torchscale.architecture.config import EncoderConfig
from torchscale.model.BEiT3 import BEiT3
import torchvision.transforms as transforms
import torch.nn as nn
import torchaudio

# Class, Function

In [None]:
class AudioEncoder:
    """HuBERT 모델을 사용한 오디오 인코더 클래스"""

    def __init__(self, model_name="facebook/hubert-base-ls960"):
        """
        HuBERT 모델을 사용하여 오디오 인코딩을 위한 클래스 초기화

        Args:
            model_name (str): 사용할 HuBERT 모델 이름
        """
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f"오디오 인코더 장치 사용: {self.device}")
        self.feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(model_name)
        self.model = HubertModel.from_pretrained(model_name).to(self.device)
        self.sample_rate = 16000  # HuBERT는 16kHz에서 훈련됨

    def extract_audio_from_clip(self, clip):
        """
        moviepy 클립에서 오디오를 추출하여 임시 파일로 저장합니다.

        Args:
            clip (VideoClip): moviepy 비디오 클립

        Returns:
            str: 추출된 오디오 파일 경로
        """
        # 임시 파일 생성
        temp_dir = tempfile.gettempdir()
        audio_path = os.path.join(temp_dir, 'extracted_audio.wav')

        # 오디오 추출 및 WAV 파일로 저장 (16kHz, 모노)
        clip.audio.write_audiofile(audio_path, fps=self.sample_rate, nbytes=2, codec='pcm_s16le', verbose=False, logger=None)

        return audio_path

    def preprocess_audio(self, audio_path):
        """
        오디오 파일을 로드하고 전처리합니다.

        Args:
            audio_path (str): 오디오 파일 경로

        Returns:
            torch.Tensor: 전처리된 오디오 텐서
        """
        # 오디오 파일 로드
        waveform, sample_rate = torchaudio.load(audio_path)

        # 모노로 변환 (스테레오인 경우)
        if waveform.shape[0] > 1:
            waveform = torch.mean(waveform, dim=0, keepdim=True)

        # 샘플 레이트 재조정 (필요한 경우)
        if sample_rate != self.sample_rate:
            resampler = torchaudio.transforms.Resample(sample_rate, self.sample_rate)
            waveform = resampler(waveform)

        return waveform.squeeze()

    def encode_segment(self, segment):
        """
        단일 오디오 세그먼트를 인코딩합니다.

        Args:
            segment (torch.Tensor): 오디오 세그먼트

        Returns:
            torch.Tensor: 인코딩된 특성 벡터
        """
        # 특성 추출기를 사용하여 입력 준비
        inputs = self.feature_extractor(
            segment,
            sampling_rate=self.sample_rate,
            return_tensors="pt"
        ).to(self.device)

        # 모델 추론 (gradient 계산 없이)
        with torch.no_grad():
            outputs = self.model(**inputs)

        # 출력의 평균을 계산
        embeddings = outputs.last_hidden_state.mean(dim=1)

        return embeddings.cpu()

    def encode_audio_clip(self, clip):
        """
        moviepy 오디오 클립을 인코딩합니다.

        Args:
            clip (VideoClip): moviepy 비디오 클립

        Returns:
            torch.Tensor: 인코딩된 오디오 임베딩
        """
        if clip.audio is None:
            raise ValueError("비디오 클립에 오디오 트랙이 없습니다.")

        # 임시 오디오 파일로 저장
        audio_path = self.extract_audio_from_clip(clip)

        try:
            # 오디오 전처리
            waveform = self.preprocess_audio(audio_path)

            # 오디오 인코딩
            audio_embedding = self.encode_segment(waveform)

            return audio_embedding
        finally:
            # 임시 파일 삭제
            if os.path.exists(audio_path):
                os.remove(audio_path)


In [None]:
class BEiT3ForEmbedding(nn.Module):
    """BEiT3 모델을 사용한 이미지 임베딩 추출 클래스"""

    def __init__(self):
        super().__init__()

    def initialize(self, checkpoint_path, model_type='base', input_size=224, device=None):
        """
        Args:
            checkpoint_path: 체크포인트 파일 경로
            model_type: 'base' 또는 'large'
            input_size: 입력 이미지 크기
            device: 사용할 디바이스 (None이면 자동 선택)
        """
        # 디바이스 설정
        self.device = device if device is not None else torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        print(f"비디오 인코더 장치 사용: {self.device}")

        # 이미지 전처리 설정
        self.transform = transforms.Compose([
            transforms.Resize((input_size, input_size), interpolation=transforms.InterpolationMode.BICUBIC),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

        # 모델 설정
        if model_type == 'base':
            config = self._get_base_config(input_size)
        else:
            config = self._get_large_config(input_size)

        # BEiT3 모델 초기화
        self.beit3 = BEiT3(config)

        # 레이어 정규화 추가 (패치 임베딩용)
        self.fc_norm = nn.LayerNorm(config.encoder_embed_dim)

        # 체크포인트 로드
        checkpoint = torch.load(checkpoint_path, map_location='cpu')

        # 체크포인트 구조 확인
        if 'model' in checkpoint:
            checkpoint_model = checkpoint['model']
        else:
            checkpoint_model = checkpoint

        # 모델 가중치 로드
        missing_keys, unexpected_keys = self._load_state_dict(checkpoint_model)
        print(f"BEiT3 가중치 로드 - Missing keys: {len(missing_keys)}, Unexpected keys: {len(unexpected_keys)}")

        # 평가 모드로 설정하고 디바이스로 이동
        self.eval()
        self.to(self.device)

        print(f"BEiT3 ({model_type}) 임베딩 모델 초기화 완료")
        return self

    def _get_base_config(self, img_size=224, patch_size=16, vocab_size=64010):
        """BEiT3 base 모델 설정"""
        return EncoderConfig(
            img_size=img_size,
            patch_size=patch_size,
            vocab_size=vocab_size,
            multiway=True,
            layernorm_embedding=False,
            normalize_output=True,
            no_output_layer=True,
            drop_path_rate=0.1,
            encoder_embed_dim=768,
            encoder_attention_heads=12,
            encoder_ffn_embed_dim=3072,
            encoder_layers=12
        )

    def _get_large_config(self, img_size=224, patch_size=16, vocab_size=64010):
        """BEiT3 large 모델 설정"""
        return EncoderConfig(
            img_size=img_size,
            patch_size=patch_size,
            vocab_size=vocab_size,
            multiway=True,
            layernorm_embedding=False,
            normalize_output=True,
            no_output_layer=True,
            drop_path_rate=0.1,
            encoder_embed_dim=1024,
            encoder_attention_heads=16,
            encoder_ffn_embed_dim=4096,
            encoder_layers=24
        )

    def _load_state_dict(self, state_dict):
        """모델 가중치 로드 (원래 BEiT3 포맷에서 일부 수정)"""
        model_state_dict = self.state_dict()

        # 모델 가중치 로드
        missing_keys = []
        unexpected_keys = []

        # state_dict의 키 중에서 model_state_dict의 키와 일치하는 것만 로드
        for k, v in state_dict.items():
            if k in model_state_dict:
                if model_state_dict[k].shape != v.shape:
                    print(f"Shape mismatch: {k}, model: {model_state_dict[k].shape}, checkpoint: {v.shape}")
                    continue
                model_state_dict[k].copy_(v)
            else:
                unexpected_keys.append(k)

        # model_state_dict의 키 중에서 state_dict에 없는 것은 missing_keys에 추가
        for k in model_state_dict:
            if k not in state_dict:
                missing_keys.append(k)

        self.load_state_dict(model_state_dict)
        return missing_keys, unexpected_keys

    def forward(self, images):
        """
        이미지에서 임베딩 추출

        Args:
            images: 이미지 텐서 [B, C, H, W]

        Returns:
            dict: 다양한 임베딩이 포함된 딕셔너리
        """
        outputs = self.beit3(textual_tokens=None, visual_tokens=images)
        x = outputs["encoder_out"]

        # CLS 토큰 (첫 번째 토큰)
        cls_embedding = x[:, 0, :]

        # 패치 토큰들 (CLS 토큰 제외)
        patch_tokens = x[:, 1:, :]

        # 평균 풀링 임베딩
        avg_embedding = self.fc_norm(patch_tokens.mean(1))

        return {
            'cls_embedding': cls_embedding,         # CLS 토큰 임베딩
            'avg_embedding': avg_embedding,         # 평균 풀링 임베딩
            'all_tokens': x,                        # 모든 토큰 임베딩
            'patch_tokens': patch_tokens            # 패치 토큰 임베딩 (CLS 제외)
        }

    def get_embedding_from_pil(self, pil_image):
        """
        PIL 이미지에서 임베딩 추출

        Args:
            pil_image: PIL 이미지 객체

        Returns:
            dict: 다양한 임베딩이 포함된 딕셔너리
        """
        # RGB 모드로 변환
        if pil_image.mode != 'RGB':
            pil_image = pil_image.convert('RGB')

        # 이미지 전처리
        image_tensor = self.transform(pil_image).unsqueeze(0).to(self.device)

        # 임베딩 추출
        with torch.no_grad():
            embeddings = self.forward(image_tensor)

        # CPU로 변환하여 반환
        for key in embeddings:
            embeddings[key] = embeddings[key].cpu()

        return embeddings

    def encode_image_frames(self, frames):
        """
        프레임 리스트에서 비디오 임베딩을 추출합니다.

        Args:
            frames (list): PIL 이미지 객체 리스트

        Returns:
            torch.Tensor: 프레임 임베딩의 평균 텐서
        """
        if not frames:
            raise ValueError("프레임 리스트가 비어있습니다.")

        frame_embeddings = []
        for frame in frames:
            emb = self.get_embedding_from_pil(frame)
            frame_embeddings.append(emb['cls_embedding'])  # CLS 토큰 사용

        # 프레임 임베딩 평균 계산
        video_embedding = torch.cat(frame_embeddings).mean(dim=0)
        return video_embedding

In [None]:
class VideoAudioEncoder:
    """비디오와 오디오 임베딩을 함께 추출하는 통합 인코더 클래스"""

    def __init__(self, beit3_checkpoint_path, beit3_model_type='base', hubert_model_name="facebook/hubert-base-ls960", input_size=224):
        """
        통합 인코더 초기화

        Args:
            beit3_checkpoint_path (str): BEiT3 체크포인트 파일 경로
            beit3_model_type (str): BEiT3 모델 타입 ('base' 또는 'large')
            hubert_model_name (str): HuBERT 모델 이름
            input_size (int): 이미지 입력 크기 (224 또는 384)
        """
        print("통합 비디오-오디오 인코더 초기화 중...")

        # 비디오 인코더 초기화
        self.video_encoder = BEiT3ForEmbedding().initialize(
            checkpoint_path=beit3_checkpoint_path,
            model_type=beit3_model_type,
            input_size=input_size
        )

        # 오디오 인코더 초기화
        self.audio_encoder = AudioEncoder(model_name=hubert_model_name)

        print("통합 인코더 초기화 완료")

    def extract_frames_from_clip(self, clip, fps=1):
        """
        moviepy 클립에서 지정한 fps로 프레임을 추출하여 PIL Image 리스트를 반환합니다.

        Args:
            clip (VideoClip): moviepy 비디오 클립
            fps (int): 초당 추출할 프레임 수

        Returns:
            list: PIL Image 리스트
        """
        frames = []
        for frame in clip.iter_frames(fps=fps, dtype="uint8"):
            image = Image.fromarray(frame)  # numpy array -> PIL Image
            frames.append(image)
        return frames

    def encode_video_segment(self, video_path, start_time=None, end_time=None, fps=1):
        """
        비디오 파일의 특정 세그먼트에서 비디오와 오디오 임베딩을 추출합니다.

        Args:
            video_path (str): 비디오 파일 경로
            start_time (float, optional): 시작 시간(초). None이면 처음부터.
            end_time (float, optional): 종료 시간(초). None이면 끝까지.
            fps (int): 초당 추출할 프레임 수

        Returns:
            tuple: (비디오 임베딩 텐서, 오디오 임베딩 텐서)
        """
        # 비디오 로드
        video = mpy.VideoFileClip(video_path)

        # 세그먼트 추출 (전체 비디오 또는 지정된 부분)
        if start_time is not None or end_time is not None:
            start = 0 if start_time is None else start_time
            end = video.duration if end_time is None else end_time
            clip = video.subclip(start, end)
        else:
            clip = video

        try:
            # 비디오 임베딩
            frames = self.extract_frames_from_clip(clip, fps=fps)
            video_embedding = self.video_encoder.encode_image_frames(frames)

            # 오디오 임베딩
            audio_embedding = self.audio_encoder.encode_audio_clip(clip)

            return video_embedding, audio_embedding.squeeze()
        finally:
            # 자원 해제
            video.close()

    def encode_video_by_segments(self, video_path, segment_duration=5, fps=1):
        """
        비디오를 일정 길이의 세그먼트로 나누어 각 세그먼트의 임베딩을 추출합니다.

        Args:
            video_path (str): 비디오 파일 경로
            segment_duration (int): 세그먼트 길이(초)
            fps (int): 초당 추출할 프레임 수

        Returns:
            tuple: (비디오 임베딩 텐서 리스트, 오디오 임베딩 텐서 리스트)
        """
        # 비디오 로드
        video = mpy.VideoFileClip(video_path)
        duration = video.duration

        video_embeddings = []
        audio_embeddings = []

        try:
            # 세그먼트별 처리
            for start in range(0, int(duration), segment_duration):
                end = min(start + segment_duration, duration)
                print(f"세그먼트 {start}~{end}초 처리 중...")

                # 세그먼트 추출
                subclip = video.subclip(start, end)

                # 비디오 임베딩
                frames = self.extract_frames_from_clip(subclip, fps=fps)
                if frames:
                    video_emb = self.video_encoder.encode_image_frames(frames)
                    video_embeddings.append(video_emb)

                # 오디오 임베딩
                if subclip.audio is not None:
                    audio_emb = self.audio_encoder.encode_audio_clip(subclip)
                    audio_embeddings.append(audio_emb.squeeze())

            return video_embeddings, audio_embeddings

        finally:
            # 자원 해제
            video.close()


# Run

```
## 경로 설정
beit3_checkpoint = "path/to/beit3_base_patch16_224.pth"
video_path = "path/to/video.mp4"

## 1. 인코더 초기화 (한 번만 실행)
encoder = VideoAudioEncoder(
    beit3_checkpoint_path=beit3_checkpoint,
    beit3_model_type='base',
    hubert_model_name='facebook/hubert-base-ls960',
    input_size=224  # 224 또는 384 (체크포인트에 맞게 설정)
)

## 2. 전체 비디오의 단일 임베딩 추출
video_emb, audio_emb = encoder.encode_video_segment(video_path)
print(f"전체 비디오 임베딩 크기: {video_emb.shape}")
print(f"전체 오디오 임베딩 크기: {audio_emb.shape}")

## 3. 비디오를 세그먼트로 나누어 임베딩 추출
video_embs, audio_embs = encoder.encode_video_by_segments(video_path, segment_duration=5, fps=1)
print(f"세그먼트 수: {len(video_embs)}")

## 4. 세그먼트 임베딩을 텐서로 변환
video_embs_tensor = torch.stack(video_embs)
audio_embs_tensor = torch.stack(audio_embs)
print(f"비디오 임베딩 텐서 크기: {video_embs_tensor.shape}")
print(f"오디오 임베딩 텐서 크기: {audio_embs_tensor.shape}")

## 5. 특정 구간만 임베딩 추출
segment_video_emb, segment_audio_emb = encoder.encode_video_segment(
    video_path,
    start_time=10,
    end_time=15,
    fps=1
)
print(f"지정 구간 비디오 임베딩 크기: {segment_video_emb.shape}")
print(f"지정 구간 오디오 임베딩 크기: {segment_audio_emb.shape}")
```



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# 1. 인코더 초기화 (한 번만 실행)
encoder = VideoAudioEncoder(
    beit3_checkpoint_path="/content/drive/MyDrive/sentiment_data/phr/beit3_base_patch16_384_coco_retrieval.pth",
    beit3_model_type='base',
    input_size=384
)

통합 비디오-오디오 인코더 초기화 중...
비디오 인코더 장치 사용: cuda
BEiT3 가중치 로드 - Missing keys: 2, Unexpected keys: 3
BEiT3 (base) 임베딩 모델 초기화 완료
오디오 인코더 장치 사용: cuda
통합 인코더 초기화 완료


In [None]:
# 3. 비디오를 세그먼트로 나누어 임베딩 추출
video_embs, audio_embs = encoder.encode_video_by_segments('/content/drive/MyDrive/sentiment_data/phr/TOO BAD.mp4', segment_duration=5, fps=1)
print(f"세그먼트 수: {len(video_embs)}")

세그먼트 0~5초 처리 중...
세그먼트 5~10초 처리 중...
세그먼트 10~15초 처리 중...
세그먼트 15~20초 처리 중...
세그먼트 20~25초 처리 중...
세그먼트 25~30초 처리 중...
세그먼트 30~35초 처리 중...
세그먼트 35~40초 처리 중...
세그먼트 40~45초 처리 중...
세그먼트 45~50초 처리 중...
세그먼트 50~55초 처리 중...
세그먼트 55~60초 처리 중...
세그먼트 60~65초 처리 중...





세그먼트 65~68.82초 처리 중...
세그먼트 수: 14


In [None]:
for i in range(len(video_embs)):
  print("segment", i)
  print(video_embs[i][:5])
  print(audio_embs[i][:5])

segment 0
tensor([-0.2494, -0.6885,  0.1377,  0.4764, -0.1825])
tensor([ 0.0623,  0.0137, -0.0132, -0.0499, -0.1548])
segment 1
tensor([-1.2907, -0.0029,  0.5133,  0.7455, -0.1321])
tensor([-0.0035,  0.0667, -0.0173,  0.0430, -0.0956])
segment 2
tensor([-1.1876, -0.7312,  0.7007,  0.5254, -0.7939])
tensor([ 0.0145,  0.0032, -0.0426,  0.0218, -0.0253])
segment 3
tensor([-0.9164,  0.0519,  0.5960,  0.9317, -0.5235])
tensor([ 0.0156,  0.0154, -0.0573,  0.0045, -0.0125])
segment 4
tensor([ 0.2548,  0.0559,  0.2362,  0.0281, -0.4912])
tensor([ 0.0039,  0.0250, -0.0551,  0.0162, -0.0052])
segment 5
tensor([-0.4852, -0.0937,  0.2378,  0.0432,  0.0352])
tensor([-0.0101, -0.0216, -0.0126,  0.0566, -0.0318])
segment 6
tensor([ 0.2970, -0.1300,  0.3104,  0.1456, -0.6024])
tensor([-0.0572,  0.0638, -0.0412, -0.0182,  0.0071])
segment 7
tensor([-0.2529, -1.7990,  0.3514,  0.0387, -1.0951])
tensor([ 0.0294, -0.0698, -0.0288, -0.0650, -0.0505])
segment 8
tensor([-0.3489, -1.0902,  0.0550,  0.5993, -1

# 사용

In [None]:
## 경로 설정
beit3_checkpoint = "path/to/beit3_base_patch16_224.pth"
video_path = "path/to/video.mp4"

## 1. 인코더 초기화
encoder = VideoAudioEncoder(
    beit3_checkpoint_path=beit3_checkpoint,
    beit3_model_type='base',
    hubert_model_name='facebook/hubert-base-ls960',
    input_size=224  # 224 또는 384 (체크포인트에 맞게 설정)
)

## 2. 전체 비디오의 단일 임베딩 추출
video_emb, audio_emb = encoder.encode_video_segment(video_path)
print(f"전체 비디오 임베딩 크기: {video_emb.shape}")
print(f"전체 오디오 임베딩 크기: {audio_emb.shape}")

## 3. 비디오를 세그먼트로 나누어 임베딩 추출
video_embs, audio_embs = encoder.encode_video_by_segments(video_path, segment_duration=5, fps=1)
print(f"세그먼트 수: {len(video_embs)}")

## 4. 세그먼트 임베딩을 텐서로 변환
video_embs_tensor = torch.stack(video_embs)
audio_embs_tensor = torch.stack(audio_embs)
print(f"비디오 임베딩 텐서 크기: {video_embs_tensor.shape}")
print(f"오디오 임베딩 텐서 크기: {audio_embs_tensor.shape}")

## 5. 특정 구간만 임베딩 추출
segment_video_emb, segment_audio_emb = encoder.encode_video_segment(
    video_path,
    start_time=10,
    end_time=15,
    fps=1
)

print(f"지정 구간 비디오 임베딩 크기: {segment_video_emb.shape}")
print(f"지정 구간 오디오 임베딩 크기: {segment_audio_emb.shape}")