In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load in 

# import numpy as np # linear algebra
# import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

# import os
# for dirname, _, filenames in os.walk('/kaggle/input/'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# import face_recognition
# Any results you write to the current directory are saved as output.

In [1]:
import json
import os
import tqdm
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms.functional as t_F
import torchvision.models as models
import torchvision.transforms as transforms
import torch.utils.data as data
import torchvision
from torch.autograd import Variable
from torch.utils.data import Dataset
import cv2

In [8]:
# set path
data_path = "E:\\deepfake\\deepfake-detection-challenge\\test_videos"
save_model_path = "E:\\deepfake\\"
k = 2

# EncoderCNN architecture
CNN_fc_hidden1, CNN_fc_hidden2 = 1024, 768
CNN_embed_dim = 512   # latent dim extracted by 2D CNN
res_size = 224        # ResNet image size
dropout_p = 0.0       # dropout probability

# DecoderRNN architecture
RNN_hidden_layers = 3
RNN_hidden_nodes = 512
RNN_FC_dim = 256

In [9]:
# 2D CNN encoder using Inception V3 pretrained
class InceptV3Encoder(nn.Module):
    def __init__(self, fc_hidden1=512, fc_hidden2=512, drop_p=0.3, CNN_embed_dim=300):
        """Load the pretrained ResNet-152 and replace top fc layer."""
        super(InceptV3Encoder, self).__init__()

        self.fc_hidden1, self.fc_hidden2 = fc_hidden1, fc_hidden2
        self.drop_p = drop_p

        inceptV3 = models.inception_v3()
        inceptV3.aux_logits = False
        self.inceptV3= inceptV3
        self.fc1 = nn.Linear(inceptV3.fc.in_features, fc_hidden1)
        self.inceptV3.fc = self.fc1
        self.bn1 = nn.BatchNorm1d(fc_hidden1, momentum=0.01)
        self.fc2 = nn.Linear(fc_hidden1, fc_hidden2)
        self.bn2 = nn.BatchNorm1d(fc_hidden2, momentum=0.01)
        self.fc3 = nn.Linear(fc_hidden2, CNN_embed_dim)


    def forward(self, x_3d):
        cnn_embed_seq = []
        for t in range(x_3d.size(1)):
            # InceptV3
            with torch.no_grad():
                x = x_3d[:, t, :, :, :]
                x = self.inceptV3(x)  # inception v3
                #x = x.view(x.size(0), -1)  # flatten output of conv

            # FC layers
            x = self.bn1(x)
            x = F.relu(x)
            x = self.bn2(self.fc2(x))
            x = F.relu(x)
            x = F.dropout(x, p=self.drop_p, training=self.training)
            x = self.fc3(x)

            cnn_embed_seq.append(x)

        # swap time and sample dim such that (sample dim, time dim, CNN latent dim)
        cnn_embed_seq = torch.stack(cnn_embed_seq, dim=0).transpose_(0, 1)
        # cnn_embed_seq: shape=(batch, time_step, input_size)

        return cnn_embed_seq

class DecoderRNN(nn.Module):
    def __init__(self, CNN_embed_dim=300, h_RNN_layers=3, h_RNN=256, h_FC_dim=128, drop_p=0.3, num_classes=50):
        super(DecoderRNN, self).__init__()

        self.RNN_input_size = CNN_embed_dim
        self.h_RNN_layers = h_RNN_layers  # RNN hidden layers
        self.h_RNN = h_RNN  # RNN hidden nodes
        self.h_FC_dim = h_FC_dim
        self.drop_p = drop_p
        self.num_classes = num_classes

        self.LSTM = nn.LSTM(
            input_size=self.RNN_input_size,
            hidden_size=self.h_RNN,
            num_layers=h_RNN_layers,
            batch_first=True,  # input & output will has batch size as 1s dimension. e.g. (batch, time_step, input_size)
        )

        self.fc1 = nn.Linear(self.h_RNN, self.h_FC_dim)
        self.fc2 = nn.Linear(self.h_FC_dim, self.num_classes)

    def forward(self, x_RNN):
        self.LSTM.flatten_parameters()
        RNN_out, (h_n, h_c) = self.LSTM(x_RNN, None)
        """ h_n shape (n_layers, batch, hidden_size), h_c shape (n_layers, batch, hidden_size) """
        """ None represents zero initial hidden state. RNN_out has shape=(batch, time_step, output_size) """

        # FC layers
        x = self.fc1(RNN_out[:, -1, :])  # choose RNN_out at the last time step
        x = F.relu(x)
        x = F.dropout(x, p=self.drop_p, training=self.training)
        x = self.fc2(x)

        return x

