# MediaPipe Pose Classification for Artistic Swimming

Sistema de clasificación de posiciones de natación artística utilizando MediaPipe para extraer keypoints de las partes del cuerpo.

## 1. Import Required Libraries

In [None]:
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
from pathlib import Path
import pickle
import joblib
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, ConfusionMatrixDisplay
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

print("Librerías importadas correctamente")

## 2. Define Dataset Paths and Classes

In [None]:
# Rutas base
DATASET_ROOT = Path('../Data/Augmented')
MODEL_SAVE_PATH = Path('./pose_classifier_model.pkl')
SCALER_SAVE_PATH = Path('./pose_classifier_scaler.pkl')
ENCODER_SAVE_PATH = Path('./label_encoder.pkl')
RESULTS_SAVE_PATH = Path('./classification_results.pkl')

# Clases (nombres de carpetas)
CLASSES = [
    'Bent Knee Surface Arch Position',
    'Bent Knee Vertical',
    'Double Leg Vertical',
    'Fishtail',
    'Knight'
]

NUM_CLASSES = len(CLASSES)
RANDOM_SEED = 42
TEST_SIZE = 0.2
IMG_SIZE = (640, 480)

print(f"Clases detectadas: {CLASSES}")
print(f"Número de clases: {NUM_CLASSES}")
print(f"Ruta dataset: {DATASET_ROOT.resolve()}")

## 3. Load Image File Paths and Labels

In [None]:
def load_dataset_paths(root_dir: Path, classes: list) -> tuple:
    """
    Carga las rutas de las imágenes y sus labels desde el directorio de dataset.
    
    Args:
        root_dir: Ruta raíz del dataset
        classes: Lista de nombres de clases (carpetas)
    
    Returns:
        Tupla (image_paths, labels)
    """
    image_paths = []
    labels = []
    
    for class_name in classes:
        class_dir = root_dir / class_name
        
        if not class_dir.exists():
            print(f"Advertencia: No se encontró carpeta {class_name}")
            continue
        
        # Buscar todas las imágenes en la carpeta
        image_files = list(class_dir.glob('*.jpg')) + list(class_dir.glob('*.jpeg')) + list(class_dir.glob('*.png'))
        
        print(f"Clase '{class_name}': {len(image_files)} imágenes")
        
        for img_path in image_files:
            image_paths.append(str(img_path))
            labels.append(class_name)
    
    print(f"\nTotal de imágenes cargadas: {len(image_paths)}")
    return image_paths, labels

# Cargar rutas y labels
image_paths, labels = load_dataset_paths(DATASET_ROOT, CLASSES)

# Crear DataFrame
df_dataset = pd.DataFrame({'image_path': image_paths, 'label': labels})
print("\nDistribución de clases:")
print(df_dataset['label'].value_counts())

## 4. Preprocess Images and Extract MediaPipe Landmarks

In [None]:
# Inicializar MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(
    static_image_mode=True,
    model_complexity=2,  # 0=lite, 1=full, 2=heavy (más preciso pero lento)
    smooth_landmarks=True
)

print("MediaPipe Pose inicializado")
print(f"Número de landmarks: {mp_pose.PoseLandmark.__dict__}")

In [None]:
def extract_pose_landmarks(image_path: str, pose_detector) -> np.ndarray:
    """
    Extrae los landmarks de pose de una imagen usando MediaPipe.
    
    Args:
        image_path: Ruta a la imagen
        pose_detector: Detector de pose inicializado de MediaPipe
    
    Returns:
        Array de landmarks (x, y, z, visibility) aplanado, o None si no se detecta
    """
    try:
        # Leer imagen
        image = cv2.imread(image_path)
        if image is None:
            return None
        
        # Convertir BGR a RGB
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        # Detectar pose
        results = pose_detector.process(image_rgb)
        
        if results.pose_landmarks is None:
            return None
        
        # Extraer coordenadas de landmarks (33 landmarks)
        landmarks = []
        for landmark in results.pose_landmarks.landmark:
            landmarks.extend([landmark.x, landmark.y, landmark.z, landmark.visibility])
        
        return np.array(landmarks, dtype=np.float32)
    
    except Exception as e:
        print(f"Error procesando {image_path}: {e}")
        return None

print("Función de extracción de landmarks definida")

