# Inference & Submission using Hugging Face Model
This notebook uses the OwensLab/commfor-model-384 from Hugging Face for deepfake detection inference and submission generation.

# Import

In [3]:
import random
import numpy as np
import pandas as pd
from pathlib import Path
from typing import Dict, List, Optional

import cv2
import torch
from PIL import Image
from tqdm import tqdm

# Hugging Face model imports
import models
import dataprocessor_hf as dphf

# Settings

In [4]:
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [11]:
# Model settings
MODEL_NAME = 'OwensLab/commfor-model-384'
PROCESSOR_NAME = 'OwensLab/commfor-data-preprocessor'
INPUT_SIZE = 384

# Data paths
TEST_DIR = Path("./test_images")  # test 데이터 경로

# Submission
OUTPUT_DIR = Path("./output")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

SAFE_MODEL_NAME = MODEL_NAME.replace("/", "_")
OUT_CSV = OUTPUT_DIR / f"{SAFE_MODEL_NAME}_submission.csv"

In [12]:
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".jfif"}
VIDEO_EXTS = {".mp4", ".mov"}

TARGET_SIZE = (384, 384)
NUM_FRAMES = 10  # 비디오 샘플링 프레임 수

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {DEVICE}")

Device: cuda


# Utils

In [13]:
def uniform_frame_indices(total_frames: int, num_frames: int) -> np.ndarray:
    """비디오 프레임을 균등하게 샘플링"""
    if total_frames <= 0:
        return np.array([], dtype=int)
    if total_frames <= num_frames:
        return np.arange(total_frames, dtype=int)
    return np.linspace(0, total_frames - 1, num_frames, dtype=int)

