In [10]:
# Loading the dataset from the specified paths.
# Preprocessing frames to extract faces.

In [11]:
# # Unzip
# !unzip /content/videos.zip

In [12]:
# Paths to REAL and FAKE videos
REAL_VIDEOS_PATH = "/Users/vittoriostile/Developer/youtube_c40/videos"
FAKE_VIDEOS_PATHS = [
#    "/Volumes/Maxtor 4Tb/FaceForensics++/manipulated_sequences/Deepfakes/c40/videos",
#   "/Volumes/Maxtor 4Tb/FaceForensics++/manipulated_sequences/Face2Face/c40/videos",
#    "/Volumes/Maxtor 4Tb/FaceForensics++/manipulated_sequences/FaceShifter/c40/videos",
    "/Volumes/Maxtor 4Tb/FaceForensics++/manipulated_sequences/FaceSwap/c40/videos",
#    "/Volumes/Maxtor 4Tb/FaceForensics++/manipulated_sequences/NeuralTextures/c40/videos",
]

LABELS = {"REAL": 0, "FAKE": 1}

In [13]:
# from google.colab import files

In [14]:
import os
import cv2
import numpy as np
import zipfile

In [15]:
def extract_faces_from_video(video_path, label, cascade_classifier, frame_skip=1):
    """
    Estrae i volti da un video e li salva in una cartella
    e crea un file zip per i frame e i volti.

    Args:
        video_path: Percorso del video.
        label: Etichetta del video (REAL o FAKE).
        cascade_classifier: Classificatore di volti.
        frame_skip: Numero di frame da saltare tra un'elaborazione e l'altra.
    """
    faces = []
    labels = []

    # Apre il video
    cap = cv2.VideoCapture(video_path)

    # Crea la cartella di output per i volti (relativa alla directory del video)
    video_dir = os.path.dirname(video_path)
    faces_output_dir = os.path.join(os.path.dirname(video_dir), "faces")  # Directory per faces
    os.makedirs(faces_output_dir, exist_ok=True)

    # Crea la cartella di output per i frame (relativa alla directory del video)
    frames_output_dir = os.path.join(os.path.dirname(video_dir), "frames")  # Directory per frames
    os.makedirs(frames_output_dir, exist_ok=True)

    # Get the video file name without extension
    video_file_name = os.path.splitext(os.path.basename(video_path))[0]

    frame_count = 0  # Inizializza frame_count a 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Salva il frame nella cartella frames
        frame_output_path = os.path.join(frames_output_dir, f"{video_file_name}_{frame_count:03d}.jpeg")
        cv2.imwrite(frame_output_path, frame)

        if frame_count % frame_skip == 0:
            # Resize frame for faster processing (optional)
            frame = cv2.resize(frame, (640, 480))

            # Convert to grayscale for Haar Cascades
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

            # Detect faces using Haar Cascades
            faces_detected = cascade_classifier.detectMultiScale(gray, scaleFactor=1.3, minNeighbors=5)

            # Save detected faces (with frame number and face index)
            for i, (x, y, w, h) in enumerate(faces_detected):
                face = frame[y:y + h, x:x + w]
                face_resized = cv2.resize(face, (224, 224))
                face_output_path = os.path.join(faces_output_dir, f"{video_file_name}_{frame_count:03d}_face{i}.jpeg")
                cv2.imwrite(face_output_path, face_resized)
                faces.append(face_resized)
                labels.append(label)
        frame_count += 1

    # Rilascia il video
    cap.release()

    # # --- Inizio del codice per la creazione dello zip ---
    # print("Creazione dello zip dei frame e dei volti...")

    # # Nome del file zip (basato sul nome del video)
    # video_file_name = os.path.splitext(os.path.basename(video_path))[0]
    # zip_file_name = f"{video_file_name}_frames_faces.zip"

    # # Crea il file zip
    # with zipfile.ZipFile(zip_file_name, 'w') as zipf:
    #     # Aggiungi i frame allo zip
    #     for frame_file in os.listdir(frames_output_dir):
    #         frame_path = os.path.join(frames_output_dir, frame_file)
    #         zipf.write(frame_path, arcname=os.path.join('frames', frame_file))

    #     # Aggiungi i volti allo zip
    #     for face_file in os.listdir(faces_output_dir):
    #         face_path = os.path.join(faces_output_dir, face_file)
    #         zipf.write(face_path, arcname=os.path.join('faces', face_file))

    # print(f"Zip creato: {zip_file_name}")
    # --- Fine del codice per la creazione dello zip ---

    # Scarica lo zip in locale
    #files.download(zip_file_name)
    #print(f"Zip {zip_file_name} scaricato in locale.")
    # --- Fine del codice per la creazione e il download dello zip ---

    return np.array(faces), np.array(labels)

