In [1]:
import os
import json
import numpy as np
import torch
import random
import pandas as pd
from torch.utils.data import Dataset, DataLoader, Subset, TensorDataset, random_split
from torch.nn.utils.rnn import pad_sequence
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import classification_report
from collections import Counter
import torchvision
import torchvision.transforms as transforms
import torch.nn.functional as F
from torchinfo import summary
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
class MedVidNpyDataset(Dataset):
    def __init__(self, npy_dir, json_path, label_map):
        self.npy_dir = npy_dir
        self.label_map = label_map
        with open(json_path, 'r') as f:
            label_data = json.load(f)
        self.data = []
        for item in label_data:
            video_id = item['video_id']
            label_str = item['label']
            label = self.label_map[label_str]
            self.data.append((video_id, label))
    def __getitem__(self, idx):
        video_id, label = self.data[idx]
        npy_path = os.path.join(self.npy_dir, f'{video_id}.npy')
        features = np.load(npy_path)
        features = torch.tensor(features, dtype=torch.float32)  # (T, 768)
        return features, label
    def __len__(self):
        return len(self.data)

In [3]:
class Args:
    seed = 42
    source_dir = '../MedVidCL'
    npy_dir = '../VideoFeatures/MedVidCL/ViT'
    batch_size = 8
    num_epochs = 10
    learning_rate = 1e-4
    embed_dim = 768
    num_heads = 8
    ff_dim = 2048 
    num_layers = 2
    max_T = 20
    num_classes = 3

args = Args()

In [4]:
torch.manual_seed(args.seed)
np.random.seed(args.seed)
random.seed(args.seed)
os.environ['PYTHONHASHSEED'] = str(args.seed)

In [5]:
label_map = {'Medical Instructional': 0, 'Medical Non-instructional': 1, 'Non-medical': 2}
reverse_label_map = {v: k for k, v in label_map.items()}

train_dataset = MedVidNpyDataset(npy_dir='../VideoFeatures/MedVidCL/ViT/train/', json_path='../MedVidCL/train.json', label_map=label_map)
val_dataset = MedVidNpyDataset(npy_dir='../VideoFeatures/MedVidCL/ViT/val/', json_path='../MedVidCL/val.json', label_map=label_map)
test_dataset = MedVidNpyDataset(npy_dir='../VideoFeatures/MedVidCL/ViT/test/', json_path='../MedVidCL/test.json', label_map=label_map)

In [7]:
def collate_fn(batch):
    features, labels = zip(*batch)
    padded = pad_sequence(features, batch_first=True)  # (B, Tmax, 768)
    lengths = torch.tensor([f.shape[0] for f in features])
    return padded, torch.tensor(labels), lengths

# ---------------------- Positional Encoding ----------------------
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=500):
        super().__init__()
        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-np.log(10000.0) / d_model))
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(1), :].unsqueeze(0).to(x.device)

# ---------------------- Transformer Block ----------------------
class TransformerBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, mlp_ratio=4, dropout=0.5):
        super().__init__()
        self.norm1 = nn.LayerNorm(embed_dim)
        self.attn = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout, batch_first=True)
        self.norm2 = nn.LayerNorm(embed_dim)
        hidden_dim = int(embed_dim * mlp_ratio)
        self.mlp = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, embed_dim)
        )

    def forward(self, x):
        x = x + self.attn(self.norm1(x), x, x, need_weights=False)[0]
        return x + self.mlp(self.norm2(x))