In [None]:
def extract_geometric_features(landmarks: np.ndarray) -> np.ndarray:
    """
    Extrae características geométricas relevantes de los landmarks para natación artística.
    Cálculo de distancias y ángulos entre articulaciones clave.
    
    Args:
        landmarks: Array de landmarks (132 elementos = 33 landmarks * 4 valores)
    
    Returns:
        Array de características geométricas
    """
    if landmarks is None or len(landmarks) == 0:
        return None
    
    # Remodelar landmarks: 33 puntos con (x, y, z, visibility)
    lm = landmarks.reshape(33, 4)
    
    features = []
    
    # Índices de articulaciones importantes para natación artística
    # MediaPipe Pose landmarks:
    # 0=nose, 11=l_shoulder, 12=r_shoulder, 23=l_hip, 24=r_hip, 25=l_knee, 26=r_knee,
    # 27=l_ankle, 28=r_ankle, 29=l_heel, 30=l_foot_index, 31=r_heel, 32=r_foot_index
    
    try:
        # 1. Distancia entre hombros
        d_shoulders = np.linalg.norm(lm[11, :3] - lm[12, :3])
        features.append(d_shoulders)
        
        # 2. Distancia entre caderas
        d_hips = np.linalg.norm(lm[23, :3] - lm[24, :3])
        features.append(d_hips)
        
        # 3. Distancia cabeza-cadera (extensión vertical)
        d_head_hip_left = np.linalg.norm(lm[0, :3] - lm[23, :3])
        d_head_hip_right = np.linalg.norm(lm[0, :3] - lm[24, :3])
        features.append(max(d_head_hip_left, d_head_hip_right))
        
        # 4. Distancia rodilla-cadera (indicador de flexión)
        d_knee_hip_left = np.linalg.norm(lm[25, :3] - lm[23, :3])
        d_knee_hip_right = np.linalg.norm(lm[26, :3] - lm[24, :3])
        features.append(d_knee_hip_left)
        features.append(d_knee_hip_right)
        
        # 5. Distancia tobillo-cadera
        d_ankle_hip_left = np.linalg.norm(lm[27, :3] - lm[23, :3])
        d_ankle_hip_right = np.linalg.norm(lm[28, :3] - lm[24, :3])
        features.append(d_ankle_hip_left)
        features.append(d_ankle_hip_right)
        
        # 6. Relación de altura: cadera a tobillo / hombro a cadera
        h_shoulder_hip = np.linalg.norm(lm[11, :3] - lm[23, :3])
        h_hip_ankle_left = np.linalg.norm(lm[23, :3] - lm[27, :3])
        h_hip_ankle_right = np.linalg.norm(lm[24, :3] - lm[28, :3])
        features.append(h_hip_ankle_left / (h_shoulder_hip + 1e-6))
        features.append(h_hip_ankle_right / (h_shoulder_hip + 1e-6))
        
        # 7. Ángulo hombro-cadera-rodilla (flexión de pierna)
        def angle_3points(p1, p2, p3):
            """Calcula el ángulo en p2 formado por p1-p2-p3"""
            v1 = p1 - p2
            v2 = p3 - p2
            cos_angle = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2) + 1e-6)
            return np.arccos(np.clip(cos_angle, -1, 1))
        
        angle_left_leg = angle_3points(lm[23, :3], lm[25, :3], lm[27, :3])
        angle_right_leg = angle_3points(lm[24, :3], lm[26, :3], lm[28, :3])
        features.append(angle_left_leg)
        features.append(angle_right_leg)
        
        # 8. Ángulo cadera-hombro (inclinación del torso)
        angle_torso = angle_3points(lm[23, :3], lm[11, :3], lm[0, :3])
        features.append(angle_torso)
        
        # 9. Coordenadas Y (profundidad): indicador de inmersión
        features.append(np.mean(lm[:, 1]))  # Y promedio de todos los puntos
        
        # 10. Coordenadas Z (profundidad en 3D)
        features.append(np.mean(lm[:, 2]))  # Z promedio
        
        # 11. Diferencia de altura entre hombros
        d_shoulder_y = abs(lm[11, 1] - lm[12, 1])
        features.append(d_shoulder_y)
        
        # 12. Diferencia de altura entre caderas
        d_hip_y = abs(lm[23, 1] - lm[24, 1])
        features.append(d_hip_y)
        
        # 13. Visibilidad promedio (confianza en detección)
        visibility_mean = np.mean(lm[:, 3])
        features.append(visibility_mean)
        
        return np.array(features, dtype=np.float32)
    
    except Exception as e:
        print(f"Error extrayendo características: {e}")
        return None

print("Función de extracción de características geométricas definida")

In [None]:
# Procesar todas las imágenes y extraer landmarks
features_list = []
labels_valid = []
failed_images = []

print(f"Procesando {len(df_dataset)} imágenes...\n")

for idx, row in tqdm(df_dataset.iterrows(), total=len(df_dataset)):
    image_path = row['image_path']
    label = row['label']
    
    # Extraer landmarks
    landmarks = extract_pose_landmarks(image_path, pose)
    
    if landmarks is None:
        failed_images.append(image_path)
        continue
    
    # Extraer características geométricas
    features = extract_geometric_features(landmarks)
    
    if features is not None:
        features_list.append(features)
        labels_valid.append(label)

print(f"\nProcesamiento completado")
print(f"Imágenes procesadas exitosamente: {len(features_list)}")
print(f"Imágenes sin detección de pose: {len(failed_images)}")

if failed_images:
    print(f"\nPrimeras 5 imágenes sin detección:")
    for img in failed_images[:5]:
        print(f"  - {img}")

## 5. Create Feature and Label Arrays

In [None]:
# Convertir a arrays numpy
X = np.array(features_list, dtype=np.float32)
y_raw = np.array(labels_valid)

# Codificar etiquetas numéricamente
le = LabelEncoder()
y = le.fit_transform(y_raw)

print(f"Forma del conjunto de features: {X.shape}")
print(f"Número de feature por muestra: {X.shape[1]}")
print(f"\nClases codificadas:")
for i, class_name in enumerate(le.classes_):
    count = np.sum(y == i)
    print(f"  {class_name}: {i} (n={count})")

## 6. Split Data into Training and Test Sets

In [None]:
# Dividir en entrenamiento y test
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=TEST_SIZE,
    stratify=y,
    random_state=RANDOM_SEED
)

# Normalizar features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print(f"Conjunto de entrenamiento: {X_train_scaled.shape}")
print(f"Conjunto de test: {X_test_scaled.shape}")
print(f"\nDistribución en entrenamiento:")
for i, class_name in enumerate(le.classes_):
    count = np.sum(y_train == i)
    print(f"  {class_name}: {count}")
print(f"\nDistribución en test:")
for i, class_name in enumerate(le.classes_):
    count = np.sum(y_test == i)
    print(f"  {class_name}: {count}")

## 7. Train a Classification Model

In [None]:
# Entrenar Random Forest Classifier
print("Entrenando Random Forest Classifier...")
clf_rf = RandomForestClassifier(
    n_estimators=200,
    max_depth=20,
    min_samples_split=5,
    min_samples_leaf=2,
    random_state=RANDOM_SEED,
    n_jobs=-1,
    verbose=1
)

clf_rf.fit(X_train_scaled, y_train)
print("✓ Random Forest entrenado")

# Entrenar SVM como modelo alternativo
print("\nEntrenando Support Vector Machine...")
clf_svm = SVC(
    kernel='rbf',
    C=10.0,
    gamma='scale',
    probability=True,
    random_state=RANDOM_SEED,
    verbose=1
)

clf_svm.fit(X_train_scaled, y_train)
print("✓ SVM entrenado")

# Usar Random Forest como modelo principal
clf = clf_rf
print("\nModelo principal: Random Forest")

## 8. Evaluate Model Performance

In [None]:
# Predicciones
y_pred_train = clf.predict(X_train_scaled)
y_pred_test = clf.predict(X_test_scaled)

# Cálculo de métricas
train_accuracy = accuracy_score(y_train, y_pred_train)
test_accuracy = accuracy_score(y_test, y_pred_test)

print("="*60)
print("RESULTADOS DE EVALUACIÓN")
print("="*60)
print(f"\nAccuracy en entrenamiento: {train_accuracy:.4f}")
print(f"Accuracy en test: {test_accuracy:.4f}")

print("\n" + "="*60)
print("REPORTE DE CLASIFICACIÓN (Test Set)")
print("="*60)
print(classification_report(y_test, y_pred_test, target_names=le.classes_))

In [None]:
# Matriz de confusión
cm = confusion_matrix(y_test, y_pred_test)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=le.classes_)
disp.plot(cmap='Blues', values_format='d', xticks_rotation='vertical')
plt.title('Matriz de Confusión - Test Set')
plt.tight_layout()
plt.savefig('confusion_matrix.png', dpi=150, bbox_inches='tight')
plt.show()

print("Matriz de confusión guardada como 'confusion_matrix.png'")

