In [1]:
import modelbit
mb = modelbit.login()

## REAL TIME MODEL DEPLOYMENT

In [2]:
import warnings
warnings.filterwarnings('ignore')
import torch
from torch import nn
from torchvision import transforms, models
import cv2
import numpy as np
from PIL import Image
from io import BytesIO
import base64

class Model(nn.Module):
    def __init__(self, num_classes=2, latent_dim=2048, lstm_layers=1, hidden_dim=2048, bidirectional=False):
        super(Model, self).__init__()
        model = models.resnext50_32x4d(pretrained=True)  # Residual Network CNN
        self.model = nn.Sequential(*list(model.children())[:-2])
        self.lstm = nn.LSTM(latent_dim, hidden_dim, lstm_layers, bidirectional)
        self.relu = nn.LeakyReLU()
        self.dp = nn.Dropout(0.4)
        self.linear1 = nn.Linear(2048, num_classes)
        self.avgpool = nn.AdaptiveAvgPool2d(1)

    def forward(self, x):
        x = self.model(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        return self.dp(self.linear1(x))

def detection(frame_base64):
    frame_data = base64.b64decode(frame_base64)
    frame = Image.open(BytesIO(frame_data))

    im_size = 224
    mean = [0.485, 0.456, 0.406]
    std = [0.229, 0.224, 0.225]

    transform = transforms.Compose([
        transforms.Resize((im_size, im_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean, std)
    ])
    frame = transform(frame).unsqueeze(0)
    
    # Load the TorchScript model
    model_path = "traced_model.pt"
    model = torch.jit.load(model_path)
    
    model.eval()

    with torch.no_grad():
        frame_tensor = frame
        logits = model(frame_tensor)
        sm = nn.Softmax(dim=1)
        probabilities = sm(logits)
        confidence, prediction = torch.max(probabilities, dim=1)    

    if prediction == 1:
        result = {
            "status": "real",
            "confidence": confidence.item() * 100
        }
    else:
        result = {
            "status": "fake",
            "confidence": confidence.item() * 100
        }
    
    return result

In [3]:
mb.deploy(detection, extra_files={"/Users/karthiksagar/DeepFake-Detection/saved_best_model/traced_model.pt" : "traced_model.pt"})

## UPLOAD VIDEO DEPLOYMENT

#### without base64 encoding

In [13]:
import cv2
import torch
import torchvision.transforms as transforms
from PIL import Image
import os

# Model class
class Model(nn.Module):
    def __init__(self, num_classes=2, latent_dim=2048, lstm_layers=1, hidden_dim=2048, bidirectional=False):
        super(Model, self).__init__()
        model = models.resnext50_32x4d(pretrained=True)  # Residual Network CNN
        self.model = nn.Sequential(*list(model.children())[:-2])
        self.lstm = nn.LSTM(latent_dim, hidden_dim, lstm_layers, bidirectional)
        self.relu = nn.LeakyReLU()
        self.dp = nn.Dropout(0.4)
        self.linear1 = nn.Linear(2048, num_classes)
        self.avgpool = nn.AdaptiveAvgPool2d(1)

    def forward(self, x):
        x = self.model(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        return self.dp(self.linear1(x))

# Preprocessing transforms (same as during training)
im_size = 224
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
transform = transforms.Compose([
    transforms.Resize((im_size, im_size)),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

def extract_frames(video_path, frame_count=20):
    vidcap = cv2.VideoCapture(video_path)
    frames = []
    total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_interval = total_frames // frame_count
    
    success, image = vidcap.read()
    count = 0
    
    while success and len(frames) < frame_count:
        if count % frame_interval == 0:
            # Convert to PIL Image and append
            frames.append(Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)))
        success, image = vidcap.read()
        count += 1
    
    vidcap.release()
    return frames

def preprocess_frames(frames):

    preprocessed_frames = [transform(frame) for frame in frames]
    return torch.stack(preprocessed_frames).unsqueeze(0)  # Add batch dimension

def predict_video(video_path):

    # Load the model
    model_path = 'traced_model.pt'
    model = torch.jit.load(model_path)
    model.eval()
    
    # Extract and preprocess frames from the video
    frames = extract_frames(video_path, frame_count=20)
    frame_tensor = preprocess_frames(frames)
    
    # Make prediction on the frame sequence
    with torch.no_grad():
        logits = model(frame_tensor)
        sm = torch.nn.Softmax(dim=1)
        probabilities = sm(logits)
        confidence, prediction = torch.max(probabilities, dim=1)
    
    result = {
        "status": "real" if prediction.item() == 1 else "fake",
        "confidence": confidence.item() * 100
    }

    return result

In [14]:
mb.deploy(predict_video, extra_files={"/Users/karthiksagar/DeepFake-Detection/saved_best_model/traced_model.pt" : "traced_model.pt"})

#### with base64 encoded

In [42]:
import warnings
warnings.filterwarnings('ignore')
import torch
from torch import nn
from torchvision import transforms, models
import cv2
import numpy as np
from PIL import Image
from io import BytesIO
import base64
import os

def extract_faces_from_video(video_path, frame_count=20):
    # Load face cascade dynamically
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

    vidcap = cv2.VideoCapture(video_path)
    faces = []
    total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_interval = total_frames // frame_count
    
    success, image = vidcap.read()
    count = 0
    
    while success and len(faces) < frame_count:
        if count % frame_interval == 0:
            gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
            detected_faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
            for (x, y, w, h) in detected_faces:
                face = image[y:y+h, x:x+w]
                if face.size > 0:
                    face_pil = Image.fromarray(cv2.cvtColor(face, cv2.COLOR_BGR2RGB))
                    faces.append(face_pil)
        success, image = vidcap.read()
        count += 1
    
    vidcap.release()
    return faces

def preprocess_faces(faces):
    # Preprocessing transforms (same as during training)
    im_size = 224
    mean = [0.485, 0.456, 0.406]
    std = [0.229, 0.224, 0.225]
    transform = transforms.Compose([
        transforms.Resize((im_size, im_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean, std)
    ])
    preprocessed_faces = [transform(face) for face in faces]
    return torch.stack(preprocessed_faces).unsqueeze(0)  # Add batch dimension

def decode_base64_video(video_base64):
    # Decode the base64 video to binary data
    video_data = base64.b64decode(video_base64)
    
    # Save the binary data as a temporary video file
    video_path = 'temp_video.mp4'
    with open(video_path, 'wb') as f:
        f.write(video_data)
    
    return video_path

def predict_video_base64(video_base64):
    # Decode the base64 video
    video_path = decode_base64_video(video_base64)

    # Load the model
    model_path = 'traced_model.pt'
    model = torch.jit.load(model_path)
    model.eval()
    
    # Extract faces from the video
    faces = extract_faces_from_video(video_path, frame_count=20)
    if not faces:
        # Handle case where no faces were detected
        return {"status": "error", "message": "No faces detected"}
    
    # Preprocess faces
    face_tensor = preprocess_faces(faces)
    
    # Make prediction on the face sequence
    with torch.no_grad():
        logits = model(face_tensor)
        sm = torch.nn.Softmax(dim=1)
        probabilities = sm(logits)
        confidence, prediction = torch.max(probabilities, dim=1)
    
    if prediction == 1:
        result = {
            "status": "real",
            "confidence": confidence.item() * 100
        }
    else:
        result = {
            "status": "fake",
            "confidence": confidence.item() * 100
        }
    
    # Remove the temporary video file after processing
    os.remove(video_path)
    
    return result


In [43]:
mb.deploy(predict_video_base64, extra_files={"/Users/karthiksagar/DeepFake-Detection/saved_best_model/traced_model.pt" : "traced_model.pt"})

### Pytorch to Jit Model

In [27]:
class Model(nn.Module):
    def __init__(self, num_classes=2, latent_dim=2048, lstm_layers=1, hidden_dim=2048, bidirectional=False):
        super(Model, self).__init__()
        model = models.resnext50_32x4d(pretrained=True)  # Residual Network CNN
        self.model = nn.Sequential(*list(model.children())[:-2])
        self.lstm = nn.LSTM(latent_dim, hidden_dim, lstm_layers, bidirectional, batch_first=True)
        self.relu = nn.LeakyReLU()
        self.dp = nn.Dropout(0.4)
        self.linear1 = nn.Linear(hidden_dim * (2 if bidirectional else 1), num_classes)
        self.avgpool = nn.AdaptiveAvgPool2d(1)

    def forward(self, x):
        # x should have shape [batch_size, seq_len, channels, height, width]
        batch_size, seq_len, c, h, w = x.size()

        # Process each frame independently through the CNN
        x = x.view(-1, c, h, w)  # Reshape to [batch_size * seq_len, channels, height, width]
        x = self.model(x)  # Apply CNN
        x = self.avgpool(x)  # Apply pooling
        x = x.view(batch_size, seq_len, -1)  # Reshape to [batch_size, seq_len, latent_dim]

        # Apply LSTM
        x, _ = self.lstm(x)  # Get the output and hidden states
        x = x[:, -1]  # Take the output from the last time step
        x = self.relu(x)  # Apply activation function
        x = self.dp(x)  # Apply dropout
        x = self.linear1(x)  # Apply the final linear layer

        return x

In [34]:
class Model(nn.Module):
    def __init__(self, num_classes=2, latent_dim=2048, lstm_layers=1, hidden_dim=2048, bidirectional=False):
        super(Model, self).__init__()
        model = models.resnext50_32x4d(pretrained=True)  # Residual Network CNN
        self.model = nn.Sequential(*list(model.children())[:-2])
        self.lstm = nn.LSTM(latent_dim, hidden_dim, lstm_layers, bidirectional, batch_first=True)
        self.relu = nn.LeakyReLU()
        self.dp = nn.Dropout(0.4)
        self.linear1 = nn.Linear(hidden_dim * (2 if bidirectional else 1), num_classes)
        self.avgpool = nn.AdaptiveAvgPool2d(1)

    def forward(self, x):
        # x should have shape [batch_size, seq_len, channels, height, width]
        batch_size, seq_len, c, h, w = x.size()
        
        # Process each frame independently
        x = x.view(-1, c, h, w)  # Reshape to [batch_size * seq_len, channels, height, width]
        x = self.model(x)  # Apply CNN
        x = self.avgpool(x)
        x = x.view(batch_size, seq_len, -1)  # Reshape to [batch_size, seq_len, latent_dim]
        
        # Apply LSTM
        x, _ = self.lstm(x)
        
        # Process the LSTM output
        x = self.relu(x[:, -1])  # Get the output of the last time step
        x = self.dp(self.linear1(x))
        
        return x


In [35]:
model = Model(2)
ff_traced_model = torch.jit.script(model)
ff_traced_model.save('/Users/karthiksagar/DeepFake-Detection/saved_best_model/traced_model.pt')


In [None]:
# def predict_video_base64(video_base64):
#     # Decode the base64 video
#     video_path = decode_base64_video(video_base64)

#     # Load the model
#     model_path = 'traced_model.pt'
#     model = torch.jit.load(model_path)
#     model.eval()

#     # Extract and preprocess frames from the video
#     frames = extract_frames(video_path, frame_count=60)

#     highest_confidence_result = None
#     highest_confidence = 0  # Initialize the highest confidence to 0
    
#     with torch.no_grad():
#         for frame in frames:
#             frame_tensor = transform(frame).unsqueeze(0)  # Transform and add batch dimension
#             logits = model(frame_tensor)
#             sm = torch.nn.Softmax(dim=1)
#             probabilities = sm(logits)
#             confidence, prediction = torch.max(probabilities, dim=1)

#             # Create the result dictionary for the current frame
#             result = {
#                 "status": "real" if prediction.item() == 1 else "fake",
#                 "confidence": confidence.item() * 100
#             }

#             # Update the highest confidence result if the curr
# ent confidence is higher
#             if confidence.item() > highest_confidence:
#                 highest_confidence = confidence.item()
#                 highest_confidence_result = result

#     return highest_confidence_result