# ---------------------- FTTransformer Model ----------------------
class FTTransformer(nn.Module):
    def __init__(self, input_dim, embed_dim=128, depth=4, num_heads=4, dropout=0.2, num_classes=3, max_len=500):
        super().__init__()
        self.embed = nn.Linear(input_dim, embed_dim)
        self.pos_encoder = PositionalEncoding(embed_dim, max_len=max_len)
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))
        self.dropout = nn.Dropout(dropout)
        self.blocks = nn.ModuleList([
            TransformerBlock(embed_dim, num_heads, dropout=dropout)
            for _ in range(depth)
        ])
        self.norm = nn.LayerNorm(embed_dim)
        self.head = nn.Linear(embed_dim, num_classes)

    def forward(self, x, lengths):
        B = x.size(0)
        x = self.embed(x)
        x = self.pos_encoder(x)
        cls = self.cls_token.expand(B, 1, -1)
        x = torch.cat([cls, x], dim=1)
        x = self.dropout(x)
        for block in self.blocks:
            x = block(x)
        x = self.norm(x)
        return self.head(x[:, 0])  # use CLS token

In [9]:
def train_model(model, train_loader, val_loader, device, epochs=40, lr=1e-4):
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    best_acc = 0.0
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for features, labels, lengths in train_loader:
            features, labels = features.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(features, lengths)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        model.eval()
        correct, total = 0, 0
        with torch.no_grad():
            for features, labels, lengths in val_loader:
                features, labels = features.to(device), labels.to(device)
                outputs = model(features, lengths)
                preds = outputs.argmax(dim=1)
                correct += (preds == labels).sum().item()
                total += labels.size(0)
        val_acc = correct / total
        print(f"Epoch {epoch+1} | Loss: {total_loss:.4f} | Val Acc: {val_acc:.4f}")
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), "best_model.pth")
    model.load_state_dict(torch.load("best_model.pth"))
    return model

# ---------------------- Evaluation Function ----------------------
def evaluate_model(model, loader, device, target_names):
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for features, labels, lengths in loader:
            features, labels = features.to(device), labels.to(device)
            outputs = model(features, lengths)
            preds = outputs.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    print(classification_report(all_labels, all_preds, target_names=target_names, digits=4))

# ---------------------- Example Usage ----------------------
def run_pipeline():
    label_map = {'Medical Instructional': 0, 'Medical Non-instructional': 1, 'Non-medical': 2}
    reverse_label_map = {v: k for k, v in label_map.items()}
    train_dataset = MedVidNpyDataset('../VideoFeatures/MedVidCL/ViT/train/', '../MedVidCL/train.json', label_map)
    val_dataset = MedVidNpyDataset('../VideoFeatures/MedVidCL/ViT/val/', '../MedVidCL/val.json', label_map)
    test_dataset = MedVidNpyDataset('../VideoFeatures/MedVidCL/ViT/test/', '../MedVidCL/test.json', label_map)

    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=16, collate_fn=collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=16, collate_fn=collate_fn)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = FTTransformer(input_dim=768, embed_dim=128, depth=4, num_heads=4, dropout=0.2, num_classes=3)
    model = train_model(model, train_loader, val_loader, device)
    target_names = [reverse_label_map[i] for i in range(len(reverse_label_map))]
    evaluate_model(model, test_loader, device, target_names)

In [35]:
run_pipeline()

Epoch 1 | Loss: 150.9092 | Val Acc: 0.7900
Epoch 2 | Loss: 95.6015 | Val Acc: 0.8167
Epoch 3 | Loss: 89.2646 | Val Acc: 0.7733
Epoch 4 | Loss: 88.3634 | Val Acc: 0.8200
Epoch 5 | Loss: 80.6524 | Val Acc: 0.7833
Epoch 6 | Loss: 78.6010 | Val Acc: 0.8367
Epoch 7 | Loss: 77.1263 | Val Acc: 0.8233
Epoch 8 | Loss: 72.6230 | Val Acc: 0.8367
Epoch 9 | Loss: 71.8934 | Val Acc: 0.8333
Epoch 10 | Loss: 69.1182 | Val Acc: 0.8133
Epoch 11 | Loss: 67.3306 | Val Acc: 0.8067
Epoch 12 | Loss: 65.1314 | Val Acc: 0.8133
Epoch 13 | Loss: 63.0813 | Val Acc: 0.8300
Epoch 14 | Loss: 62.1159 | Val Acc: 0.7967
Epoch 15 | Loss: 59.0107 | Val Acc: 0.8267
Epoch 16 | Loss: 59.3925 | Val Acc: 0.8000
Epoch 17 | Loss: 55.0579 | Val Acc: 0.8200
Epoch 18 | Loss: 52.4115 | Val Acc: 0.8000
Epoch 19 | Loss: 52.9976 | Val Acc: 0.8133
Epoch 20 | Loss: 51.8933 | Val Acc: 0.8033
Epoch 21 | Loss: 50.6187 | Val Acc: 0.8100
Epoch 22 | Loss: 49.8881 | Val Acc: 0.8067
Epoch 23 | Loss: 48.0079 | Val Acc: 0.7833
Epoch 24 | Loss: 47