In [None]:
# Importancia de features (Solo para Random Forest)
if hasattr(clf, 'feature_importances_'):
    feature_importance = clf.feature_importances_
    
    # Feature names
    feature_names = [
        'Shoulder Distance', 'Hip Distance', 'Head-Hip Distance',
        'Knee-Hip Left', 'Knee-Hip Right',
        'Ankle-Hip Left', 'Ankle-Hip Right',
        'Hip-Ankle Ratio Left', 'Hip-Ankle Ratio Right',
        'Leg Angle Left', 'Leg Angle Right', 'Torso Angle',
        'Mean Y', 'Mean Z', 'Shoulder Y Diff', 'Hip Y Diff', 'Visibility Mean'
    ]
    
    # Top 10 features
    top_indices = np.argsort(feature_importance)[-10:][::-1]
    
    fig, ax = plt.subplots(figsize=(10, 6))
    ax.barh(range(10), feature_importance[top_indices])
    ax.set_yticks(range(10))
    ax.set_yticklabels([feature_names[i] if i < len(feature_names) else f'Feature {i}' for i in top_indices])
    ax.set_xlabel('Importancia')
    ax.set_title('Top 10 Features más importantes')
    plt.tight_layout()
    plt.savefig('feature_importance.png', dpi=150, bbox_inches='tight')
    plt.show()
    
    print("Gráfico de importancia de features guardado como 'feature_importance.png'")

In [None]:
# Predicciones en ejemplos individuales
print("\n" + "="*60)
print("EJEMPLOS DE PREDICCIONES EN TEST SET")
print("="*60)

# Mostrar 10 ejemplos aleatorios
sample_indices = np.random.choice(len(y_test), size=min(10, len(y_test)), replace=False)

for idx_in_sample, idx_in_test in enumerate(sample_indices, 1):
    actual = le.classes_[y_test[idx_in_test]]
    predicted = le.classes_[y_pred_test[idx_in_test]]
    proba = clf.predict_proba(X_test_scaled[idx_in_test:idx_in_test+1])[0]
    confidence = proba.max()
    match = "✓" if actual == predicted else "✗"
    
    print(f"\n{idx_in_sample}. {match}")
    print(f"   Actual: {actual}")
    print(f"   Predicción: {predicted} (confianza: {confidence:.2%})")

## 9. Save Trained Model

In [None]:
# Guardar el modelo
joblib.dump(clf, MODEL_SAVE_PATH)
print(f"✓ Modelo guardado en: {MODEL_SAVE_PATH.resolve()}")

# Guardar el scaler
joblib.dump(scaler, SCALER_SAVE_PATH)
print(f"✓ Scaler guardado en: {SCALER_SAVE_PATH.resolve()}")

# Guardar el encoder
joblib.dump(le, ENCODER_SAVE_PATH)
print(f"✓ Label Encoder guardado en: {ENCODER_SAVE_PATH.resolve()}")

# Guardar resultados
results = {
    'train_accuracy': float(train_accuracy),
    'test_accuracy': float(test_accuracy),
    'confusion_matrix': cm.tolist(),
    'classes': le.classes_.tolist(),
    'num_features': X.shape[1],
    'num_train_samples': len(X_train),
    'num_test_samples': len(X_test),
    'total_samples': len(X)
}

joblib.dump(results, RESULTS_SAVE_PATH)
print(f"✓ Resultados guardados en: {RESULTS_SAVE_PATH.resolve()}")

In [None]:
# Crear un resumen final
print("\n" + "="*60)
print("RESUMEN FINAL DEL MODELO")
print("="*60)
print(f"\nModelo: Random Forest Classifier")
print(f"Clases: {', '.join(le.classes_)}")
print(f"\nDatos:")
print(f"  - Total de muestras: {len(X)}")
print(f"  - Muestras de entrenamiento: {len(X_train)}")
print(f"  - Muestras de test: {len(X_test)}")
print(f"  - Número de features: {X.shape[1]}")
print(f"\nRendimiento:")
print(f"  - Accuracy entrenamiento: {train_accuracy:.4f}")
print(f"  - Accuracy test: {test_accuracy:.4f}")
print(f"\nArchivos generados:")
print(f"  - Modelo: {MODEL_SAVE_PATH.name}")
print(f"  - Scaler: {SCALER_SAVE_PATH.name}")
print(f"  - Encoder: {ENCODER_SAVE_PATH.name}")
print(f"  - Resultados: {RESULTS_SAVE_PATH.name}")
print(f"  - Matriz de confusión: confusion_matrix.png")
print(f"  - Importancia de features: feature_importance.png")
print("\n" + "="*60)