# Bibliotecas

In [13]:
import os
import math
import cv2
import json
import joblib
import numpy as np

import mediapipe as mp
from mediapipe.framework.formats import landmark_pb2

from sklearn.model_selection import LeaveOneOut
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import classification_report, accuracy_score
from catboost import CatBoostClassifier


# Extraccion de keypoints

### Configuración de MediaPipe

In [14]:
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils
BaseOptions = mp.tasks.BaseOptions
PoseLandmarker = mp.tasks.vision.PoseLandmarker
PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
VisionRunningMode = mp.tasks.vision.RunningMode

options = PoseLandmarkerOptions(
    base_options=BaseOptions(model_asset_path='pose_landmarker_heavy.task'),
    running_mode=VisionRunningMode.VIDEO,
    output_segmentation_masks=False
)



In [15]:
def display_video(video_path, annotated_image):
    """
    Muestra un video con anotaciones en una ventana y permite al usuario interrumpir el procesamiento.

    Esta función muestra un frame anotado de un video en una ventana con el título basado en el nombre del archivo de video.
    El usuario puede interrumpir el procesamiento del video presionando la tecla 'q'.

    Parámetros
    ----------
    video_path : str
        Ruta del archivo de video que se está procesando.
    annotated_image : numpy.ndarray
        Imagen anotada (frame del video) que se mostrará en la ventana.

    Retorna
    -------
    bool
        Retorna `True` si el usuario interrumpe el procesamiento presionando 'q', de lo contrario retorna `False`.

    Notas
    -----
    Esta función utiliza OpenCV para mostrar la imagen y capturar la entrada del usuario. La ventana se actualiza
    continuamente mientras se procesa el video.

    Ejemplos
    --------
    >>> display_video("video.mp4", frame_anotado)
    False  # El procesamiento continúa
    >>> display_video("video.mp4", frame_anotado)
    True   # El usuario presionó 'q' para interrumpir el procesamiento
    """
    
    cv2.imshow(f"Video: {os.path.basename(video_path)}", annotated_image)
    key = cv2.waitKey(1) & 0xFF
    if key == ord('q'):
        print(f"Procesamiento de '{video_path}' interrumpido por el usuario.")
        return True  
    return False  

    

In [16]:
def extract_keypoints(video_path, frame_skip=1, show_video=True):
    """
    Extrae los keypoints (puntos clave) de un video utilizando un modelo de detección de pose.

    Esta función procesa un video frame por frame, detecta los keypoints de la pose humana en cada frame
    utilizando un modelo de detección de pose, y almacena los resultados en una lista. Además, permite
    visualizar el video con los keypoints dibujados en tiempo real.

    Parámetros
    ----------
    video_path : str
        Ruta del archivo de video que se procesará.
    frame_skip : int, opcional
        Número de frames a saltar entre cada procesamiento. Por defecto es 1 (sin saltar frames).
    show_video : bool, opcional
        Si es `True`, muestra el video con los keypoints dibujados en una ventana. Por defecto es `True`.

    Retorna
    -------
    keypoints_data : list
        Lista de listas que contiene los keypoints detectados en cada frame. Cada frame contiene un
        diccionario con las coordenadas (x, y, z) y la visibilidad de cada keypoint.
    frame_count : int
        Número total de frames procesados en el video.
    fps : float
        Tasa de frames por segundo (FPS) del video.

    Notas
    -----
    - Si no se detectan keypoints en un frame, se rellena con diccionarios vacíos.
    - El usuario puede interrumpir el procesamiento presionando la tecla 'q' durante la visualización.
    - La función utiliza OpenCV para la captura de video y MediaPipe para la detección de pose.

    Ejemplos
    --------
    >>> keypoints, count, fps, width, height = extract_keypoints("video.mp4", frame_skip=2, show_video=True)
    >>> print(len(keypoints))  # Número de frames procesados
    >>> print(keypoints[0])     # Keypoints del primer frame
    """
    
    keypoints_data = []

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error al abrir: {video_path}")
        return None, 0, 0, 0

    frame_count = 0
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_skip_counter = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if frame_count % frame_skip == 0:
            frame_skip_counter += 1
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb)

            timestamp_ms = int(frame_count * (1000 / fps))
            
            landmarker = PoseLandmarker.create_from_options(options)
            pose_landmarker_result = landmarker.detect_for_video(mp_image, timestamp_ms)

            annotated_image = frame.copy()
            frame_keypoints = []

            if pose_landmarker_result.pose_landmarks:
                for landmark in pose_landmarker_result.pose_landmarks[0]:
                    frame_keypoints.append({
                        'x': landmark.x,
                        'y': landmark.y,
                        'z': landmark.z,
                        'visibility': landmark.visibility
                    })

                landmark_list = landmark_pb2.NormalizedLandmarkList()
                for landmark in pose_landmarker_result.pose_landmarks[0]:
                    pb2_landmark = landmark_pb2.NormalizedLandmark(
                        x=landmark.x, y=landmark.y, z=landmark.z, visibility=landmark.visibility
                    )
                    landmark_list.landmark.append(pb2_landmark)

                mp_drawing.draw_landmarks(
                    annotated_image,
                    landmark_list,
                    mp_pose.POSE_CONNECTIONS,
                    mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2, circle_radius=2),
                    mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2)
                )
            else:
                frame_keypoints = [{} for _ in range(33)]

            keypoints_data.append(frame_keypoints)

            if show_video:
                if display_video(video_path, annotated_image):
                    cap.release()
                    cv2.destroyAllWindows()
                    landmarker.close()
                    return None, 0, 0, 0
        frame_count += 1

    cap.release()
    cv2.destroyAllWindows()
    landmarker.close()
    print(f"Video procesado: {video_path}")
    return keypoints_data, fps

In [17]:

def process_all_videos(dataset_path, output_json, show_videos=True):
    """
    Procesa todos los videos en un directorio, extrae keypoints y guarda los resultados en un archivo JSON.

    Esta función recorre todos los archivos de video en el directorio especificado, extrae los keypoints
    de pose humana utilizando la función `extract_keypoints`, y almacena los resultados en un archivo JSON.
    Los videos deben tener un formato de nombre específico para extraer información como el ID de la persona
    y la etiqueta.

    Parámetros
    ----------
    dataset_path : str
        Ruta del directorio que contiene los videos a procesar.
    output_json : str
        Ruta del archivo JSON de salida donde se guardarán los datos procesados.
    show_videos : bool, opcional
        Si es `True`, muestra los videos con los keypoints dibujados durante el procesamiento.
        Por defecto es `True`.

    Retorna
    -------
    None
        La función no retorna ningún valor, pero guarda los datos procesados en un archivo JSON.

    Notas
    -----
    - Los videos deben tener un nombre en el formato: `person_id_*_label.ext` (por ejemplo, `001_xyz_walk.MOV`).
    - Si un video no cumple con el formato de nombre, se omite y se muestra un mensaje de advertencia.
    - Si el procesamiento de un video es interrumpido por el usuario, no se guarda su información.
    - La función utiliza `extract_keypoints` para extraer los keypoints y OpenCV para la visualización.

    Ejemplos
    --------
    >>> process_all_videos("videos/", "output.json", show_videos=False)
    Procesamiento completado. Todos los datos guardados en: output.json
    """
    data = {}

    for video_file in os.listdir(dataset_path):
        if video_file.endswith(('.MOV', '.mp4')):
            video_path = os.path.join(dataset_path, video_file)

            parts = video_file.split('_')
            if len(parts) < 3:
                print(f"Nombre de archivo inválido: {video_file}. Saltando.")
                continue

            person_id = parts[0]
            label = parts[2].split('.')[0]

            keypoints, fps, = extract_keypoints(video_path, frame_skip=5, show_video=show_videos)

            if keypoints is not None and fps > 0:
                data[video_file] = {
                    "keypoints": keypoints,
                    "label": label,
                    "person_id": person_id
                }
            elif keypoints is None:
                print("No se guarda la información del video por interrupción")
                continue
            else:
                print(f"Error al procesar {video_path}. No se añaden datos al JSON.")

    with open(output_json, 'w') as f:
        json.dump(data, f, indent=4)

    print("Procesamiento completado. Todos los datos guardados en:", output_json)

