# Import

In [4]:
import random
import numpy as np
import pandas as pd
from pathlib import Path
from typing import Dict, List, Optional

import sklearn
import cv2
import torch
import torch.nn.functional as F
from PIL import Image
from tqdm import tqdm
from transformers import ViTForImageClassification, ViTImageProcessor
import wandb

# Settings

In [5]:
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [6]:
# MODEL_ID = "prithivMLmods/Deep-Fake-Detector-v2-Model"
MODEL_ID = "buildborderless/CommunityForensics-DeepfakeDet-ViT"
TEST_DIR = Path("./test_data")  # test 데이터 경로

# Submission
OUTPUT_DIR = Path("./output")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)  # output 폴더 없으면 생성

SAFE_MODEL_ID = MODEL_ID.replace("/", "_")
OUT_CSV = OUTPUT_DIR / f"{SAFE_MODEL_ID}_auxhead_submission.csv"

In [7]:
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".jfif"}
VIDEO_EXTS = {".mp4", ".mov"}

# TARGET_SIZE = (224, 224)
TARGET_SIZE = (384, 384)
NUM_FRAMES = 10  # 비디오 샘플링 프레임 수

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {DEVICE}")

Device: cuda


# Utils

In [8]:
def uniform_frame_indices(total_frames: int, num_frames: int) -> np.ndarray:
    """비디오 프레임을 균등하게 샘플링"""
    if total_frames <= 0:
        return np.array([], dtype=int)
    if total_frames <= num_frames:
        return np.arange(total_frames, dtype=int)
    return np.linspace(0, total_frames - 1, num_frames, dtype=int)

