## 딥페이크 범죄 대응을 위한 AI 탐지 모델 경진대회

**※주의** : 반드시 본 파일을 이용하여 제출을 수행해야 하며, 파일의 이름은 `task.ipynb`로 유지되어야 합니다.

* #### 추론 실행 환경
    * `python 3.10` 환경
    * `CUDA 12.6`를 지원합니다.
    * torch 버전: `2.7.1`

| Python | CUDA | torch |
|--------|------|-------|
| 3.10   | 12.6 | 2.7.1 |

* #### `task.ipynb` 작성 규칙
코드는 크게 3가지 파트로 구성되며, 해당 파트의 특성을 지켜서 내용을 편집하세요.   
1. **제출용 aifactory 라이브러리 및 추가 필요 라이브러리 설치**
    - 채점 및 제출을 위한 aifactory 라이브러리를 설치하는 셀입니다. 이 부분은 수정하지 않고 그대로 실행합니다.
    - 그 외로, 모델 추론에 필요한 라이브러리를 직접 설치합니다.
2. **추론용 코드 작성**
    - 모델 로드, 데이터 전처리, 예측 등 실제 추론을 수행하는 모든 코드를 이 영역에 작성합니다.
3. **aif.submit() 함수를 호출하여 최종 결과를 제출**
    - **마이 페이지-활동히스토리**에서 발급받은 key 값을 함수의 인자로 정확히 입력해야 합니다.

------

#### 1. 제출용 aifactory 라이브러리 설치
※ 결과 전송에 필요하므로 아래와 같이 aifactory 라이브러리가 반드시 최신버전으로 설치될 수 있게끔 합니다

In [1]:
!pip install -U aifactory

Defaulting to user installation because normal site-packages is not writeable


* 자신의 모델 추론 실행에 필요한 추가 라이브러리 설치

In [None]:
# FSFM 모델 추론에 필요한 라이브러리 설치
# 대회 서버 환경: Python 3.10 + CUDA 12.6 + torch 2.7.1 (기본 설치)

!pip install timm==0.4.5 --no-cache-dir --quiet
!pip install opencv-python-headless==4.10.0.82 --no-cache-dir --quiet
!pip install numpy==1.26.4 --no-cache-dir --quiet
!pip install Pillow==10.0.0 --no-cache-dir --quiet
!pip install mediapipe==0.10.9 --no-cache-dir --quiet
!pip install tqdm==4.66.1 --no-cache-dir --quiet

-----

#### 2. 추론용 코드 작성

##### 추론 환경의 기본 경로 구조

- 평가 데이터셋 경로: `./data/`
   - 채점에 사용될 테스트 데이터셋은 `./data/` 디렉토리 안에 포함되어 있습니다.
   - 해당 디렉토리에는 이미지(JPG, PNG)와 동영상(MP4) 파일이 별도의 하위 폴더 없이 혼합되어 있습니다.
```bash
/aif/
└── data/
    ├── {이미지 데이터1}.jpg
    ├── {이미지 데이터2}.png
    ├── {동영상 데이터1}.mp4
    ├── {이미지 데이터3}.png
    ├── {동영상 데이터2}.mp4
    ...
```

- 모델 및 자원 경로: 예시 : `./model/`
   - 추론 스크립트가 실행되는 위치를 기준으로, 제출된 모델 관련 파일들이 위치해야 하는 상대 경로입니다.
   - 학습된 모델 가중치(.pt, .ckpt, .pth 등)

* 제출 파일은 `submission.csv`로 저장돼야 합니다.
  * submission.csv는 *filename*과 *label* 컬럼으로 구성돼야 합니다.
  * filename은 추론한 파일의 이름(확장자 포함), label은 추론 결과입니다. (real:0, fake:1)
  * filename은 *string*, label은 *int* 자료형이어야 합니다.

| filename | label |
|----------|-------|
| {이미지 데이터1}.jpg | 0 |
| {동영상 데이터1}.mp4 | 1 |
| ... | ... |

**※ 주의 사항**

