In [None]:
import cv2
import torch
import numpy as np
from torch import nn
from google.colab import drive
import torchvision.transforms as T
import torchvision.models as models
from torchvision import models, transforms
from torch.utils.data.dataset import Dataset
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [3]:
class Model(nn.Module):
    def __init__(self, num_classes, latent_dim = 2048, lstm_layers = 1, hidden_dim = 2048, bidirectional = False):
        super(Model, self).__init__()
        model = models.resnext50_32x4d(weights=models.ResNeXt50_32X4D_Weights.IMAGENET1K_V1)
        self.model = nn.Sequential(*list(model.children())[:-2])
        self.lstm = nn.LSTM(latent_dim, hidden_dim, lstm_layers, bidirectional = bidirectional, bias=False)
        self.relu = nn.LeakyReLU()
        self.dp = nn.Dropout(0.4)
        self.linear1 = nn.Linear(2048, num_classes)
        self.avgpool = nn.AdaptiveAvgPool2d(1)

    def forward(self, x):
        batch_size, seq_length, c, h, w = x.shape
        x = x.view(batch_size * seq_length, c, h, w)
        fmap = self.model(x)
        x = self.avgpool(fmap)
        x = x.view(batch_size, seq_length, 2048)
        x_lstm, _ = self.lstm(x, None)
        return fmap, self.dp(self.linear1(x_lstm[:, -1, :]))

In [17]:
im_size = 112
mean=[0.485, 0.456, 0.406]
std=[0.229, 0.224, 0.225]

sm = nn.Softmax(dim=1)
inv_normalize =  transforms.Normalize(mean = -1 * np.divide(mean, std), std = np.divide([1, 1, 1], std))

def predict(model, img, path='./'):
    fmap, logits = model(img.to(device))
    params = list(model.parameters())
    weight_softmax = model.linear1.weight.detach().cpu().numpy()
    logits = sm(logits)
    _, prediction = torch.max(logits, 1)
    confidence = logits[:, int(prediction.item())].item() * 100
    return [int(prediction.item()), confidence]

In [5]:
class validation_dataset(Dataset):
    def __init__(self, video_names, sequence_length = 60, transform = None, output_video_path = 'cropped_output_video.mp4'):
        self.video_names = video_names
        self.transform = transform
        self.count = sequence_length
        self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
        self.output_video_path = output_video_path
        self.out_writer = None
        self.last_bbox = None
        self.padding = 50

    def __len__(self):
        return len(self.video_names)

    def __getitem__(self, idx):
        video_path = self.video_names[idx]
        frames = []
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise ValueError(f"Could not open video: {video_path}")

        fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        a = max(1, total_frames // self.count)
        first_frame = np.random.randint(0, a)
        frame_count = 0

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            if frame_count < first_frame:
                frame_count += 1
                continue

            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            faces = self.face_cascade.detectMultiScale(gray, scaleFactor = 1.1, minNeighbors = 5, minSize = (30, 30))

            if len(faces) > 0:
                x, y, w, h = faces[0]
                self.last_bbox = (x, y, w, h)
            elif self.last_bbox is not None:
                x, y, w, h = self.last_bbox
            else:
                continue

            height, width = frame.shape[:2]
            x = max(0, x - self.padding)
            y = max(0, y - self.padding)
            w = min(width - x, w + 2 * self.padding)
            h = min(height - y, h + 2 * self.padding)

            cropped_frame = frame[y:y+h, x:x+w, :]
            if cropped_frame.size == 0:
                continue

            if self.out_writer is None:
                out_height, out_width = cropped_frame.shape[:2]
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                self.out_writer = cv2.VideoWriter(self.output_video_path, fourcc, fps, (out_width, out_height))

            self.out_writer.write(cropped_frame)

            transformed_frame = self.transform(cropped_frame)
            frames.append(transformed_frame)

            if len(frames) == self.count:
                break

        cap.release()
        if self.out_writer is not None:
            self.out_writer.release()

        if not frames:
            raise ValueError(f'No valid frames found in video: {video_path}')

        frames = torch.stack(frames)
        frames = frames[:self.count]
        return frames.unsqueeze(0)

In [18]:
my_transforms = T.Compose([
    T.ToPILImage(),
    T.Resize((112, 112)),
    T.ToTensor(),
    T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

model = Model(2).to(device)
path_to_model = '/content/drive/My Drive/model_93_acc_100_frames_celeb_FF_data.pt'
model.load_state_dict(torch.load(path_to_model, map_location = device))
model.eval()

Model(
  (model): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (4): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
        (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv3): Conv2d(128, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (downsample): Sequential(
          (0)

In [26]:
path_video_fake = '/root/.cache/kagglehub/datasets/sanikatiwarekar/deep-fake-detection-dfd-entire-original-dataset/versions/1/DFD_manipulated_sequences/DFD_manipulated_sequences/01_03__talking_angry_couch__JZUXXFRB.mp4'

video_dataset = validation_dataset([path_video_fake], sequence_length = 20, transform = my_transforms, output_video_path = 'cropped_output_video.mp4')

for i in range(len(video_dataset)):
    prediction = predict(model, video_dataset[0], './')
    if prediction[0] == 1:
        print(f'Confidence Of Prediction: {np.floor(prediction[1])}%')
        print(' ')
        print('Real')
    else:
        print(f'Confidence Of Prediction: {np.floor(prediction[1])}%')
        print(' ')
        print('Fake')

Confidence Of Prediction: 63.0%
 
Fake
