In [15]:
import os
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import cv2
from tqdm import tqdm
from facenet_pytorch import MTCNN
from torchvision import models
# from sklearn.cluster import KMeans
# from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import numpy as np

ImportError: DLL load failed while importing _multiarray_umath: The specified module could not be found.

In [13]:
import torch
print(torch.version.cuda)
print(torch.cuda.is_available())


11.8
True


In [14]:

# -------------------- STEP 0: GPU SETUP --------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [None]:
def extract_faces_from_video(video_path, mtcnn, max_frames=30):
    cap = cv2.VideoCapture(video_path)
    faces = []
    frame_count = 0
    while cap.isOpened() and frame_count < max_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        face = mtcnn(frame_rgb)
        if face is not None:
            faces.append(face)
            frame_count += 1
    cap.release()
    if len(faces) == max_frames:
        return torch.stack(faces)
    else:
        return None

class VideoFaceDataset(Dataset):
    def __init__(self, video_paths, mtcnn, transform=None, max_frames=30):
        self.video_paths = video_paths
        self.mtcnn = mtcnn
        self.transform = transform
        self.max_frames = max_frames

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        video_path = self.video_paths[idx]
        faces = extract_faces_from_video(video_path, self.mtcnn, self.max_frames)
        if faces is None:
            raise ValueError(f"Insufficient faces in video: {video_path}")
        if self.transform:
            faces = torch.stack([self.transform(face) for face in faces])
        return faces, video_path

def get_video_paths(main_folder):
    paths = []
    if not os.path.exists(main_folder):
        raise ValueError(f"Folder not found: {main_folder}")
    for file in os.listdir(main_folder):
        if file.endswith(('.mp4', '.avi')):
            paths.append(os.path.join(main_folder, file))
    return paths

class GRUClassifier(nn.Module):
    def __init__(self, feature_size=512, hidden_size=256, num_classes=2):
        super().__init__()
        self.resnet = models.resnet18(pretrained=True)
        self.resnet.fc = nn.Identity()
        self.gru = nn.GRU(input_size=feature_size, hidden_size=hidden_size, batch_first=True)
        self.classifier = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        B, T, C, H, W = x.size()
        x = x.view(B * T, C, H, W)
        feats = self.resnet(x)
        feats = feats.view(B, T, -1)
        _, hidden = self.gru(feats)
        out = self.classifier(hidden[-1])
        return out

def extract_video_features(dataloader, model, device):
    model.eval()
    all_feats = []
    video_paths = []
    with torch.no_grad():
        for faces, paths in tqdm(dataloader, desc="Extracting Features"):
            if faces is None or len(faces) == 0:
                print(f"Skipping video (no faces): {paths}")
                continue
            faces = faces.to(device)
            B, T, C, H, W = faces.size()
            faces = faces.view(B * T, C, H, W)
            feats = model.resnet(faces)
            feats = feats.view(B, T, -1)
            avg_feat = feats.mean(dim=1).squeeze().cpu().numpy()
            if avg_feat.ndim == 1 and avg_feat.size == 512:
                all_feats.append(avg_feat)
                video_paths.extend(paths)
            else:
                print(f"Skipping video due to invalid feature shape: {paths}")
    if len(all_feats) == 0:
        raise ValueError("No valid features extracted. Cannot perform clustering.")
    return np.array(all_feats), video_paths

if __name__ == "__main__":
    video_dir = "E:\DFDC dataset\dfdc_train_part_00\dfdc_train_part_0"  # Change accordingly

    print("Loading video paths...")
    video_paths = get_video_paths(video_dir)
    print(f"Total videos found: {len(video_paths)}")

    if len(video_paths) == 0:
        raise ValueError("No videos found in the folder!")

    mtcnn = MTCNN(image_size=160, device=device)
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.Normalize([0.5]*3, [0.5]*3)
    ])

    dataset = VideoFaceDataset(video_paths, mtcnn, transform=transform, max_frames=30)
    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)

    model = GRUClassifier()
    model.to(device)

    # Extract features
    features, paths = extract_video_features(dataloader, model, device)

    print(f"Features shape: {features.shape}")

    # Use 1 cluster since only one folder of videos
    n_clusters = 1
    print(f"Clustering with {n_clusters} cluster(s)...")
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    clusters = kmeans.fit_predict(features)

    # Visualize clusters (t-SNE is still possible even for 1 cluster)
    print("Visualizing with t-SNE...")
    tsne = TSNE(n_components=2, perplexity=30, random_state=42)
    reduced = tsne.fit_transform(features)

    plt.figure(figsize=(10, 7))
    plt.scatter(reduced[:, 0], reduced[:, 1], c=clusters, cmap='coolwarm', s=15)
    plt.title("t-SNE Visualization of Video Feature Clusters")
    plt.xlabel("Component 1")
    plt.ylabel("Component 2")
    plt.colorbar(label='Cluster')
    plt.show()


In [1]:
import matplotlib.pyplot as plt

