In [None]:
!pip -q install facenet-pytorch==2.5.3 tqdm>=4.67

import os, sys, math, json, traceback, shutil, glob
from pathlib import Path
from typing import List, Tuple, Optional

import numpy as np
import cv2
from PIL import Image
import torch
from facenet_pytorch import MTCNN
from tqdm.auto import tqdm

print("CUDA available:", torch.cuda.is_available())

CUDA available: False


In [None]:
# =========================
# Config
# =========================
# If your data is in Drive, uncomment these two lines to mount:
# from google.colab import drive
# drive.mount('/content/drive')

# Point this to your DFDC part 2 directory (where the .mp4 files are).
INPUT_DIR  = "/content/drive/MyDrive/dfdc_train_part_2"   # e.g. "/content/drive/MyDrive/DFDC/dfdc_train_part_2"
OUTPUT_DIR = "dfdc_faces_part_2"   # results will be written here

# Frame sampling
FRAMES_PER_SECOND = 1        # sample N frames per second
MAX_FRAMES_PER_VIDEO = 64    # safety cap (set None to disable)

# Face crop
FACE_MARGIN_RATIO = 0.2      # 20% margin around detected box
MIN_FACE_SIZE = 64           # skip detections smaller than this (in pixels on short edge)

# Output image size and normalization
TARGET_SIZE = (299, 299)     # width, height
NORMALIZE_TO = "0_1"         # "0_1" or "neg1_1"

# Save options
SAVE_JPEGS = True            # save each processed frame as .jpg
SAVE_NPY   = True            # save per-video tensor as .npy

# Device for MTCNN
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Make dirs
Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)


In [None]:
# =========================
# Helpers
# =========================
def normalize_img(img_np: np.ndarray, mode: str = "0_1") -> np.ndarray:
    """
    img_np: HxWxC in uint8
    returns float32 normalized image
    """
    x = img_np.astype(np.float32) / 255.0
    if mode == "neg1_1":
        x = x * 2.0 - 1.0
    return x

def expand_box(xyxy, margin_ratio, img_w, img_h):
    x1, y1, x2, y2 = xyxy
    w = x2 - x1
    h = y2 - y1
    cx = x1 + w / 2.0
    cy = y1 + h / 2.0
    m = margin_ratio * max(w, h)
    new_w = w + 2*m
    new_h = h + 2*m
    nx1 = max(0, int(round(cx - new_w/2)))
    ny1 = max(0, int(round(cy - new_h/2)))
    nx2 = min(img_w, int(round(cx + new_w/2)))
    ny2 = min(img_h, int(round(cy + new_h/2)))
    return nx1, ny1, nx2, ny2

def pick_largest_box(boxes: np.ndarray) -> Optional[np.ndarray]:
    if boxes is None or len(boxes) == 0:
        return None
    areas = (boxes[:,2]-boxes[:,0]) * (boxes[:,3]-boxes[:,1])
    idx = int(np.argmax(areas))
    return boxes[idx]

def ensure_dir(p: Path):
    p.mkdir(parents=True, exist_ok=True)


In [None]:
# =========================
# Initialize detector
# =========================
# keep_all=True gives all faces; we'll pick the largest for consistency
mtcnn = MTCNN(keep_all=True, device=DEVICE, thresholds=[0.6, 0.7, 0.7])


In [None]:
# =========================
# Processing loop
# =========================
video_paths = sorted([p for p in Path(INPUT_DIR).glob("*.mp4")])

if not video_paths:
    print(f"No .mp4 files found in {INPUT_DIR}. Check your path.")
else:
    print(f"Found {len(video_paths)} videos.")

error_log = []

for vpath in tqdm(video_paths, desc="Videos"):
    try:
        cap = cv2.VideoCapture(str(vpath))
        if not cap.isOpened():
            raise RuntimeError("Could not open video")

        fps = cap.get(cv2.CAP_PROP_FPS)
        fps = fps if fps and fps > 0 else 30.0
        step = max(1, int(round(fps / FRAMES_PER_SECOND)))

        # Output folders
        vname = vpath.stem
        out_dir = Path(OUTPUT_DIR) / vname
        if SAVE_JPEGS:
            ensure_dir(out_dir)

        processed_frames = []
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        save_count = 0
        read_idx = 0

        while True:
            ret = cap.grab()  # fast skip
            if not ret:
                break
            if read_idx % step != 0:
                read_idx += 1
                continue

            # retrieve frame at this index
            ret, frame = cap.retrieve()
            if not ret:
                read_idx += 1
                continue

            # BGR -> RGB
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            pil_img = Image.fromarray(rgb)

            # detect faces
            boxes, probs = mtcnn.detect(pil_img)
            if boxes is None or len(boxes) == 0:
                read_idx += 1
                continue

            # pick largest face
            box = pick_largest_box(boxes)
            img_h, img_w = rgb.shape[:2]

            # expand with margin
            x1, y1, x2, y2 = expand_box(box, FACE_MARGIN_RATIO, img_w, img_h)

            # skip tiny faces
            if min(x2-x1, y2-y1) < MIN_FACE_SIZE:
                read_idx += 1
                continue

            # crop + resize
            crop = rgb[y1:y2, x1:x2]
            if crop.size == 0:
                read_idx += 1
                continue

            crop_resized = cv2.resize(crop, TARGET_SIZE, interpolation=cv2.INTER_AREA)

            # save jpeg (uint8)
            if SAVE_JPEGS:
                out_file = out_dir / f"{vname}_{save_count:05d}.jpg"
                cv2.imwrite(str(out_file), cv2.cvtColor(crop_resized, cv2.COLOR_RGB2BGR))

            # normalized float
            norm = normalize_img(crop_resized, NORMALIZE_TO)
            processed_frames.append(norm)

            save_count += 1
            read_idx += 1

            if MAX_FRAMES_PER_VIDEO is not None and save_count >= MAX_FRAMES_PER_VIDEO:
                break

        cap.release()

        # save npy (N, 299, 299, 3), float32
        if SAVE_NPY and processed_frames:
            arr = np.stack(processed_frames, axis=0).astype(np.float32)
            np.save(str(Path(OUTPUT_DIR) / f"{vname}.npy"), arr)

    except Exception as e:
        error_info = {
            "video": str(vpath),
            "error": repr(e),
            "trace": traceback.format_exc(limit=1)
        }
        error_log.append(error_info)

# Write error log if any
if error_log:
    with open(Path(OUTPUT_DIR) / "errors.json", "w") as f:
        json.dump(error_log, f, indent=2)
    print(f"Completed with {len(error_log)} errors. See errors.json in {OUTPUT_DIR}.")
else:
    print("All videos processed successfully.")


Found 1518 videos.


Videos:   0%|          | 0/1518 [00:00<?, ?it/s]

All videos processed successfully.


In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive
