# Preprocessing completion

Notebook pour compléter les vidéos dont le preprocessing a échoué pour une raison ou une autre...

In [None]:
import io
import os

import cv2
import dfdetect.utils as utils
import numpy as np
from dfdetect.data_loaders import DFDC, DFDC_preprocessed_single_frames
from tqdm.auto import tqdm
import dfdetect.preprocessing.face_detection as fd

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
old_path = os.environ.get("DFDC_DATASET_PATH", "./dfdc_test_set")
new_path = os.environ.get(
    "DFDC_PREPROCESSED_DATASET_PATH",
    "./dfdc_preprocessed_frames_test",
)

In [None]:
np.random.seed(0x1B)

In [None]:
dataset = DFDC(old_path, is_test=True)

In [None]:
preprocessed_dataset = DFDC_preprocessed_single_frames(new_path)

In [None]:
dfdc_ids = set(dataset.desc["index"])
preprocessed_dfdc_ids = set(preprocessed_dataset.desc["dfdc_id"])
missing = list(dfdc_ids - preprocessed_dfdc_ids)
len(missing)

In [None]:
metadata_fname = os.path.join(new_path, "labels.csv")
target_ratio = 0.5

In [None]:
def fd_waterfall(frame):
    (bboxes,) = fd.detect_face_blazeface([frame])
    if len(bboxes) == 0:
        (bboxes,) = fd.detect_face_mtcnn([frame])
    if len(bboxes) == 0:
        (bboxes,) = fd.detect_face_retinaface([frame])
    return bboxes


def preprocessing_function(index, is_test=True):
    meta = dataset.desc.iloc[index]
    output_frames = []
    error_loading = True
    if meta["label"] == "FAKE" and not is_test:
        """In training, we use the original video to filter our fake faces from real faces in fake videos"""
        original_name = meta["original"]
        original_path = os.path.join(os.path.dirname(meta["path"]), original_name)
        real_frames = utils.video_to_frames(original_path)[0]
        fake_frames = utils.video_to_frames(meta["path"])[0]
        for real_frame, fake_frame in zip(real_frames, fake_frames):
            error_loading = False
            bboxes = fd_waterfall(real_frame)
            for bbox in bboxes:
                bbox.recast()
                if bbox.width() * bbox.height() == 0:
                    continue
                real_face, fake_face = list(
                    utils.crop_frames([real_frame, fake_frame], [bbox, bbox])
                )
                change_ratio = np.count_nonzero(real_face - fake_face) / np.product(
                    real_face.shape
                )
                if change_ratio > target_ratio:
                    output_frames.append(fake_face)
            if len(output_frames) > 0:
                break
    else:  # REAL or testing
        real_frames = utils.video_to_frames(meta["path"])[0]
        for real_frame in real_frames:
            error_loading = False
            bboxes = fd_waterfall(real_frame)
            for n in range(len(bboxes)):
                bbox = bboxes[n]
                bbox.recast()
                if bbox.width() * bbox.height() == 0:
                    del bboxes[n]
            real_faces = list(utils.crop_frames([real_frame] * len(bboxes), bboxes))
            if len(real_faces) > 0:
                if is_test:  # Keep only 1 image per video in test
                    output_frames.append(real_faces[0])
                else:
                    output_frames += real_faces
                break

    if error_loading:
        print("Error while loading video:", meta["path"], meta["label"])
        return

    if len(output_frames) == 0:
        print("Error while processing video:", meta["path"])
    else:
        video_original_name = dataset.get_filename(index)
        _, file_extension = os.path.splitext(video_original_name)
        image_path = os.path.join(
            new_path, video_original_name.replace(file_extension, ".png")
        )

        for i in range(len(output_frames)):
            try:
                c_path = image_path.replace(".png", f"_{i}.png")
                cv2.imwrite(c_path, cv2.cvtColor(output_frames[i], cv2.COLOR_RGB2BGR))

                with io.open(metadata_fname, "a") as f:
                    f.write(f"{video_original_name},{c_path},{meta['label']}\n")
            except Exception as e:
                print(e)
                print("Error while saving for video:", meta["path"])
                continue

In [None]:
def preprocessing_empty(index, is_test=True):
    """Preprocess test frame by cropping a 100x100 block in the middle for the frames where face is undetectable"""
    meta = dataset.desc.iloc[index]
    video_original_name = dataset.get_filename(index)
    _, file_extension = os.path.splitext(video_original_name)
    image_path = os.path.join(
        new_path, video_original_name.replace(file_extension, "_0.png")
    )
    frames = list(utils.video_to_frames(meta["path"])[0])
    if len(frames) == 0:
        print("Error while loading video:", meta["path"], meta["label"])
        return
    mid_frame = len(frames) // 2
    frame = frames[mid_frame]
    h, w, c = frame.shape
    sh, sw = h // 2 - 50, w // 2 - 50
    face = frame[sh : sh + 100, sw : sw + 100, :]
    cv2.imwrite(image_path, cv2.cvtColor(face, cv2.COLOR_RGB2BGR))
    with io.open(metadata_fname, "a") as f:
        f.write(f"{video_original_name},{image_path},{meta['label']}\n")

In [None]:
for id_name in tqdm(missing):
    (position,) = np.where(dataset.desc["index"] == id_name)
    preprocessing_empty(position[0])

In [None]:
preprocessed_dataset = DFDC_preprocessed_single_frames(new_path)
preprocessed_dfdc_ids = set(preprocessed_dataset.desc["dfdc_id"])
missing = list(dfdc_ids - preprocessed_dfdc_ids)
len(missing)