In [2]:
plt.figure(figsize=(10, 7))
plt.scatter(reduced[:, 0], reduced[:, 1], c=clusters, cmap='coolwarm', s=15)
plt.title("t-SNE Visualization of Video Feature Clusters")
plt.xlabel("Component 1")
plt.ylabel("Component 2")
plt.colorbar(label='Cluster')
plt.show()

NameError: name 'reduced' is not defined

<Figure size 1000x700 with 0 Axes>

In [25]:
# Save model weights
torch.save(model.state_dict(), "main_model_11.pth")
print("Model saved as gru_classifier.pth")


Model saved as gru_classifier.pth


In [26]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torch.utils.data as data
import cv2
from tqdm import tqdm
import numpy as np
from facenet_pytorch import MTCNN
from torchvision import models
from sklearn.cluster import KMeans
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import pandas as pd

from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

# -------------------- DEVICE --------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# -------------------- STEP 1: FACE EXTRACTION --------------------
def extract_faces_from_video(video_path, mtcnn, max_frames=30):
    cap = cv2.VideoCapture(video_path)
    faces = []
    frame_count = 0

    while cap.isOpened() and frame_count < max_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        face = mtcnn(frame_rgb)
        if face is not None:
            faces.append(face)
            frame_count += 1

    cap.release()

    if len(faces) == max_frames:
        return torch.stack(faces)  # Shape: [30, 3, 160, 160]
    else:
        return None

def load_dataset(main_folder, max_frames=30):
    mtcnn = MTCNN(image_size=160, device=device)
    videos = []
    labels = []

    for label_name in ['real', 'fake']:
        label_folder = os.path.join(main_folder, label_name)
        if not os.path.exists(label_folder):
            continue
        label = 1 if label_name == 'real' else 0

        for file in tqdm(os.listdir(label_folder)):
            if not file.endswith(('.mp4', '.avi')):
                continue
            video_path = os.path.join(label_folder, file)
            print(f"Processing: {file} ({label_name})")
            face_seq = extract_faces_from_video(video_path, mtcnn, max_frames)
            if face_seq is not None:
                videos.append(face_seq)
                labels.append(label)
            else:
                print(f"Skipped (not enough faces): {file}")

    print(f"Total valid sequences: {len(videos)}")
    return videos, labels

# -------------------- STEP 2: DATASET --------------------
class DeepfakeDataset(Dataset):
    def __init__(self, face_sequences, labels, transform=None):
        self.sequences = face_sequences
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        seq = self.sequences[idx]
        if self.transform:
            seq = torch.stack([self.transform(img) for img in seq])
        return seq, torch.tensor(self.labels[idx], dtype=torch.long)

# -------------------- STEP 3: MODEL --------------------
class GRUClassifier(nn.Module):
    def __init__(self, feature_size=512, hidden_size=256, num_classes=2):
        super().__init__()
        self.resnet = models.resnet18(pretrained=True)
        self.resnet.fc = nn.Identity()
        self.gru = nn.GRU(input_size=feature_size, hidden_size=hidden_size, batch_first=True)
        self.classifier = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        B, T, C, H, W = x.size()
        x = x.view(B * T, C, H, W)
        feats = self.resnet(x)
        feats = feats.view(B, T, -1)
        _, hidden = self.gru(feats)
        out = self.classifier(hidden[-1])
        return out

# -------------------- STEP 4: FEATURE EXTRACTION + CLUSTERING --------------------
def extract_video_features(sequences, model):
    model.eval()
    all_feats = []
    with torch.no_grad():
        for seq in tqdm(sequences, desc="Extracting Features"):
            seq = torch.stack([transforms.Resize((224, 224))(img) for img in seq])
            seq = transforms.Normalize([0.5]*3, [0.5]*3)(seq)
            seq = seq.unsqueeze(0).to(device)  # [1, T, 3, H, W]

            B, T, C, H, W = seq.size()
            seq = seq.view(B * T, C, H, W)
            feats = model.resnet(seq)  # [B*T, 512]
            feats = feats.view(B, T, -1)
            avg_feat = feats.mean(dim=1).squeeze().cpu().numpy()  # [512]
            all_feats.append(avg_feat)
    return all_feats


ModuleNotFoundError: No module named 'pandas'

In [None]:

# -------------------- STEP 5: RUN PIPELINE --------------------
if __name__ == "__main__":
    video_dir = "E:\DFDC dataset\dfdc_train_part_00\dfdc_train_part_0"  # path to folder containing 'real/' and 'fake/'
    face_sequences, labels = load_dataset(video_dir, max_frames=30)

    if len(face_sequences) == 0:
        raise ValueError("No valid face sequences extracted.")

    model = GRUClassifier()
    model.to(device)

    # Extract features
    features = extract_video_features(face_sequences, model)

    # K-Means Clustering
    print("Clustering...")
    kmeans = KMeans(n_clusters=2, random_state=42)
    clusters = kmeans.fit_predict(features)

    # Visualize
    print("Visualizing...")
    tsne = TSNE(n_components=2, perplexity=5, random_state=42)
    reduced = tsne.fit_transform(features)

    plt.figure(figsize=(8, 6))
    plt.scatter(reduced[:, 0], reduced[:, 1], c=clusters, cmap='coolwarm')
    plt.title("t-SNE Clustering of Deepfake Features")
    plt.xlabel("Component 1")
    plt.ylabel("Component 2")
    plt.show()

    # Save cluster pseudo-labels
    df = pd.DataFrame({
        'video_index': list(range(len(clusters))),
        'pseudo_label': clusters
    })
    df.to_csv("pseudo_labels.csv", index=False)
    print("Cluster assignments saved to 'pseudo_labels.csv'.")


