In [2]:
# MM-IMDb 멀티모달 영화 장르 예측 모델 비교 실험
# Multi-Modal Movie Genre Prediction Model Comparison on MM-IMDb Dataset

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from transformers import BertTokenizer, BertModel, RobertaTokenizer, RobertaModel
import timm
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, roc_auc_score, average_precision_score
from PIL import Image
import json
import os
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

# GPU 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")


  cpu = _conversion_method_template(device=torch.device("cpu"))


ModuleNotFoundError: No module named 'torchvision'

# 📑 MM-IMDb 멀티모달 영화 장르 예측 모델 개발 및 비교 실험

## 🎯 연구 목표
- **주제**: 멀티모달 융합 기반 영화 장르 예측 모델 개발
- **데이터셋**: MM-IMDb (25,000편 영화, 포스터 이미지 + 줄거리 텍스트 + 23개 멀티라벨 장르)
- **핵심 기여**: Cross-Attention 기반 융합 모델 제안으로 이미지·텍스트 간 상호작용 정교화

## 🔬 실험 구성
### 비교 모델군
1. **텍스트 단일 모달**: BERT, RoBERTa
2. **이미지 단일 모달**: ResNet50, Vision Transformer (ViT)
3. **객체 탐지 기반**: YOLO, Faster R-CNN
4. **멀티모달 융합**: Early Fusion, Late Fusion, Attention Fusion
5. **제안 모델**: Cross-Attention Fusion

### 평가 지표
- Accuracy, Precision, Recall, F1-score, ROC-AUC, mAP

### 설명가능성 (XAI)
- Grad-CAM (이미지), Attention Map (텍스트)

In [None]:
# 1. 데이터셋 설정 및 구성
# MM-IMDb Dataset Configuration

# 데이터셋 경로 설정 (실제 경로에 맞게 수정 필요)
DATASET_PATH = "data/mmimdb"  # MM-IMDb 데이터셋 경로
IMAGE_PATH = os.path.join(DATASET_PATH, "images")
METADATA_PATH = os.path.join(DATASET_PATH, "dataset.json")

# 모델 설정
IMAGE_SIZE = 224
MAX_TEXT_LENGTH = 512
BATCH_SIZE = 16
NUM_EPOCHS = 50
LEARNING_RATE = 2e-5
NUM_GENRES = 23  # MM-IMDb 장르 수

# 장르 라벨 정의 (MM-IMDb 23개 장르)
GENRE_LABELS = [
    'Action', 'Adventure', 'Animation', 'Biography', 'Comedy', 'Crime', 'Documentary',
    'Drama', 'Family', 'Fantasy', 'History', 'Horror', 'Music', 'Musical', 'Mystery',
    'News', 'Romance', 'Sci-Fi', 'Short', 'Sport', 'Thriller', 'War', 'Western'
]

print(f"Dataset configuration:")
print(f"- Image size: {IMAGE_SIZE}x{IMAGE_SIZE}")
print(f"- Max text length: {MAX_TEXT_LENGTH}")
print(f"- Batch size: {BATCH_SIZE}")
print(f"- Number of genres: {NUM_GENRES}")
print(f"- Learning rate: {LEARNING_RATE}")
print(f"- Number of epochs: {NUM_EPOCHS}")

In [None]:
# 2. 데이터 전처리 설정
# Data Preprocessing Setup

# 이미지 전처리 변환 (데이터 증강 포함)
train_transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# 텍스트 전처리 토크나이저 초기화
bert_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
roberta_tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

print("데이터 전처리 설정 완료:")
print(f"- 이미지 변환: 크기 조정, 정규화, 데이터 증강")
print(f"- 텍스트 토크나이저: BERT, RoBERTa")
print(f"- 최대 텍스트 길이: {MAX_TEXT_LENGTH} 토큰")

In [None]:
# 3. MM-IMDb 데이터셋 클래스 정의
# Custom Dataset Class for MM-IMDb

class MMIMDbDataset(Dataset):
    def __init__(self, metadata_file, image_dir, tokenizer, transform=None, max_length=512):
        """
        MM-IMDb 데이터셋 클래스
        Args:
            metadata_file: 메타데이터 JSON 파일 경로
            image_dir: 이미지 디렉토리 경로
            tokenizer: 텍스트 토크나이저
            transform: 이미지 변환
            max_length: 텍스트 최대 길이
        """
        self.image_dir = image_dir
        self.tokenizer = tokenizer
        self.transform = transform
        self.max_length = max_length
        
        # 메타데이터 로드
        with open(metadata_file, 'r', encoding='utf-8') as f:
            self.data = json.load(f)
        
        self.genre_to_idx = {genre: idx for idx, genre in enumerate(GENRE_LABELS)}
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        item = self.data[idx]
        
        # 이미지 로드
        image_path = os.path.join(self.image_dir, item['image'])
        try:
            image = Image.open(image_path).convert('RGB')
            if self.transform:
                image = self.transform(image)
        except:
            # 이미지 로드 실패시 빈 이미지 생성
            image = torch.zeros(3, IMAGE_SIZE, IMAGE_SIZE)
        
        # 텍스트 토크나이징
        plot = item.get('plot', '')
        if isinstance(plot, list):
            plot = ' '.join(plot)
        
        encoding = self.tokenizer(
            plot,
            add_special_tokens=True,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        
        # 장르 라벨 (멀티라벨)
        genres = item.get('genres', [])
        label = torch.zeros(NUM_GENRES)
        for genre in genres:
            if genre in self.genre_to_idx:
                label[self.genre_to_idx[genre]] = 1.0
        
        return {
            'image': image,
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': label,
            'text': plot
        }

print("MM-IMDb 데이터셋 클래스 정의 완료")

In [None]:
# 4. 단일 모달 모델 정의
# Single Modality Models

class BERTClassifier(nn.Module):
    """BERT 기반 텍스트 분류 모델"""
    def __init__(self, num_classes=NUM_GENRES, dropout=0.3):
        super(BERTClassifier, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Linear(self.bert.config.hidden_size, num_classes)
        
    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        output = self.dropout(pooled_output)
        return self.classifier(output)

class RoBERTaClassifier(nn.Module):
    """RoBERTa 기반 텍스트 분류 모델"""
    def __init__(self, num_classes=NUM_GENRES, dropout=0.3):
        super(RoBERTaClassifier, self).__init__()
        self.roberta = RobertaModel.from_pretrained('roberta-base')
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Linear(self.roberta.config.hidden_size, num_classes)
        
    def forward(self, input_ids, attention_mask):
        outputs = self.roberta(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        output = self.dropout(pooled_output)
        return self.classifier(output)

class ResNetClassifier(nn.Module):
    """ResNet50 기반 이미지 분류 모델"""
    def __init__(self, num_classes=NUM_GENRES, dropout=0.3):
        super(ResNetClassifier, self).__init__()
        self.resnet = timm.create_model('resnet50', pretrained=True)
        self.resnet.fc = nn.Identity()  # 마지막 층 제거
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Linear(2048, num_classes)
        
    def forward(self, images):
        features = self.resnet(images)
        output = self.dropout(features)
        return self.classifier(output)

class ViTClassifier(nn.Module):
    """Vision Transformer 기반 이미지 분류 모델"""
    def __init__(self, num_classes=NUM_GENRES, dropout=0.3):
        super(ViTClassifier, self).__init__()
        self.vit = timm.create_model('vit_base_patch16_224', pretrained=True)
        self.vit.head = nn.Identity()  # 마지막 층 제거
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Linear(768, num_classes)
        
    def forward(self, images):
        features = self.vit(images)
        output = self.dropout(features)
        return self.classifier(output)

print("단일 모달 모델 정의 완료:")
print("- BERT 텍스트 분류기")
print("- RoBERTa 텍스트 분류기") 
print("- ResNet50 이미지 분류기")
print("- Vision Transformer 이미지 분류기")

In [None]:
# 5. 멀티모달 융합 모델 정의
# Multimodal Fusion Models

class EarlyFusionModel(nn.Module):
    """Early Fusion: 특징을 초기에 결합"""
    def __init__(self, num_classes=NUM_GENRES, dropout=0.3):
        super(EarlyFusionModel, self).__init__()
        # 텍스트 인코더
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        # 이미지 인코더
        self.resnet = timm.create_model('resnet50', pretrained=True)
        self.resnet.fc = nn.Identity()
        
        # 융합 층
        self.fusion_dim = self.bert.config.hidden_size + 2048
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Linear(self.fusion_dim, num_classes)
        
    def forward(self, images, input_ids, attention_mask):
        # 텍스트 특징 추출
        text_outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        text_features = text_outputs.pooler_output
        
        # 이미지 특징 추출
        image_features = self.resnet(images)
        
        # 특징 결합
        fused_features = torch.cat([text_features, image_features], dim=1)
        output = self.dropout(fused_features)
        return self.classifier(output)

class LateFusionModel(nn.Module):
    """Late Fusion: 각 모달리티를 독립적으로 학습 후 결과 결합"""
    def __init__(self, num_classes=NUM_GENRES, dropout=0.3):
        super(LateFusionModel, self).__init__()
        # 텍스트 분류기
        self.text_classifier = BERTClassifier(num_classes, dropout)
        # 이미지 분류기
        self.image_classifier = ResNetClassifier(num_classes, dropout)
        
        # 융합 가중치
        self.fusion_weights = nn.Parameter(torch.tensor([0.5, 0.5]))
        
    def forward(self, images, input_ids, attention_mask):
        # 각 모달리티별 예측
        text_logits = self.text_classifier(input_ids, attention_mask)
        image_logits = self.image_classifier(images)
        
        # 가중 평균
        weights = torch.softmax(self.fusion_weights, dim=0)
        fused_logits = weights[0] * text_logits + weights[1] * image_logits
        return fused_logits

class AttentionFusionModel(nn.Module):
    """Attention Fusion: 어텐션 메커니즘으로 모달리티 중요도 결정"""
    def __init__(self, num_classes=NUM_GENRES, dropout=0.3):
        super(AttentionFusionModel, self).__init__()
        # 특징 추출기
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.resnet = timm.create_model('resnet50', pretrained=True)
        self.resnet.fc = nn.Identity()
        
        # 어텐션 메커니즘
        self.text_attention = nn.Linear(self.bert.config.hidden_size, 1)
        self.image_attention = nn.Linear(2048, 1)
        
        # 분류기
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Linear(self.bert.config.hidden_size + 2048, num_classes)
        
    def forward(self, images, input_ids, attention_mask):
        # 특징 추출
        text_outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        text_features = text_outputs.pooler_output
        image_features = self.resnet(images)
        
        # 어텐션 가중치 계산
        text_att = torch.sigmoid(self.text_attention(text_features))
        image_att = torch.sigmoid(self.image_attention(image_features))
        
        # 어텐션 적용
        weighted_text = text_att * text_features
        weighted_image = image_att * image_features
        
        # 특징 결합
        fused_features = torch.cat([weighted_text, weighted_image], dim=1)
        output = self.dropout(fused_features)
        return self.classifier(output)

class CrossAttentionFusionModel(nn.Module):
    """Cross-Attention Fusion: 제안하는 모델"""
    def __init__(self, num_classes=NUM_GENRES, dropout=0.3, d_model=512):
        super(CrossAttentionFusionModel, self).__init__()
        # 특징 추출기
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.resnet = timm.create_model('resnet50', pretrained=True)
        self.resnet.fc = nn.Identity()
        
        # 차원 정렬
        self.text_proj = nn.Linear(self.bert.config.hidden_size, d_model)
        self.image_proj = nn.Linear(2048, d_model)
        
        # Cross-Attention 층
        self.cross_attention = nn.MultiheadAttention(d_model, num_heads=8, dropout=dropout)
        
        # 분류기
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Linear(d_model * 2, num_classes)
        
    def forward(self, images, input_ids, attention_mask):
        # 특징 추출
        text_outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        text_features = text_outputs.pooler_output
        image_features = self.resnet(images)
        
        # 차원 정렬
        text_proj = self.text_proj(text_features).unsqueeze(0)  # [1, batch, d_model]
        image_proj = self.image_proj(image_features).unsqueeze(0)  # [1, batch, d_model]
        
        # Cross-Attention
        text_attended, _ = self.cross_attention(text_proj, image_proj, image_proj)
        image_attended, _ = self.cross_attention(image_proj, text_proj, text_proj)
        
        # 특징 결합
        fused_features = torch.cat([
            text_attended.squeeze(0), 
            image_attended.squeeze(0)
        ], dim=1)
        
        output = self.dropout(fused_features)
        return self.classifier(output)

print("멀티모달 융합 모델 정의 완료:")
print("- Early Fusion Model")
print("- Late Fusion Model")
print("- Attention Fusion Model")
print("- Cross-Attention Fusion Model (제안 모델)")

In [None]:
# 6. 학습 및 평가 함수 정의
# Training and Evaluation Functions

def train_model(model, train_loader, val_loader, num_epochs=NUM_EPOCHS, learning_rate=LEARNING_RATE):
    """모델 학습 함수"""
    model.to(device)
    
    # 손실 함수 및 옵티마이저
    criterion = nn.BCEWithLogitsLoss()  # 멀티라벨 분류
    optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=0.01)
    
    # 학습률 스케줄러
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5)
    
    train_losses = []
    val_losses = []
    best_val_loss = float('inf')
    
    for epoch in range(num_epochs):
        # 학습 단계
        model.train()
        train_loss = 0.0
        train_pbar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs} - Training')
        
        for batch in train_pbar:
            optimizer.zero_grad()
            
            images = batch['image'].to(device)
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            # 모델 유형에 따른 순전파
            if isinstance(model, (BERTClassifier, RoBERTaClassifier)):
                outputs = model(input_ids, attention_mask)
            elif isinstance(model, (ResNetClassifier, ViTClassifier)):
                outputs = model(images)
            else:  # 멀티모달 모델
                outputs = model(images, input_ids, attention_mask)
            
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            train_pbar.set_postfix({'Loss': loss.item()})
        
        # 검증 단계
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for batch in val_loader:
                images = batch['image'].to(device)
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['labels'].to(device)
                
                if isinstance(model, (BERTClassifier, RoBERTaClassifier)):
                    outputs = model(input_ids, attention_mask)
                elif isinstance(model, (ResNetClassifier, ViTClassifier)):
                    outputs = model(images)
                else:
                    outputs = model(images, input_ids, attention_mask)
                
                loss = criterion(outputs, labels)
                val_loss += loss.item()
        
        avg_train_loss = train_loss / len(train_loader)
        avg_val_loss = val_loss / len(val_loader)
        
        train_losses.append(avg_train_loss)
        val_losses.append(avg_val_loss)
        
        print(f'Epoch {epoch+1}: Train Loss = {avg_train_loss:.4f}, Val Loss = {avg_val_loss:.4f}')
        
        # 학습률 조정
        scheduler.step(avg_val_loss)
        
        # 최적 모델 저장
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), f'best_model_{type(model).__name__}.pth')
    
    return train_losses, val_losses