def get_full_frame_padded(pil_img: Image.Image, target_size=(384, 384)) -> Image.Image:
    """전체 이미지를 비율 유지하며 정사각형 패딩 처리"""
    img = pil_img.convert("RGB")
    img.thumbnail(target_size, Image.BICUBIC)
    new_img = Image.new("RGB", target_size, (0, 0, 0))
    new_img.paste(img, ((target_size[0] - img.size[0]) // 2,
                        (target_size[1] - img.size[1]) // 2))
    return new_img

def read_rgb_frames(file_path: Path, num_frames: int = NUM_FRAMES) -> List[np.ndarray]:
    """이미지 또는 비디오에서 RGB 프레임 추출"""
    ext = file_path.suffix.lower()
    
    # 이미지 파일
    if ext in IMAGE_EXTS:
        try:
            img = Image.open(file_path).convert("RGB")
            return [np.array(img)]
        except Exception:
            return []
    
    # 비디오 파일
    if ext in VIDEO_EXTS:
        cap = cv2.VideoCapture(str(file_path))
        total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        if total <= 0:
            cap.release()
            return []
        
        frame_indices = uniform_frame_indices(total, num_frames)
        frames = []
        
        for idx in frame_indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, int(idx))
            ret, frame = cap.read()
            if not ret:
                continue
            frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        
        cap.release()
        return frames
    
    return []

# Data Preprocessing

In [9]:
class PreprocessOutput:
    def __init__(
        self,
        filename: str,
        imgs: List[Image.Image],
        error: Optional[str] = None
    ):
        self.filename = filename
        self.imgs = imgs
        self.error = error

def preprocess_one(file_path: Path, num_frames: int = NUM_FRAMES) -> PreprocessOutput:
    """
    파일 하나에 대한 전처리 수행
    
    Args:
        file_path: 처리할 파일 경로
        num_frames: 비디오에서 추출할 프레임 수
    
    Returns:
        PreprocessOutput 객체
    """
    try:
        frames = read_rgb_frames(file_path, num_frames=num_frames)
              
        imgs: List[Image.Image] = []
        
        for rgb in frames:     
            imgs.append(get_full_frame_padded(Image.fromarray(rgb), TARGET_SIZE))
        
        return PreprocessOutput(file_path.name, imgs, None)
    
    except Exception as e:
        return PreprocessOutput(file_path.name, [], str(e))

# Model Load

In [10]:
import torch
import torch.nn as nn
from transformers import ViTForImageClassification


In [11]:
class ViTWithAuxHead(nn.Module):
    def __init__(self, model_id, device):
        super().__init__()

        # 기존 모델 그대로 로드
        self.model = ViTForImageClassification.from_pretrained(model_id)
        self.model.to(device)

        hidden = self.model.config.hidden_size
        num_labels = self.model.config.num_labels

        
        # auxiliary head
        self.aux_classifier = nn.Linear(hidden, num_labels)

        # backbone freeze
        for p in self.model.vit.parameters():
            p.requires_grad = False

    def forward(self, pixel_values, labels=None, aux_weight=0.3):
        outputs = self.model.vit(
            pixel_values=pixel_values,
            return_dict=True
        )
    
        cls = outputs.last_hidden_state[:, 0]
    
        logits = self.model.classifier(cls)
        aux_logits = self.aux_classifier(cls)
    
        loss = None
        if labels is not None:
            ce = nn.CrossEntropyLoss()
            main_loss = ce(logits, labels)
            aux_loss = ce(aux_logits, labels)
            loss = main_loss + aux_weight * aux_loss
    
        return {
            "loss": loss,
            "logits": logits,
            "aux_logits": aux_logits
        }


In [12]:
print("Loading model with auxiliary head...")
model = ViTWithAuxHead(MODEL_ID, DEVICE).to(DEVICE)
model.eval()

processor = ViTImageProcessor.from_pretrained(
    MODEL_ID,
    size={"height": 384, "width": 384},
    do_resize=True
)

print(f"Model loaded: {MODEL_ID}")
print(f"num_labels: {model.model.config.num_labels}")
print(f"id2label: {model.model.config.id2label}")


Loading model with auxiliary head...




Model loaded: buildborderless/CommunityForensics-DeepfakeDet-ViT
num_labels: 2
id2label: {0: 'LABEL_0', 1: 'LABEL_1'}


In [13]:
def infer_fake_logits(imgs):
    logits_list = []

    with torch.no_grad():
        for img in imgs:
            out = model(pixel_values=img)
            logit = out["logits"].item()   # binary logit (1D)
            logits_list.append(logit)

    return logits_list


In [14]:
def infer_fake_probs(imgs, w_main=0.7, w_aux=0.3):
    inputs = processor(images=imgs, return_tensors="pt").to(DEVICE)

    with torch.no_grad():
        outputs = model(**inputs)

        logits = outputs["logits"]
        aux_logits = outputs["aux_logits"]

        # logit-level ensemble
        final_logits = w_main * logits + w_aux * aux_logits

    probs = torch.softmax(final_logits, dim=-1)[:, 1]
    return probs.cpu().numpy().tolist()


# Dataset 정의
- 학습시킬 dataset 처리
- ds = load_dataset(
    "Hemgg/deep-fake-detection-dfd-entire-original-dataset",
    streaming = True
)
- ds2 = load_dataset("OpenRL/DeepFakeFace",
                   streaming = True)

- ds3 = load_dataset("UniDataPro/deepfake-videos-dataset",
                   cache_dir="C:/Users/yjneo/workspace/hecto_deepfake/data")

In [15]:
from torch.utils.data import Dataset
from pathlib import Path
from PIL import Image
import torch

In [16]:
# 라벨링 정의
LABEL_MAP = {
    "wiki": 0,           # real
    "inpainting": 1,     # fake
    "insight": 1,        # fake
    "text2img": 1,       # fake
}



In [17]:
# jpeg augmentation
import io
import random
from PIL import Image

class RandomJPEGCompression:
    def __init__(self, quality_range=(30, 100), p=0.5):
        self.quality_range = quality_range
        self.p = p

    def __call__(self, img):
        if random.random() > self.p:
            return img

        quality = random.randint(*self.quality_range)
        buffer = io.BytesIO()
        img.save(buffer, format="JPEG", quality=quality)
        buffer.seek(0)
        return Image.open(buffer).convert("RGB")


In [18]:
# random gamma
import random
import random
import torchvision.transforms.functional as TF

class RandomGamma:
    def __init__(self, gamma_range=(0.7, 1.5), p=0.5):
        self.gamma_range = gamma_range
        self.p = p

    def __call__(self, img):
        if random.random() > self.p:
            return img

        gamma = random.uniform(*self.gamma_range)
        return TF.adjust_gamma(img, gamma)


In [19]:
# augmentation transform 정의
from torchvision import transforms
from torchvision import transforms

train_transform = transforms.Compose([
    transforms.Resize(448),
    transforms.RandomCrop(384),

    transforms.ColorJitter(
        brightness=0.2,
        contrast=0.2,
        saturation=0.05,
    ),

    RandomGamma(gamma_range=(0.7, 1.5), p=0.4),
    RandomJPEGCompression(quality_range=(30, 100), p=0.4),

    transforms.RandomApply(
        [transforms.GaussianBlur(kernel_size=3)],
        p=0.2
    ),

    transforms.ToTensor(),
])

val_transform = transforms.Compose([
    transforms.Resize(448),
    transforms.CenterCrop(384),
    transforms.ToTensor(),
])



In [20]:
from torch.utils.data import Dataset
from pathlib import Path
from PIL import Image

from torch.utils.data import Dataset
from pathlib import Path
from PIL import Image
import torch
from torchvision import transforms

class DeepFakeImageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = Path(root_dir)
        self.transform = transform
        self.samples = []

        for folder in self.root_dir.iterdir():
            if not folder.is_dir():
                continue
            if folder.name not in LABEL_MAP:
                continue

            label = LABEL_MAP[folder.name]

            for img_path in folder.rglob("*"):
                if img_path.suffix.lower() in [".jpg", ".jpeg", ".png"]:
                    self.samples.append((img_path, label))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        image = Image.open(img_path).convert("RGB")

        # transform 적용 (Tensor 변환)
        image = self.transform(image)

        return {
            "pixel_values": image,
            "labels": torch.tensor(label, dtype=torch.long)  # label도 Tensor로
        }


In [17]:
# zip 파일로 다운로드
from huggingface_hub import snapshot_download

local_dir = "./deepfakeface_raw"

snapshot_download(
    repo_id="OpenRL/DeepFakeFace",
    repo_type="dataset",          # ← 이 줄이 없어서 404가 난 것
    allow_patterns=["*.zip"],
    local_dir=local_dir,
    local_dir_use_symlinks=False
)


For more details, check out https://huggingface.co/docs/huggingface_hub/main/en/guides/download#download-files-to-local-folder.
Fetching 4 files: 100%|██████████| 4/4 [00:09<00:00,  2.32s/it]


'/workspace/hecto_deepfake/notebooks/deepfakeface_raw'

In [18]:
# 압축해제
import zipfile
from pathlib import Path

raw_dir = Path("./deepfakeface_raw")
out_dir = Path("./deepfakeface_extracted")

out_dir.mkdir(exist_ok=True)

for zip_path in raw_dir.glob("*.zip"):
    target_dir = out_dir / zip_path.stem
    target_dir.mkdir(exist_ok=True)

    with zipfile.ZipFile(zip_path, "r") as z:
        z.extractall(target_dir)


In [21]:
# 라벨링 확인
full_train_dataset = DeepFakeImageDataset(
    "./deepfakeface_extracted",
    transform=train_transform
)

full_val_dataset = DeepFakeImageDataset(    # no augmentation
    "./deepfakeface_extracted",
    transform=val_transform
)

from collections import Counter
labels = [label for _, label in full_train_dataset.samples]
pos_ratio = sum(labels) / len(labels)
print(Counter(labels))
print(pos_ratio)

Counter({1: 90000, 0: 30000})
0.75


In [22]:
# real/fake 가중치
pos_weight = (1 - pos_ratio) / pos_ratio
pos_weight = torch.tensor(pos_weight).to(DEVICE)
criterion = torch.nn.BCEWithLogitsLoss(pos_weight=pos_weight)

In [23]:
# train/test split
from torch.utils.data import random_split

train_size = int(0.8 * len(full_train_dataset))
val_size = len(full_train_dataset) - train_size

train_dataset, val_dataset = random_split(
    full_train_dataset,
    [train_size, val_size],
    generator=torch.Generator().manual_seed(42)
)


In [24]:
# 일부만 사용 ()
from torch.utils.data import Subset

train_subset_size = min(500, len(train_dataset))
val_subset_size = min(100, len(val_dataset))

small_train_dataset = Subset(train_dataset, range(train_subset_size))
small_val_dataset = Subset(val_dataset, range(val_subset_size))
print(len(small_train_dataset))
print(len(small_val_dataset))


500
100


# Dataset 준비

In [52]:
num_epochs = 50

In [53]:
# collate_fn 학습과 추론 입력 구조를 동일하게 유지 

def collate_fn(batch):
    images, labels = zip(*batch)
    return {
        "pixel_values": torch.stack(images),
        "labels": torch.tensor(labels)
    }



In [54]:
from torch.utils.data import DataLoader

train_loader = DataLoader(
    train_dataset,
    batch_size=16,
    num_workers=4,        # 먼저 0으로 시작
    pin_memory=True
)

val_loader = DataLoader(
    val_dataset,
    batch_size=16,
    num_workers=4,
    pin_memory=True
)


## model 구성
- backbone을 freeze 하고 classifier만 학습시킨다.
- optimizer, scheduler, earlystopping

In [55]:
# backbone 먼저 freeze
# for param in model.vit.parameters():
#     param.requires_grad = False

In [56]:
# # unfreeze
# N = 2  # 마지막 N개 block unfreeze
# for layer in model.vit.encoder.layer[-N:]:
#     for param in layer.parameters():
#         param.requires_grad = True



In [57]:

# 학습 되는 layer 확인 (선택)
trainable = sum(p.requires_grad for p in model.parameters())
total = sum(1 for _ in model.parameters())
print(f"Trainable params: {trainable} / {total}")

Trainable params: 4 / 202


In [58]:
# Oprimizer/Scheduler
Learning_Rate = 1e-4
DEVICE = "cuda"
optimizer = torch.optim.AdamW(
    model.parameters(),
    lr=Learning_Rate,
    weight_decay=1e-4
)

scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=len(train_loader) * num_epochs
)