* argparse 사용시 `args, _ = parser.parse_known_args()`로 인자를 지정하세요.   
   - `args = parser.parse_args()`는 jupyter에서 오류가 발생합니다.
* return 할 결과물과 양식에 유의하세요.

In [None]:
import os
import sys
from PIL import Image
import cv2
from pathlib import Path
import numpy as np
import csv
import torch
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm
from torchvision import transforms
import time
import warnings
from functools import partial
import argparse
warnings.filterwarnings('ignore')

# Mediapipe 로그 억제
os.environ['GLOG_minloglevel'] = '3'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

try:
    import mediapipe as mp
    MEDIAPIPE_AVAILABLE = True
except ImportError:
    MEDIAPIPE_AVAILABLE = False

# ============================================================
# FSFM Vision Transformer Model Definition
# ============================================================
import timm.models.vision_transformer

class VisionTransformer(timm.models.vision_transformer.VisionTransformer):
    """ Vision Transformer with support for global average pooling """
    def __init__(self, global_pool=False, **kwargs):
        super(VisionTransformer, self).__init__(**kwargs)
        self.global_pool = global_pool
        if self.global_pool:
            norm_layer = kwargs['norm_layer']
            embed_dim = kwargs['embed_dim']
            self.fc_norm = norm_layer(embed_dim)
            del self.norm

    def forward_features(self, x):
        B = x.shape[0]
        x = self.patch_embed(x)
        cls_tokens = self.cls_token.expand(B, -1, -1)
        x = torch.cat((cls_tokens, x), dim=1)
        x = x + self.pos_embed
        x = self.pos_drop(x)
        for blk in self.blocks:
            x = blk(x)
        if self.global_pool:
            x = x[:, 1:, :].mean(dim=1)
            outcome = self.fc_norm(x)
        else:
            x = self.norm(x)
            outcome = x[:, 0]
        return outcome

def vit_base_patch16(**kwargs):
    model = VisionTransformer(
        patch_size=16, embed_dim=768, depth=12, num_heads=12, mlp_ratio=4, qkv_bias=True,
        norm_layer=partial(nn.LayerNorm, eps=1e-6), **kwargs)
    return model

# ============================================================
# Face Detection Utilities (Mediapipe + Haar Cascade only)
# ============================================================
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".JPG", ".JPEG", ".PNG"}
VIDEO_EXTS = {".avi", ".mp4", ".AVI", ".MP4"}

HAAR_FACE_CASCADE = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_alt2.xml')

if MEDIAPIPE_AVAILABLE:
    try:
        mp_face_detection = mp.solutions.face_detection
        MEDIAPIPE_DETECTOR = mp_face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.3)
    except:
        MEDIAPIPE_DETECTOR = None
        MEDIAPIPE_AVAILABLE = False
else:
    MEDIAPIPE_DETECTOR = None

def detect_face_haar(image_np, target_size=(224, 224)):
    """Face detection using OpenCV Haar Cascade"""
    gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY)
    faces = HAAR_FACE_CASCADE.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(30, 30))
    if len(faces) == 0:
        return None
    x, y, w, h = max(faces, key=lambda f: f[2] * f[3])
    expansion = int(max(w, h) * 0.3)
    x = max(0, x - expansion)
    y = max(0, y - expansion)
    w = min(image_np.shape[1] - x, w + 2 * expansion)
    h = min(image_np.shape[0] - y, h + 2 * expansion)
    cropped_np = image_np[y:y + h, x:x + w]
    face_img = Image.fromarray(cropped_np).resize(target_size, Image.BICUBIC)
    return face_img