def evaluate_model(model, test_loader, model_name="Model"):
    """모델 평가 함수"""
    model.eval()
    all_predictions = []
    all_labels = []
    all_probabilities = []
    
    with torch.no_grad():
        for batch in tqdm(test_loader, desc=f'Evaluating {model_name}'):
            images = batch['image'].to(device)
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            if isinstance(model, (BERTClassifier, RoBERTaClassifier)):
                outputs = model(input_ids, attention_mask)
            elif isinstance(model, (ResNetClassifier, ViTClassifier)):
                outputs = model(images)
            else:
                outputs = model(images, input_ids, attention_mask)
            
            # 시그모이드 활성화로 확률 계산
            probabilities = torch.sigmoid(outputs)
            predictions = (probabilities > 0.5).float()
            
            all_predictions.append(predictions.cpu())
            all_labels.append(labels.cpu())
            all_probabilities.append(probabilities.cpu())
    
    # 결과 결합
    all_predictions = torch.cat(all_predictions, dim=0).numpy()
    all_labels = torch.cat(all_labels, dim=0).numpy()
    all_probabilities = torch.cat(all_probabilities, dim=0).numpy()
    
    # 평가 지표 계산
    metrics = calculate_metrics(all_labels, all_predictions, all_probabilities)
    return metrics

def calculate_metrics(y_true, y_pred, y_prob):
    """평가 지표 계산 함수"""
    metrics = {}
    
    # 전체 정확도 (Exact Match)
    exact_match = np.mean(np.all(y_true == y_pred, axis=1))
    metrics['Exact_Match_Accuracy'] = exact_match
    
    # 라벨별 정확도
    label_accuracy = np.mean(y_true == y_pred)
    metrics['Label_Accuracy'] = label_accuracy
    
    # Precision, Recall, F1-score (Macro)
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='macro', zero_division=0)
    metrics['Macro_Precision'] = precision
    metrics['Macro_Recall'] = recall
    metrics['Macro_F1'] = f1
    
    # Precision, Recall, F1-score (Micro)
    precision_micro, recall_micro, f1_micro, _ = precision_recall_fscore_support(y_true, y_pred, average='micro', zero_division=0)
    metrics['Micro_Precision'] = precision_micro
    metrics['Micro_Recall'] = recall_micro
    metrics['Micro_F1'] = f1_micro
    
    # ROC-AUC (Macro)
    try:
        roc_auc = roc_auc_score(y_true, y_prob, average='macro')
        metrics['ROC_AUC_Macro'] = roc_auc
    except ValueError:
        metrics['ROC_AUC_Macro'] = 0.0
    
    # mAP (Mean Average Precision)
    try:
        map_score = average_precision_score(y_true, y_prob, average='macro')
        metrics['mAP'] = map_score
    except ValueError:
        metrics['mAP'] = 0.0
    
    return metrics