In [59]:
# early stopping
class EarlyStopping:
    def __init__(self, patience=4, min_delta=0.0001):
        self.patience = patience
        self.min_delta = min_delta
        self.best_score = None
        self.counter = 0

    def step(self, score):
        if self.best_score is None:
            self.best_score = score
            return False  # stop = False

        if score < self.best_score + self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                return True  # stop = True
        else:
            self.best_score = score
            self.counter = 0

        return False


In [60]:
# training loop  
from torch.cuda.amp import autocast
pos_weight = torch.tensor(pos_ratio, device=DEVICE)
criterion = torch.nn.BCEWithLogitsLoss(pos_weight=pos_weight)
def train_one_epoch(model, loader, aux_weight):
    model.train()
    total_loss = 0.0

    for batch in loader:
        batch = {k: v.to(DEVICE) for k, v in batch.items()}
        optimizer.zero_grad(set_to_none=True)

        with autocast():
            outputs = model(
                pixel_values=batch["pixel_values"],
                labels=batch["labels"],
                aux_weight=aux_weight
            )
            loss = outputs["loss"]

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        total_loss += loss.item()

    return total_loss / len(loader)


In [61]:
def train_one_epoch_cpu(model, loader):
    model.train()
    total_loss = 0.0

    for batch in loader:
        batch = {k: v.to(DEVICE) for k, v in batch.items()}
        optimizer.zero_grad()

        outputs = model(**batch)
        loss = outputs.loss

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(loader)


