In [2]:
# IMPORTANT: SOME KAGGLE DATA SOURCES ARE PRIVATE
# RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES.
# %pip install -q kagglehub
import kagglehub

kagglehub.login()

VBox(children=(HTML(value='<center> <img\nsrc=https://www.kaggle.com/static/images/site-logo.png\nalt=\'Kaggle…

In [3]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

superpotato9_dalle_recognition_dataset_path = kagglehub.dataset_download(
    "superpotato9/dalle-recognition-dataset"
)
spectrewolf8_random_images_dataset_path = kagglehub.dataset_download(
    "spectrewolf8/random-images-dataset"
)

print("Data source import complete.")

Downloading from https://www.kaggle.com/api/v1/datasets/download/superpotato9/dalle-recognition-dataset?dataset_version_number=7...


  1%|          | 80.0M/13.4G [01:47<5:04:29, 784kB/s] 


KeyboardInterrupt: 

In [None]:
# !pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu124
# !pip install opencv-python --upgrade
# !pip install ultralytics --upgrade
# !pip install torchmetrics --upgrade
# !pip install grad-cam --upgrade
# !pip install natsort --upgrade
# !pip install Pillow --upgrade
# !pip install wandb --upgrade
# !pip install lightning --upgrade

### >Restart kernel!


In [None]:
# Importing necessary libraries
import torch
import torch.nn as nn
import torch.nn.functional as tnf
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset, random_split
from PIL import Image, ImageChops
import os, shutil
import matplotlib.pyplot as plt
import numpy as np
import cv2
from torchmetrics import Accuracy, Precision, Recall, F1Score, ConfusionMatrix

# from ultralytics import YOLO
import hashlib
from natsort import natsorted
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
from pytorch_grad_cam.utils.image import show_cam_on_image

# Dataset


## Generate Dataset


In [None]:
# !rm -rf '/kaggle/working/temp_dataset'

In [None]:
#create directories
!mkdir '/kaggle/working/temp_dataset'
!mkdir '/kaggle/working/temp_dataset/real'
!mkdir '/kaggle/working/temp_dataset/fake'
!mkdir '/kaggle/working/models'

# #copy real images
# !cp -r '/kaggle/input/random-images-dataset/random_images_dataset/training/all_images/'* '/kaggle/working/temp_dataset/real/'
# !cp -r '/kaggle/input/dalle-recognition-dataset/real/'* '/kaggle/working/temp_dataset/real/'
!find '/kaggle/input/random-images-dataset/random_images_dataset/training/all_images/' -type f -print0 | xargs -0 cp -t '/kaggle/working/temp_dataset/real/'
!find '/kaggle/input/dalle-recognition-dataset/real/' -type f -print0 | xargs -0 cp -t '/kaggle/working/temp_dataset/real/'

# #copy fake images
# !cp -r '/kaggle/input/dalle-recognition-dataset/fakeV2/fake-v2/'* '/kaggle/working/temp_dataset/fake/'
!find '/kaggle/input/dalle-recognition-dataset/fakeV2/fake-v2/' -type f -print0 | xargs -0 cp -t '/kaggle/working/temp_dataset/fake/'

In [None]:
#remove unnecessary files
!rm -rf '/kaggle/working/temp_dataset/real/r-art.txt'
!rm -rf '/kaggle/working/temp_dataset/fake/sort'
!rm -rf '/kaggle/working/temp_dataset/fake/dataset-metadata.json'
!rm -rf '/kaggle/working/temp_dataset/fake/12479.jpg'

In [None]:
# Print number of files
real_files = [
    os.path.join("/kaggle/working/temp_dataset/real", f)
    for f in os.listdir("/kaggle/working/temp_dataset/real")
    if os.path.isfile(os.path.join("/kaggle/working/temp_dataset/real", f))
]
fake_files = [
    os.path.join("/kaggle/working/temp_dataset/fake", f)
    for f in os.listdir("/kaggle/working/temp_dataset/fake")
    if os.path.isfile(os.path.join("/kaggle/working/temp_dataset/fake", f))
]

print(f"Number of real images: {len(real_files)}")
print(f"Number of fake images: {len(fake_files)}")

In [None]:
from PIL import Image
import os


def is_valid_image(file_path):
    try:
        with Image.open(file_path) as img:
            # Check image size
            if img.width * img.height > 178956970:  # PIL's max pixel limit
                print(f"Oversized image: {file_path}")
                return False

            # Additional checks
            img.verify()  # Verify the image is not corrupted
        return True
    except Exception as e:
        print(f"Invalid image {file_path}: {e}")
        return False


# Modify your dataset loading to skip invalid images
valid_image_paths_1 = [path for path in real_files if is_valid_image(path)]
valid_image_paths_2 = [path for path in fake_files if is_valid_image(path)]

## Prepare Dataset


In [None]:
transform = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]
)

In [None]:
class AiGenDataset(Dataset):
    def __init__(self, image_dir, cap=1000, transform=None):
        """
        Args:
            image_dir (str): Directory with all the images.
            transform (callable, optional): Optional transform to be applied on an image.
        """
        self.image_dir = image_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []
        self.samples_cap = cap

        # Assuming there are two folders: 'real' and 'deepfake' in 'dataset' directory
        for label in ["real", "fake"]:
            label_dir = os.path.join(image_dir, label)

            # Iterate over all images in 'real' and 'fake' directories
            for idx, img_file in enumerate(os.listdir(label_dir)):
                if idx > self.samples_cap:
                    break
                img_path = os.path.join(label_dir, img_file)
                self.image_paths.append(img_path)

                # Assign labels: 'real' -> 0, 'fake' -> 1
                self.labels.append(0 if label == "real" else 1)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # try:
        img_path = self.image_paths[idx]
        label = self.labels[idx]

        # Load image and convert it to RGB
        image = Image.open(img_path).convert("RGB")

        # Apply the transform, if provided
        if self.transform:
            image = self.transform(image)

        # except Exception as e:
        #     print(f"Error loading image {img_path}: {e}")
        #     image = Image.new('RGB', (224, 224), color='black')
        return image, label

In [None]:
# /kaggle/working/dataset/
# ├── real/
# └── fake/

# Define the dataset with all data
dataset = AiGenDataset(image_dir="/kaggle/working/temp_dataset", transform=transform)

# Split the dataset into train, validation, and test sets
train_size = int(0.85 * len(dataset))
val_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Create DataLoaders for each set
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Print details for confirmation
print(f"Train dataset size: {len(train_dataset)}")
print(f"Validation dataset size: {len(val_dataset)}")
print(f"Test dataset size: {len(test_dataset)}")
print(f"Train loader: {train_loader}")
print(f"Validation loader: {val_loader}")
print(f"Test loader: {test_loader}")

In [None]:
from torchvision.models import ResNeXt101_32X8D_Weights

# model = models.resnext50_32x4d()
# model = models.resnext101_32x8d()

model = models.resnext101_32x8d(weights=ResNeXt101_32X8D_Weights.DEFAULT)

# Modify the last layer to match the number of classes (real or fake)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 2)  # 2 classes: real and fake

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(device)
# print(device,'\n', model)

In [None]:
# Initialize metrics
accuracy_metric = Accuracy(task="multiclass", num_classes=2).to(device)
precision_metric = Precision(task="multiclass", num_classes=2, average="macro").to(device)
recall_metric = Recall(task="multiclass", num_classes=2, average="macro").to(device)
f1_metric = F1Score(task="multiclass", num_classes=2, average="macro").to(device)
confusion_matrix_metric = ConfusionMatrix(task="multiclass", num_classes=2).to(device)

## Train loop


In [None]:
num_epochs = 15
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    # Reset metrics at start of epoch
    accuracy_metric.reset()
    precision_metric.reset()
    recall_metric.reset()
    f1_metric.reset()

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        # Update metrics batch by batch
        accuracy_metric.update(outputs, labels)
        precision_metric.update(outputs, labels)
        recall_metric.update(outputs, labels)
        f1_metric.update(outputs, labels)

    # Compute epoch metrics
    epoch_loss = running_loss / len(train_loader)
    epoch_acc = accuracy_metric.compute() * 100
    epoch_precision = precision_metric.compute()
    epoch_recall = recall_metric.compute()
    epoch_f1 = f1_metric.compute()

    print(
        f"--------\nEpoch [{epoch + 1}/{num_epochs}]"
        f"\nTrain Loss: {epoch_loss:.4f}"
        f"\nTrain Metrics:"
        f"\n  Accuracy: {epoch_acc:.2f}%"
        f"\n  Precision: {epoch_precision:.2f}"
        f"\n  Recall: {epoch_recall:.2f}"
        f"\n  F1 Score: {epoch_f1:.2f}"
    )

## Val loop


In [None]:
model.eval()  # Set the model to evaluation mode
val_loss = 0.0

# Reset all metrics at the start of validation
accuracy_metric.reset()
precision_metric.reset()
recall_metric.reset()
f1_metric.reset()
confusion_matrix_metric.reset()

with torch.no_grad():  # Disable gradient computation
    for images, labels in val_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)
        val_loss += loss.item()

        # Update metrics batch by batch
        accuracy_metric.update(outputs, labels)
        precision_metric.update(outputs, labels)
        recall_metric.update(outputs, labels)
        f1_metric.update(outputs, labels)
        confusion_matrix_metric.update(outputs, labels)

