In [None]:
# Install facenet-pytorch, from : https://www.kaggle.com/timesler/facial-recognition-model-in-pytorch/data

!pip install /kaggle/input/facenetpytorch/facenet_pytorch-2.0.1-py3-none-any.whl

from facenet_pytorch.models.inception_resnet_v1 import get_torch_home

torch_home = get_torch_home()

In [None]:

# Copy model checkpoints to torch cache so they are loaded automatically by the package

!mkdir -p $torch_home/checkpoints/

!cp /kaggle/input/facenetpytorch/20180402-114759-vggface2-logits.pth $torch_home/checkpoints/vggface2_DG3kwML46X.pt

!cp /kaggle/input/facenetpytorch/20180402-114759-vggface2-features.pth $torch_home/checkpoints/vggface2_G5aNV2VSMn.pt

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from fastai.core import *
from fastai.vision import *
import os
import glob
import time
from tqdm import tqdm
import cv2

for dirname, _, filenames in os.walk('/kaggle/input'):

    for filename in filenames:

        print(os.path.join(dirname, filename))

from facenet_pytorch import MTCNN

In [None]:
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

In [None]:
class DetectionPipeline:

    """Pipeline class for detecting faces in the frames of a video file."""

    def __init__(self, detector, n_frames=None, batch_size=60, resize=None):

        """Constructor for DetectionPipeline class.

        Keyword Arguments:

            n_frames {int} -- Total number of frames to load. These will be evenly spaced

                throughout the video. If not specified (i.e., None), all frames will be loaded.

                (default: {None})

            batch_size {int} -- Batch size to use with MTCNN face detector. (default: {32})

            resize {float} -- Fraction by which to resize frames from original prior to face

                detection. A value less than 1 results in downsampling and a value greater than

                1 result in upsampling. (default: {None})
        """

        self.detector = detector
        self.n_frames, self.batch_size, self.resize = n_frames, batch_size, resize


    def __call__(self, filename, label=None, save_dir=None):
        """Load frames from an MP4 video and detect faces.
        Arguments:
            filename {str} -- Path to video.
        """
        v_cap = cv2.VideoCapture(filename)
        v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT))

        if self.n_frames is None: sample = np.arange(0, v_len)
        else: sample = np.linspace(0, v_len - 1, self.n_frames).astype(int)

        faces = []
        idxs, frames = [], []
        for j in range(v_len):
            success = v_cap.grab()
            if j in sample:
                success, frame = v_cap.retrieve()
                if not success: continue
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frame = PIL.Image.fromarray(frame)  

                if self.resize is not None:
                    frame = frame.resize([int(d * self.resize) for d in frame.size])
                idxs.append(j); frames.append(frame)

                if len(frames) % self.batch_size == 0 or j == sample[-1]:
                    if save_dir is not None:
                        save_paths = self.get_savepaths(filename, idxs, label, save_dir)
                        faces.extend(self.detector(frames, save_path=save_paths))
                    else: faces.extend(self.detector(frames))
                    idxs, frames = [], []
        v_cap.release()
        return faces

    def get_savepaths(self, filename, idxs, label=None, save_dir=None):
        if isinstance(filename, str): filename = Path(filename)
        if save_dir is None: save_dir = Path('./')
        if label is None: save_paths = [save_dir/f'{filename.stem}_{i:03d}.png' for i in idxs]
        else: save_paths = [save_dir/f'{filename.stem}_{i:03d}_{label}.png' for i in idxs]
        return [str(o) for o in save_paths]

In [None]:
def process_faces(faces, net):
    # Filter out frames without faces
    faces = [f for f in faces if f is not None]
    faces = torch.cat(faces).to(device)

    # Generate facial feature vectors using a pretrained model
    embeddings = net(faces)

    # Calculate centroid for video and distance of each face's feature vector from centroid
    centroid = embeddings.mean(dim=0)
    x = (embeddings - centroid).norm(dim=1).cpu().numpy()

    return x

In [None]:
detector = MTCNN(device=device, post_process=False)

facepipe = DetectionPipeline(detector, batch_size=60, n_frames=1, resize=1/2)

In [None]:
# Get all test videos

filenames = glob.glob('/kaggle/input/deepfake-detection-challenge/test_videos/*.mp4')

In [None]:
sample_filenames = filenames[:10]

In [None]:
!mkdir /kaggle/working/test_faces

In [None]:
DIR_FACE = Path('/kaggle/working/test_faces/')

In [None]:
for i, filename in tqdm(enumerate(sample_filenames), total=len(sample_filenames)):
    faces = facepipe(str(filename), save_dir=DIR_FACE)

In [None]:
WEIGHTS = Path('/kaggle/input/weight/')

In [None]:
test = ImageList.from_folder(DIR_FACE)
learner = load_learner(WEIGHTS, fname='export.pkl', test=test)

In [None]:
preds,_ = learn.get_preds(DatasetType.Test)

In [None]:
#submission = pd.DataFrame(preds, columns=['filename', 'label'])
#submission.sort_values('filename').to_csv('submission.csv', index=False)