## 딥페이크 범죄 대응을 위한 AI 탐지 모델 경진대회

**※주의** : 반드시 본 파일을 이용하여 제출을 수행해야 하며, 파일의 이름은 `task.ipynb`로 유지되어야 합니다.

* #### 추론 실행 환경
    * `python 3.9` 환경
    * `CUDA 10.2`, `CUDA 11.8`, `CUDA 12.6`를 지원합니다.
    * 각 CUDA 환경에 미리 설치돼있는 torch 버전은 다음 표를 참고하세요.

<table>
  <thead>
    <tr>
      <th align="center">Python</th>
      <th align="center">CUDA</th>
      <th align="center">torch</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td align="center" style="vertical-align: middle;">3.8</td>
      <td align="center">10.2</td>
      <td align="center">1.6.0</td>
    </tr>
    <tr>
      <td align="center" style="vertical-align: middle;">3.9</td>
      <td align="center">11.8</td>
      <td align="center">1.8.0</td>
    </tr>
    <tr>
      <td align="center">3.10</td>
      <td align="center">12.6</td>
      <td align="center">2.7.1</td>
    </tr>
  </tbody>
</table>

* #### CUDA 버전 관련 안내사항  
  - 이번 경진대회는 3개의 CUDA 버전을 지원합니다.  
  - 참가자는 자신의 모델의 라이브러리 의존성에 맞는 CUDA 환경을 선택하여 모델을 제출하면 됩니다.   
  - 각 CUDA 환경에는 기본적으로 torch가 설치되어 있으나, 참가자는 제출하는 CUDA 버전과 호환되는 torch, 필요한 버전의 라이브러리를 `!pip install` 하여 사용하여도 무관합니다.

* #### `task.ipynb` 작성 규칙
코드는 크게 3가지 파트로 구성되며, 해당 파트의 특성을 지켜서 내용을 편집하세요.   
1. **제출용 aifactory 라이브러리 및 추가 필요 라이브러리 설치**
    - 채점 및 제출을 위한 aifactory 라이브러리를 설치하는 셀입니다. 이 부분은 수정하지 않고 그대로 실행합니다.
    - 그 외로, 모델 추론에 필요한 라이브러리를 직접 설치합니다.
2. **추론용 코드 작성**
    - 모델 로드, 데이터 전처리, 예측 등 실제 추론을 수행하는 모든 코드를 이 영역에 작성합니다.
3. **aif.submit() 함수를 호출하여 최종 결과를 제출**
    - **마이 페이지-활동히스토리**에서 발급받은 key 값을 함수의 인자로 정확히 입력해야 합니다.
    - **※주의** : 제출하고자 하는 CUDA 환경에 맞는 key를 입력하여야 합니다.

<table>
  <thead>
    <tr>
      <th align="left">Competition 이름</th>
      <th align="center">CUDA</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td align="left">딥페이크 범죄 대응을 위한 AI 탐지 모델 경진대회</td>
      <td align="center">11.8</td>
    </tr>
    <tr>
      <td align="left">딥페이크 범죄 대응을 위한 AI 탐지 모델 경진대회 CUDA 12.6</td>
      <td align="center">12.6</td>
    </tr>
    <tr>
      <td align="left">딥페이크 범죄 대응을 위한 AI 탐지 모델 경진대회 CUDA 10.2</td>
      <td align="center">10.2</td>
    </tr>
  </tbody>
</table>

------

#### 1. 제출용 aifactory 라이브러리 설치
※ 결과 전송에 필요하므로 아래와 같이 aifactory 라이브러리가 반드시 최신버전으로 설치될 수 있게끔 합니다

In [None]:
!pip install -U aifactory

* 자신의 모델 추론 실행에 필요한 추가 라이브러리 설치

In [None]:
!pip install transformers==4.30
!pip install -U torch==2.7.1 torchvision==0.22.1 --index-url https://download.pytorch.org/whl/cu118
!pip install -U datasets
!pip install -U opencv-python==4.10.0.82 numpy==1.26.4 scikit-learn==1.3.2 scipy==1.11.4
!pip install -U pathlib
!pip install -U dlib

