In [None]:
!pip install insightface onnxruntime-gpu



In [None]:
import cv2
import shutil
import numpy as np
import os
import zipfile
import tempfile
import random
import gc
from pathlib import Path
from tqdm import tqdm
from insightface.app import FaceAnalysis
from google.colab import drive

In [None]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
SEED = 42
random.seed(SEED)
np.random.seed(SEED)

In [None]:
ZIP_PATH = "/content/drive/MyDrive/Celeb-DF-v2.zip"
SAVE_BASE_PATH = "/content/drive/MyDrive/HECTO/Dataset/Celeb_frames_ver2"
NUM_FRAMES = 15
TARGET_SIZE = (256, 256)
FAKE_SAMPLE_COUNT = 890

In [None]:
# providers=['CUDAExecutionProvider']Î°ú GPU Í∞ïÏ†ú ÏÇ¨Ïö©
detector = FaceAnalysis(allowed_modules=['detection', 'landmark_2d'], providers=['CUDAExecutionProvider'])
detector.prepare(ctx_id=0, det_size=(640, 640))

Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
model ignore: /root/.insightface/models/buffalo_l/1k3d68.onnx landmark_3d_68
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
model ignore: /root/.insightface/models/buffalo_l/2d106det.onnx landmark_2d_106
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /root/.insightface/models/buffalo_l/det_10g.onnx detection [1, 3, '?', '?'] 127.5 128.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
model ignore: /root/.insightface/models/buffalo_l/genderage.onnx genderage
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
model ignore: /root/.insightface/models/buffalo_l/w600k_r50.onnx recognition
set det-size: (640, 640)


In [None]:
def get_hybrid_face(image, face_info, target_size=(256, 256)):
    try:
        h, w = image.shape[:2]
        bbox = face_info.bbox.astype(int)
        landmarks = getattr(face_info, 'kps', None)

        cx = (bbox[0] + bbox[2]) // 2
        cy = (bbox[1] + bbox[3]) // 2

        # 1Ô∏è‚É£ Ï†ïÎ†¨
        if landmarks is not None and len(landmarks) >= 2:
            left_eye, right_eye = landmarks[0], landmarks[1]
            dy, dx = right_eye[1] - left_eye[1], right_eye[0] - left_eye[0]
            angle = np.degrees(np.arctan2(dy, dx))
            M = cv2.getRotationMatrix2D((float(cx), float(cy)), angle, 1.0)
            image = cv2.warpAffine(image, M, (w, h), flags=cv2.INTER_CUBIC)

        # 2Ô∏è‚É£ Ï†ïÏÇ¨Í∞Å ÌÅ¨Î°≠
        side = int(max(bbox[2]-bbox[0], bbox[3]-bbox[1]) * 1.3)
        half = side // 2

        x1, y1 = cx - half, cy - half
        x2, y2 = cx + half, cy + half

        px1, py1 = max(0, -x1), max(0, -y1)
        px2, py2 = max(0, x2 - w), max(0, y2 - h)

        x1, y1 = max(0, x1), max(0, y1)
        x2, y2 = min(w, x2), min(h, y2)

        crop = image[y1:y2, x1:x2]

        if px1 or py1 or px2 or py2:
            crop = cv2.copyMakeBorder(
                crop, py1, py2, px1, px2,
                cv2.BORDER_CONSTANT, value=[0, 0, 0]
            )

        return cv2.resize(crop, target_size)

    except:
        return None