# Compute final metrics
val_loss = val_loss / len(val_loader)
val_accuracy = accuracy_metric.compute() * 100
val_precision = precision_metric.compute()
val_recall = recall_metric.compute()
val_f1 = f1_metric.compute()
val_confusion_matrix = confusion_matrix_metric.compute()

print(f"Validation Loss: {val_loss:.4f}, Accuracy: {val_accuracy:.2f}%")
print(f"Precision: {val_precision:.2f}, Recall: {val_recall:.2f}, F1 Score: {val_f1:.2f}")
print(f"Confusion Matrix:\n{val_confusion_matrix.cpu().numpy()}")

# Confusion matrix
# [[TN  FP]
#  [FN  TP]]

# TN = True Negatives
# FP = False Positives
# FN = False Negatives
# TP = True Positives

## Test loop


In [None]:
model.eval()  # Set the model to evaluation mode
test_loss = 0.0

# Reset all metrics before testing
accuracy_metric.reset()
precision_metric.reset()
recall_metric.reset()
f1_metric.reset()
confusion_matrix_metric.reset()

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)
        test_loss += loss.item()

        # Update metrics batch by batch
        accuracy_metric.update(outputs, labels)
        precision_metric.update(outputs, labels)
        recall_metric.update(outputs, labels)
        f1_metric.update(outputs, labels)
        confusion_matrix_metric.update(outputs, labels)