In [62]:
# auxiliary weight 제어
def get_aux_weight(epoch):
    if epoch < 10:
        return 0.3
    elif epoch < 20:
        return 0.2
    else:
        return 0.1


In [63]:
import torch.nn.functional as F

from sklearn.metrics import roc_auc_score


@torch.inference_mode()

def validate(model, loader):
    model.eval()
    total_loss = 0.0
    all_labels = []
    all_probs = []

    for batch in loader:
        batch = {k: v.to(DEVICE, non_blocking=True) for k, v in batch.items()}
        outputs = model(**batch)
        loss = outputs.loss
        total_loss += loss.item()

        probs = torch.softmax(outputs.logits, dim=1)[:, 1]  # fake probability
        all_probs.extend(probs.cpu().tolist())
        all_labels.extend(batch["labels"].cpu().tolist())

    val_loss = total_loss / len(loader)
    val_auc = roc_auc_score(all_labels, all_probs)
    return val_loss, val_auc



# training

In [64]:
# # wandb
# num_epochs = 1
# Learning_Rate = 1e-4 
# import wandb

# run = wandb.init(
#     entity="yjneon339-kyonggi-university",   # 팀명 또는 계정명
#     project="dacon_hecto_deepfake",          # 프로젝트명
#     config={
#         "learning_rate": Learning_Rate,
#         "architecture": MODEL_ID,
#         "dataset": 'hf_openrl',
#         "epochs": num_epochs,
#         "batch_size": train_loader.batch_size
#     }
# )