In [18]:
base_path = r'H:\Mi unidad\CICESE\2025-1\CDSI2025Gortarez\practices\Practice 2\DataSet'  
for i in range(1, 6):  # Itera desde 1 hasta 5 
    exercise_num = str(i).zfill(2)  # Formato "01", "02", ..., "05"
    dataset_path = os.path.join(base_path, f'Ejercicio {exercise_num}')
    output_json = f'Ejercicio {exercise_num}.json'

    print(f"Procesando Ejercicio {exercise_num}...")

    # Verifica si la carpeta del ejercicio existe
    if not os.path.exists(dataset_path):
        print(f"ERROR: La carpeta '{dataset_path}' no existe. Saltando...")
        continue  # Pasa al siguiente ejercicio si la carpeta no existe

    # Llama a tu función de procesamiento
    process_all_videos(dataset_path, output_json, show_videos=False)


Procesando Ejercicio 01...
Video procesado: H:\Mi unidad\CICESE\2025-1\CDSI2025Gortarez\practices\Practice 2\DataSet\Ejercicio 01\S01_01_Completo.MOV
Video procesado: H:\Mi unidad\CICESE\2025-1\CDSI2025Gortarez\practices\Practice 2\DataSet\Ejercicio 01\S01_01_Ninguno.MOV
Video procesado: H:\Mi unidad\CICESE\2025-1\CDSI2025Gortarez\practices\Practice 2\DataSet\Ejercicio 01\S01_01_Parcial.MOV
Video procesado: H:\Mi unidad\CICESE\2025-1\CDSI2025Gortarez\practices\Practice 2\DataSet\Ejercicio 01\S02_01_Completo.MOV
Video procesado: H:\Mi unidad\CICESE\2025-1\CDSI2025Gortarez\practices\Practice 2\DataSet\Ejercicio 01\S02_01_Ninguno.MOV
Video procesado: H:\Mi unidad\CICESE\2025-1\CDSI2025Gortarez\practices\Practice 2\DataSet\Ejercicio 01\S02_01_Parcial.MOV
Video procesado: H:\Mi unidad\CICESE\2025-1\CDSI2025Gortarez\practices\Practice 2\DataSet\Ejercicio 01\S03_01_Completo.MOV
Video procesado: H:\Mi unidad\CICESE\2025-1\CDSI2025Gortarez\practices\Practice 2\DataSet\Ejercicio 01\S03_01_Ningun

# Extracción de características

In [19]:
def calculate_angle(p1, p2, p3):
    """Calcula el ángulo entre tres puntos usando la fórmula del coseno."""
    v1 = np.array(p1) - np.array(p2)
    v2 = np.array(p3) - np.array(p2)

    dot_product = np.dot(v1, v2)
    magnitudes = np.linalg.norm(v1) * np.linalg.norm(v2)

    if magnitudes == 0:
        return 0.0

    cosine_angle = dot_product / magnitudes
    angle_rad = np.arccos(np.clip(cosine_angle, -1.0, 1.0))
    return math.degrees(angle_rad)

