In [23]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
from tqdm import tqdm
from sklearn.metrics import classification_report

# --- Configuration ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BATCH_SIZE = 64
EPOCHS = 1
IMG_SIZE = 128
NUM_CLASSES = 2
EPSILON = 0.03
PGD_STEPS = 7

In [17]:
# --- Adversarial Attack Functions ---
def fgsm_attack(model, images, labels, epsilon=EPSILON):
    images.requires_grad = True
    outputs = model(images)
    loss = F.cross_entropy(outputs, labels)
    model.zero_grad()
    loss.backward()
    perturbed = images + epsilon * images.grad.sign()
    return torch.clamp(perturbed, 0, 1).detach()

def pgd_attack(model, images, labels, epsilon=EPSILON, alpha=0.01, iters=PGD_STEPS):
    perturbed = images.clone().detach()
    for _ in range(iters):
        perturbed.requires_grad = True
        outputs = model(perturbed)
        loss = F.cross_entropy(outputs, labels)
        model.zero_grad()
        loss.backward()
        with torch.no_grad():
            perturbed += alpha * perturbed.grad.sign()
            perturbed = torch.max(torch.min(perturbed, images + epsilon), images - epsilon)
            perturbed = torch.clamp(perturbed, 0, 1)
    return perturbed.detach()

def one_pixel_attack(images, pixel_count=1):
    perturbed = images.clone()
    batch_size, _, h, w = images.shape
    for i in range(batch_size):
        for _ in range(pixel_count):
            x, y = np.random.randint(0, h), np.random.randint(0, w)
            perturbed[i, :, x, y] = torch.rand(3)
    return perturbed

In [16]:
# --- Robust Model Architecture ---
class GaussianNoise(nn.Module):
    def __init__(self, std=0.1):
        super().__init__()
        self.std = std
        
    def forward(self, x):
        return x + torch.randn_like(x) * self.std if self.training else x

class RobustHybridModel(nn.Module):
    def __init__(self):
        super().__init__()
        # Input processing
        self.input_norm = nn.BatchNorm2d(3)
        self.input_denoiser = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 3, 3, padding=1)
        )
        
        # CNN backbone
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 64, 3, padding=1), nn.ReLU(),
            nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(),
            nn.MaxPool2d(2),
            GaussianNoise(0.05),
            nn.Conv2d(128, 256, 3, padding=1), nn.ReLU(),
            nn.MaxPool2d(2)
        )
        
        # Transformer
        self.patch_embed = nn.Conv2d(256, 128, kernel_size=4, stride=4)
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=128, nhead=4, dim_feedforward=256),
            num_layers=2
        )
        
        # Classifier
        self.classifier = nn.Sequential(
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, NUM_CLASSES)
        )

    def forward(self, x):
        x = self.input_norm(x)
        x = self.input_denoiser(x)
        x = self.cnn(x)
        x = self.patch_embed(x)
        x = x.flatten(2).permute(2, 0, 1)
        x = self.transformer(x)
        x = x.mean(dim=0)
        return self.classifier(x)

In [15]:
# --- Adversarial Training Loop ---
def robust_train(model, loader, optimizer, criterion):
    model.train()
    running_loss, correct = 0, 0
    for x, y in tqdm(loader, desc="Training"):
        x, y = x.to(device), y.to(device)
        
        # Generate adversarial examples
        with torch.enable_grad():
            x_fgsm = fgsm_attack(model, x, y)
            x_pgd = pgd_attack(model, x, y)
        
        x_pixel = one_pixel_attack(x)
        
        # Combined training batch
        mixed_x = torch.cat([x, x_fgsm, x_pgd, x_pixel])
        mixed_y = torch.cat([y]*4)
        
        optimizer.zero_grad()
        outputs = model(mixed_x)
        loss = criterion(outputs, mixed_y)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        correct += (outputs.argmax(1) == mixed_y).sum().item()
    
    acc = correct / (4 * len(loader.dataset))
    return running_loss / len(loader), acc


In [20]:
# --- Evaluation with Attacks ---
def adversarial_test(model, loader):
    model.eval()
    results = {'clean': {'correct': 0, 'total': 0},
               'fgsm': {'correct': 0, 'total': 0},
               'pgd': {'correct': 0, 'total': 0},
               'pixel': {'correct': 0, 'total': 0}}
    
    for x, y in tqdm(loader, desc="Testing"):
        x, y = x.to(device), y.to(device)
        
        # Clean samples
        with torch.no_grad():
            out_clean = model(x)
        results['clean']['correct'] += (out_clean.argmax(1) == y).sum().item()
        results['clean']['total'] += y.size(0)
        
        # Generate attacks
        with torch.enable_grad():
            x_fgsm = fgsm_attack(model, x, y)
            x_pgd = pgd_attack(model, x, y)
        x_pixel = one_pixel_attack(x)
        
        # Test attacks
        with torch.no_grad():
            for name, data in [('fgsm', x_fgsm), ('pgd', x_pgd), ('pixel', x_pixel)]:
                out = model(data)
                results[name]['correct'] += (out.argmax(1) == y).sum().item()
                results[name]['total'] += y.size(0)
    
    # Calculate accuracies
    metrics = {}
    for key in results:
        metrics[key] = results[key]['correct'] / results[key]['total']
    return metrics


In [8]:
import kagglehub

# Example (replace with the correct handle)
path = kagglehub.dataset_download("manjilkarki/deepfake-and-real-images")
print("Downloaded to:", path)

Downloaded to: /kaggle/input/deepfake-and-real-images


In [11]:
transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor()
])

train_dataset = datasets.ImageFolder('/kaggle/input/deepfake-and-real-images/Dataset/Train', transform=transform)
test_dataset = datasets.ImageFolder('/kaggle/input/deepfake-and-real-images/Dataset/Test', transform=transform)

