In [None]:
drive.mount('/content/drive')
#before running upload the task_1 dataset in your google drive

#put the name of your folder (or the relative path)
folder_name = "face_db"
folder_path = f"/content/drive/My Drive/{folder_name}/"

os.listdir(folder_path)

In [None]:
!ls drive/MyDrive/deepfake_det_task1/development/real

In [None]:
def get_image_paths_and_labels(base_path, label):
    image_paths = []
    for root, _, files in os.walk(base_path):
        for file in files:
            if file.lower().endswith(('.png', '.jpg', '.jpeg')):
                image_paths.append((os.path.join(root, file), label))
    return image_paths

In [None]:
real_path = os.path.join(folder_path, "development", "real")
fake_path = os.path.join(folder_path, "development", "fake")
real_test_path = os.path.join(folder_path, "evaluation", "real")
fake_test_path = os.path.join(folder_path, "evaluation", "fake")

In [None]:
#extract paths and labels for all images of training data
real_images = get_image_paths_and_labels(real_path, label=0)  # 0 for real
fake_images = get_image_paths_and_labels(fake_path, label=1)  # 1 for fake

In [None]:
# compare number of images per set
print(len(real_images))
print(len(fake_images))

In [None]:
import cv2
import numpy as np
import os
from PIL import Image
import torch
from torch.utils.data import Dataset
import random

class DeepFakeDataset(Dataset):
    def __init__(self, real_dir, fake_dir, transform=None, target_size=(224, 224)):
        # Get image paths and labels
        self.real_images = self._get_image_paths(real_dir)
        self.fake_images = self._get_image_paths(fake_dir)
        self.images = self.real_images + self.fake_images
        self.labels = [0] * len(self.real_images) + [1] * len(self.fake_images)

        # Create pairs of image paths and labels
        self.data = list(zip(self.images, self.labels))
        # Shuffle the dataset
        random.shuffle(self.data)

        self.transform = transform
        self.target_size = target_size

        # Load the Haar cascade classifier for face detection
        self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

        # Pre-filter the dataset
        self._prefilter_dataset()

    def _prefilter_dataset(self):
        """Pre-filter the dataset to remove images without faces"""
        filtered_data = []
        print(f"Pre-filtering dataset of {len(self.data)} images...")

        for img_path, label in self.data:
            try:
                image = cv2.imread(img_path)
                if image is None:
                    print(f"Failed to read image {img_path}")
                    continue

                # Convert image to grayscale for face detection
                gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

                # Detect faces
                faces = self.face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))

                if len(faces) > 0:
                    filtered_data.append((img_path, label))
                else:
                    print(f"No face detected in image {img_path}")
            except Exception as e:
                print(f"Error processing image {img_path}: {str(e)}")

        print(f"Filtered dataset contains {len(filtered_data)} images")
        if len(filtered_data) == 0:
            print("WARNING: No images with faces found! Using original dataset.")
            self.data = self.data  # Keep original data if no faces found
        else:
            self.data = filtered_data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, label = self.data[idx]

        try:
            # Read the image
            image = cv2.imread(img_path)
            if image is None:
                return self._create_dummy_item(label, img_path)

            # Save a copy of the original image for display purposes
            original_image = image.copy()

            # Convert image to grayscale for face detection
            gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

            # Detect faces
            faces = self.face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))

            if len(faces) > 0:
                # Use the largest face detected
                largest_face = max(faces, key=lambda face: face[2] * face[3])
                (x, y, w, h) = largest_face

                # Expand the face bounding box slightly for better results
                padding = int(0.2 * max(w, h))  # 20% padding
                x_start = max(0, x - padding)
                y_start = max(0, y - padding)
                x_end = min(image.shape[1], x + w + padding)
                y_end = min(image.shape[0], y + h + padding)

                # Crop the image to the face region
                face_image = image[y_start:y_end, x_start:x_end]
            else:
                # This should not happen due to pre-filtering, but just in case
                face_image = image

            # Convert BGR to RGB (since OpenCV uses BGR by default)
            face_image = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)

            # Resize the image to the target size
            face_image = cv2.resize(face_image, self.target_size)

            # Convert to PIL Image for transformations
            face_image = Image.fromarray(face_image)

            # Apply the transformations (ToTensor, Normalize, etc.)
            if self.transform:
                face_image = self.transform(face_image)

            # Convert original image to RGB for consistent return
            original_rgb = cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB)
            original_rgb = cv2.resize(original_rgb, self.target_size)
            original_rgb = Image.fromarray(original_rgb)

            if self.transform:
                # Apply same transform to original image to ensure consistent tensor shapes
                original_tensor = self.transform(original_rgb)
            else:
                # If no transform, convert to tensor manually
                original_tensor = torch.from_numpy(np.array(original_rgb).transpose((2, 0, 1)) / 255.0).float()

            return face_image, label, original_tensor, img_path

        except Exception as e:
            print(f"Error processing image {img_path}: {str(e)}")
            return self._create_dummy_item(label, img_path)

    def _create_dummy_item(self, label, img_path):
        """Create a dummy item with the correct dimensions"""
        dummy_image = np.zeros((self.target_size[0], self.target_size[1], 3), dtype=np.uint8)
        dummy_pil = Image.fromarray(dummy_image)

        if self.transform:
            dummy_tensor = self.transform(dummy_pil)
        else:
            dummy_tensor = torch.from_numpy(dummy_image.transpose((2, 0, 1)) / 255.0).float()

        return dummy_tensor, label, dummy_tensor, img_path

    def _get_image_paths(self, dir_path):
        image_paths = []
        for root, _, files in os.walk(dir_path):
            for file in files:
                if file.lower().endswith(('.png', '.jpg', '.jpeg')):
                    image_paths.append(os.path.join(root, file))
        return image_paths