# Compute final metrics
test_loss = test_loss / len(test_loader)
test_accuracy = accuracy_metric.compute() * 100
test_precision = precision_metric.compute()
test_recall = recall_metric.compute()
test_f1 = f1_metric.compute()
test_confusion_matrix = confusion_matrix_metric.compute()

print(
    f"Test Metrics:"
    f"\n  Loss: {test_loss:.4f}"
    f"\n  Accuracy: {test_accuracy:.2f}%"
    f"\n  Precision: {test_precision:.2f}"
    f"\n  Recall: {test_recall:.2f}"
    f"\n  F1 Score: {test_f1:.2f}"
)
print(f"Confusion Matrix:\n{test_confusion_matrix.cpu().numpy()}")

## Exporting model


In [None]:
# torch.save(model.state_dict(), '/kaggle/working/model/acc99.49_test-1_deepfake_detector_resnext50.pth')
torch.save(model, "/kaggle/working/models/acc94.00_test-1.0_AI_image_detector_resnext101_32x8d.pth")

## Manual Testing


In [None]:
def load_test_image(image_path, show_image=False):
    if show_image:
        cv_img = cv2.imread(image_path)

        # Convert the image from BGR to RGB for displaying with matplotlib
        cv_img_rgb = cv2.cvtColor(cv_img, cv2.COLOR_BGR2RGB)

        # Display the image with detected faces
        plt.figure(figsize=(8, 6))
        plt.imshow(cv_img_rgb)
        plt.axis("off")  # Hide axis
        plt.title(f"Test image")
        plt.show()

    #     Define the transformations (should be the same as used in training)
    transform = transforms.Compose(
        [
            transforms.Resize((224, 224)),  # Resize the image to the input size of the model
            transforms.ToTensor(),  # Convert image to tensor
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
            ),  # Normalize as per the pre-trained model's requirements
        ]
    )

    # Load the image
    image = Image.open(image_path).convert("RGB")

    # Apply the transformations
    image = transform(image)

    # Add a batch dimension (models expect a batch of images, even if it's just one image)
    image = image.unsqueeze(0)

    return image