def extract_features(video_data):
    """Extrae features de los datos de video."""
    frames = video_data["keypoints"]
    if not frames:
        return None

    num_keypoints = len(frames[0])
    features = []

    for kp_index in range(num_keypoints):
        x_vals, y_vals, z_vals = [], [], []
        for frame in frames:
            if kp_index < len(frame) and frame[kp_index]:
                kp = frame[kp_index]
                x_vals.append(kp['x'])
                y_vals.append(kp['y'])
                z_vals.append(kp['z'])

        if not x_vals:
            features.extend([0.0] * 8)
            continue

        range_x = max(x_vals) - min(x_vals)
        range_y = max(y_vals) - min(y_vals)

        angles = []
        anglesMirror = []
        if 24 < num_keypoints:
            for i in range(1, len(frames) - 1):

                if frames[i][24] and frames[i][12] and frames[i][16] and frames[i][23] and frames[i][11] and frames[i][15]:
                    p24 = [frames[i][24]['x'], frames[i][24]['y']]
                    p12 = [frames[i][12]['x'], frames[i][12]['y']]
                    p16 = [frames[i][16]['x'], frames[i][16]['y']]                    
                    p23 = [frames[i][23]['x'], frames[i][23]['y']]
                    p11 = [frames[i][11]['x'], frames[i][11]['y']]
                    p15 = [frames[i][15]['x'], frames[i][15]['y']]     

                    angle = calculate_angle(p24, p12, p16)
                    angleMirror = calculate_angle(p23, p11, p15)
                    angles.append(angle)
                    anglesMirror.append(angleMirror)

        avg_angle = np.mean(angles) if angles else 0.0
        std_angle = np.std(angles) if angles else 0.0
        avg_anglesMirror = np.mean(anglesMirror) if anglesMirror else 0.0
        std_anglesMirror = np.std(anglesMirror) if anglesMirror else 0.0

        features.extend([range_x, range_y, std_angle, avg_angle, avg_anglesMirror, std_anglesMirror])

    return features



# Evaluación

In [20]:
def train_and_evaluate_models_loo(data, exercise_num):
    X = []
    y = []
    video_names = []

    for video_name, video_data in data.items():
        feat = extract_features(video_data)
        if feat is not None:
            X.append(feat)
            y.append(video_data["label"])
            video_names.append(video_name)

    # scaler = StandardScaler()
    # X = scaler.fit_transform(np.array(X))
    # joblib.dump(scaler, 'scaler.pkl')
    X = np.array(X)
    y = np.array(y)
    video_names = np.array(video_names)

    loo = LeaveOneOut()
    results = {}

    # Modelos a evaluar
    models = {
        "SVC": SVC(kernel='poly', gamma='scale'),
        "KNN": KNeighborsClassifier(n_neighbors=3),
        "DecisionTree": DecisionTreeClassifier(),
        "CatBoost": CatBoostClassifier(verbose=0)
    }

    for model_name, model in models.items():
        y_true_all = []
        y_pred_all = []
        predictions = []

        for train_index, test_index in loo.split(X):
            X_train, X_test = X[train_index], X[test_index]
            y_train, y_test = y[train_index], y[test_index]
            video_name_test = video_names[test_index][0]

            model.fit(X_train, y_train)
            y_pred = model.predict(X_test)
            y_true_all.extend(y_test)
            y_pred_all.extend(y_pred)
            predictions.append((video_name_test, y_test[0], y_pred[0]))

        report = classification_report(y_true_all, y_pred_all, output_dict=True)
        accuracy = accuracy_score(y_true_all, y_pred_all)

        results[model_name] = {
            # "classification_report": report,
            "average_precision": report['macro avg']['precision'],
            "average_recall": report['macro avg']['recall'],
            "average_f1_score": report['macro avg']['f1-score'],
            "accuracy": accuracy,
            # "predictions": predictions,
        }

        # Guardar el modelo
        joblib.dump(model, f'modelo_{model_name.lower()}_{exercise_num}.pkl')

    return results

In [21]:
for i in range(1, 6): 
    exercise_num = str(i).zfill(2)  # Formato "01", "02", ..., "05"

    with open(f'Ejercicio {exercise_num}.json', "r") as f:
        data = json.load(f)

    print(f"Procesando Ejercicio {exercise_num}...")

    # Llamar a la función de entrenamiento y evaluación
    results = train_and_evaluate_models_loo(data, exercise_num)

    # Imprimir los resultados formateados
    if "error" in results:
        print(results["error"])  # Imprimir solo el mensaje de error
    else:
        print(json.dumps(results, indent=4))