-----

#### 2. 추론용 코드 작성

##### 추론 환경의 기본 경로 구조

- 평가 데이터셋 경로: `./data/`
   - 채점에 사용될 테스트 데이터셋은 `./data/` 디렉토리 안에 포함되어 있습니다.
   - 해당 디렉토리에는 이미지(JPG, PNG)와 동영상(MP4) 파일이 별도의 하위 폴더 없이 혼합되어 있습니다.
```bash
/aif/
└── data/
    ├── {이미지 데이터1}.jpg
    ├── {이미지 데이터2}.png
    ├── {동영상 데이터1}.mp4
    ├── {이미지 데이터3}.png
    ├── {동영상 데이터2}.mp4
    ...
```

- 모델 및 자원 경로: 예시 : `./model/`
   - 추론 스크립트가 실행되는 위치를 기준으로, 제출된 모델 관련 파일들이 위치해야하 하는 상대 경로입니다.
   - 학습된 모델 가중치(.pt, .ckpt, .pth 등)

* 제출 파일은 `submission.csv`로 저장돼야 합니다.
  * submission.csv는 *filename*과 *label* 컬럼으로 구성돼야 합니다.
  * filename은 추론한 파일의 이름(확장자 포함), label은 추론 결과입니다. (real:0, fake:1)
  * filename은 *string*, label은 *int* 자료형이어야 합니다.
  * 추론하는 데이터의 순서는 무작위로 섞여도 상관 없습니다.

<table>
  <thead>
    <tr>
      <th align="center">filename</th>
      <th align="center">label</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td align="center">{이미지 데이터1}.jpg</td>
      <td align="center">0</td>
    </tr>
    <tr>
      <td align="center">{동영상 데이터1}.mp4</td>
      <td align="center">1</td>
    </tr>
    <tr>
      <td colspan="2" align="center">...</td>
    </tr>
  </tbody>
</table>

**※ 주의 사항**

* argparse 사용시 `args, _ = parser.parse_known_args()`로 인자를 지정하세요.   
   - `args = parser.parse_args()`는 jupyter에서 오류가 발생합니다.
* return 할 결과물과 양식에 유의하세요.

In [None]:
## 필요 라이브러리 import
import os
from transformers import ViTForImageClassification, ViTImageProcessor
from PIL import Image
import cv2
import dlib
from pathlib import Path
import numpy as np
import csv
import torch
import torch.nn.functional as F
from tqdm import tqdm
import multiprocessing

### 추론 환경 경로 설정
model_path = "./model/deep-fake-detector-v2-model"
test_dataset_path = Path("./data")
output_csv_path = Path("submission.csv")

# 상수
IMAGE_EXTS = {".jpg", ".jpeg", ".png"}
VIDEO_EXTS = {".avi", ".mp4"}
TARGET_SIZE = (224, 224)
NUM_FRAMES = 30
BATCH_SIZE = 16  # GPU 배치 처리

# 집계 가중치 (mean vs max)
MEAN_WEIGHT = 0.6
MAX_WEIGHT = 0.4