In [16]:
def load_subset_dataset(real_path, fake_paths, cascade_path):  # Prende all videos, ho rimosso un max_videos parameter usato in precedenza
    face_cascade = cv2.CascadeClassifier(cascade_path)
    faces = []
    labels = []

    # Load all REAL videos
    for video in os.listdir(real_path):  # Removed [:max_videos]
        video_path = os.path.join(real_path, video)
        face_frames, face_labels = extract_faces_from_video(video_path, LABELS["REAL"], face_cascade)
        faces.extend(face_frames)
        labels.extend(face_labels)
        # print(f"Processed REAL video: {video}, extracted {len(face_frames)} faces")  # Print per video REAL

    # Load all FAKE videos
    for fake_path in fake_paths:
        for video in os.listdir(fake_path):  # Removed [:max_videos]
            video_path = os.path.join(fake_path, video)
            face_frames, face_labels = extract_faces_from_video(video_path, LABELS["FAKE"], face_cascade)
            faces.extend(face_frames)
            labels.extend(face_labels)
            # print(f"Processed FAKE video: {video}, extracted {len(face_frames)} faces")  # Print per video FAKE

    return np.array(faces), np.array(labels)

In [17]:
def process_videos_in_batches(real_path, fake_paths, cascade_path, batch_size=50, start_batch=0, start_folder=0, process_real=True, process_fake=True):
    """
    Elabora i video in batch, consentendo di specificare il batch e la cartella di partenza.

    Args:
        real_path: Percorso della cartella dei video reali.
        fake_paths: Lista di percorsi delle cartelle dei video falsi.
        cascade_path: Percorso del classificatore a cascata.
        batch_size: Dimensione del batch di video da elaborare.
        start_batch: Batch da cui iniziare l'elaborazione (0-based).
        start_folder: Indice della cartella da cui iniziare l'elaborazione (0 per REAL, 1-5 per FAKE).
        process_real: Booleano per indicare se elaborare i video reali.
        process_fake: Booleano per indicare se elaborare i video falsi.
    """
    face_cascade = cv2.CascadeClassifier(cascade_path)
    faces = []
    labels = []
    current_batch = start_batch

    def process_folder(folder_path, label, folder_name):
        nonlocal current_batch

        # Carica lo stato dal file, se presente
        try:
            with open("stato_elaborazione.txt", "r") as f:
                current_batch, file_index = map(int, f.read().split(","))
                print(f"Ripresa dell'elaborazione dal batch {current_batch}, file {file_index}")
        except FileNotFoundError:
            file_index = 0  # Inizializza file_index se il file di stato non esiste

        # Ciclo sui batch, 20 batch per 1000 video
        for batch_index in range(start_batch, 20):  # Ciclo sui batch, 20 batch per 1000 video
            print(f"Elaborazione batch {batch_index} in cartella: {folder_name}")  # Mostra il batch effettivo (0-19)

            # Ottieni gli ultimi tre livelli del percorso della cartella
            folder_levels = folder_path.split(os.path.sep)[-3:]
            folder_path_display = os.path.sep.join(folder_levels)
            print(f"Percorso cartella: {folder_path_display}")

            # Print per i file presenti nella cartella
            print(f"File nella cartella: {os.listdir(folder_path)}")

            # Calcola l'inizio del batch corrente
            batch_start = batch_index * batch_size

            # Ottieni tutti i file nella cartella e ordinali
            all_files = sorted(os.listdir(folder_path))

            # Filtra i file per il batch corrente, considerando il formato del nome
            batch_files = [
                f for f in all_files
                if f.endswith(".mp4") and (
                    (folder_name == "REAL" and int(f.split(".")[0]) in range(batch_start, min(batch_start + batch_size, 1000))) or
                    (folder_name != "REAL" and int(f.split("_")[0]) in range(batch_start, min(batch_start + batch_size, 1000)))
                )
            ]

            # Stampa i file selezionati per il batch
            print(f"File selezionati per il batch {current_batch}: {batch_files}")

            # Elabora i file del batch
            for i, video_file in enumerate(batch_files):  # Inserisci qui enumerate
                # Salta i file gi√† elaborati se si riprende l'elaborazione
                if i < file_index:
                    continue

                video_path = os.path.join(folder_path, video_file)
                face_frames, face_labels = extract_faces_from_video(video_path, label, face_cascade)
                faces.extend(face_frames)
                labels.extend(face_labels)
                print(f"Elaborato video: {video_file}, estratti {len(face_frames)} volti")

                # Salva lo stato dopo aver elaborato ogni file
                file_index = i + 1  # Aggiorna l'indice del file
                with open("stato_elaborazione.txt", "w") as f:
                    f.write(f"{current_batch},{file_index}")

            # Resetta l'indice del file all'inizio di ogni nuovo batch
            file_index = 0

            current_batch += 1

    if process_real:
        process_folder(real_path, LABELS["REAL"], "REAL")

    if process_fake:
        for folder_index, fake_path in enumerate(fake_paths, 1):  # Inizia da 1 per le cartelle fake
            if folder_index >= start_folder:
                # Resetta current_batch a start_batch quando si inizia una nuova cartella
                current_batch = start_batch
                folder_name = os.path.basename(fake_path)  # Ottiene il nome della cartella
                process_folder(fake_path, LABELS["FAKE"], folder_name)  # Passa il nome della cartella

    return np.array(faces), np.array(labels)