def detect_face_mediapipe(image_np, target_size=(224, 224)):
    """Face detection using Mediapipe"""
    if MEDIAPIPE_DETECTOR is None:
        return None
    image_rgb = cv2.cvtColor(image_np, cv2.COLOR_RGB2RGB)
    results = MEDIAPIPE_DETECTOR.process(image_rgb)
    if not results.detections:
        return None
    detection = max(results.detections, key=lambda d: d.score[0])
    h, w = image_np.shape[:2]
    bbox = detection.location_data.relative_bounding_box
    x1 = int(bbox.xmin * w)
    y1 = int(bbox.ymin * h)
    x2 = int((bbox.xmin + bbox.width) * w)
    y2 = int((bbox.ymin + bbox.height) * h)
    margin = int(max(x2 - x1, y2 - y1) * 0.1)
    x1 = max(0, x1 - margin)
    y1 = max(0, y1 - margin)
    x2 = min(w, x2 + margin)
    y2 = min(h, y2 + margin)
    cropped_np = image_np[y1:y2, x1:x2]
    if cropped_np.size == 0:
        return None
    face_img = Image.fromarray(cropped_np).resize(target_size, Image.BICUBIC)
    return face_img

def detect_and_crop_face_multi(image: Image.Image, target_size=(224, 224)):
    """Multi-method face detection with fallback (Mediapipe -> Haar -> Full Image)"""
    if image.mode != 'RGB':
        image = image.convert('RGB')
    image_np = np.array(image)
    
    # Try Mediapipe first
    if MEDIAPIPE_AVAILABLE:
        try:
            face_img = detect_face_mediapipe(image_np, target_size)
            if face_img:
                return face_img
        except:
            pass
    
    # Try Haar Cascade
    try:
        face_img = detect_face_haar(image_np, target_size)
        if face_img:
            return face_img
    except:
        pass
    
    # Fallback: resize original image
    resized_img = Image.fromarray(image_np).resize(target_size, Image.BICUBIC)
    return resized_img

def process_video_frames(video_path, num_frames=10, max_duration=10):
    """Extract and process frames from video"""
    face_images = []
    cap = None
    try:
        cap = cv2.VideoCapture(str(video_path))
        if not cap.isOpened():
            return face_images
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        if total_frames <= 0:
            return face_images
        fps = cap.get(cv2.CAP_PROP_FPS)
        if fps > 0:
            max_frames = int(fps * max_duration)
            total_frames = min(total_frames, max_frames)
        frame_indices = np.linspace(0, total_frames - 1, num_frames, dtype=int)
        for idx in frame_indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ret, frame = cap.read()
            if not ret:
                continue
            image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            face_img = detect_and_crop_face_multi(image)
            if face_img:
                face_images.append(face_img)
    except:
        pass
    finally:
        if cap is not None:
            cap.release()
    return face_images