In [None]:

    # Save model
torch.save(model.state_dict(), "unsupervised_gru_10.pth")
print("Model weights saved to 'unsupervised_gru_10.pth'")


In [None]:


# -------------------- STEP 1: FACE EXTRACTION --------------------
def extract_faces_from_video(video_path, mtcnn, max_frames=30):
    cap = cv2.VideoCapture(video_path)
    faces = []
    frame_count = 0

    while cap.isOpened() and frame_count < max_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        face = mtcnn(frame_rgb)
        if face is not None:
            faces.append(face)
            frame_count += 1

    cap.release()

    if len(faces) == max_frames:
        return torch.stack(faces)  # Shape: [30, 3, 160, 160]
    else:
        return None

def load_dataset(main_folder, max_frames=30):
    mtcnn = MTCNN(image_size=160, device=device)
    videos = []
    labels = []

    for label_name in ['real', 'fake']:
        label_folder = os.path.join(main_folder, label_name)
        if not os.path.exists(label_folder):
            continue
        label = 1 if label_name == 'real' else 0

        for file in tqdm(os.listdir(label_folder)):
            if not file.endswith((".mp4", ".avi")):
                continue
            video_path = os.path.join(label_folder, file)
            print(f"Processing: {file} ({label_name})")
            face_seq = extract_faces_from_video(video_path, mtcnn, max_frames)
            if face_seq is not None:
                videos.append(face_seq)
                labels.append(label)
            else:
                print(f"Skipped (not enough faces): {file}")

    print(f"Total valid sequences: {len(videos)}")
    return videos, labels


In [9]:

# -------------------- STEP 2: DATASET --------------------
class DeepfakeDataset(Dataset):
    def __init__(self, face_sequences, labels, transform=None):
        self.sequences = face_sequences
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        seq = self.sequences[idx]
        if self.transform:
            seq = torch.stack([self.transform(img) for img in seq])
        return seq, torch.tensor(self.labels[idx], dtype=torch.long)


In [10]:

# -------------------- STEP 3: MODEL --------------------
class GRUClassifier(nn.Module):
    def __init__(self, feature_size=512, hidden_size=256, num_classes=2):
        super().__init__()
        self.resnet = models.resnet18(pretrained=True)
        self.resnet.fc = nn.Identity()  # Remove classification layer

        self.gru = nn.GRU(input_size=feature_size, hidden_size=hidden_size, batch_first=True)
        self.classifier = nn.Linear(hidden_size, num_classes)

    def forward(self, x):  # x: [B, T, 3, H, W]
        B, T, C, H, W = x.size()
        x = x.view(B * T, C, H, W)
        feats = self.resnet(x)  # [B*T, 512]
        feats = feats.view(B, T, -1)  # [B, T, 512]
        _, hidden = self.gru(feats)
        out = self.classifier(hidden[-1])  # last hidden state
        return out


In [None]:

# -------------------- STEP 4: TRAINING --------------------
def train_model(model, train_loader, val_loader, num_epochs=10):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-4)

    model.to(device)

    for epoch in range(num_epochs):
        model.train()
        total_loss, correct, total = 0, 0, 0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            
            outputs = model(inputs)
            
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        acc = correct / total * 100
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss:.4f}, Train Accuracy: {acc:.2f}%")

        model.eval()
        correct, total = 0, 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs, 1)
                correct += (predicted == labels).sum().item()
                total += labels.size(0)
        val_acc = correct / total * 100
        print(f"Validation Accuracy: {val_acc:.2f}%")


In [None]:

# -------------------- STEP 5: RUN FULL PIPELINE --------------------

if __name__ == "__main__":
    video_dir = "E:\DFDC dataset\dfdc_train_part_00\dfdc_train_part_0"  # Folder with 'real/' and 'fake/' subfolders
    face_sequences, labels = load_dataset(video_dir, max_frames=30)

    if len(face_sequences) == 0:
        raise ValueError("No valid face sequences extracted. Check your video files.")

    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.Normalize([0.5]*3, [0.5]*3)  # Normalize to [-1, 1]
    ])

    X_train, X_val, y_train, y_val = train_test_split(face_sequences, labels, test_size=0.2, random_state=42)

    train_dataset = DeepfakeDataset(X_train, y_train, transform=transform)
    val_dataset = DeepfakeDataset(X_val, y_val, transform=transform)

    train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=4)

    model = GRUClassifier()
    model.summary()
    