In [65]:
@torch.no_grad()
def validate(model, loader):
    model.eval()
    total_loss = 0.0
    all_probs, all_labels = [], []

    for batch in loader:
        batch = {k: v.to(DEVICE) for k, v in batch.items()}

        outputs = model(
            pixel_values=batch["pixel_values"],
            labels=batch["labels"]  # loss 계산용 (aux_weight 없이)
        )

        logits = outputs["logits"]          # [B, 2]
        loss = outputs["loss"]              # CE 기반 loss

        probs = F.softmax(logits, dim=1)[:, 1]  # fake class prob

        total_loss += loss.item()
        all_probs.append(probs.cpu())
        all_labels.append(batch["labels"].cpu())

    val_auc = roc_auc_score(
        torch.cat(all_labels).numpy(),
        torch.cat(all_probs).numpy()
    )

    return total_loss / len(loader), val_auc


In [66]:
from tqdm import tqdm
import torch
from torch.cuda.amp import autocast, GradScaler

scaler = GradScaler()

best_val_auc = 0.0
early_stopper = EarlyStopping(patience=10, min_delta=0.0003)
epoch_bar = tqdm(range(num_epochs), desc="Training", position=0)

for epoch in epoch_bar:
    aux_weight = get_aux_weight(epoch)
    train_loss = train_one_epoch(model, train_loader, aux_weight)
    val_loss, val_auc = validate(model, val_loader)

    # tqdm에 실시간 표시
    epoch_bar.set_postfix({
        "train_loss": f"{train_loss:.4f}",
        "val_loss": f"{val_loss:.4f}",
        "val_auc": f"{val_auc:.4f}"
    })

    # # W&B 로깅
    # wandb.log({
    #     "epoch": epoch + 1,
    #     "train_loss": train_loss,
    #     "val_loss": val_loss,
    #     "val_auc": val_auc
    # })

    # best model 저장
    if val_auc > best_val_auc:
        best_val_auc = val_auc
        torch.save(model.state_dict(), "best_model.pt")

    # early stopping
    if early_stopper.step(val_auc):
        epoch_bar.write(
            f"Early stopping triggered at epoch {epoch+1} "
            f"(best val_auc={early_stopper.best_score:.4f})"
        )
        break


Training:  14%|█▍        | 7/50 [1:25:08<8:42:58, 729.74s/it, train_loss=0.7148, val_loss=0.7144, val_auc=0.6145]


KeyboardInterrupt: 

# inference

In [None]:
# 학습된 모델 가져오기
print("Loading model...")

model = ViTWithAuxHead(MODEL_ID, DEVICE)
ckpt = torch.load("best_model.pt", map_location=DEVICE)

model.load_state_dict(ckpt)
model.to(DEVICE)
model.eval()

processor = ViTImageProcessor.from_pretrained(
    MODEL_ID,
    size={"height": 384, "width": 384},
    do_resize=True
)

print("Model loaded from best_model.pt")
print(f"num_labels: {model.model.config.num_labels}")
print(f"id2label: {model.model.config.id2label}")


In [None]:
# 추론하기
files = sorted([p for p in TEST_DIR.iterdir() if p.is_file()])
print(f"Test data length: {len(files)}")

results: Dict[str, float] = {}

# 전처리 및 추론
for file_path in tqdm(files, desc="Processing"):
    out = preprocess_one(file_path)
    
    # 1. 에러 로깅
    if out.error:
        print(f"[WARN] {out.filename}: {out.error}")
    
    # 2. 정상 추론
    elif out.imgs:
        logits = infer_fake_logits(out.img)

        if logits:
            mean_logit = floar(np.mean(logits))
            prob = 1 / (1 + np.exp(-mean_logit))
            results[out.filename] = prob
    
    # 3. 둘 다 없으면 0.0 (real)
    else:
        results[out.filename] = 0.0

print(f"Inference completed. Processed: {len(results)} files")

# submission

In [None]:
submission = pd.read_csv('C:/Users/yjneo/workspace/hecto_deepfake/sample_submission.csv')
submission['prob'] = submission['filename'].map(results).fillna(0.0)

# CSV 저장
submission.to_csv(OUT_CSV, encoding='utf-8-sig', index=False)
print(f"Saved submission to: {OUT_CSV}")