<a href="https://colab.research.google.com/github/rohingarg12/python-practice/blob/main/notebookd157e5105b.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.
import kagglehub
sanikatiwarekar_deep_fake_detection_dfd_entire_original_dataset_path = kagglehub.dataset_download('sanikatiwarekar/deep-fake-detection-dfd-entire-original-dataset')

print('Data source import complete.')


In [None]:
!pip install facenet-pytorch timm


In [None]:
import torch
from facenet_pytorch import MTCNN
import cv2
from pathlib import Path
from PIL import Image
from torchvision import transforms
from tqdm import tqdm

# Init face detector
mtcnn = MTCNN(keep_all=False, device='cuda' if torch.cuda.is_available() else 'cpu')

# Dataset paths
real_videos_dir = Path("/kaggle/input/deep-fake-detection-dfd-entire-original-dataset/DFD_original sequences")
fake_videos_dir = Path("/kaggle/input/deep-fake-detection-dfd-entire-original-dataset/DFD_manipulated_sequences/DFD_manipulated_sequences")

# Output dirs
real_output = Path("/kaggle/working/extracted_faces/real")
fake_output = Path("/kaggle/working/extracted_faces/fake")
real_output.mkdir(parents=True, exist_ok=True)
fake_output.mkdir(parents=True, exist_ok=True)

# Extraction function
def extract_faces(video_path, output_dir, every_n=15, max_faces=60):
    name = video_path.stem
    cap = cv2.VideoCapture(str(video_path))
    count = 0
    saved = 0
    while cap.isOpened() and saved < max_faces:
        ret, frame = cap.read()
        if not ret: break
        if count % every_n == 0:
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            img = Image.fromarray(rgb)
            face = mtcnn(img)
            if face is not None:
                img = transforms.ToPILImage()(face)
                img.save(output_dir / f"{name}_{saved}.jpg")
                saved += 1
        count += 1
    cap.release()

# Extract
print("🟢 Extracting Real")
for vid in tqdm(list(real_videos_dir.rglob("*.mp4"))):  # limit for testing
    extract_faces(vid, real_output)

print("🔴 Extracting Fake")
for vid in tqdm(list(fake_videos_dir.rglob("*.mp4"))):  # limit for testing
    extract_faces(vid, fake_output)

print("✅ Done!")


🟢 Extracting Real


100%|██████████| 363/363 [57:17<00:00,  9.47s/it] 


🔴 Extracting Fake


100%|██████████| 3068/3068 [6:58:30<00:00,  8.18s/it]  

✅ Done!





In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
from pathlib import Path
import timm
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from tqdm import tqdm


In [None]:
# Paths to images
real_dir = Path("/kaggle/working/extracted_faces/real")
fake_dir = Path("/kaggle/working/extracted_faces/fake")

real_paths = list(real_dir.glob("*.jpg"))
fake_paths = list(fake_dir.glob("*.jpg"))

all_paths = real_paths + fake_paths
all_labels = [0] * len(real_paths) + [1] * len(fake_paths)

# Train/val split
train_paths, val_paths, train_labels, val_labels = train_test_split(
    all_paths, all_labels, test_size=0.2, stratify=all_labels, random_state=42
)

# Transforms
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

# Dataset class
class FaceDataset(Dataset):
    def __init__(self, paths, labels, transform):
        self.paths = paths
        self.labels = labels
        self.transform = transform

    def __len__(self): return len(self.paths)

    def __getitem__(self, idx):
        img = Image.open(self.paths[idx]).convert("RGB")
        img = self.transform(img)
        return img, self.labels[idx]

# Loaders
train_set = FaceDataset(train_paths, train_labels, transform)
val_set = FaceDataset(val_paths, val_labels, transform)

train_loader = DataLoader(train_set, batch_size=8, shuffle=True)
val_loader = DataLoader(val_set, batch_size=8, shuffle=False)


In [None]:
class MesoNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 8, 3, padding=1), nn.BatchNorm2d(8), nn.ReLU(), nn.MaxPool2d(2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(8, 8, 5, padding=2), nn.BatchNorm2d(8), nn.ReLU(), nn.MaxPool2d(2))
        self.fc = nn.Linear(8 * 56 * 56, 256)

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        return self.fc(x.view(x.size(0), -1))