def get_full_frame_padded(pil_img: Image.Image, target_size=(384, 384)) -> Image.Image:
    """전체 이미지를 비율 유지하며 정사각형 패딩 처리"""
    img = pil_img.convert("RGB")
    img.thumbnail(target_size, Image.BICUBIC)
    new_img = Image.new("RGB", target_size, (0, 0, 0))
    new_img.paste(img, ((target_size[0] - img.size[0]) // 2,
                        (target_size[1] - img.size[1]) // 2))
    return new_img

def read_rgb_frames(file_path: Path, num_frames: int = NUM_FRAMES) -> List[np.ndarray]:
    """이미지 또는 비디오에서 RGB 프레임 추출"""
    ext = file_path.suffix.lower()
    
    # 이미지 파일
    if ext in IMAGE_EXTS:
        try:
            img = Image.open(file_path).convert("RGB")
            return [np.array(img)]
        except Exception:
            return []
    
    # 비디오 파일
    if ext in VIDEO_EXTS:
        cap = cv2.VideoCapture(str(file_path))
        total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        if total <= 0:
            cap.release()
            return []
        
        frame_indices = uniform_frame_indices(total, num_frames)
        frames = []
        
        for idx in frame_indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, int(idx))
            ret, frame = cap.read()
            if not ret:
                continue
            frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        
        cap.release()
        return frames
    
    return []

# Data Preprocessing

In [14]:
class PreprocessOutput:
    def __init__(
        self,
        filename: str,
        imgs: List[Image.Image],
        error: Optional[str] = None
    ):
        self.filename = filename
        self.imgs = imgs
        self.error = error

def preprocess_one(file_path: Path, num_frames: int = NUM_FRAMES) -> PreprocessOutput:
    """
    파일 하나에 대한 전처리 수행
    
    Args:
        file_path: 처리할 파일 경로
        num_frames: 비디오에서 추출할 프레임 수
    
    Returns:
        PreprocessOutput 객체
    """
    try:
        frames = read_rgb_frames(file_path, num_frames=num_frames)
              
        imgs: List[Image.Image] = []
        
        for rgb in frames:     
            imgs.append(get_full_frame_padded(Image.fromarray(rgb), TARGET_SIZE))
        
        return PreprocessOutput(file_path.name, imgs, None)
    
    except Exception as e:
        return PreprocessOutput(file_path.name, [], str(e))

# Model Load

In [15]:
print("Loading data processor and model...")

# Load data processor from Hugging Face
data_processor = dphf.CommForImageProcessor.from_pretrained(
    PROCESSOR_NAME, 
    size=INPUT_SIZE
)

# Load model from Hugging Face
model = models.ViTClassifier.from_pretrained(MODEL_NAME).to(DEVICE)
model.eval()

print(f"Model loaded: {MODEL_NAME}")
print(f"Data processor loaded: {PROCESSOR_NAME}")
print(f"Model is on device: {next(model.parameters()).device}")

Loading data processor and model...
Model loaded: OwensLab/commfor-model-384
Data processor loaded: OwensLab/commfor-data-preprocessor
Model is on device: cuda:0


# Inference Function

In [25]:
import torchvision.transforms as T
device = "cuda" if torch.cuda.is_available() else "cpu"

def infer_fake_probs(imgs):
    probs = []
    for img in imgs:
        # PIL.Image -> torch.Tensor
        if isinstance(img, Image.Image):
            img = to_tensor(img)

        # Tensor shape 확인: (C,H,W) -> (1,C,H,W)
        if img.ndim == 3:
            img = img.unsqueeze(0)

        img = img.to(device)  # 전역 device 사용
        with torch.no_grad():
            logit = model(img)
            prob = torch.sigmoid(logit).item()
            probs.append(prob)
    return probs

# 변환 정의 (이미지 -> tensor, 0~1 정규화)
to_tensor = T.Compose([
    T.Resize((384, 384)),  # 모델 input size와 맞추기
    T.ToTensor()
])

def infer_fake_probs(imgs):
    probs = []
    for img in imgs:
        # PIL.Image -> torch.Tensor
        if isinstance(img, Image.Image):
            img = to_tensor(img)

        # Tensor shape 확인
        # img: (C,H,W) -> (1,C,H,W)
        if img.ndim == 3:
            img = img.unsqueeze(0)

        img = img.to(device)  # 모델 device
        with torch.no_grad():
            logit = model(img)
            prob = torch.sigmoid(logit).item()
            probs.append(prob)
    return probs


# Run Inference on Test Data

In [30]:
# Get test files
files = sorted([p for p in TEST_DIR.iterdir() if p.is_file()])
print(f"Test data length: {len(files)}")

results: Dict[str, float] = {}

# 전처리 및 추론
for file_path in tqdm(files, desc="Processing"):
    out = preprocess_one(file_path)
    
    # 1. 에러 로깅
    if out.error:
        print(f"[WARN] {out.filename}: {out.error}")
        results[out.filename] = 0.0
    
    # 2. 정상 추론
    elif out.imgs:
        probs = infer_fake_probs(out.imgs)
        results[out.filename] = float(np.mean(probs)) if probs else 0.0
    
    # 3. 둘 다 없으면 0.0 (real)
    else:
        results[out.filename] = 0.0

print(f"Inference completed. Processed: {len(results)} files")

Test data length: 10


Processing: 100%|██████████| 10/10 [00:03<00:00,  3.24it/s]

Inference completed. Processed: 10 files





In [31]:
print(results)

{'00000274.png': 0.9872192144393921, '00000420.png': 0.9979440569877625, '00000845.png': 0.000356019358150661, '00000916.png': 0.9264843463897705, '00000989.png': 0.8770726919174194, 'faceswap1.png': 0.36961260437965393, 'faceswap2.png': 0.8823122978210449, 'faceswap3.png': 0.10990641266107559, 'faceswap4.png': 0.17557691037654877, 'realimage_genvideo_kling_20260129_Image_to_Video_A_very_sho_3811_0.mp4': 0.003448340226896107}


# Create Submission

In [32]:
# Load sample submission file
# Note: Update the path to your actual sample_submission.csv file
SAMPLE_SUBMISSION_PATH = './sample_submission.csv'

submission = pd.read_csv(SAMPLE_SUBMISSION_PATH)
submission['prob'] = submission['filename'].map(results).fillna(0.0)

# Display first few rows
print("\nSubmission preview:")
print(submission.head(10))

print("\nSubmission statistics:")
print(f"Total files: {len(submission)}")
print(f"Mean probability: {submission['prob'].mean():.4f}")
print(f"Min probability: {submission['prob'].min():.4f}")
print(f"Max probability: {submission['prob'].max():.4f}")

# Save submission
submission.to_csv(OUT_CSV, encoding='utf-8-sig', index=False)
print(f"\nSaved submission to: {OUT_CSV}")


Submission preview:
       filename  prob
0  TEST_000.mp4   0.0
1  TEST_001.jpg   0.0
2  TEST_002.mp4   0.0
3  TEST_003.mp4   0.0
4  TEST_004.jpg   0.0
5  TEST_005.mp4   0.0
6  TEST_006.mp4   0.0
7  TEST_007.jpg   0.0
8  TEST_008.jpg   0.0
9  TEST_009.png   0.0

Submission statistics:
Total files: 500
Mean probability: 0.0000
Min probability: 0.0000
Max probability: 0.0000

Saved submission to: output/OwensLab_commfor-model-384_submission.csv


# Test on Sample Images (Optional)

In [None]:
# This is optional - test on a few sample images to verify the model works
# You can skip this cell if you don't have test images

# Example test images (update paths as needed)
test_images_paths = [
    # Add your test image paths here
    # "test_images/00000274.png",
    # "test_images/00000420.png",
]

if test_images_paths and all(Path(p).exists() for p in test_images_paths):
    test_imgs = [Image.open(p).convert('RGB') for p in test_images_paths]
    
    # Run inference
    probs = infer_fake_probs(test_imgs)
    
    print("\nTest image results:")
    for path, prob in zip(test_images_paths, probs):
        print(f"{Path(path).name}: {prob:.4f}")
else:
    print("No test images found or paths not configured. Skipping sample test.")