In [None]:
image_path = "/kaggle/input/dalle-recognition-dataset/fakeV2/fake-v2/10005.jpg"
image = load_test_image(image_path=image_path, show_image=True)
# Make prediction
with torch.no_grad():  # No need to compute gradients for inference
    image = image.to(device)
    output = model(image)  # Forward pass

    # Apply softmax to get probabilities
    probabilities = tnf.softmax(output, dim=1)

    # Get the class with the highest probability
    confidence, predicted = torch.max(probabilities, 1)  # Get the class with highest probability

# Convert the prediction to a label (assuming you have a mapping of class indices to labels)
label_map = {0: "real", 1: "fake"}  # Adjust based on your dataset
predicted_label = label_map[predicted.item()]

# Print predicted label and confidence score
confidence_score = confidence.item()
print(f"Predicted label: {predicted_label}")
print(f"Confidence score: {confidence_score*100:.2f}")

# Load Model


In [None]:
# Assuming your model is already defined and loaded
# model = models.resnext50_32x4d()  # Replace with your actual model definition
loaded_model = torch.load(
    "/kaggle/working/model/acc98.00_test-2.1_CROPS_deepfake_detector_resnext101_32x8d.pth",
    map_location="cpu",
    weights_only=False,
)  # Load the trained model weights
loaded_model.eval()  # Set the model to evaluation mode

# If using GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
loaded_model.to(device)

# Shifting to pytorch lightening


In [None]:
# import pytorch_lightning as pl
# import torch
# import torch.nn as nn
# import torchvision.models as models
# from torchmetrics import Accuracy, Precision, Recall, F1Score
# from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping

# class AIImageDetector(pl.LightningModule):
#     def __init__(self, learning_rate=0.001):
#         super().__init__()
#         # Load the ResNeXt model
#         self.model = models.resnext101_64x4d(weights='DEFAULT')

#         # Modify the last layer for binary classification
#         num_ftrs = self.model.fc.in_features
#         self.model.fc = nn.Linear(num_ftrs, 2)

#         # Save hyperparameters
#         self.save_hyperparameters()

#         # Loss function
#         self.criterion = nn.CrossEntropyLoss()

#         # Metrics
#         self.train_accuracy = Accuracy(task="multiclass", num_classes=2)
#         self.val_accuracy = Accuracy(task="multiclass", num_classes=2)
#         self.test_accuracy = Accuracy(task="multiclass", num_classes=2)

#         self.train_precision = Precision(task="multiclass", num_classes=2, average='macro')
#         self.val_precision = Precision(task="multiclass", num_classes=2, average='macro')
#         self.test_precision = Precision(task="multiclass", num_classes=2, average='macro')

#         self.train_recall = Recall(task="multiclass", num_classes=2, average='macro')
#         self.val_recall = Recall(task="multiclass", num_classes=2, average='macro')
#         self.test_recall = Recall(task="multiclass", num_classes=2, average='macro')

#         self.train_f1 = F1Score(task="multiclass", num_classes=2, average='macro')
#         self.val_f1 = F1Score(task="multiclass", num_classes=2, average='macro')
#         self.test_f1 = F1Score(task="multiclass", num_classes=2, average='macro')