In [10]:
def face_detect(frame):
    
    face_cascade = cv2.CascadeClassifier('/kaggle/input/trained-models/haarcascade_frontalface_default.xml')
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    # Resize frame of video to 1/4 size for faster face detection processing
    small_frame = cv2.resize(gray, (0, 0), fx=0.25, fy=0.25)
    # Detect the faces
    faces = face_cascade.detectMultiScale(small_frame, 1.1, 4)
    return faces


def readVideo(videoFile):
    X = []
    X_nofaces = []
    max_attempts = 90
    num_frames = 20
    
    transform = transforms.Compose([
        transforms.Resize(299),
        transforms.CenterCrop(299),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    # Open the video file
    cap = cv2.VideoCapture(videoFile)
    cap.set(1, 10)
    # nFrames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    # frames = torch.FloatTensor(self.channels, self.timeDepth, self.xSize, self.ySize)
    face_detected = False
    attempts = 0
    while attempts < max_attempts:
        ret, frame = cap.read()
        attempts += 1
        if ret:
            # detect faces
            if not face_detected:

                faces = face_detect(frame)
                # Face detected
                if len(faces) == 1:
                    # Get the first face
                    x, y, w, h = faces[0] * 4
                    face_detected = True
                else:
                    frame = torch.from_numpy(frame)
                    # HWC2CHW
                    frame = frame.permute(2, 0, 1)
                    if transform is not None:
                        frame = t_F.to_pil_image(frame)
                        frame = transform(frame)
                    X_nofaces.append(frame.squeeze_(0))
                    if len(X_nofaces) > num_frames:
                        break

            if face_detected:
                face_img = frame[y: y + h, x: x+w]
                frame = torch.from_numpy(face_img)
                # HWC2CHW
                frame = frame.permute(2, 0, 1)
                if transform is not None:
                    frame = t_F.to_pil_image(frame)
                    frame = transform(frame)
                X.append(frame.squeeze_(0))
                if len(X) > num_frames:
                    break

    cap.release()
    if len(X) > num_frames:
        X = torch.stack(X, dim=0)
        return X
    else:
        X_nofaces = torch.stack(X_nofaces, dim=0)
        return X_nofaces

In [11]:
def test(model, device, test_vidoes):
    # set model as testing mode
    output_file = 'submission.csv'
    if os.path.exists(output_file):
        os.remove(output_file)      
    
    cnn_encoder, rnn_decoder = model
    cnn_encoder.eval()
    rnn_decoder.eval()

    results = {}
    with torch.no_grad():
        for video_file in tqdm.tqdm(test_vidoes):
            file_name = video_file.split('/')[-1]
            # Make prediction
            try:
                X = readVideo(video_file)
                X = X.to(device)
                X = X.unsqueeze(0)
                output = rnn_decoder(cnn_encoder(X))
                output_prob = F.softmax(output, dim=1)
                results[file_name] = output_prob[0][1].item()
            except:
                results[file_name] = 0.5
                
    df =  pd.DataFrame([results.keys(), results.values()]).T
    df.columns = ['filename', 'label']
    df.fillna(0.5)
    df.to_csv(output_file, sep=',', index=False)
    print("Finished prediction!!!")

In [12]:
def get_videos(data_folder):
    video_files = []
    videos = os.listdir(data_folder)
    for v in videos:
        if v.endswith('mp4'):
            video_files.append(os.path.join(data_folder, v))
    return video_files

In [13]:
# Detect devices
use_cuda = torch.cuda.is_available()                   # check if GPU exists
device = torch.device("cuda" if use_cuda else "cpu")   # use CPU or GPU
video_files = get_videos(data_path)

In [14]:
# Create model
cnn_encoder = InceptV3Encoder(fc_hidden1=CNN_fc_hidden1, fc_hidden2=CNN_fc_hidden2, drop_p=dropout_p, CNN_embed_dim=CNN_embed_dim).to(device)
rnn_decoder = DecoderRNN(CNN_embed_dim=CNN_embed_dim, h_RNN_layers=RNN_hidden_layers, h_RNN=RNN_hidden_nodes,
                         h_FC_dim=RNN_FC_dim, drop_p=dropout_p, num_classes=k).to(device)

# Load model
encoder_model_path = os.path.join(save_model_path, 'cnn_encoder_epoch1_inceptv3.pth')
decoder_model_path = os.path.join(save_model_path, 'rnn_decoder_epoch1_inceptv3.pth')
cnn_encoder.load_state_dict(torch.load(encoder_model_path))
rnn_decoder.load_state_dict(torch.load(decoder_model_path))



FileNotFoundError: [Errno 2] No such file or directory: 'E:\\deepfake\\cnn_encoder_epoch1_inceptv3.pth'

In [10]:
# Predict
test([cnn_encoder, rnn_decoder], device, video_files)

100%|██████████| 400/400 [09:38<00:00,  1.45s/it]


Finished prediction!!!