In [None]:
!pip install dlib
!wget http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2
!bzip2 -d shape_predictor_68_face_landmarks.dat.bz2

In [None]:
import dlib

predictor_path = "shape_predictor_68_face_landmarks.dat"  # Download from dlib

detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor(predictor_path)

In [None]:
import cv2
import numpy as np
import os
from PIL import Image
import torch
from torch.utils.data import Dataset
import random

class DeepFakeDataset(Dataset):

    def __init__(self, real_dir, fake_dir, transform=None, target_size=(224, 224)):
        self.real_images = self._get_image_paths(real_dir)
        self.fake_images = self._get_image_paths(fake_dir)
        self.images = self.real_images + self.fake_images
        self.labels = [0] * len(self.real_images) + [1] * len(self.fake_images)

        self.data = list(zip(self.images, self.labels))
        random.shuffle(self.data)

        self.transform = transform
        self.target_size = target_size

        self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
        self._prefilter_dataset()

    def _prefilter_dataset(self):
        filtered_data = []
        print(f"Pre-filtering dataset of {len(self.data)} images...")
        for img_path, label in self.data:
            try:
                image = cv2.imread(img_path)
                if image is None:
                    continue
                gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
                faces = self.face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
                if len(faces) > 0:
                    filtered_data.append((img_path, label))
            except Exception as e:
                print(f"Error processing image {img_path}: {str(e)}")
        self.data = filtered_data if filtered_data else self.data

    def _align_face2(self, image, face):
      """
      Aligns the face using dlib's 68-point facial landmarks.
      """
      gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

      # Get facial landmarks
      landmarks = self.predictor(gray, face)

      # Extract eye centers
      left_eye = np.mean([(landmarks.part(i).x, landmarks.part(i).y) for i in range(36, 42)], axis=0)
      right_eye = np.mean([(landmarks.part(i).x, landmarks.part(i).y) for i in range(42, 48)], axis=0)

      # Calculate rotation angle
      dx = right_eye[0] - left_eye[0]
      dy = right_eye[1] - left_eye[1]
      angle = np.degrees(np.arctan2(dy, dx))

      # Define the center of the face
      center = (face.left() + face.width() // 2, face.top() + face.height() // 2)

      # Rotate the image to align the eyes horizontally
      M = cv2.getRotationMatrix2D(center, angle, 1.0)
      aligned_image = cv2.warpAffine(image, M, (image.shape[1], image.shape[0]), flags=cv2.INTER_CUBIC)

      # Crop the aligned face
      aligned_face = aligned_image[face.top():face.bottom(), face.left():face.right()]

      return aligned_face

    def _align_face(self, image, face):
      (x, y, w, h) = face

      # Don't process faces that are too small
      if w < 50 or h < 50:
          return image[y:y+h, x:x+w]

      gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

      try:
          eyes = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_eye.xml')
          roi_gray = gray[y:y+h, x:x+w]

          # Detect eyes with stricter parameters
          detected_eyes = eyes.detectMultiScale(roi_gray,
                                              scaleFactor=1.1,
                                              minNeighbors=5,
                                              minSize=(20, 20))

          # Create a copy of the original image
          image_copy = image.copy()

          if len(detected_eyes) >= 2:
              # Filter eyes by vertical position (eyes should be in the upper half of the face)
              upper_eyes = [eye for eye in detected_eyes if eye[1] < h/2]

              if len(upper_eyes) >= 2:
                  # Sort eyes by x-coordinate to get left and right eyes
                  eye_centers = sorted(upper_eyes, key=lambda ex: ex[0])[:2]

                  # Calculate eye centers in the original image coordinates
                  left_eye_center = (x + eye_centers[0][0] + eye_centers[0][2]//2,
                                    y + eye_centers[0][1] + eye_centers[0][3]//2)
                  right_eye_center = (x + eye_centers[1][0] + eye_centers[1][2]//2,
                                    y + eye_centers[1][1] + eye_centers[1][3]//2)

                  # Calculate angle for horizontal alignment
                  dx = right_eye_center[0] - left_eye_center[0]
                  dy = right_eye_center[1] - left_eye_center[1]
                  angle = np.degrees(np.arctan2(dy, dx))

                  # Limit rotation angle to prevent excessive rotation
                  if abs(angle) > 30:
                      print(f"Excessive rotation angle {angle} detected, limiting to ±30 degrees")
                      angle = np.sign(angle) * 30

                  # Ensure center is properly formatted as a tuple of floats
                  center = (float(x + w//2), float(y + h//2))

                  try:
                      # Apply rotation to the entire image
                      M = cv2.getRotationMatrix2D(center, angle, 1.0)
                      rotated_image = cv2.warpAffine(image_copy, M, (image.shape[1], image.shape[0]),
                                                    flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)

                      # Extract and return the aligned face from the rotated image
                      return rotated_image[y:y+h, x:x+w]
                  except Exception as e:
                      print(f"Error during rotation: {str(e)}")
                      return image[y:y+h, x:x+w]  # Return original cropped face

      except Exception as e:
          print(f"Error during eye detection: {str(e)}")

      # If any step fails, return original face crop
      return image[y:y+h, x:x+w]

    def __extract_landmarks__(self, image):
      landmarks = predictor(gray, face)
      # Convert to NumPy array
      points = np.array([[p.x, p.y] for p in landmarks.parts()], dtype=np.int32)

      return points

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, label = self.data[idx]
        try:
            image = cv2.imread(img_path)
            if image is None:
                return self._create_dummy_item(label, img_path)

            gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
            faces = self.face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))

            if len(faces) > 0:
                face = max(faces, key=lambda f: f[2] * f[3])
                face_image = self._align_face2(image, face)
            else:
                face_image = image

            face_image = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
            face_image = cv2.resize(face_image, self.target_size)
            face_image = Image.fromarray(face_image)

            if self.transform:
                face_image = self.transform(face_image)

            original_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            original_rgb = cv2.resize(original_rgb, self.target_size)
            original_rgb = Image.fromarray(original_rgb)
            original_tensor = self.transform(original_rgb) if self.transform else torch.from_numpy(np.array(original_rgb).transpose((2, 0, 1)) / 255.0).float()

            return face_image, label, original_tensor, img_path
        except Exception as e:
            print(f"Error processing image {img_path}: {str(e)}")
            return self._create_dummy_item(label, img_path)

    def _create_dummy_item(self, label, img_path):
        dummy_image = np.zeros((self.target_size[0], self.target_size[1], 3), dtype=np.uint8)
        dummy_pil = Image.fromarray(dummy_image)
        dummy_tensor = self.transform(dummy_pil) if self.transform else torch.from_numpy(dummy_image.transpose((2, 0, 1)) / 255.0).float()
        return dummy_tensor, label, dummy_tensor, img_path

    def _get_image_paths(self, dir_path):
        image_paths = []
        for root, _, files in os.walk(dir_path):
            for file in files:
                if file.lower().endswith(('.png', '.jpg', '.jpeg')):
                    image_paths.append(os.path.join(root, file))
        return image_paths


In [None]:
from PIL import Image, ImageFilter

class EdgeEnhanceTransform:
    def __call__(self, img):
        return img.filter(ImageFilter.EDGE_ENHANCE)  # Apply edge enhancement


In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to 224x224 (optional, if not done earlier)
    EdgeEnhanceTransform(),  # Apply edge enhancement
    transforms.ToTensor(),  # Convert to tensor (also scales values to [0, 1])
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])

    #transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize the image
])