#     def forward(self, x):
#         return self.model(x)

#     def training_step(self, batch, batch_idx):
#         x, y = batch
#         logits = self(x)
#         loss = self.criterion(logits, y)

#         # Calculate and log metrics
#         self.train_accuracy(logits, y)
#         self.train_precision(logits, y)
#         self.train_recall(logits, y)
#         self.train_f1(logits, y)

#         self.log('train_loss', loss, prog_bar=True)
#         self.log('train_accuracy', self.train_accuracy, prog_bar=True)
#         self.log('train_precision', self.train_precision)
#         self.log('train_recall', self.train_recall)
#         self.log('train_f1', self.train_f1)

#         return loss

#     def validation_step(self, batch, batch_idx):
#         x, y = batch
#         logits = self(x)
#         loss = self.criterion(logits, y)

#         # Calculate and log metrics
#         self.val_accuracy(logits, y)
#         self.val_precision(logits, y)
#         self.val_recall(logits, y)
#         self.val_f1(logits, y)

#         self.log('val_loss', loss, prog_bar=True)
#         self.log('val_accuracy', self.val_accuracy, prog_bar=True)
#         self.log('val_precision', self.val_precision)
#         self.log('val_recall', self.val_recall)
#         self.log('val_f1', self.val_f1)

#         return loss

#     def test_step(self, batch, batch_idx):
#         x, y = batch
#         logits = self(x)
#         loss = self.criterion(logits, y)

#         # Calculate and log metrics
#         self.test_accuracy(logits, y)
#         self.test_precision(logits, y)
#         self.test_recall(logits, y)
#         self.test_f1(logits, y)

#         self.log('test_loss', loss, prog_bar=True)
#         self.log('test_accuracy', self.test_accuracy, prog_bar=True)
#         self.log('test_precision', self.test_precision)
#         self.log('test_recall', self.test_recall)
#         self.log('test_f1', self.test_f1)

#         return loss

#     def configure_optimizers(self):
#         optimizer = torch.optim.Adam(self.parameters(), lr=self.hparams.learning_rate)
#         return optimizer

In [None]:
# # Create PyTorch Lightning DataModule
# class AIImageDataModule(pl.LightningDataModule):
#     def __init__(self, train_loader, val_loader, test_loader):
#         super().__init__()
#         self.train_loader = train_loader
#         self.val_loader = val_loader
#         self.test_loader = test_loader

#     def train_dataloader(self):
#         return self.train_loader

#     def val_dataloader(self):
#         return self.val_loader

#     def test_dataloader(self):
#         return self.test_loader

In [None]:
# from pytorch_lightning.loggers import WandbLogger  # or TensorBoardLogger

# # Initialize logger
# wandb_logger = WandbLogger(project='ai-image-detection')

In [None]:
# # Create the data module
# data_module = AIImageDataModule(train_loader, val_loader, test_loader)

# # Create the model
# model = AIImageDetector(learning_rate=0.001)

# # Define callbacks
# checkpoint_callback = ModelCheckpoint(
#     monitor='val_accuracy',
#     dirpath='checkpoints/',
#     filename='ai-detector-{epoch:02d}-{val_accuracy:.2f}',
#     save_top_k=3,
#     mode='max'
# )

# early_stop_callback = EarlyStopping(
#     monitor='val_loss',
#     patience=5,
#     mode='min'
# )

# # Initialize trainer
# trainer = pl.Trainer(
#     max_epochs=15,
#     accelerator='auto',  # Automatically detect GPU/CPU
#     devices=1,
#     callbacks=[checkpoint_callback, early_stop_callback],
#     # logger=wandb_logger
# )

# # Train the model
# trainer.fit(model, data_module)

# # Test the model
# trainer.test(model, data_module)

In [None]:
# torch.save(model, '/kaggle/working/models/acc94.00_test-2.0_AI_image_detector_resnext101_32x8d.pth')