### ablation study

In [None]:
# 1. Without Positional Encoding
class FTTransformerNoPosEnc(nn.Module):
    def __init__(self, input_dim, embed_dim=128, depth=4, num_heads=4, dropout=0.2, num_classes=3, max_len=500):
        super().__init__()
        self.embed = nn.Linear(input_dim, embed_dim)
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))
        self.dropout = nn.Dropout(dropout)
        self.blocks = nn.ModuleList([
            TransformerBlock(embed_dim, num_heads, dropout=dropout)
            for _ in range(depth)
        ])
        self.norm = nn.LayerNorm(embed_dim)
        self.head = nn.Linear(embed_dim, num_classes)

    def forward(self, x, lengths):
        B = x.size(0)
        x = self.embed(x)
        # Positional encoding removed
        cls = self.cls_token.expand(B, 1, -1)
        x = torch.cat([cls, x], dim=1)
        x = self.dropout(x)
        for block in self.blocks:
            x = block(x)
        x = self.norm(x)
        return self.head(x[:, 0])

# 2. Without CLS Token (using mean pooling)
class FTTransformerNoCLS(nn.Module):
    def __init__(self, input_dim, embed_dim=128, depth=4, num_heads=4, dropout=0.2, num_classes=3, max_len=500):
        super().__init__()
        self.embed = nn.Linear(input_dim, embed_dim)
        self.pos_encoder = PositionalEncoding(embed_dim, max_len=max_len)
        self.dropout = nn.Dropout(dropout)
        self.blocks = nn.ModuleList([
            TransformerBlock(embed_dim, num_heads, dropout=dropout)
            for _ in range(depth)
        ])
        self.norm = nn.LayerNorm(embed_dim)
        self.head = nn.Linear(embed_dim, num_classes)

    def forward(self, x, lengths):
        x = self.embed(x)
        x = self.pos_encoder(x)
        x = self.dropout(x)
        for block in self.blocks:
            x = block(x)
        x = self.norm(x)
        # Mean pooling instead of CLS token
        return self.head(x.mean(dim=1))

# Ablation Study Runner
def run_ablation_study(train_loader, val_loader, test_loader, device):
    ablations = {
        "Original": FTTransformer(input_dim=768, num_classes=3),
        "No_Positional_Encoding": FTTransformerNoPosEnc(input_dim=768, num_classes=3),
        "No_CLS_Token": FTTransformerNoCLS(input_dim=768, num_classes=3)
    }
    
    results = {}
    target_names = [reverse_label_map[i] for i in range(3)]
    
    for name, model in ablations.items():
        print(f"\n=== Running ablation: {name} ===")
        trained_model = train_model(model, train_loader, val_loader, device)
        print(f"\nResults for {name}:")
        
        # 수정된 부분: evaluate_model 호출 방식 변경
        all_preds, all_labels = evaluate_model(
            trained_model, 
            test_loader, 
            device, 
            target_names, 
            return_preds_labels=True
        )
        
        # 결과 리포트 생성
        print(classification_report(all_labels, all_preds, target_names=target_names, digits=4))
        
        # 결과 저장
        report = classification_report(
            all_labels, all_preds,
            target_names=target_names,
            output_dict=True,
            digits=4
        )
        results[name] = {
            "accuracy": report["accuracy"],
            "macro_f1": report["macro avg"]["f1-score"]
        }
    
    # 결과 비교
    print("\n===== Ablation Study Summary =====")
    for name, metrics in results.items():
        print(f"{name} - Accuracy: {metrics['accuracy']:.4f}, F1: {metrics['macro_f1']:.4f}")