def get_boundingbox(face, width, height, margin=1.3):
    """얼굴 바운딩 박스 추출 (여유 공간 포함)"""
    x1, y1, x2, y2 = face.left(), face.top(), face.right(), face.bottom()
    size_bb = int(max(x2 - x1, y2 - y1) * margin)
    center_x, center_y = (x1 + x2) // 2, (y1 + y2) // 2
    x1 = max(int(center_x - size_bb // 2), 0)
    y1 = max(int(center_y - size_bb // 2), 0)
    size_bb = min(width - x1, size_bb)
    size_bb = min(height - y1, size_bb)
    return x1, y1, size_bb


def center_crop_image(image: Image.Image, target_size=(224, 224)):
    """얼굴 미검출 시 중앙 크롭 사용"""
    if image.mode != 'RGB':
        image = image.convert('RGB')
    
    width, height = image.size
    crop_size = min(width, height)
    left = (width - crop_size) // 2
    top = (height - crop_size) // 2
    right = left + crop_size
    bottom = top + crop_size
    
    cropped = image.crop((left, top, right, bottom))
    return cropped.resize(target_size, Image.BICUBIC)


def detect_and_crop_face(image: Image.Image, target_size=(224, 224), resize_for_detection=640):
    """얼굴 검출 및 크롭 (개선: 미검출 시 중앙 크롭)"""
    if image.mode != 'RGB':
        image = image.convert('RGB')
    
    original_np = np.array(image)
    original_h, original_w, _ = original_np.shape
    
    # 얼굴 검출용 리사이즈
    if original_w > resize_for_detection:
        scale = resize_for_detection / float(original_w)
        resized_h = int(original_h * scale)
        resized_np = cv2.resize(original_np, (resize_for_detection, resized_h), interpolation=cv2.INTER_AREA)
    else:
        scale = 1.0
        resized_np = original_np
    
    # dlib 얼굴 검출
    face_detector = dlib.get_frontal_face_detector()
    faces = face_detector(resized_np, 1)
    
    # 얼굴 검출 실패 시 중앙 크롭 사용
    if not faces:
        return center_crop_image(image, target_size), False  # False = 얼굴 미검출
    
    # 가장 큰 얼굴 선택
    face = max(faces, key=lambda rect: rect.width() * rect.height())
    
    # 원본 이미지 좌표로 변환
    scaled_face_rect = dlib.rectangle(
        left=int(face.left() / scale),
        top=int(face.top() / scale),
        right=int(face.right() / scale),
        bottom=int(face.bottom() / scale)
    )
    
    x, y, size = get_boundingbox(scaled_face_rect, original_w, original_h)
    cropped_np = original_np[y:y + size, x:x + size]
    face_img = Image.fromarray(cropped_np).resize(target_size, Image.BICUBIC)
    
    return face_img, True  # True = 얼굴 검출 성공


def process_single_file(file_path):
    """파일 하나 전처리 (병렬 처리용)"""
    face_images = []
    face_detected_flags = []
    ext = file_path.suffix.lower()
    
    try:
        if ext in IMAGE_EXTS:
            image = Image.open(file_path)
            face_img, detected = detect_and_crop_face(image, TARGET_SIZE)
            face_images.append(face_img)
            face_detected_flags.append(detected)
            
        elif ext in VIDEO_EXTS:
            cap = cv2.VideoCapture(str(file_path))
            total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            
            if total_frames > 0:
                frame_indices = np.linspace(0, total_frames - 1, NUM_FRAMES, dtype=int)
                for idx in frame_indices:
                    cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
                    ret, frame = cap.read()
                    if not ret:
                        continue
                    
                    image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
                    face_img, detected = detect_and_crop_face(image, TARGET_SIZE)
                    face_images.append(face_img)
                    face_detected_flags.append(detected)
            
            cap.release()
            
    except Exception as e:
        return file_path.name, [], [], str(e)
    
    return file_path.name, face_images, face_detected_flags, None


def predict_batch(model, processor, images, device="cuda"):
    """배치 단위 GPU 추론"""
    if not images:
        return []
    
    inputs = processor(images=images, return_tensors="pt").to(device)
    
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits
        probs = F.softmax(logits, dim=1)  # [N, 2]
    
    return probs.cpu().numpy()


def aggregate_video_predictions(probs_list, face_detected_flags):
    """
    동영상 프레임 확률 집계 (개선)
    - mean + max 확률 조합
    - 얼굴 검출 실패 프레임 가중치 낮춤
    """
    if not probs_list:
        return 0  # 기본값 Real
    
    probs_array = np.array(probs_list)  # [N, 2]
    
    # 얼굴 검출 성공 프레임만 사용 (최소 1개는 사용)
    detected_indices = [i for i, flag in enumerate(face_detected_flags) if flag]
    if detected_indices:
        probs_array = probs_array[detected_indices]
    
    # Mean과 Max 확률 조합
    mean_probs = probs_array.mean(axis=0)
    max_probs = probs_array.max(axis=0)
    
    combined_probs = MEAN_WEIGHT * mean_probs + MAX_WEIGHT * max_probs
    
    return np.argmax(combined_probs)


# --- 메인 추론 로직 ---
if __name__ == "__main__":
    print("=" * 50)
    print("딥페이크 탐지 모델 - Improved Version")
    print("=" * 50)
    
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Device: {device}")
    
    # 모델 로드
    print("\n[1/4] 모델 로딩...")
    model = ViTForImageClassification.from_pretrained(model_path).to(device)
    processor = ViTImageProcessor.from_pretrained(model_path)
    model.eval()
    print("✓ 모델 로드 완료")
    
    # 파일 리스트
    files = sorted([p for p in test_dataset_path.iterdir() if p.is_file()])
    print(f"\n[2/4] 테스트 데이터: {len(files)}개 파일")
    
    # CPU 워커 설정
    num_workers = min(max(1, multiprocessing.cpu_count() - 1), 8)
    print(f"\n[3/4] 전처리 시작 (workers: {num_workers})")
    
    results = {}
    
    # 병렬 전처리 + 순차 추론
    with multiprocessing.Pool(processes=num_workers) as pool:
        with tqdm(total=len(files), desc="Processing") as pbar:
            for filename, face_images, face_flags, error in pool.imap_unordered(process_single_file, files):
                if error:
                    print(f"\n⚠ Error: {filename} - {error}")
                    results[filename] = 0
                    pbar.update(1)
                    continue
                
                if not face_images:
                    results[filename] = 0
                    pbar.update(1)
                    continue
                
                # 배치 단위로 GPU 추론
                all_probs = []
                for i in range(0, len(face_images), BATCH_SIZE):
                    batch = face_images[i:i + BATCH_SIZE]
                    batch_probs = predict_batch(model, processor, batch, device)
                    all_probs.extend(batch_probs)
                
                # 이미지: 단일 예측, 동영상: 집계
                if len(all_probs) == 1:
                    predicted_class = np.argmax(all_probs[0])
                else:
                    predicted_class = aggregate_video_predictions(all_probs, face_flags)
                
                results[filename] = predicted_class
                pbar.update(1)
    
    # CSV 저장
    print("\n[4/4] 결과 저장 중...")
    with open(output_csv_path, mode="w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["filename", "label"])
        for p in files:
            filename = p.name
            label = results.get(filename, 0)
            writer.writerow([filename, label])
    
    print(f"\n✓ 추론 완료! 결과 저장: {output_csv_path}")
    print("=" * 50)

----

#### 3. `aif.submit()` 함수를 호출하여 최종 결과를 제출

**※주의** : task별, 참가자별로 key가 다릅니다. 잘못 입력하지 않도록 유의바랍니다.
- key는 대회 페이지 [베이스라인 코드](https://aifactory.space/task/9197/baseline) 탭에 기재된 가이드라인을 따라 task 별로 확인하실 수 있습니다.
- key가 틀리면 제출이 진행되지 않거나 잘못 제출되므로 task에 맞는 자신의 key를 사용해야 합니다.
-  **NOTE** : 이번 경진대회에서는 3개의 CUDA 버전을 지원하며, 각 CUDA 버전에 따라 task key가 상이합니다. 함수를 실행하기 전에 현재 key가 제출하고자 하는 CUDA 환경에 대한 key인지 반드시 확인하세요.

In [1]:
import aifactory.score as aif
import time
t = time.time()

#-----------------------------------------------------#
aif.submit(model_name="your_model_name",
    key="your_key"
)
#-----------------------------------------------------#
print(time.time() - t)

file : task
jupyter notebook
제출 완료
35.576070070266724