Procesando Ejercicio 01...
{
    "SVC": {
        "average_precision": 0.9696969696969697,
        "average_recall": 0.9666666666666667,
        "average_f1_score": 0.9665831244778613,
        "accuracy": 0.9666666666666667
    },
    "KNN": {
        "average_precision": 1.0,
        "average_recall": 1.0,
        "average_f1_score": 1.0,
        "accuracy": 1.0
    },
    "DecisionTree": {
        "average_precision": 1.0,
        "average_recall": 1.0,
        "average_f1_score": 1.0,
        "accuracy": 1.0
    },
    "CatBoost": {
        "average_precision": 1.0,
        "average_recall": 1.0,
        "average_f1_score": 1.0,
        "accuracy": 1.0
    }
}
Procesando Ejercicio 02...
{
    "SVC": {
        "average_precision": 0.9444444444444445,
        "average_recall": 0.9333333333333332,
        "average_f1_score": 0.9326599326599326,
        "accuracy": 0.9333333333333333
    },
    "KNN": {
        "average_precision": 1.0,
        "average_recall": 1.0,
        "average_f1

# Validación externa

In [22]:
def process_videos(video_path, output_json, model_path, show_videos=False):
    """
    Procesa videos en un directorio, realiza predicciones utilizando un modelo preentrenado y muestra los resultados.

    Esta función procesa todos los videos en el directorio especificado, extrae características de los keypoints
    detectados, carga un modelo preentrenado (SVC) y realiza predicciones sobre las etiquetas de los videos.
    Los resultados se imprimen en la consola.

    Parámetros
    ----------
    video_path : str
        Ruta del directorio que contiene los videos a procesar.
    output_json : str
        Ruta del archivo JSON donde se guardarán los datos procesados de los videos.
    model_path : str
        Ruta del archivo del modelo preentrenado (SVC) que se utilizará para las predicciones.
    show_videos : bool, opcional
        Si es `True`, muestra los videos con los keypoints dibujados durante el procesamiento.
        Por defecto es `False`.

    Retorna
    -------
    None
        La función no retorna ningún valor, pero imprime las predicciones en la consola.

    Notas
    -----
    - La función utiliza `process_all_videos` para procesar los videos y guardar los datos en un archivo JSON.
    - Se utiliza `extract_features` para extraer características de los keypoints detectados.
    - El modelo preentrenado debe ser compatible con las características extraídas.
    - Si el archivo JSON ya existe, se sobrescribirá con los nuevos datos.

    Ejemplos
    --------
    >>> process_videos(new_video_path, output_json, 'modelo_entrenado.pkl', show_videos=False)
    Video: video1.MOV
    Etiqueta real: walk
    Predicción: walk
    """
    process_all_videos(video_path, output_json, show_videos=show_videos)

    with open(output_json, "r") as f:
        data = json.load(f)

    model = joblib.load(model_path)

    for video_name, video_data in data.items():
        X = extract_features(video_data)
        X = np.array(X).reshape(1, -1)
        y = np.array(video_data["label"])

        y_pred = model.predict(X)

        print(f"Video: {video_name}")
        print("Etiqueta real: ", y)
        print("Predicción: ", y_pred, "\n")

In [23]:
new_video_path = r'H:\Mi unidad\CICESE\2025-1\CDSI2025Gortarez\practices\Practice 2\DataSet\Nueva carpeta'
output_json = 'EjercicioExterno.json'
model_path = 'modelo_catboost.pkl'

process_videos(new_video_path, output_json, model_path, show_videos=False)

Video procesado: H:\Mi unidad\CICESE\2025-1\CDSI2025Gortarez\practices\Practice 2\DataSet\Nueva carpeta\S11_01_Parcial.mp4
Video procesado: H:\Mi unidad\CICESE\2025-1\CDSI2025Gortarez\practices\Practice 2\DataSet\Nueva carpeta\S12_01_Completo.mp4
Procesamiento completado. Todos los datos guardados en: EjercicioExterno.json


FileNotFoundError: [Errno 2] No such file or directory: 'modelo_catboost.pkl'

# Conclusiones
La incorporación de landmarks específicos de las manos puede mejorar significativamente la precisión de los modelos en ejercicios que involucran movimientos manuales. Esto permite una detección más detallada y precisa de las posiciones y gestos de las manos.

Implementar una técnica de escalado de datos en modo espejo puede ser beneficioso. Al reflejar los datos, podemos analizar ambos lados del cuerpo de una persona, lo que ayuda a identificar posibles variaciones o asimetrías en los movimientos. Esto podría proporcionar una visión más completa y equilibrada del rendimiento del ejercicio.