# 수정된 evaluate_model 함수
def evaluate_model(model, loader, device, target_names, return_preds_labels=False):
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for features, labels, lengths in loader:
            features, labels = features.to(device), labels.to(device)
            outputs = model(features, lengths)
            preds = outputs.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    if return_preds_labels:
        return all_preds, all_labels
    
    print(classification_report(all_labels, all_preds, target_names=target_names, digits=4))

# Run ablation study
if __name__ == "__main__":
    # Initialize datasets and loaders
    label_map = {'Medical Instructional': 0, 'Medical Non-instructional': 1, 'Non-medical': 2}
    reverse_label_map = {v: k for k, v in label_map.items()}
    
    train_dataset = MedVidNpyDataset(npy_dir='../VideoFeatures/MedVidCL/ViT/train/', 
                                    json_path='../MedVidCL/train.json', 
                                    label_map=label_map)
    val_dataset = MedVidNpyDataset(npy_dir='../VideoFeatures/MedVidCL/ViT/val/', 
                                  json_path='../MedVidCL/val.json', 
                                  label_map=label_map)
    test_dataset = MedVidNpyDataset(npy_dir='../VideoFeatures/MedVidCL/ViT/test/', 
                                   json_path='../MedVidCL/test.json', 
                                   label_map=label_map)
    
    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=16, collate_fn=collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=16, collate_fn=collate_fn)
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Run ablation study
    run_ablation_stu  dy(train_loader, val_loader, test_loader, device)



=== Running ablation: Original ===
Epoch 1 | Loss: 164.5487 | Val Acc: 0.7833
Epoch 2 | Loss: 96.8387 | Val Acc: 0.7667
Epoch 3 | Loss: 88.7873 | Val Acc: 0.8167
Epoch 4 | Loss: 85.2697 | Val Acc: 0.7800
Epoch 5 | Loss: 80.9070 | Val Acc: 0.8200
Epoch 6 | Loss: 77.1699 | Val Acc: 0.8100
Epoch 7 | Loss: 75.4817 | Val Acc: 0.8233
Epoch 8 | Loss: 73.5645 | Val Acc: 0.8267
Epoch 9 | Loss: 69.4273 | Val Acc: 0.8233
Epoch 10 | Loss: 68.1826 | Val Acc: 0.8267
Epoch 11 | Loss: 64.7166 | Val Acc: 0.8300
Epoch 12 | Loss: 66.0535 | Val Acc: 0.8000
Epoch 13 | Loss: 60.4093 | Val Acc: 0.8333
Epoch 14 | Loss: 60.1501 | Val Acc: 0.8433
Epoch 15 | Loss: 59.7575 | Val Acc: 0.8267
Epoch 16 | Loss: 57.9551 | Val Acc: 0.8467
Epoch 17 | Loss: 54.8884 | Val Acc: 0.8000
Epoch 18 | Loss: 53.3789 | Val Acc: 0.8467
Epoch 19 | Loss: 50.6114 | Val Acc: 0.8367
Epoch 20 | Loss: 50.3679 | Val Acc: 0.8133
Epoch 21 | Loss: 49.9425 | Val Acc: 0.8300
Epoch 22 | Loss: 46.5220 | Val Acc: 0.8200
Epoch 23 | Loss: 47.9519 |