In [73]:
import cv2
import numpy as np
import os
import glob
import random
import matplotlib.pyplot as plt

import pandas as pd
from sklearn.decomposition import PCA

from scipy import stats
from scipy.stats import skew
from sklearn.feature_selection import f_classif # for ANOVA F-test

import mediapipe as mp

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix

import pickle
from sklearn.impute import SimpleImputer

In [83]:
SEED = 1
np.random.seed(SEED)
random.seed(SEED)

### Histogram analysis

In [84]:
# Load the pre-trained Haar Cascade classifier for face detection.
# This file is typically located in the OpenCV data directory.
# You might need to adjust this path depending on your OpenCV installation.
FACE_CASCADE_PATH = '/workspaces/proyecto/.venv/lib/python3.14/site-packages/cv2/data/' + 'haarcascade_frontalface_default.xml'
face_cascade = cv2.CascadeClassifier(FACE_CASCADE_PATH)

# --- Check if the cascade file loaded successfully ---
if face_cascade.empty():
    print(f"ERROR: Could not load the Haar Cascade classifier from: {FACE_CASCADE_PATH}")
    print("Please ensure you have the OpenCV data files installed and the path is correct.")
    # In a real application, you might raise an error here.
# ---------------------------------------------------


def get_random_face_sequence(video_path, window_size, face_cascade):
    """
    Finds a random continuous sequence of 'window_size' frames, detects the face, 
    crops the frames to the face bounding box, and converts them to YCbCr color space.

    Args:
        video_path (str): The path to the video file.
        window_size (int): The required number of consecutive frames in the sequence.
        face_cascade (cv2.CascadeClassifier): The pre-loaded Haar Cascade classifier.

    Returns:
        list of np.ndarray: A list of cropped face frames in YCbCr color space, 
                            or an empty list if no face is detected or the video 
                            is too short/invalid.
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        return []

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Determine the starting frame index for the sequence
    start_frame = 0
    end_frame = total_frames - 1

    if total_frames >= window_size:
        # Select a random starting frame index for a continuous sequence of 'window_size'
        max_start_index = total_frames - window_size
        start_frame = random.randint(0, max_start_index)
        end_frame = start_frame + window_size - 1
    else:
        # Video is too short, use the entire video
        window_size = total_frames
        end_frame = total_frames - 1
        
    # --- 1. Detect Face in the FIRST frame of the sequence ---
    # We assume the face position doesn't change significantly over 'window_size' frames.
    cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
    ret, frame_t_rgb = cap.read()
    
    if not ret:
        cap.release()
        return []

    # Convert frame to grayscale for faster face detection
    gray_t = cv2.cvtColor(frame_t_rgb, cv2.COLOR_BGR2GRAY) 
    
    # Detect faces. Returns (x, y, w, h) for each face.
    faces = face_cascade.detectMultiScale(
        gray_t, 
        scaleFactor=1.1, 
        minNeighbors=5, 
        minSize=(30, 30)
    )

    if len(faces) == 0:
        # Skip if no face is detected in the starting frame
        cap.release()
        return []
    
    # Use the largest detected face (x, y, w, h)
    x, y, w, h = max(faces, key=lambda f: f[2] * f[3])

    # --- 2. Extract and Crop all Frames in the Sequence ---
    face_sequence_ycbcr = []
    
    # Reset to the start of the sequence
    cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)

    for _ in range(window_size):
        ret, frame = cap.read()
        if not ret:
            # Should not happen if total_frames was calculated correctly, but safe check
            break 
            
        # Crop to the face bounding box
        cropped_frame_rgb = frame[y:y+h, x:x+w]
        
        # Convert to YCbCr color space
        cropped_frame_ycbcr = cv2.cvtColor(cropped_frame_rgb, cv2.COLOR_BGR2YCrCb)
        
        face_sequence_ycbcr.append(cropped_frame_ycbcr)

    cap.release()
    return face_sequence_ycbcr

In [85]:
def extract_emd_temporal_features_ycc(face_sequence_ycbcr, window_size=30, n_bins=64):
    """
    Calcula la Distancia EMD de cada histograma a la media temporal de la secuencia 
    en los canales de color Y, Cb y Cr, y extrae estadísticas de la variación.

    Args:
        face_sequence_ycbcr (list): Lista de arreglos numpy (cuadros) en espacio YCbCr,
                                    ya recortados al bounding box de la cara.
        window_size (int): Longitud de la secuencia (N).
        n_bins (int): Número de bins para el histograma.

    Returns:
        dict: Diccionario con las estadísticas (Media, Varianza, Rango) 
              de las variaciones EMD para Y, Cb y Cr (9 características en total).
    """
    
    if len(face_sequence_ycbcr) != window_size:
        window_size = len(face_sequence_ycbcr)
        if window_size == 0:
            return {f'emd_{c}_mean': 0.0 for c in ['y', 'cb', 'cr']} # Retorno seguro

    channels = [0, 1, 2] # 0=Y, 1=Cb, 2=Cr
    channel_names = ['y', 'cb', 'cr']
    
    all_hists = {name: [] for name in channel_names}
    
    hist_range = [0, 256] 
    
    # --- 1. Cálculo de Histograma para cada canal de cada cuadro ---
    for frame in face_sequence_ycbcr:
        for i, name in zip(channels, channel_names):
            # Extraer histograma para el canal i
            hist = cv2.calcHist([frame], [i], None, [n_bins], hist_range)
            # Normalizar y aplanar
            hist = cv2.normalize(hist, hist).flatten() 
            all_hists[name].append(hist)

    results = {}
    
    # --- 2. Iterar sobre cada canal (Y, Cb, Cr) para el análisis EMD ---
    for name in channel_names:
        hists_array = np.array(all_hists[name])
        
        # Cálculo del Histograma Promedio Temporal (H_bar)
        h_bar = np.mean(hists_array, axis=0)

        variation_emd = []
        
        # Convertir a float32 para la función cv2.compareHist
        h_bar_f32 = h_bar.astype('float32')

        EMD_COMPARE_METHOD = 3  # cv2.HISTCMP_EMD

        # Cálculo de las Variaciones EMD respecto a la media
        for i in range(window_size):
            hist_f32 = hists_array[i].astype('float32')
            
            # Usamos cv2.HISTCMP_EMD (Distancia de Earth Mover)
            emd_value = cv2.compareHist(hist_f32, h_bar_f32, EMD_COMPARE_METHOD)
            variation_emd.append(emd_value)

        # --- 3. Extracción de Métricas Estadísticas de la Variación EMD ---
        var_array = np.array(variation_emd)
        
        results[f'emd_{name}_mean'] = np.mean(var_array)
        results[f'emd_{name}_variance'] = np.var(var_array)
        results[f'emd_{name}_range'] = np.max(var_array) - np.min(var_array)

    return results

In [86]:
def get_histogram_analysis(root_dir, window_size, videos_per_category, num_bins=64):

    all_histograms = {}
    category_dirs = [d for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))]

    # Loop through each category
    for category in category_dirs:
        print(f"--- Processing category: **{category}** ---")
        category_path = os.path.join(root_dir, category)
        all_video_paths = glob.glob(os.path.join(category_path, '*.mp4')) 
        
        # Sample videos if there are more than required
        if len(all_video_paths) > videos_per_category:
            video_paths = random.sample(all_video_paths, videos_per_category)
        else:
            video_paths = all_video_paths
        
        category_features = {}
        
        # Process each video
        for video_path in video_paths:
            
            # --- MODULAR CALL: Get the face sequence in YCbCr ---
            face_sequence_ycbcr = get_random_face_sequence(video_path, window_size, face_cascade)
            
            if not face_sequence_ycbcr:
                # No face detected or video invalid/too short
                continue
            
            # --- MODULAR CALL: Extract EMD Temporal Features ---
            emd_features = extract_emd_temporal_features_ycc(face_sequence_ycbcr, window_size, num_bins)
            category_features[video_path] = emd_features
        
        all_histograms[category] = category_features
        print(f"Processed **{len(video_paths)}** videos. Total **{len(category_features)}** face-focused features extracted.")

    return all_histograms

#### Llamada

In [99]:
random.seed(SEED)
VIDEO_ROOT_DIR = '/workspaces/proyecto/workspace/src_c40'
WINDOW_SIZE = 100
VIDEOS_PER_CATEGORY = 250
NUM_BINS = 64

histogram_features = get_histogram_analysis(VIDEO_ROOT_DIR, WINDOW_SIZE, VIDEOS_PER_CATEGORY, NUM_BINS)

--- Processing category: **Deepfakes** ---
Processed **250** videos. Total **248** face-focused features extracted.
--- Processing category: **Original** ---
Processed **250** videos. Total **250** face-focused features extracted.
--- Processing category: **FaceShifter** ---
Processed **250** videos. Total **250** face-focused features extracted.
--- Processing category: **FaceSwap** ---
Processed **250** videos. Total **248** face-focused features extracted.
--- Processing category: **NeuralTextures** ---
Processed **250** videos. Total **250** face-focused features extracted.
--- Processing category: **Face2Face** ---
Processed **250** videos. Total **249** face-focused features extracted.


In [87]:
def get_dataframe_from_dictionary(features, imputer_strategy = None, imputer_fill_value = None):
    rows = []
    for category, features_dict in features.items():
        for video, features in features_dict.items():
            # Keep the dictionary structure to preserve column names
            row = features.copy()
            row['label'] = category
            row['video_id'] = video
            rows.append(row)
    
    if imputer_strategy == None:
        df =  pd.DataFrame(rows)
        return df.set_index('video_id')
    
    imputer = SimpleImputer(missing_values=np.nan, strategy=imputer_strategy, fill_value=imputer_fill_value)
    return imputer.fit_transform(pd.DataFrame(rows)) 

def pca_and_plot(features: dict, imputer_strategy = None, imputer_fill_value = None):

    try:
        # Convertir las características del histograma a un DataFrame
        df = get_dataframe_from_dictionary(features, imputer_strategy, imputer_fill_value)
        # Separar características y etiquetas
        X = df.drop(columns=['label']).values
        y = df['label'].values
        # Aplicar PCA
        pca = PCA(n_components=2)
        X_pca = pca.fit_transform(X)
    except ValueError:
        df = get_dataframe_from_dictionary(features).dropna()
        X = df.drop(columns=['label']).values
        y = df['label'].values
        X_pca = pca.fit_transform(X)

    # Crear un DataFrame para los resultados PCA
    pca_df = pd.DataFrame(data=X_pca, columns=['PC1', 'PC2'])
    pca_df['label'] = y

    # Graficar los resultados PCA
    plt.figure(figsize=(10, 8))
    for category in pca_df['label'].unique():
        subset = pca_df[pca_df['label'] == category]
        plt.scatter(subset['PC1'], subset['PC2'], label=category)
    
    plt.title('PCA of Histogram Features')
    plt.xlabel('Principal Component 1')
    plt.ylabel('Principal Component 2')
    plt.legend()
    plt.grid()
    plt.show()

    # Plot explained variance for all PCA components
    pca_full = PCA()  # fit PCA with all possible components
    pca_full.fit(X)
    explained = pca_full.explained_variance_ratio_
    cumulative = np.cumsum(explained)

    plt.figure(figsize=(10, 4))
    plt.bar(np.arange(1, len(explained) + 1), explained, alpha=0.7, label='Individual explained variance')
    plt.plot(np.arange(1, len(explained) + 1), cumulative, color='r', marker='o', label='Cumulative explained variance')
    plt.xlabel('Principal Component')
    plt.ylabel('Explained Variance Ratio')
    plt.title('Explained Variance by PCA Components')
    plt.xticks(np.arange(1, len(explained) + 1))
    plt.grid(axis='y', linestyle='--', alpha=0.5)
    plt.legend()
    plt.tight_layout()
    plt.show()

In [None]:
pca_and_plot(histogram_features)

In [15]:
# Simple random forrest classifier to validate feature usefulness
def validate_features_with_random_forest(features_dataframe):
    """
    Validates the usefulness of extracted features using a Random Forest classifier.

    Args:
        features_dataframe: a dataframe where each row corresponds to a sample,
                            columns are features, and there is a 'label' column for categories.
    """
    print("\n--- Validating Features with Random Forest Classifier ---")

    # 1. Prepare Data
    all_features = features_dataframe.drop(columns=['label']).values
    all_labels = features_dataframe['label'].values
    
    X = np.array(all_features)
    y = np.array(all_labels)

    if X.ndim != 2 or X.shape[1] == 0:
        print("Error: Feature matrix X is empty or incorrectly shaped.")
        return

    # 2. Split Data into Training and Testing Sets
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    # 3. Initialize and Train Random Forest Classifier
    clf = RandomForestClassifier(n_estimators=100, random_state=42)
    clf.fit(X_train, y_train)

    # 4. Make Predictions
    y_pred = clf.predict(X_test)

    # 5. Evaluate the Classifier
    print("\n--- Classification Report ---")
    print(classification_report(y_test, y_pred))

    print("\n--- Confusion Matrix ---")
    print(confusion_matrix(y_test, y_pred))

In [None]:
validate_features_with_random_forest(get_dataframe_from_dictionary(histogram_features))

### Facial landmarks analysis

#### Funciones

In [92]:
# --------------------------------------------------------------------
# 0. Extraer landmarks
# --------------------------------------------------------------------

def get_landmark_sequence(video_path: str, window_size: int) -> list:
    """
    Extrae una secuencia de landmarks faciales de un video usando MediaPipe Face Mesh.

    Args:
        video_path (str): Ruta al archivo de video.
        window_size (int): Número de cuadros a extraer.

    Returns:
        list: Lista de objetos 'NormalizedLandmarkList' (uno por cuadro).
    """
    landmark_sequence = []
    
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: No se pudo abrir el video {video_path}")
        return landmark_sequence

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_indices = random.sample(range(total_frames), min(window_size, total_frames))

    for frame_idx in sorted(frame_indices):
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
        ret, frame = cap.read()
        if not ret:
            continue

        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = face_mesh.process(rgb_frame)

        if results.multi_face_landmarks:
            landmark_sequence.append(results.multi_face_landmarks[0])
    
    cap.release()
    return landmark_sequence

In [None]:
# --------------------------------------------------------------------
# 1. CONSTANTES Y CONFIGURACIÓN
# --------------------------------------------------------------------

# Initialize MediaPipe Face Mesh
mp_face_mesh = mp.solutions.face_mesh
# Configuration for the face mesh model
face_mesh = mp_face_mesh.FaceMesh(
    static_image_mode=False, 
    max_num_faces=1, 
    refine_landmarks=True,  # Use a refined model for better accuracy
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5
)

# Índices EAR (p1, p2, p3, p4, p5, p6) para el cálculo 3D
LEFT_EYE_INDICES_EAR = [33, 160, 158, 133, 153, 144] 
RIGHT_EYE_INDICES_EAR = [362, 385, 387, 263, 373, 380] 

EAR_BLINK_THRESHOLD = 0.25      # Umbral del Eye Aspect Ratio (EAR) para ojo cerrado.
MIN_CONSECUTIVE_FRAMES = 2      # Mínimo de cuadros cerrados para registrar un parpadeo.

# --------------------------------------------------------------------
# 2. FUNCIONES DE CÁLCULO BÁSICO (3D OPTIMIZADO)
# --------------------------------------------------------------------

def euclidean_distance(p1: np.ndarray, p2: np.ndarray) -> float:
    return np.linalg.norm(p1 - p2)

def calculate_ear(eye_landmarks: list) -> float:
    """
    Calcula el Eye Aspect Ratio (EAR) para un ojo dado sus 6 puntos 3D.
    
    EAR = (|p2-p6| + |p3-p5|) / (2 * |p1-p4|)
    
    Args:
        eye_landmarks: Lista de 6 puntos [p1, ..., p6] como arrays (x, y, z).
    
    Returns:
        El valor del EAR 3D.
    """
    # p1, p2, p3, p4, p5, p6 corresponden a índices 0 a 5
    
    # Distancias verticales (p2-p6 y p3-p5)
    vertical_sum = euclidean_distance(eye_landmarks[1], eye_landmarks[5]) + \
                   euclidean_distance(eye_landmarks[2], eye_landmarks[4])
    
    # Distancia horizontal (p1-p4)
    horizontal_distance = euclidean_distance(eye_landmarks[0], eye_landmarks[3])
    
    if horizontal_distance == 0:
        return 0.0
        
    ear = vertical_sum / (2.0 * horizontal_distance)
    return ear

# --------------------------------------------------------------------
# 3. FUNCIÓN PRINCIPAL DE ANÁLISIS DE LANDMARKS POR FRAME (CONCISA)
# --------------------------------------------------------------------

def analyze_frame_landmarks(face_landmarks) -> float:
    """
    Extrae los 6 puntos clave de cada ojo y calcula el EAR mínimo en 3D.
    
    Args:
        face_landmarks: Un objeto 'NormalizedLandmarkList' de MediaPipe Face Mesh. 

    Returns:
        El EAR mínimo entre el ojo izquierdo y derecho. Retorna 1.0 si no hay landmarks.
    """
    if not face_landmarks:
        raise ValueError("No face landmarks detected in this frame.")

    def get_eye_coords(indices):
        """Usa comprensión de listas para extraer puntos (x, y, z) de forma concisa."""
        return [np.array([face_landmarks.landmark[i].x, 
                          face_landmarks.landmark[i].y, 
                          face_landmarks.landmark[i].z]) 
                for i in indices]

    # Extracción concisa y cálculo 3D
    left_eye_coords = get_eye_coords(LEFT_EYE_INDICES_EAR)
    right_eye_coords = get_eye_coords(RIGHT_EYE_INDICES_EAR)
        
    ear_left = calculate_ear(left_eye_coords)
    ear_right = calculate_ear(right_eye_coords)

    return min(ear_left, ear_right)

# --------------------------------------------------------------------
# 4. FUNCIÓN DE EXTRACCIÓN DE CARACTERÍSTICAS DE PARPADEO
# --------------------------------------------------------------------

def extract_blink_features(landmark_sequence: list) -> dict:
    """
    Calcula las 6 métricas de parpadeo a partir de una secuencia de resultados 
    de análisis de EAR.
    
    Args:
        landmark_sequence (list): Lista de objetos 'NormalizedLandmarkList' (un elemento por frame).
        fps (float): Frames por segundo de la secuencia de video.

    Returns:
        dict: Diccionario con las 6 características de parpadeo solicitadas.
    """
    
    frame_count = len(landmark_sequence)

    if not landmark_sequence or frame_count == 0:
        return {
            'blink_frequency': 0.0,
            'interval_variance': 0.0, 'interval_skewness': 0.0,
            'duration_mean': 0.0, 'duration_variance': 0.0, 'duration_range': 0.0
        }

    # 1. Generación de la Serie Temporal EAR
    ears_min = []
    
    for lm in landmark_sequence:
        try:
            ears_min.append(analyze_frame_landmarks(lm))
        except ValueError:
            # Se continúa con el siguiente frame.
            pass 

    # Se continúa con la lógica solo si quedaron cuadros válidos
    if not ears_min:
        print("Advertencia: Ningún frame pudo ser analizado tras la detección.")
        return extract_blink_features([])

    # 2. Detección de Parpadeos y Extracción de Duraciones/Intervalos
    blink_durations_frames = []      
    intervals_frames = []            
    
    is_blinking = False
    current_blink_duration = 0      
    frames_since_last_blink = 0     
    total_blinks = 0
    
    for ear in ears_min:
        frames_since_last_blink += 1 
        is_closed = ear < EAR_BLINK_THRESHOLD
        
        if is_closed:
            current_blink_duration += 1
            if not is_blinking:
                is_blinking = True
        else:
            if is_blinking:
                if current_blink_duration >= MIN_CONSECUTIVE_FRAMES:
                    total_blinks += 1
                    blink_durations_frames.append(current_blink_duration)
                    # El intervalo es el tiempo abierto antes de este parpadeo
                    intervals_frames.append(frames_since_last_blink - current_blink_duration)
                
                # Resetear el estado
                is_blinking = False
                current_blink_duration = 0
                frames_since_last_blink = 0 

    # 3. Cálculo de Métricas Finales
    frame_count = len(ears_min)
    results = {}
    
    # A. Frecuencia de Parpadeo
    results['total_blinks'] = total_blinks
    results['blink_frequency'] = total_blinks / frame_count if frame_count > 0 else 0.0

    # B. Intervalo entre Parpadeos
    if intervals_frames and len(intervals_frames) >= 2:
        results['interval_variance'] = np.var(intervals_frames)
        results['interval_skewness'] = skew(intervals_frames)
    else:
        results['interval_variance'] = 0.0
        results['interval_skewness'] = 0.0

    # C. Duración de Parpadeo
    if blink_durations_frames:
        results['duration_mean'] = np.mean(blink_durations_frames)
        results['duration_variance'] = np.var(blink_durations_frames)
        results['duration_range'] = np.max(blink_durations_frames) - np.min(blink_durations_frames)
    else:
        results['duration_mean'] = 0.0
        results['duration_variance'] = 0.0
        results['duration_range'] = 0.0

    return results

W0000 00:00:1765298659.747197   70927 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


W0000 00:00:1765298659.757550   70927 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


In [94]:
# --- Índices Clave de MediaPipe Face Mesh ---
# NARIZ: Puntos centrales de la punta de la nariz
NOSE_POINT = 4
# OREJAS/Caras Laterales: Puntos de las caras laterales (orejas conceptuales)
LEFT_FACE_POINT = 133
RIGHT_FACE_POINT = 362
# FRENTE: Punto central superior (frente conceptual)
FOREHEAD_POINT = 10
# BOCA: Punto central superior del labio (filtrum superior)
UPPER_MOUTH_POINT = 0

def extract_facial_distance_features(landmark_sequence: list) -> dict:
    """
    Calcula y normaliza las distancias clave de la nariz (orejas, frente, boca) 
    para una secuencia de landmarks, y extrae estadísticas de la serie.

    Args:
        landmark_sequence (list): Lista de objetos 'NormalizedLandmarkList' (un elemento por frame).

    Returns:
        dict: Diccionario con el máximo, mínimo y asimetría (skewness) de cada serie de distancia.
    """
    if not landmark_sequence:
        return {}

    series = {
        'nose_to_left_ear': [],
        'nose_to_right_ear': [],
        'nose_to_forehead': [],
        'nose_to_upper_mouth': []
    }

    # --- 1. Procesamiento Frame a Frame ---
    for face_landmarks in landmark_sequence:
        
        # Validación de landmarks (manejo de errores robusto)
        if not face_landmarks or not face_landmarks.landmark:
            # Omitir frames sin detección (el llamador ya maneja la excepción del frame completo)
            continue
        
        # Función auxiliar para obtener el punto 3D
        def get_point(index):
            lm = face_landmarks.landmark[index]
            return np.array([lm.x, lm.y, lm.z])
        
        try:
            # a. Extracción de Puntos
            p_nose = get_point(NOSE_POINT)
            p_left_face = get_point(LEFT_FACE_POINT)
            p_right_face = get_point(RIGHT_FACE_POINT)
            p_forehead = get_point(FOREHEAD_POINT)
            p_upper_mouth = get_point(UPPER_MOUTH_POINT)

            # b. Distancia de Normalización (Ancho Horizontal del Rostro)
            # Normalizar con la distancia entre las caras laterales.
            normalization_distance = euclidean_distance(p_left_face, p_right_face)
            
            # Evitar división por cero
            if normalization_distance == 0:
                continue

            # c. Cálculo de Distancias y Normalización
            dist_left_ear = euclidean_distance(p_nose, p_left_face) / normalization_distance
            dist_right_ear = euclidean_distance(p_nose, p_right_face) / normalization_distance
            dist_forehead = euclidean_distance(p_nose, p_forehead) / normalization_distance
            dist_upper_mouth = euclidean_distance(p_nose, p_upper_mouth) / normalization_distance

            # d. Almacenamiento
            series['nose_to_left_ear'].append(dist_left_ear)
            series['nose_to_right_ear'].append(dist_right_ear)
            series['nose_to_forehead'].append(dist_forehead)
            series['nose_to_upper_mouth'].append(dist_upper_mouth)

        except IndexError:
            # Esto puede ocurrir si un punto clave no existe por alguna razón (poco común)
            continue

    # --- 2. Cálculo de Métricas Finales ---
    final_metrics = {}
    
    for name, data_series in series.items():
        # Calcular las métricas solo si hay datos válidos en la serie
        if data_series:
            arr = np.array(data_series)
            
            # Máximo
            final_metrics[f'{name}_max'] = np.max(arr)
            # Mínimo
            final_metrics[f'{name}_min'] = np.min(arr)
            # Asimetría (Skewness)
            # La asimetría requiere al menos 3 puntos para ser significativa, pero funciona con 2+
            final_metrics[f'{name}_skewness'] = skew(arr)
        else:
            # Si la serie está vacía, se retorna 0.0 o un valor neutro
            final_metrics[f'{name}_max'] = 0.0
            final_metrics[f'{name}_min'] = 0.0
            final_metrics[f'{name}_skewness'] = 0.0

    return final_metrics

#### Llamadas

In [98]:
# --------------------------------------------------------------------
#  Extracción de landmarks
# --------------------------------------------------------------------

random.seed(SEED)
PATH = '/workspaces/proyecto/workspace/src_c40'
SAMPLE_VIDEOS_PER_CATEGORY = 250
WINDOW_SIZE = 100
PICKLED_FILE_PATH = ""
PICKLED_FILE_NAME = f"allLandmarks_{SAMPLE_VIDEOS_PER_CATEGORY}_with_IDs.pkl"

try:
    with open(PICKLED_FILE_NAME, 'rb') as file:
        all_landmarks = pickle.load(file)

except FileNotFoundError:
    print("Unable to open pickled file; extracting landmarks.")
    all_landmarks = {}
    category_dirs = [d for d in os.listdir(PATH) if os.path.isdir(os.path.join(PATH, d))]
    print("   ***** Extracting landmarks from videos *****")
    for category in category_dirs:
        print(f"--- Processing category: **{category}** ---")
        category_path = os.path.join(PATH, category)
        video_paths = glob.glob(os.path.join(category_path, '*.mp4')) 
        if len(video_paths) > SAMPLE_VIDEOS_PER_CATEGORY:
            video_paths = random.sample(video_paths, SAMPLE_VIDEOS_PER_CATEGORY)
        
        category_landmarks = {}
        
        for video_path in video_paths:
            landmark_seq = get_landmark_sequence(video_path, window_size=WINDOW_SIZE)
            if not landmark_seq:
                print(f"No landmarks for video {video_path}")
                continue
            category_landmarks[video_path] = landmark_seq

        all_landmarks[category] = category_landmarks
        
        print(f"Processed **{len(video_paths)}** videos. Total **{len(category_landmarks)}** landmark sequences extracted.")

    with open(PICKLED_FILE_NAME, 'wb') as file:
        pickle.dump(all_landmarks, file)

    print(f"Landmarks successfully pickled to {PICKLED_FILE_NAME}")

Unable to open pickled file; extracting landmarks.
   ***** Extracting landmarks from videos *****
--- Processing category: **Deepfakes** ---
No landmarks for video /workspaces/proyecto/workspace/src_c40/Deepfakes/817_827.mp4
No landmarks for video /workspaces/proyecto/workspace/src_c40/Deepfakes/923_023.mp4
No landmarks for video /workspaces/proyecto/workspace/src_c40/Deepfakes/427_637.mp4
No landmarks for video /workspaces/proyecto/workspace/src_c40/Deepfakes/759_755.mp4
No landmarks for video /workspaces/proyecto/workspace/src_c40/Deepfakes/862_047.mp4
No landmarks for video /workspaces/proyecto/workspace/src_c40/Deepfakes/254_261.mp4
No landmarks for video /workspaces/proyecto/workspace/src_c40/Deepfakes/776_676.mp4
Processed **250** videos. Total **243** landmark sequences extracted.
--- Processing category: **Original** ---
No landmarks for video /workspaces/proyecto/workspace/src_c40/Original/824.mp4
No landmarks for video /workspaces/proyecto/workspace/src_c40/Original/647.mp4


In [105]:
# --------------------------------------------------------------------
#  Análisis de parpadeos
# --------------------------------------------------------------------

all_blink_features = {}
print("   ***** Extracting features from landmarks *****")
for category in all_landmarks:
    print(f"--- Processing category: **{category}** ---")
    category_features = {}
    
    for video_path, landmark_seq in all_landmarks[category].items():
        blink_features = extract_blink_features(landmark_seq)
        category_features[video_path] = blink_features
        
    all_blink_features[category] = category_features

    print(f"Processed **{len(all_landmarks[category])}** landmark sequences. Total **{len(category_features)}** blink features extracted.")

   ***** Extracting features from landmarks *****
--- Processing category: **Deepfakes** ---


  results['interval_skewness'] = skew(intervals_frames)


Processed **243** landmark sequences. Total **243** blink features extracted.
--- Processing category: **Original** ---
Processed **247** landmark sequences. Total **247** blink features extracted.
--- Processing category: **FaceShifter** ---
Processed **245** landmark sequences. Total **245** blink features extracted.
--- Processing category: **FaceSwap** ---
Processed **250** landmark sequences. Total **250** blink features extracted.
--- Processing category: **NeuralTextures** ---
Processed **241** landmark sequences. Total **241** blink features extracted.
--- Processing category: **Face2Face** ---
Processed **244** landmark sequences. Total **244** blink features extracted.


In [None]:
pca_and_plot(all_blink_features)

In [106]:
# --------------------------------------------------------------------
#  Análisis de distancias
# --------------------------------------------------------------------

all_distance_features = {}
print("   ***** Extracting features from landmarks *****")
for category in all_landmarks:
    print(f"--- Processing category: **{category}** ---")
    category_features = {}
    
    for video_path, landmark_seq in all_landmarks[category].items():
        distance_features = extract_facial_distance_features(landmark_seq)
        category_features[video_path] = distance_features

    all_distance_features[category] = category_features

    print(f"Processed **{len(all_landmarks[category])}** landmark sequences. Total **{len(category_features)}** facial distance features extracted.")

   ***** Extracting features from landmarks *****
--- Processing category: **Deepfakes** ---
Processed **243** landmark sequences. Total **243** facial distance features extracted.
--- Processing category: **Original** ---
Processed **247** landmark sequences. Total **247** facial distance features extracted.
--- Processing category: **FaceShifter** ---
Processed **245** landmark sequences. Total **245** facial distance features extracted.
--- Processing category: **FaceSwap** ---
Processed **250** landmark sequences. Total **250** facial distance features extracted.
--- Processing category: **NeuralTextures** ---
Processed **241** landmark sequences. Total **241** facial distance features extracted.
--- Processing category: **Face2Face** ---
Processed **244** landmark sequences. Total **244** facial distance features extracted.


In [None]:
pca_and_plot(all_distance_features)

In [None]:
all_features = {}
for category in all_blink_features:
    all_features[category] = {}
    for sequence1, sequence2 in zip(all_blink_features[category], all_distance_features[category]):
        all_features[category].append({**sequence1, **sequence2})

TypeError: 'str' object is not a mapping

In [None]:
validate_features_with_random_forest(get_dataframe_from_dictionary(all_features))

### Combinar variables de histogramas y de razgos faciales

In [None]:
df_h = get_dataframe_from_dictionary(histogram_features)
df_l = get_dataframe_from_dictionary(all_features)
# --------------------------------------------------------------------
#  Export dataframes to CSV
# -------------------------------------------------------------------- 

df_h.to_csv('histogram_features_with_ids.csv', index=False)
df_l.to_csv('landmark_features_with_ids.csv', index=False)

In [115]:
all_features_df = pd.merge(get_dataframe_from_dictionary(all_distance_features), get_dataframe_from_dictionary(all_blink_features), left_index=True, right_index=True)
all_features_df.drop(columns=['label_x'], inplace=True)
all_features_df.rename(columns={'label_y': 'label'}, inplace=True)
all_features_df.to_csv('landmark_features_with_ids.csv', index=True)

In [117]:
histogram_features_df = get_dataframe_from_dictionary(histogram_features)
histogram_features_df.to_csv("histogram_features_with_ids.csv", index=True)

#### Pruebas

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import os

def extract_and_plot_first_frame(video_path):
    print(f"Attempting to open video: {video_path}")
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video file '{video_path}'")
        return
    ret, frame_array = cap.read() # The frame is automatically stored as a NumPy array
    if ret:
        rgb_frame = cv2.cvtColor(frame_array, cv2.COLOR_BGR2RGB)
        plt.figure(figsize=(10, 6))
        plt.imshow(rgb_frame)
        plt.title(f"First Frame of Video: {os.path.basename(video_path)}")
        plt.axis('off') # Hide axis ticks
        plt.show() # Display the plot
        
    else:
        print("Error: Could not read the first frame from the video.")
    cap.release()

# path = '/workspaces/proyecto/workspace/src_c40/Original'
# video = glob.glob(os.path.join(path, '*.mp4'))[0]
video_file = '/workspaces/proyecto/workspace/src_c40/Original/000.mp4'

if os.path.exists(video_file):
    extract_and_plot_first_frame(video_file)
else:
    print(f"\nNote: Video file '{video_file}' not found. Please ensure '{video_file}' exists in this directory or update the 'video_file' path.")