In [None]:
dataset = DeepFakeDataset(real_dir=real_path, fake_dir=fake_path, transform=transform)

# Select an example image


In [None]:
test_dataset = DeepFakeDataset(real_dir=real_test_path, fake_dir = fake_test_path, transform=transform)

In [None]:
a,b,c,d = dataset[0]

In [None]:
import matplotlib.pyplot as plt
import torchvision.transforms as transforms
import torch

def visualize_normalized_image(dataset, num_samples=5):
    """Visualizes images after normalization."""

    fig, axes = plt.subplots(num_samples, 2, figsize=(8, num_samples * 4))

    for i in range(num_samples):
        face_tensor, label, original_tensor, img_path = dataset[i]

        # Convert tensors to numpy arrays for display
        face_image = face_tensor.permute(1, 2, 0).cpu().numpy()  # CHW → HWC
        original_image = original_tensor.permute(1, 2, 0).cpu().numpy()

        # Scale from [-1,1] to [0,1] (ONLY for visualization)
        face_image = (face_image - face_image.min()) / (face_image.max() - face_image.min())
        original_image = (original_image - original_image.min()) / (original_image.max() - original_image.min())

        # Plot images
        axes[i, 0].imshow(original_image)
        axes[i, 0].set_title(f"Original Image (After Normalization)\n{img_path.split('/')[-1]}")
        axes[i, 0].axis("off")

        axes[i, 1].imshow(face_image)
        axes[i, 1].set_title("Detected Face (After Normalization)")
        axes[i, 1].axis("off")

    plt.tight_layout()
    plt.show()