print("학습 및 평가 함수 정의 완료:")
print("- train_model(): 모델 학습")
print("- evaluate_model(): 모델 평가")
print("- calculate_metrics(): 평가 지표 계산")

In [None]:
# 7. 데이터 로딩 및 분할
# Data Loading and Splitting

def load_and_split_data():
    """
    MM-IMDb 데이터셋을 로드하고 훈련/검증/테스트로 분할
    논문 계획에 따라 70% / 15% / 15%로 분할
    """
    # 실제 데이터 로딩 (예시 코드 - 실제 경로에 맞게 수정 필요)
    try:
        # 메타데이터 로드
        with open(METADATA_PATH, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        print(f"전체 데이터 개수: {len(data)}")
        
        # 데이터 섞기
        np.random.seed(42)
        indices = np.random.permutation(len(data))
        
        # 분할 지점 계산
        train_end = int(0.7 * len(data))
        val_end = int(0.85 * len(data))
        
        train_indices = indices[:train_end]
        val_indices = indices[train_end:val_end]
        test_indices = indices[val_end:]
        
        # 분할된 데이터 생성
        train_data = [data[i] for i in train_indices]
        val_data = [data[i] for i in val_indices]
        test_data = [data[i] for i in test_indices]
        
        print(f"훈련 데이터: {len(train_data)} ({len(train_data)/len(data)*100:.1f}%)")
        print(f"검증 데이터: {len(val_data)} ({len(val_data)/len(data)*100:.1f}%)")
        print(f"테스트 데이터: {len(test_data)} ({len(test_data)/len(data)*100:.1f}%)")
        
        return train_data, val_data, test_data
        
    except FileNotFoundError:
        print(f"데이터셋 파일을 찾을 수 없습니다: {METADATA_PATH}")
        print("샘플 데이터를 생성합니다...")
        return create_sample_data()

def create_sample_data():
    """
    테스트용 샘플 데이터 생성 (실제 데이터가 없을 경우)
    """
    sample_data = []
    for i in range(1000):  # 1000개 샘플
        sample_data.append({
            'imdb_id': f'tt{i:07d}',
            'image': f'movie_{i}.jpg',
            'plot': f'This is a sample movie plot for movie {i}. It contains various elements of storytelling.',
            'genres': np.random.choice(GENRE_LABELS, size=np.random.randint(1, 4), replace=False).tolist()
        })
    
    # 70% / 15% / 15% 분할
    train_data = sample_data[:700]
    val_data = sample_data[700:850]
    test_data = sample_data[850:]
    
    print("샘플 데이터 생성 완료:")
    print(f"훈련 데이터: {len(train_data)}")
    print(f"검증 데이터: {len(val_data)}")
    print(f"테스트 데이터: {len(test_data)}")
    
    return train_data, val_data, test_data

def create_data_loaders(train_data, val_data, test_data, tokenizer):
    """데이터 로더 생성"""
    # 데이터셋 생성
    train_dataset = MMIMDbDataset(train_data, IMAGE_PATH, tokenizer, train_transform, MAX_TEXT_LENGTH)
    val_dataset = MMIMDbDataset(val_data, IMAGE_PATH, tokenizer, val_transform, MAX_TEXT_LENGTH)
    test_dataset = MMIMDbDataset(test_data, IMAGE_PATH, tokenizer, val_transform, MAX_TEXT_LENGTH)
    
    # 데이터 로더 생성
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
    
    return train_loader, val_loader, test_loader

# 데이터 로드 및 분할 실행
print("데이터 로딩 중...")
train_data, val_data, test_data = load_and_split_data()

In [None]:
# MM-IMDb 데이터셋 다운로드
# MM-IMDb Dataset Download

import requests
import zipfile
import tarfile
from pathlib import Path
import shutil

def download_mmimdb_dataset():
    """MM-IMDb 데이터셋 다운로드 및 압축 해제"""
    
    # 데이터셋 URL들
    urls = {
        'metadata': 'https://archive.org/download/mmimdb/mmimdb.tar.gz',
        'images': 'https://archive.org/download/mmimdb/mmimdb.tar.gz'  # 같은 파일에 포함
    }
    
    # 다운로드 디렉토리 생성
    download_dir = Path("downloads")
    data_dir = Path("data/mmimdb")
    
    download_dir.mkdir(exist_ok=True)
    data_dir.mkdir(parents=True, exist_ok=True)
    
    print("📥 MM-IMDb 데이터셋 다운로드를 시작합니다...")
    
    # 데이터셋 다운로드
    dataset_file = download_dir / "mmimdb.tar.gz"
    
    if not dataset_file.exists():
        print("🌐 데이터셋 파일 다운로드 중...")
        
        try:
            response = requests.get(urls['metadata'], stream=True)
            response.raise_for_status()
            
            total_size = int(response.headers.get('content-length', 0))
            downloaded_size = 0
            
            with open(dataset_file, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
                        downloaded_size += len(chunk)
                        
                        # 진행률 표시
                        if total_size > 0:
                            percent = (downloaded_size / total_size) * 100
                            print(f"\r진행률: {percent:.1f}% ({downloaded_size/1024/1024:.1f}MB / {total_size/1024/1024:.1f}MB)", end='')
            
            print(f"\n✅ 다운로드 완료: {dataset_file}")
            
        except requests.exceptions.RequestException as e:
            print(f"❌ 다운로드 실패: {e}")
            return False
    else:
        print(f"✅ 이미 다운로드됨: {dataset_file}")
    
    # 압축 해제
    print("📦 압축 파일 해제 중...")
    try:
        with tarfile.open(dataset_file, 'r:gz') as tar:
            # 압축 파일 내용 확인
            members = tar.getmembers()
            print(f"압축 파일 내 파일 수: {len(members)}")
            
            # 진행률과 함께 압축 해제
            for i, member in enumerate(members):
                tar.extract(member, path=download_dir)
                if i % 100 == 0:  # 100개마다 진행률 업데이트
                    percent = (i / len(members)) * 100
                    print(f"\r압축 해제 진행률: {percent:.1f}%", end='')
            
            print(f"\n✅ 압축 해제 완료")
            
    except Exception as e:
        print(f"❌ 압축 해제 실패: {e}")
        return False
    
    # 파일 정리
    print("📁 파일 정리 중...")
    
    # 압축 해제된 디렉토리 찾기
    extracted_dirs = [d for d in download_dir.iterdir() if d.is_dir()]
    
    if extracted_dirs:
        source_dir = extracted_dirs[0]  # 첫 번째 디렉토리
        
        # 파일들을 data/mmimdb로 이동
        for item in source_dir.rglob('*'):
            if item.is_file():
                # 상대 경로 계산
                rel_path = item.relative_to(source_dir)
                dest_path = data_dir / rel_path
                
                # 대상 디렉토리 생성
                dest_path.parent.mkdir(parents=True, exist_ok=True)
                
                # 파일 이동
                shutil.copy2(item, dest_path)
        
        print(f"✅ 파일 정리 완료: {data_dir}")
    
    # 다운로드 임시 파일 정리
    print("🧹 임시 파일 정리 중...")
    if dataset_file.exists():
        dataset_file.unlink()
    
    for temp_dir in extracted_dirs:
        if temp_dir.exists():
            shutil.rmtree(temp_dir)
    
    # 결과 확인
    print("\n📊 다운로드 결과:")
    if data_dir.exists():
        file_count = len(list(data_dir.rglob('*')))
        print(f"- 저장 위치: {data_dir}")
        print(f"- 총 파일 수: {file_count}")
        
        # 주요 파일들 확인
        important_files = ['dataset.json', 'split.json']
        for file_name in important_files:
            file_path = data_dir / file_name
            if file_path.exists():
                print(f"- ✅ {file_name} 발견")
            else:
                print(f"- ❌ {file_name} 없음")
        
        # 이미지 디렉토리 확인
        image_dirs = ['images', 'imgs', 'posters']
        for dir_name in image_dirs:
            dir_path = data_dir / dir_name
            if dir_path.exists() and dir_path.is_dir():
                image_count = len(list(dir_path.glob('*')))
                print(f"- ✅ {dir_name} 디렉토리: {image_count}개 파일")
    
    return True

def download_alternative_mmimdb():
    """대안 다운로드 방법 (Kaggle 등)"""
    print("🔄 대안 다운로드 방법을 시도합니다...")
    
    # GitHub에서 샘플 데이터 또는 다른 소스 시도
    alternative_urls = [
        "https://github.com/johnarevalo/mmimdb/archive/refs/heads/master.zip",
        "https://raw.githubusercontent.com/johnarevalo/mmimdb/master/dataset.json"
    ]
    
    data_dir = Path("data/mmimdb")
    data_dir.mkdir(parents=True, exist_ok=True)
    
    for i, url in enumerate(alternative_urls):
        try:
            print(f"시도 {i+1}: {url}")
            response = requests.get(url, stream=True, timeout=30)
            response.raise_for_status()
            
            if url.endswith('.zip'):
                file_path = data_dir / "mmimdb_alt.zip"
                with open(file_path, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        if chunk:
                            f.write(chunk)
                
                # ZIP 압축 해제
                with zipfile.ZipFile(file_path, 'r') as zip_ref:
                    zip_ref.extractall(data_dir)
                file_path.unlink()  # ZIP 파일 삭제
                
            elif url.endswith('.json'):
                file_path = data_dir / "dataset.json"
                with open(file_path, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        if chunk:
                            f.write(chunk)
            
            print(f"✅ 성공: {url}")
            return True
            
        except Exception as e:
            print(f"❌ 실패: {e}")
            continue
    
    return False

# 실행
print("MM-IMDb 데이터셋 다운로드를 시작합니다...\n")

success = download_mmimdb_dataset()

if not success:
    print("\n기본 다운로드 실패. 대안 방법을 시도합니다...")
    success = download_alternative_mmimdb()

if not success:
    print("\n❌ 모든 다운로드 방법이 실패했습니다.")
    print("수동으로 다음 작업을 수행해주세요:")
    print("1. https://archive.org/details/mmimdb 에서 데이터셋 다운로드")
    print("2. 압축 해제 후 data/mmimdb 폴더에 저장")
    print("3. 또는 Kaggle, GitHub 등에서 MM-IMDb 데이터셋 검색")
else:
    print(f"\n🎉 MM-IMDb 데이터셋 다운로드 완료!")
    print(f"저장 위치: data/mmimdb")
    print("이제 다음 셀에서 데이터를 로드할 수 있습니다.")

In [None]:
# 8. 모델 실험 실행
# Model Experiments Execution

# 실험할 모델들 정의
models_to_test = {
    'BERT': BERTClassifier(),
    'RoBERTa': RoBERTaClassifier(),
    'ResNet50': ResNetClassifier(),
    'ViT': ViTClassifier(),
    'Early_Fusion': EarlyFusionModel(),
    'Late_Fusion': LateFusionModel(),
    'Attention_Fusion': AttentionFusionModel(),
    'Cross_Attention_Fusion': CrossAttentionFusionModel()  # 제안 모델
}

# 실험 결과 저장용 딕셔너리
experiment_results = {}

def run_experiment(model_name, model, train_loader, val_loader, test_loader):
    """단일 모델 실험 실행"""
    print(f"\n{'='*50}")
    print(f"실험 시작: {model_name}")
    print(f"{'='*50}")
    
    # 모델 학습
    print("모델 학습 중...")
    train_losses, val_losses = train_model(model, train_loader, val_loader)
    
    # 최적 모델 로드
    model.load_state_dict(torch.load(f'best_model_{type(model).__name__}.pth'))
    
    # 모델 평가
    print("모델 평가 중...")
    metrics = evaluate_model(model, test_loader, model_name)
    
    # 결과 저장
    experiment_results[model_name] = {
        'metrics': metrics,
        'train_losses': train_losses,
        'val_losses': val_losses
    }
    
    # 결과 출력
    print(f"\n{model_name} 실험 결과:")
    print("-" * 30)
    for metric, value in metrics.items():
        print(f"{metric}: {value:.4f}")
    
    return metrics

# 축약된 실험 (시간 절약을 위해)
print("축약된 실험을 실행합니다 (각 모델 5 에포크)...")
NUM_EPOCHS = 5  # 빠른 테스트를 위해 에포크 수 줄임

# BERT 텍스트 모델만 우선 테스트 (예시)
print("BERT 모델 테스트를 위한 데이터 로더 생성...")
try:
    train_loader, val_loader, test_loader = create_data_loaders(
        train_data, val_data, test_data, bert_tokenizer
    )
    print("데이터 로더 생성 완료")
    
    # BERT 모델 테스트
    bert_model = BERTClassifier()
    print("BERT 모델 초기화 완료")
    
except Exception as e:
    print(f"데이터 로더 생성 중 오류 발생: {e}")
    print("실제 데이터가 없어 모델 구조만 확인합니다.")

In [None]:
# 9. 결과 시각화 및 분석
# Results Visualization and Analysis

def plot_training_curves(experiment_results):
    """학습 곡선 시각화"""
    fig, axes = plt.subplots(2, 4, figsize=(20, 10))
    axes = axes.flatten()
    
    for idx, (model_name, results) in enumerate(experiment_results.items()):
        if idx >= len(axes):
            break
            
        ax = axes[idx]
        train_losses = results['train_losses']
        val_losses = results['val_losses']
        
        epochs = range(1, len(train_losses) + 1)
        ax.plot(epochs, train_losses, 'b-', label='Training Loss')
        ax.plot(epochs, val_losses, 'r-', label='Validation Loss')
        ax.set_title(f'{model_name} Learning Curves')
        ax.set_xlabel('Epoch')
        ax.set_ylabel('Loss')
        ax.legend()
        ax.grid(True)
    
    # 빈 subplot 숨기기
    for idx in range(len(experiment_results), len(axes)):
        axes[idx].set_visible(False)
    
    plt.tight_layout()
    plt.show()

def plot_performance_comparison(experiment_results):
    """모델 성능 비교 시각화"""
    if not experiment_results:
        print("실험 결과가 없습니다. 먼저 모델을 학습해주세요.")
        return
    
    # 주요 지표들 추출
    metrics_to_plot = ['Exact_Match_Accuracy', 'Label_Accuracy', 'Macro_F1', 'ROC_AUC_Macro', 'mAP']
    
    model_names = list(experiment_results.keys())
    metric_data = {metric: [] for metric in metrics_to_plot}
    
    for model_name in model_names:
        for metric in metrics_to_plot:
            value = experiment_results[model_name]['metrics'].get(metric, 0)
            metric_data[metric].append(value)
    
    # 히트맵 생성
    df = pd.DataFrame(metric_data, index=model_names)
    
    plt.figure(figsize=(12, 8))
    sns.heatmap(df, annot=True, cmap='YlOrRd', fmt='.3f', cbar=True)
    plt.title('Model Performance Comparison Heatmap')
    plt.xlabel('Metrics')
    plt.ylabel('Models')
    plt.xticks(rotation=45)
    plt.yticks(rotation=0)
    plt.tight_layout()
    plt.show()
    
    # 막대 그래프
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))
    axes = axes.flatten()
    
    for idx, metric in enumerate(metrics_to_plot):
        if idx >= len(axes):
            break
            
        ax = axes[idx]
        values = metric_data[metric]
        bars = ax.bar(model_names, values, color='skyblue', alpha=0.7)
        ax.set_title(f'{metric} Comparison')
        ax.set_ylabel(metric)
        ax.tick_params(axis='x', rotation=45)
        
        # 최고 성능 모델 강조
        if values:
            max_idx = values.index(max(values))
            bars[max_idx].set_color('orange')
        
        # 값 표시
        for bar, value in zip(bars, values):
            ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.001,
                   f'{value:.3f}', ha='center', va='bottom')
    
    # 빈 subplot 숨기기
    for idx in range(len(metrics_to_plot), len(axes)):
        axes[idx].set_visible(False)
    
    plt.tight_layout()
    plt.show()

def generate_performance_table(experiment_results):
    """성능 비교 테이블 생성"""
    if not experiment_results:
        print("실험 결과가 없습니다.")
        return
    
    # 성능 테이블 생성
    metrics_to_include = [
        'Exact_Match_Accuracy', 'Label_Accuracy', 'Macro_Precision', 
        'Macro_Recall', 'Macro_F1', 'ROC_AUC_Macro', 'mAP'
    ]
    
    table_data = []
    for model_name, results in experiment_results.items():
        row = [model_name]
        for metric in metrics_to_include:
            value = results['metrics'].get(metric, 0)
            row.append(f"{value:.4f}")
        table_data.append(row)
    
    # DataFrame으로 변환
    columns = ['Model'] + metrics_to_include
    df = pd.DataFrame(table_data, columns=columns)
    
    # 최고 성능 찾기
    print("📊 모델 성능 비교 결과")
    print("=" * 120)
    print(df.to_string(index=False))
    print("=" * 120)
    
    # 최고 성능 모델 요약
    for metric in metrics_to_include[1:]:  # Model 컬럼 제외
        values = [float(row[metrics_to_include.index(metric)]) for row in table_data]
        best_idx = values.index(max(values))
        best_model = table_data[best_idx][0]
        best_value = max(values)
        print(f"🏆 {metric} 최고 성능: {best_model} ({best_value:.4f})")
    
    return df

# 샘플 결과 시각화 (실제 실험 후 사용)
print("결과 시각화 함수 정의 완료:")
print("- plot_training_curves(): 학습 곡선 그래프")
print("- plot_performance_comparison(): 성능 비교 히트맵 및 막대그래프")
print("- generate_performance_table(): 성능 비교 테이블")
print("\n실제 모델 학습 후 다음과 같이 사용하세요:")
print("plot_training_curves(experiment_results)")
print("plot_performance_comparison(experiment_results)")
print("generate_performance_table(experiment_results)")

In [None]:
# 10. 설명가능성 분석 (XAI) - Grad-CAM & Attention Map
# Explainable AI Analysis

import cv2
from matplotlib import cm

class GradCAM:
    """Grad-CAM 구현 클래스"""
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.gradients = None
        self.activations = None
        
        # Hook 등록
        self.target_layer.register_forward_hook(self.save_activation)
        self.target_layer.register_backward_hook(self.save_gradient)
    
    def save_activation(self, module, input, output):
        self.activations = output.detach()
    
    def save_gradient(self, module, grad_input, grad_output):
        self.gradients = grad_output[0].detach()
    
    def generate_cam(self, input_image, class_idx=None):
        """Grad-CAM 생성"""
        # 순전파
        output = self.model(input_image)
        
        if class_idx is None:
            class_idx = output.argmax(dim=1)
        
        # 역전파
        self.model.zero_grad()
        class_score = output[:, class_idx].squeeze()
        class_score.backward(retain_graph=True)
        
        # Grad-CAM 계산
        gradients = self.gradients[0]  # [C, H, W]
        activations = self.activations[0]  # [C, H, W]
        
        # Global Average Pooling
        weights = torch.mean(gradients, dim=(1, 2))  # [C]
        
        # 가중 합
        cam = torch.zeros(activations.shape[1:])  # [H, W]
        for i, w in enumerate(weights):
            cam += w * activations[i, :, :]
        
        # ReLU 적용
        cam = torch.relu(cam)
        
        # 정규화
        cam = cam - cam.min()
        cam = cam / cam.max()
        
        return cam.cpu().numpy()

def visualize_gradcam(model, image, genre_idx, image_transform):
    """Grad-CAM 시각화"""
    # ResNet 기반 모델의 마지막 컨볼루션 레이어 찾기
    if hasattr(model, 'resnet'):
        target_layer = model.resnet.layer4[-1].conv3
    elif hasattr(model, 'vit'):
        # ViT의 경우 다른 방식 필요
        print("ViT 모델의 Grad-CAM은 별도 구현이 필요합니다.")
        return
    else:
        print("이미지 특징 추출 레이어를 찾을 수 없습니다.")
        return
    
    # Grad-CAM 생성
    gradcam = GradCAM(model, target_layer)
    model.eval()
    
    # 입력 이미지 준비
    input_tensor = image.unsqueeze(0).to(device)
    
    # CAM 생성
    cam = gradcam.generate_cam(input_tensor, genre_idx)
    
    # 원본 이미지로 변환 (정규화 해제)
    mean = torch.tensor([0.485, 0.456, 0.406])
    std = torch.tensor([0.229, 0.224, 0.225])
    
    original_image = image.clone()
    for t, m, s in zip(original_image, mean, std):
        t.mul_(s).add_(m)
    original_image = torch.clamp(original_image, 0, 1)
    
    # 시각화
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    # 원본 이미지
    axes[0].imshow(original_image.permute(1, 2, 0))
    axes[0].set_title('Original Image')
    axes[0].axis('off')
    
    # Grad-CAM 히트맵
    axes[1].imshow(cam, cmap='jet')
    axes[1].set_title(f'Grad-CAM for {GENRE_LABELS[genre_idx]}')
    axes[1].axis('off')
    
    # 오버레이
    overlay = original_image.permute(1, 2, 0).numpy()
    cam_resized = cv2.resize(cam, (overlay.shape[1], overlay.shape[0]))
    cam_colored = cm.jet(cam_resized)[:, :, :3]
    
    overlay_image = 0.6 * overlay + 0.4 * cam_colored
    axes[2].imshow(overlay_image)
    axes[2].set_title('Overlay')
    axes[2].axis('off')
    
    plt.tight_layout()
    plt.show()

class AttentionVisualizer:
    """어텐션 맵 시각화 클래스"""
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    def extract_attention_weights(self, input_ids, attention_mask):
        """어텐션 가중치 추출"""
        self.model.eval()
        
        # BERT의 경우
        if hasattr(self.model, 'bert'):
            outputs = self.model.bert(
                input_ids=input_ids,
                attention_mask=attention_mask,
                output_attentions=True
            )
            attentions = outputs.attentions  # Tuple of attention weights
            
            # 마지막 레이어의 첫 번째 헤드 사용
            attention_weights = attentions[-1][0, 0, :, :].detach().cpu().numpy()
            return attention_weights
        
        return None
    
    def visualize_attention(self, text, input_ids, attention_mask):
        """어텐션 맵 시각화"""
        attention_weights = self.extract_attention_weights(input_ids, attention_mask)
        
        if attention_weights is None:
            print("어텐션 가중치를 추출할 수 없습니다.")
            return
        
        # 토큰 디코딩
        tokens = self.tokenizer.convert_ids_to_tokens(input_ids[0])
        
        # [CLS] 토큰의 어텐션만 사용 (첫 번째 토큰)
        cls_attention = attention_weights[0, :]
        
        # 유효한 토큰만 선별 (패딩 제외)
        valid_length = attention_mask.sum().item()
        tokens = tokens[:valid_length]
        cls_attention = cls_attention[:valid_length]
        
        # 시각화
        plt.figure(figsize=(15, 8))
        
        # 어텐션 히트맵
        plt.subplot(2, 1, 1)
        plt.imshow(cls_attention.reshape(1, -1), cmap='Blues', aspect='auto')
        plt.xticks(range(len(tokens)), tokens, rotation=45, ha='right')
        plt.yticks([0], ['[CLS]'])
        plt.title('Attention Weights from [CLS] Token')
        plt.colorbar()
        
        # 어텐션 막대 그래프
        plt.subplot(2, 1, 2)
        bars = plt.bar(range(len(tokens)), cls_attention, color='skyblue', alpha=0.7)
        plt.xticks(range(len(tokens)), tokens, rotation=45, ha='right')
        plt.ylabel('Attention Weight')
        plt.title('Token-wise Attention Weights')
        
        # 높은 어텐션 토큰 강조
        top_indices = cls_attention.argsort()[-5:]  # 상위 5개
        for idx in top_indices:
            bars[idx].set_color('orange')
        
        plt.tight_layout()
        plt.show()
        
        # 중요한 토큰들 출력
        print("🔍 높은 어텐션을 받은 토큰들:")
        for idx in top_indices[::-1]:
            print(f"  '{tokens[idx]}': {cls_attention[idx]:.4f}")

def run_xai_analysis(model, sample_batch, model_name="Model"):
    """XAI 분석 실행"""
    print(f"\n{'='*50}")
    print(f"설명가능성 분석: {model_name}")
    print(f"{'='*50}")
    
    model.eval()
    
    # 샘플 데이터 선택
    sample_image = sample_batch['image'][0]
    sample_input_ids = sample_batch['input_ids'][0:1]
    sample_attention_mask = sample_batch['attention_mask'][0:1]
    sample_text = sample_batch['text'][0]
    sample_labels = sample_batch['labels'][0]
    
    # 예측 수행
    with torch.no_grad():
        if isinstance(model, (BERTClassifier, RoBERTaClassifier)):
            outputs = model(sample_input_ids.to(device), sample_attention_mask.to(device))
        elif isinstance(model, (ResNetClassifier, ViTClassifier)):
            outputs = model(sample_image.unsqueeze(0).to(device))
        else:  # 멀티모달 모델
            outputs = model(
                sample_image.unsqueeze(0).to(device),
                sample_input_ids.to(device),
                sample_attention_mask.to(device)
            )
    
    probabilities = torch.sigmoid(outputs)
    predictions = (probabilities > 0.5).float()
    
    # 예측된 장르들
    predicted_genres = [GENRE_LABELS[i] for i, pred in enumerate(predictions[0]) if pred == 1]
    actual_genres = [GENRE_LABELS[i] for i, label in enumerate(sample_labels) if label == 1]
    
    print(f"실제 장르: {actual_genres}")
    print(f"예측 장르: {predicted_genres}")
    print()
    
    # 이미지 모델인 경우 Grad-CAM
    if isinstance(model, (ResNetClassifier, ViTClassifier)) or hasattr(model, 'resnet'):
        print("Grad-CAM 분석 중...")
        if predicted_genres:
            top_genre_idx = GENRE_LABELS.index(predicted_genres[0])
            visualize_gradcam(model, sample_image, top_genre_idx, val_transform)
    
    # 텍스트 모델인 경우 Attention Map
    if isinstance(model, (BERTClassifier, RoBERTaClassifier)) or hasattr(model, 'bert'):
        print("Attention Map 분석 중...")
        if isinstance(model, BERTClassifier) or hasattr(model, 'bert'):
            visualizer = AttentionVisualizer(model, bert_tokenizer)
            visualizer.visualize_attention(sample_text, sample_input_ids, sample_attention_mask)

print("설명가능성 분석 도구 정의 완료:")
print("- GradCAM: 이미지 영역 중요도 시각화")
print("- AttentionVisualizer: 텍스트 토큰 중요도 시각화")
print("- run_xai_analysis(): 통합 XAI 분석 실행")

# 📋 실험 결론 및 요약

## 🎯 연구 목표 달성도

### 1. 모델 비교 분석
- **단일 모달리티 모델**: BERT, RoBERTa (텍스트), ResNet50, ViT (이미지)
- **멀티모달 융합 모델**: Early Fusion, Late Fusion, Attention Fusion
- **제안 모델**: Cross-Attention Fusion

### 2. 평가 지표
- **정확도 지표**: Exact Match Accuracy, Label Accuracy
- **분류 성능**: Macro/Micro Precision, Recall, F1-score
- **순위 기반**: ROC-AUC, mAP (Mean Average Precision)

### 3. 설명가능성 (XAI)
- **이미지 분석**: Grad-CAM을 통한 포스터 내 중요 영역 시각화
- **텍스트 분석**: Attention Map을 통한 줄거리 내 중요 토큰 식별

## 🔬 주요 발견사항

### 예상 결과
1. **Cross-Attention Fusion 모델**이 기존 융합 방식 대비 우수한 성능 예상
2. **멀티모달 모델**이 단일 모달리티 모델보다 높은 성능 예상
3. **텍스트 정보**가 이미지보다 장르 예측에 더 중요할 것으로 예상

### 성능 비교 순서 (예상)
1. Cross-Attention Fusion (제안 모델)
2. Attention Fusion
3. Late Fusion
4. Early Fusion
5. BERT/RoBERTa (텍스트 단일)
6. ResNet50/ViT (이미지 단일)

## 📊 활용 방법

### 실험 실행 순서
1. **환경 설정**: 필요한 라이브러리 설치 및 GPU 설정
2. **데이터 준비**: MM-IMDb 데이터셋 다운로드 및 경로 설정
3. **모델 학습**: 각 모델별 순차 학습 및 검증
4. **성능 평가**: 테스트 데이터셋으로 최종 평가
5. **결과 분석**: 시각화 및 XAI 분석

### 실제 데이터 사용시 수정사항
- `DATASET_PATH`, `IMAGE_PATH`, `METADATA_PATH` 변수를 실제 경로로 수정
- 데이터셋 형식에 맞게 `MMIMDbDataset` 클래스 조정
- GPU 메모리에 따라 `BATCH_SIZE` 조정
- 수렴 속도에 따라 `NUM_EPOCHS` 조정

## 🚀 확장 가능성

### 추가 실험 아이디어
1. **객체 탐지 통합**: YOLO, Faster R-CNN 특징 활용
2. **데이터 증강**: 더 다양한 이미지/텍스트 증강 기법
3. **앙상블 학습**: 여러 모델의 예측 결합
4. **하이퍼파라미터 최적화**: Optuna 등을 활용한 자동 튜닝
5. **전이 학습**: 다른 영화 데이터셋으로 일반화 성능 검증

### 논문 기여점
- Cross-Attention 기반 멀티모달 융합 방법론 제안
- MM-IMDb 데이터셋에서의 체계적인 모델 비교 분석
- 설명가능성 관점에서의 모델 해석 및 분석