class DeepfakeEnsemble(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()
        self.resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
        self.resnet.fc = nn.Identity()

        self.meso = MesoNet()
        self.xception = timm.create_model("xception", pretrained=True, num_classes=0)

        self.classifier = nn.Sequential(
            nn.Linear(2048 + 256 + 2048, 512),
            nn.ReLU(), nn.Dropout(0.3),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        try:
            r = self.resnet(x)
            m = self.meso(x)
            xcep = self.xception(x)
            out = self.classifier(torch.cat([r, m, xcep], dim=1))
            return out
        except Exception as e:
            print(f"❌ Error during forward pass: {e}")
            return None


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = DeepfakeEnsemble().to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

def train(model, train_loader, val_loader, loss_fn, optimizer, device, epochs=3):
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        all_preds, all_labels = [], []

        for imgs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            if outputs is None:
                print("⛔ Skipping batch due to model error")
                continue
            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            total_loss += loss.item()
            all_preds += outputs.argmax(1).cpu().tolist()
            all_labels += labels.cpu().tolist()

        acc = accuracy_score(all_labels, all_preds)
        print(f"✅ Epoch {epoch+1} | Loss: {total_loss/len(train_loader):.4f} | Accuracy: {acc:.4f}")

        # Validation
        model.eval()
        val_preds, val_labels = [], []
        with torch.no_grad():
            for imgs, labels in val_loader:
                imgs, labels = imgs.to(device), labels.to(device)
                outputs = model(imgs)
                if outputs is None:
                    continue
                val_preds += outputs.argmax(1).cpu().tolist()
                val_labels += labels.cpu().tolist()
        val_acc = accuracy_score(val_labels, val_preds)
        print(f"🧪 Val Accuracy: {val_acc:.4f}")
        model.train()

train(model, train_loader, val_loader, loss_fn, optimizer, device, epochs=3)


  model = create_fn(
Epoch 1: 100%|██████████| 15616/15616 [42:51<00:00,  6.07it/s]


✅ Epoch 1 | Loss: 0.1705 | Accuracy: 0.9374
🧪 Val Accuracy: 0.9545


Epoch 2: 100%|██████████| 15616/15616 [42:50<00:00,  6.07it/s]


✅ Epoch 2 | Loss: 0.1021 | Accuracy: 0.9635
🧪 Val Accuracy: 0.9638


Epoch 3: 100%|██████████| 15616/15616 [42:52<00:00,  6.07it/s]


✅ Epoch 3 | Loss: 0.0826 | Accuracy: 0.9707
🧪 Val Accuracy: 0.9674


In [None]:
torch.save(model.state_dict(), "/kaggle/working/deepfake_ensemble.pt")
print("✅ Model saved to /kaggle/working/deepfake_ensemble.pt")


✅ Model saved to /kaggle/working/deepfake_ensemble.pt


In [None]:
import torch
from torch.utils.data import Dataset
from pathlib import Path
from collections import defaultdict
from PIL import Image

class VideoFaceSequenceDataset(Dataset):
    def __init__(self, root_dir, label_map, max_frames=30, transform=None):
        self.root_dir = Path(root_dir)
        self.transform = transform
        self.max_frames = max_frames
        self.samples = []

        grouped = defaultdict(list)
        for cls, label in label_map.items():
            folder = self.root_dir / cls
            for img_path in folder.glob("*.jpg"):
                video_id = img_path.stem.rsplit("_", 1)[0]
                grouped[(video_id, label)].append(img_path)

        for (video_id, label), paths in grouped.items():
            paths = sorted(paths)[:max_frames]
            self.samples.append((video_id, label, paths))

    def __len__(self): return len(self.samples)

    def __getitem__(self, idx):
        _, label, img_paths = self.samples[idx]
        frames = []
        for path in img_paths:
            img = Image.open(path).convert("RGB")
            if self.transform:
                img = self.transform(img)
            frames.append(img)

        while len(frames) < self.max_frames:
            frames.append(torch.zeros_like(frames[0]))

        return torch.stack(frames), label


In [None]:
from torchvision import transforms
from torch.utils.data import DataLoader

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

label_map = {"real": 0, "fake": 1}

video_dataset = VideoFaceSequenceDataset(
    root_dir="/kaggle/working/extracted_faces",
    label_map=label_map,
    max_frames=30,
    transform=transform
)

video_loader = DataLoader(video_dataset, batch_size=1, shuffle=True)


In [None]:
class VideoLSTMModel(nn.Module):
    def __init__(self, cnn_weights_path, hidden_size=256, num_layers=1, num_classes=2):
        super(VideoLSTMModel, self).__init__()
        self.cnn = DeepfakeEnsemble(num_classes=2)
        self.cnn.load_state_dict(torch.load(cnn_weights_path, map_location='cpu'))
        self.cnn.classifier = nn.Identity()
        for param in self.cnn.parameters():
            param.requires_grad = False

        self.lstm = nn.LSTM(
            input_size=2048 + 256 + 2048,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True
        )
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, video_seq):  # [B, T, C, H, W]
        embeddings = []
        for t in range(video_seq.size(1)):
            x = video_seq[:, t].to(device)
            with torch.no_grad():
                emb = self.cnn(x)
            embeddings.append(emb)

        seq = torch.stack(embeddings, dim=1)  # [B, T, D]
        lstm_out, _ = self.lstm(seq)
        final = lstm_out[:, -1, :]
        return self.fc(final)


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

video_model = VideoLSTMModel("/kaggle/working/deepfake_ensemble.pt").to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(video_model.parameters(), lr=1e-4)

def train_video_model(model, dataloader, criterion, optimizer, device, epochs=3):
    model.train()
    for epoch in range(epochs):
        all_preds, all_labels = [], []
        total_loss = 0.0

        for videos, labels in tqdm(dataloader, desc=f"Epoch {epoch+1}"):
            videos, labels = videos.to(device), labels.to(device)
            outputs = model(videos)
            loss = criterion(outputs, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            all_preds += outputs.argmax(1).cpu().tolist()
            all_labels += labels.cpu().tolist()

        acc = accuracy_score(all_labels, all_preds)
        print(f"✅ Epoch {epoch+1} | Loss: {total_loss / len(dataloader):.4f} | Accuracy: {acc:.4f}")


In [None]:
train_video_model(video_model, video_loader, criterion, optimizer, device, epochs=3)


Epoch 1:   7%|▋         | 246/3431 [04:01<51:28,  1.03it/s]