In [None]:
visualize_normalized_image(test_dataset, num_samples=10)

In [None]:
visualize_normalized_image(dataset, num_samples=10)

In [None]:
# Split into train/val datasets
from torch.utils.data import random_split
#train_size = int(0.8 * len(dataset))
#val_size = len(dataset) - train_size
#train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Dataloaders
train_loader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=2)
val_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=2)

print(f'Total images: {len(dataset) + len(test_dataset)}, Train: {len(dataset)}, Validation: {len(test_dataset)}')

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torchvision import models, transforms
from torch.utils.data import DataLoader
from tqdm import tqdm
import matplotlib.pyplot as plt
import time
import copy

# Training loop with early stopping
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler=None,
                num_epochs=10, device='cuda', patience=5, save_path='best_model.pth'):

    # Move model to device


    # Initialize variables for early stopping
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    no_improve_epochs = 0

    # Track metrics
    history = {
        'train_loss': [], 'train_acc': [],
        'val_loss': [], 'val_acc': []
    }

    # Time tracking
    start_time = time.time()

    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
                dataloader = train_loader
            else:
                model.eval()
                dataloader = val_loader

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data batches

            for inputs, labels, original_imgs, img_paths in tqdm(dataloader, desc=f"{phase}"):
                # Our dataset returns 4 items, but we only need the first two for training
                inputs = inputs.to(device)
                labels = labels.to(device)

                # Zero gradients
                optimizer.zero_grad()

                # Forward pass
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)  # Model outputs probabilities (sigmoid applied)
                    loss = criterion(outputs, labels)

                    # Convert outputs to binary predictions (0 or 1)
                    preds = torch.round(outputs)  # Sigmoid threshold at 0.5

                    # Backward pass and optimization only in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # Accumulate loss and accuracy
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels)

            # Compute epoch loss and accuracy
            epoch_loss = running_loss / len(dataloader.dataset)
            epoch_acc = running_corrects.double() / len(dataloader.dataset)

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # Store history
            if phase == 'train':
                history['train_loss'].append(epoch_loss)
                history['train_acc'].append(epoch_acc.item())

                # Update learning rate scheduler
                if scheduler is not None:
                    scheduler.step(epoch_loss)
            else:
                history['val_loss'].append(epoch_loss)
                history['val_acc'].append(epoch_acc.item())

                # Save best model based on validation accuracy
                if epoch_acc > best_acc:
                    best_acc = epoch_acc
                    best_model_wts = copy.deepcopy(model.state_dict())
                    no_improve_epochs = 0

                    # Save best model
                    torch.save(model.state_dict(), save_path)
                    print(f"Saved best model with accuracy: {best_acc:.4f}")
                else:
                    no_improve_epochs += 1

        print()

        # Early stopping
        if no_improve_epochs >= patience:
            print(f"Early stopping triggered after {epoch+1} epochs")
            break

    # Training complete
    time_elapsed = time.time() - start_time
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:.4f}')

    # Load best model weights
    model.load_state_dict(best_model_wts)

    return model, history