In [None]:
# Esempio di utilizzo:
# Inizia dall'elaborazione dei video fake, dal batch 0 (video 100-149)
cascade_path = cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
faces, labels = process_videos_in_batches(REAL_VIDEOS_PATH, FAKE_VIDEOS_PATHS, cascade_path, start_batch=14, start_folder=0, process_real=False, process_fake=True)

Ripresa dell'elaborazione dal batch 14, file 0
Elaborazione batch 14 in cartella: videos
Percorso cartella: FaceSwap/c40/videos
File nella cartella: ['475_265.mp4', '366_473.mp4', '944_032.mp4', '135_880.mp4', '002_006.mp4', '078_955.mp4', '051_332.mp4', '193_030.mp4', '068_028.mp4', '654_648.mp4', '151_225.mp4', '460_678.mp4', '173_171.mp4', '139_130.mp4', '030_193.mp4', '862_047.mp4', '762_832.mp4', '268_269.mp4', '503_756.mp4', '213_083.mp4', '748_355.mp4', '751_752.mp4', '666_656.mp4', '892_112.mp4', '734_699.mp4', '876_891.mp4', '620_619.mp4', '391_406.mp4', '343_363.mp4', '448_361.mp4', '750_743.mp4', '402_453.mp4', '304_300.mp4', '765_867.mp4', '817_827.mp4', '228_289.mp4', '499_539.mp4', '155_576.mp4', '917_924.mp4', '380_358.mp4', '628_568.mp4', '777_745.mp4', '219_220.mp4', '603_575.mp4', '982_004.mp4', '939_115.mp4', '408_424.mp4', '107_109.mp4', '461_250.mp4', '140_143.mp4', '627_658.mp4', '185_276.mp4', '991_064.mp4', '759_755.mp4', '902_901.mp4', '592_500.mp4', '430_459.m

In [None]:
# # Extraction for real video
# for video_file in os.listdir(REAL_VIDEOS_PATH):
#     video_path = os.path.join(REAL_VIDEOS_PATH, video_file)

#     # Call extract_faces_from_video to process the video and extract frames
#     faces, labels = extract_faces_from_video(video_path, LABELS["REAL"], cv2.CascadeClassifier(cascade_path))

#     # Now you have the faces and labels for this specific video
#     print(f"Processed video: {video_file}, extracted {len(faces)} faces")