In [None]:
def run_preprocessing():
    with zipfile.ZipFile(ZIP_PATH, 'r') as z:
        all_videos = [
            f for f in z.namelist()
            if f.lower().endswith('.mp4') and not os.path.basename(f).startswith('._')
        ]

        real_videos = [f for f in all_videos if 'celeb-real' in f.lower() or 'youtube-real' in f.lower()]
        fake_all = [f for f in all_videos if 'celeb-synthesis' in f.lower()]
        fake_videos = random.sample(fake_all, min(len(fake_all), FAKE_SAMPLE_COUNT))

        print(f"üìä REAL: {len(real_videos)}, FAKE: {len(fake_videos)}")

        for video_list, label_type in [(real_videos, "REAL"), (fake_videos, "FAKE")]:
            print(f"\nüî• {label_type} Ï≤òÎ¶¨ Ï§ë...")

            for file_path in tqdm(video_list):
                video_name = Path(file_path).stem

                if 'youtube-real' in file_path.lower():
                    label_dir = 'real'
                    person_id = 'youtube'
                elif 'celeb-real' in file_path.lower():
                    label_dir = 'real'
                    person_id = video_name.split('_')[0]
                else:
                    label_dir = 'fake'
                    person_id = video_name.split('_')[0]

                save_dir = os.path.join(SAVE_BASE_PATH, label_dir, person_id, video_name)
                if os.path.exists(save_dir):
                    shutil.rmtree(save_dir)
                os.makedirs(save_dir, exist_ok=True)

                with tempfile.NamedTemporaryFile(suffix=".mp4") as tmp:
                    tmp.write(z.read(file_path))
                    tmp.flush()

                    cap = cv2.VideoCapture(tmp.name)
                    total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                    if total <= 0:
                        cap.release()
                        continue

                    indices = np.linspace(0, total - 1, NUM_FRAMES, dtype=int)

                    face_crops = []
                    frames_cache = []
                    last_face_crop = None

                    for idx in indices:
                        cap.set(cv2.CAP_PROP_POS_FRAMES, int(idx))
                        ret, frame = cap.read()
                        if not ret or frame is None:
                            if last_face_crop is not None:
                                face_crops.append(last_face_crop)
                            continue

                        frames_cache.append(frame)
                        faces = detector.get(frame)

                        crop = None
                        if faces:
                            face = max(
                                faces,
                                key=lambda x: (x.bbox[2]-x.bbox[0])*(x.bbox[3]-x.bbox[1])
                            )
                            crop = get_hybrid_face(frame, face)
                        if crop is not None:
                            # üî• Ï≤òÏùå ÏñºÍµ¥ Í≤ÄÏ∂ú Ïãú ‚Üí Ïù¥Ï†Ñ ÌîÑÎ†àÏûÑ retro-fill
                            if last_face_crop is None and len(face_crops) == 0:
                                for _ in range(len(frames_cache) - 1):
                                    face_crops.append(crop)

                            face_crops.append(crop)
                            last_face_crop = crop

                        elif last_face_crop is not None:
                            # üî• Temporal Fallback
                            face_crops.append(last_face_crop)

                    cap.release()

                    # ===== ÏµúÏ¢Ö Î≥¥Ï†ï =====
                    if len(face_crops) == 0:
                        # ÏñºÍµ¥ Ï†ÑÏ≤¥ Ïã§Ìå® ‚Üí ÌîÑÎ†àÏûÑÏùÑ ÏùΩÏùÄ Í≤ΩÏö∞ÏóêÎßå Ï†ÑÏ≤¥ ÌîÑÎ†àÏûÑ Î¶¨ÏÇ¨Ïù¥Ï¶à
                        for i, frame in enumerate(frames_cache[:NUM_FRAMES]):
                            resized = cv2.resize(frame, TARGET_SIZE)
                            cv2.imwrite(os.path.join(save_dir, f"frame_{i}.jpg"), resized)
                        # frames_cacheÍ∞Ä ÎπÑÏñ¥ÏûàÏúºÎ©¥ ‚Üí ÏïÑÎ¨¥Í≤ÉÎèÑ Ï†ÄÏû• Ïïà Ìï® (Îπà Ìè¥Îçî Ïú†ÏßÄ)

                    else:
                        # ÏñºÍµ¥Ïù¥ ÌïòÎÇòÎùºÎèÑ ÏûàÏóàÎçò Í≤ΩÏö∞
                        while len(face_crops) < NUM_FRAMES:
                            face_crops.append(face_crops[-1])

                        for i in range(NUM_FRAMES):
                            cv2.imwrite(
                                os.path.join(save_dir, f"frame_{i}.jpg"),
                                face_crops[i]
                            )

                gc.collect()

    print(f"\n‚úÖ ÏôÑÎ£å: {SAVE_BASE_PATH}")

run_preprocessing()

üì¶ ÏïïÏ∂ï ÌååÏùº ÎÇ¥ Ï†ÑÏ≤¥ ÌååÏùº Í∞úÏàò: 6533
üé• Ï∞æÏùÄ MP4 ÏòÅÏÉÅ Í∞úÏàò: 6529
üìä ÏµúÏ¢Ö ÌÉÄÍ≤ü -> REAL: 890, FAKE: 890

üî• REAL Ï≤òÎ¶¨ Ï§ë...


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 890/890 [1:50:42<00:00,  7.46s/it]



üî• FAKE Ï≤òÎ¶¨ Ï§ë...


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 890/890 [1:50:02<00:00,  7.42s/it]


‚úÖ Î™®Îì† ÏûëÏóÖÏù¥ ÏôÑÎ£åÎêòÏóàÏäµÎãàÎã§. Ï†ÄÏû•ÏÜå: /content/drive/MyDrive/HECTO/Dataset/Celeb_frames_ver2