# Remap targets: FAKE=1, REAL=0
for dataset in [train_dataset, test_dataset]:
    dataset.targets = [1 if x == dataset.class_to_idx['Fake'] else 0 for x in dataset.targets]

In [13]:
train_loader = DataLoader(train_dataset, BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, BATCH_SIZE, shuffle=False)

In [27]:
def normal_train(model, loader, optimizer, criterion):
    model.train()
    running_loss, correct = 0, 0
    for x, y in tqdm(loader, desc="Normal Training"):
        x, y = x.to(device), y.to(device)
        
        optimizer.zero_grad()
        outputs = model(x)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        correct += (outputs.argmax(1) == y).sum().item()
    
    acc = correct / len(loader.dataset)
    return running_loss / len(loader), acc

In [28]:
# --- Initialize Models ---
normal_model = RobustHybridModel().to(device)

optimizer_normal = torch.optim.AdamW(normal_model.parameters(), lr=1e-4, weight_decay=1e-4)
criterion = nn.CrossEntropyLoss()
EPOCHS = 3
# --- Training Loop ---
for epoch in range(EPOCHS):
    print(f"\nEpoch {epoch+1}/{EPOCHS}")
    
    # Normal Training (Clean Data)
    normal_loss, normal_acc = normal_train(normal_model, train_loader, optimizer_normal, criterion)
    print(f"Normal Train Acc: {normal_acc:.4f}")
    
    
    # Periodic Evaluation
    if (epoch+1) % 2 == 0:
        print("\nEvaluating Normal Model:")
        normal_metrics = adversarial_test(normal_model, test_loader)
        for attack, acc in normal_metrics.items():
            print(f"Normal {attack.upper():<6} Acc: {acc:.2%}")
        
def normal_train(model, loader, optimizer, criterion):
    model.train()
    running_loss, correct = 0, 0
    for x, y in tqdm(loader, desc="Normal Training"):
        x, y = x.to(device), y.to(device)
        
        optimizer.zero_grad()
        outputs = model(x)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        correct += (outputs.argmax(1) == y).sum().item()
    
    acc = correct / len(loader.dataset)
    return running_loss / len(loader), acc


Epoch 1/3


Normal Training: 100%|██████████| 2188/2188 [16:55<00:00,  2.15it/s]


Normal Train Acc: 0.6982

Epoch 2/3


Normal Training: 100%|██████████| 2188/2188 [11:34<00:00,  3.15it/s]


Normal Train Acc: 0.8816

Evaluating Normal Model:


Testing: 100%|██████████| 171/171 [04:01<00:00,  1.41s/it]


Normal CLEAN  Acc: 81.02%
Normal FGSM   Acc: 0.24%
Normal PGD    Acc: 0.04%
Normal PIXEL  Acc: 81.02%

Epoch 3/3


Normal Training: 100%|██████████| 2188/2188 [11:54<00:00,  3.06it/s]

Normal Train Acc: 0.9212





In [29]:
# --- Final Test ---
print("\n=== Final Results ===")

# Normal Model Performance
print("\n[ Normal Model ]")
normal_metrics = adversarial_test(normal_model, test_loader)
for attack, acc in normal_metrics.items():
    print(f"{attack.upper():<6} Accuracy: {acc:.2%}")


=== Final Results ===

[ Normal Model ]


Testing: 100%|██████████| 171/171 [04:02<00:00,  1.42s/it]

CLEAN  Accuracy: 84.98%
FGSM   Accuracy: 0.23%
PGD    Accuracy: 0.02%
PIXEL  Accuracy: 85.00%





In [None]:
# Model Setup
model = RobustHybridModel().to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-4)
criterion = nn.CrossEntropyLoss()


half_train_loader = torch.utils.data.DataLoader(
    train_loader.dataset,
    batch_size=train_loader.batch_size,
    sampler=torch.utils.data.SubsetRandomSampler(
        range(0, len(train_loader.dataset), 2)  # Take every other sample
    )
)

best_acc = 0
patience = 3
no_improve = 0
EPOCHS = 2
# Training
for epoch in range(EPOCHS):
    print(f"\nEpoch {epoch+1}/{EPOCHS}")
    
    # Train on half dataset
    train_loss, train_acc = robust_train(model, half_train_loader, optimizer, criterion)
    print(f"Train Acc: {train_acc:.4f}")
    
    # Periodic evaluation
    if (epoch+1) % 2 == 0:
        metrics = adversarial_test(model, test_loader)
        print("\nAdversarial Performance:")
        for attack, acc in metrics.items():
            print(f"{attack.upper():<6} Accuracy: {acc:.2%}")
        
        # Early stopping check
        current_acc = sum(metrics.values())/len(metrics)  # Average accuracy
        if current_acc > best_acc:
            best_acc = current_acc
            no_improve = 0
            torch.save(model.state_dict(), 'best_model.pth')
        else:
            no_improve += 1
            if no_improve >= patience:
                print(f"No improvement for {patience} epochs, stopping early")
                break

# Load best model
model.load_state_dict(torch.load('best_model.pth'))

In [31]:
# Final Evaluation
print("\nFinal Test Results:")
metrics = adversarial_test(model, test_loader)
for attack, acc in metrics.items():
    print(f"{attack.upper():<6} Accuracy: {acc:.2%}")

# Save Model
torch.save(model.state_dict(), "robust_ViT_DFRI.pth")


Final Test Results:


Testing: 100%|██████████| 171/171 [04:01<00:00,  1.41s/it]

CLEAN  Accuracy: 49.64%
FGSM   Accuracy: 49.64%
PGD    Accuracy: 49.64%
PIXEL  Accuracy: 49.64%