# Function to visualize results
def plot_training_history(history):
    epochs = range(1, len(history['train_loss']) + 1)

    plt.figure(figsize=(12, 5))

    # Plot loss
    plt.subplot(1, 2, 1)
    plt.plot(epochs, history['train_loss'], 'b-', label='Training Loss')
    plt.plot(epochs, history['val_loss'], 'r-', label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    # Plot accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs, history['train_acc'], 'b-', label='Training Accuracy')
    plt.plot(epochs, history['val_acc'], 'r-', label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.tight_layout()
    plt.show()

# Function to visualize predictions with sample images
import torch
import numpy as np
import matplotlib.pyplot as plt

def visualize_misclassified(model, test_loader, device, num_samples=10, mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]):
    model.eval()
    fig, axes = plt.subplots(2, 5, figsize=(20, 8))
    axes = axes.flatten()

    count = 0
    with torch.no_grad():
        for inputs, labels, _, img_paths in test_loader:  # Removed original_imgs
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for i in range(inputs.size(0)):
                if count >= num_samples:
                    break

                if preds[i] != labels[i]:  # Only show misclassified samples
                    # Get probability of predicted class
                    probabilities = torch.nn.functional.softmax(outputs[i], dim=0)
                    pred_prob = probabilities[preds[i]].item()

                    # Convert tensor to numpy for visualization
                    img = inputs[i].cpu().numpy().transpose((1, 2, 0))
                    img = img * np.array(std) + np.array(mean)  # Unnormalize
                    img = np.clip(img, 0, 1)  # Ensure values are in valid range

                    axes[count].imshow(img)

                    title = f"Pred: {'Fake' if preds[i] else 'Real'} ({pred_prob:.2f})\nTrue: {'Fake' if labels[i] else 'Real'}"
                    axes[count].set_title(title, color='red')  # Always red for misclassified
                    axes[count].axis('off')

                    count += 1

            if count >= num_samples:
                break

    plt.tight_layout()
    plt.show()



# Example of how to use this code


In [None]:
# Parameters
batch_size = 16
num_epochs = 20
learning_rate = 1e-4
#device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
patience = 5  # For early stopping

model = DeepFakeDetector()
device = torch.device("cpu")
model = model.to(device)
#model = models.resnet18()
#model.fc = nn.Linear(model.fc.in_features, 2)  # Adjust for your output classes

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Add learning rate scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',
    factor=0.1,
    patience=3,
    verbose=True
)

# Train the model
model, history = train_model(
    model,
    train_loader,
    val_loader,
    criterion,
    optimizer,
    scheduler=None,
    num_epochs=num_epochs,
    device=device,
    patience=patience,
    save_path='best_deepfake_model.pth'
)

# Visualize training history
plot_training_history(history)

# Visualize some predictions
visualize_misclassified(model, val_loader, device)

In [None]:
visualize_misclassified(model, val_loader, device)

In [None]:
#ciao caro
#extract the testing data

#create the labels


In [None]:
#create a CNN with three layers

In [None]:
#massimo