# ============================================================
# Main Inference Logic
# ============================================================
if __name__ == "__main__" or True:
    print("Starting inference...")


    # Model path and test data path
    model_weights_path = "./model/fsfm_vit_base_checkpoint.pth"
    test_dataset_path = Path("./data")
    output_csv_path = Path("submission.csv")

    # Load model
    print("Loading model...")
    model = vit_base_patch16(num_classes=2, global_pool=True, drop_path_rate=0.1)
    
    # Load checkpoint with safe globals for PyTorch 2.6+
    torch.serialization.add_safe_globals([argparse.Namespace])
    checkpoint = torch.load(model_weights_path, map_location='cpu', weights_only=True)
    if 'model' in checkpoint:
        checkpoint_model = checkpoint['model']
    else:
        checkpoint_model = checkpoint
    
    # Remove head weights if shape mismatch
    state_dict = model.state_dict()
    for k in ['head.weight', 'head.bias']:
        if k in checkpoint_model and checkpoint_model[k].shape != state_dict[k].shape:
            print(f"Removing key {k} from pretrained checkpoint")
            del checkpoint_model[k]
    
    load_result = model.load_state_dict(checkpoint_model, strict=False)


    # Device setup
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = model.to(device)
    model.eval()
    print(f"Model ready on {device}")

    # Image preprocessing
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    

    # Get all files from data directory (including subdirectories)
    files = []
    dir_stats = {}
    for root, dirs, filenames in os.walk(test_dataset_path):
        root_path = Path(root)
        rel_path = root_path.relative_to(test_dataset_path) if root_path != test_dataset_path else Path(".")
        dir_stats[str(rel_path)] = len(filenames)
        for filename in filenames:
            files.append(Path(root) / filename)
    
    total_files = len(files)
    print(f"Processing {total_files} files")
    for dir_name, count in sorted(dir_stats.items()):
        print(f"  {dir_name}: {count} files")
    
    # CSV header
    with open(output_csv_path, mode="w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["filename", "label"])

    num_frames_to_extract = 10
    results = []
    error_count = 0
    skipped_count = 0
    start_time = time.time()

    # Process files
    for idx, file_path in enumerate(tqdm(files, desc="Processing", ncols=80)):
        face_images = []
        ext = file_path.suffix.lower()
        predicted_class = 0

        try:
            if ext in IMAGE_EXTS:
                image = Image.open(file_path)
                face_img = detect_and_crop_face_multi(image)
                if face_img:
                    face_images = [face_img]

            elif ext in VIDEO_EXTS:
                face_images = process_video_frames(file_path, num_frames_to_extract, max_duration=10)
                if len(face_images) == 0:
                    # Fallback: use first frame
                    try:
                        cap = cv2.VideoCapture(str(file_path))
                        ret, frame = cap.read()
                        cap.release()
                        if ret:
                            fallback_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
                            face_img = detect_and_crop_face_multi(fallback_image)
                            if face_img:
                                face_images = [face_img]
                    except:
                        pass
            else:
                # Unknown extension - still process as image attempt
                try:
                    image = Image.open(file_path)
                    face_img = detect_and_crop_face_multi(image)
                    if face_img:
                        face_images = [face_img]
                except:
                    pass

            # Inference
            if len(face_images) > 0:
                with torch.no_grad():
                    # Use all available frames (up to 10 for videos)
                    batch = face_images[:min(len(face_images), 10)]
                    img_tensors = torch.stack([transform(img) for img in batch]).to(device)
                    
                    # Forward pass for each frame
                    logits_list = []
                    for img_tensor in img_tensors:
                        logits = model(img_tensor.unsqueeze(0))
                        logits_list.append(logits)
                    
                    # Average predictions across frames
                    avg_logits = torch.mean(torch.cat(logits_list, dim=0), dim=0, keepdim=True)
                    probs = F.softmax(avg_logits, dim=1)
                    predicted_class = torch.argmax(probs).item()
                    
                    # GPU memory cleanup
                    del img_tensors, logits_list, avg_logits, probs
                    if device == "cuda":
                        torch.cuda.empty_cache()

        except Exception as e:
            error_count += 1
            predicted_class = 0

        # Store result - ALWAYS store a result for every file
        results.append([file_path.name, int(predicted_class)])



    # Final statistics
    elapsed_total = time.time() - start_time
    print(f"\nCompleted {len(results)} files in {elapsed_total/60:.1f} min")

    # Write results to CSV
    with open(output_csv_path, mode="a", newline="") as f:
        writer = csv.writer(f)
        for row in results:
            writer.writerow(row)

    # CSV validation
    with open(output_csv_path, mode="r") as f:
        data_rows = sum(1 for _ in f) - 1
    print(f"CSV: {data_rows}/{total_files} rows | {'✓ OK' if data_rows == total_files else '✗ MISMATCH'}")

----

#### 3. `aif.submit()` 함수를 호출하여 최종 결과를 제출

**※주의** : task별, 참가자별로 key가 다릅니다. 잘못 입력하지 않도록 유의바랍니다.
- key는 대회 페이지 [베이스라인 코드](https://aifactory.space/task/9197/baseline) 탭에 기재된 가이드라인을 따라 task 별로 확인하실 수 있습니다.
- key가 틀리면 제출이 진행되지 않거나 잘못 제출되므로 task에 맞는 자신의 key를 사용해야 합니다.

In [2]:
import aifactory.score as aif
import time
t = time.time()

#-----------------------------------------------------#
aif.submit(model_name="FSFM-ViT-Base",
    key="cae0fbcb-0410-4084-a308-21c98d8d886b"  # ← 여기에 본인의 key를 입력하세요
)
#-----------------------------------------------------#
print(time.time() - t)

file : task
jupyter notebook
제출 완료
283.56489753723145
