In [3]:
import torch
import numpy as np
from pathlib import Path
import json
import cv2
from torch.utils.data import DataLoader, Subset, Dataset
import random
from collections import defaultdict
import torch
import torch.nn as nn

device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class EnhancedExerciseHierarchicalDataset(Dataset):
    def __init__(self, data_dir, exercices_points, grouped_sports, max_seq_len=45,
                 augment=False, validate_physics=True): # Removed base_feature_dim here
        self.augment = augment
        self.max_seq_len = max_seq_len
        self.validate_physics = validate_physics
        self.data_dir = data_dir
        self.exercise_names = []

        # Cache des dimensions par exercice
        self.feature_cache = {}
        self.exercises_points = exercices_points
        self.grouped_sports = grouped_sports # Keep grouped_sports for mapping

        # Construction des mappings et chargement avec validation (dynamic)
        self._build_mappings_dynamic()

        # Determine the maximum feature dimension across the dynamically loaded exercises
        self.max_feature_dim = self._calculate_max_feature_dim(self.present_exercises)

        self._load_samples_enhanced()

        if validate_physics:
            self._validate_dataset_quality()

    def __len__(self):
        """Returns the total number of samples in the dataset."""
        return len(self.samples)


    def _calculate_max_feature_dim(self, present_exercises):
        """Calculates the maximum feature dimension across the present exercises."""
        max_dim = 0
        for exercise_name in present_exercises:
            if exercise_name in self.exercises_points:
                num_points = len(self.exercises_points[exercise_name])
                # Coordonn√É¬©es 3D + Angles + Distances + Vitesses + Acc√É¬©l√É¬©rations
                current_dim = num_points * 3 + 3 + 2 + num_points * 3 + num_points * 3
                max_dim = max(max_dim, current_dim)
        # If no exercises are present, return a default or handle appropriately
        return max_dim if max_dim > 0 else 0


    def _validate_dataset_quality(self):
      """Valide les √©chantillons charg√©s (placeholder simple)."""
      # Ici, tu peux ajouter des v√©rifications avanc√©es, par par exemple :
      valid_samples = []
      valid_labels = []
      valid_groups = []
      valid_exercise_names = []
      for i, path in enumerate(self.samples):
          try:
              arr = np.load(path, mmap_mode='r')
              # Exemple : on v√©rifie qu'il n'y a pas de NaN
              if not np.isnan(arr).any() and not np.isinf(arr).any():
                  valid_samples.append(path)
                  valid_labels.append(self.labels[i])
                  valid_groups.append(self.groups[i])
                  valid_exercise_names.append(self.exercise_names[i])
          except Exception as e:
              print(f"Fichier corrompu ou illisible: {path} ({e})")
      print(f"Apr√®s validation: {len(valid_samples)} s√©quences valides / {len(self.samples)}")
      self.samples = valid_samples
      self.labels = valid_labels
      self.groups = valid_groups
      self.exercise_names = valid_exercise_names


    def _build_mappings_dynamic(self):
        """Construit les mappings exercice <-> groupe dynamiquement bas√©s sur les fichiers pr√©trait√©s"""
        self.present_exercises = []
        print(f"Scanning directory for preprocessed data: {self.data_dir}")
        if not os.path.isdir(self.data_dir):
            print(f"Data directory not found: {self.data_dir}")
            self.exercise_to_idx = {}
            self.idx_to_exercise = {}
            self.exercise_to_group = {}
            self.group_to_idx = {}
            self.idx_to_group = {}
            return

        # Determine which exercises have preprocessed data and are in a defined group
        exercises_with_data_and_group = []
        for exercise_name in self.exercises_points:
            ex_dir = os.path.join(self.data_dir, exercise_name)
            if os.path.isdir(ex_dir):
                # Check if there is at least one .npy file in the directory
                npy_files = list(Path(ex_dir).glob('*.npy'))
                # Check if the exercise is in any defined group
                is_in_a_group = any(exercise_name in exercises for exercises in self.grouped_sports.values())

                if npy_files and is_in_a_group:
                    exercises_with_data_and_group.append(exercise_name)

        self.present_exercises = sorted(exercises_with_data_and_group)

        print(f"Found {len(self.present_exercises)} present exercises with data and group mapping.")

        # Mapping exercice -> index (only for present exercises)
        self.exercise_to_idx = {ex: i for i, ex in enumerate(self.present_exercises)}
        self.idx_to_exercise = {i: ex for ex, i in self.exercise_to_idx.items()}
        print(f"exercise_to_idx (dynamic): {self.exercise_to_idx}")
        # print(f"idx_to_exercise (dynamic): {self.idx_to_exercise}") # Avoid printing potentially long dict

        # Mapping exercice -> groupe (with verification against present exercises)
        self.exercise_to_group = {}
        for group_name, exercises in self.grouped_sports.items():
            for ex in exercises:
                if ex in self.exercise_to_idx: # Check if exercise is present and mapped
                    self.exercise_to_group[ex] = group_name
        print(f"exercise_to_group (dynamic): {self.exercise_to_group}")

        # Mapping groupe -> index (only for groups with present exercises)
        present_groups = sorted(list(set(self.exercise_to_group.values())))
        self.group_to_idx = {g: i for i, g in enumerate(present_groups)}
        self.idx_to_group = {i: g for g, i in self.group_to_idx.items()}
        print(f"group_to_idx (dynamic): {self.group_to_idx}")
        # print(f"idx_to_group (dynamic): {self.idx_to_group}") # Avoid printing potentially long dict


    def _load_samples_enhanced(self):
        """Charge les √©chantillons avec validation des chemins"""
        self.samples = []
        self.labels = []
        self.groups = []
        self.exercise_names = []
        loaded_count = 0
        skipped_count = 0

        if not self.present_exercises:
            print("No present exercises found with data and group. Skipping sample loading.")
            return

        for ex in self.present_exercises:
            ex_dir = os.path.join(self.data_dir, ex)
            # This directory should exist based on _build_mappings_dynamic, but double-check
            if not os.path.isdir(ex_dir):
                print(f"Warning: Directory unexpectedly missing for present exercise: {ex_dir}")
                continue

            try:
                for fname in os.listdir(ex_dir):
                    if fname.endswith('.npy'):
                        path = os.path.join(ex_dir, fname)
                        if not os.path.exists(path):
                            print(f"Fichier manquant: {path}")
                            skipped_count += 1
                            continue

                        # Basic check for file size to quickly filter out potentially empty files
                        if os.path.getsize(path) < 100: # Arbitrary small threshold
                            print(f"Skipping potentially empty file: {path}")
                            skipped_count += 1
                            continue

                        self.samples.append(path)
                        # Use the dynamically created exercise_to_idx
                        # Ensure the exercise is still in exercise_to_idx (should be based on _build_mappings)
                        if ex in self.exercise_to_idx:
                             self.labels.append(self.exercise_to_idx[ex])
                        else:
                            # This case should ideally not be reached with the updated _build_mappings
                            print(f"Warning: Exercise {ex} in samples but not in exercise_to_idx. Skipping sample.")
                            skipped_count += 1
                            self.samples.pop() # Remove the appended path
                            continue


                        # Use the dynamically created group_to_idx and exercise_to_group
                        # Ensure the exercise is still in exercise_to_group (should be based on _build_mappings)
                        if ex in self.exercise_to_group:
                            self.groups.append(self.group_to_idx[self.exercise_to_group[ex]])
                            self.exercise_names.append(ex)
                            loaded_count += 1
                        else:
                            # This case should ideally not be reached with the updated _build_mappings
                            print(f"Warning: Exercise {ex} in samples but not in exercise_to_group. Skipping sample.")
                            # Remove the appended path and label as well
                            self.samples.pop()
                            self.labels.pop()
                            skipped_count += 1


            except Exception as e:
                print(f"Error processing directory {ex_dir}: {e}")
                continue

        print(f"Finished loading samples. Loaded {loaded_count} samples, skipped {skipped_count}.")


    def _calculate_expected_features(self, exercise_name):
        """Calculates the expected feature dimension for a given exercise."""
        if exercise_name not in self.exercises_points:
            # Fallback or error handling if exercise is not in the points mapping
            print(f"Warning: Exercise '{exercise_name}' not found in exercises_points.")
            # Return a default or the max dimension
            return self.max_feature_dim # Use max_feature_dim as a safe fallback


        num_points = len(self.exercises_points[exercise_name])
        base_features = num_points * 3

        # Features du pr√©traitement am√©lior√© (Angles, Distances, Vitesses, Acc√©l√©rations)
        # Make sure this calculation matches the actual feature generation in preprocessor
        angles = 3  # coude, genou, torse
        distances = 2  # distances relatives
        dynamics = base_features * 2  # vitesses + acc√©l√©rations (3D * 2 for vel+acc)

        total_features = base_features + angles + distances + dynamics
        return total_features

    def _extract_exercise_name(self, file_path):
        """Extrait le nom d'exercice depuis le chemin de fichier"""
        path_parts = os.path.normpath(file_path).split(os.sep)
        for part in path_parts:
            if part in self.exercises_points:
                return part
        return "unknown"

    def _validate_sample_quality(self, seq, exercise_name):
        """Valide la qualit√© d'un √©chantillon selon les crit√®res physiques"""
        if seq.shape[0] == 0:
            return False

        # V√©rification NaN/Inf
        if np.isnan(seq).any() or np.isinf(seq).any():
            return False

        # V√©rification variance
        variance = np.var(seq, axis=0)
        static_ratio = (variance < 1e-6).sum() / seq.shape[1]

        return static_ratio < 0.3  # Moins de 30% de features statiques

    def __getitem__(self, idx):
        try:
            # Load the numpy array
            seq_np = np.load(self.samples[idx], mmap_mode='r', allow_pickle=False)

            # Ensure the data is float32 and handle potential non-finite values after loading
            seq_np = seq_np.astype(np.float32)
            if np.isnan(seq_np).any() or np.isinf(seq_np).any():
                 print(f"Warning: NaN or Inf found in {self.samples[idx]} after loading. Skipping.")
                 return None # Skip this sample

            seq = torch.from_numpy(seq_np)

            exercise_name = self.exercise_names[idx]
            # We no longer need expected_features per sample for padding in _standardize_sequence_enhanced

            # Validation physique if enabled
            if self.validate_physics and not self._validate_sample_quality(seq.numpy(), exercise_name):
                print(f"Warning: Sample {self.samples[idx]} failed quality validation. Skipping.")
                return None


            # Adaptive standardization using the global max_feature_dim
            seq = self._standardize_sequence_enhanced(seq, self.max_feature_dim)


            if self.augment:
                seq = self.augment_sequence(seq)

            return (
                seq.to(device),
                torch.tensor(self.labels[idx], dtype=torch.long).to(device),
                torch.tensor(self.groups[idx], dtype=torch.long).to(device)
            )

        except Exception as e:
            print(f"‚ùå Error loading or processing {self.samples[idx]}: {str(e)}")
            return None

    def _standardize_sequence_enhanced(self, seq, target_feature_dim):
        """Adaptive standardization to a target feature dimension."""
        # Feature adjustment
        if seq.shape[1] < target_feature_dim:
            padding = target_feature_dim - seq.shape[1]
            seq = torch.nn.functional.pad(seq, (0, padding), mode='constant', value=0)
        elif seq.shape[1] > target_feature_dim:
            seq = seq[:, :target_feature_dim]

        # Temporal adjustment
        if seq.shape[0] > self.max_seq_len:
            seq = seq[:self.max_seq_len]
        elif seq.shape[0] < self.max_seq_len:
            padding = self.max_seq_len - seq.shape[0]
            seq = torch.nn.functional.pad(seq, (0, 0, 0, padding), mode='constant', value=0)

        return seq

    def augment_sequence(self, seq):
      """Enhanced data augmentation"""
      # Controlled Jitter
      if random.random() < 0.5 and seq.numel() > 0: # Added check for empty tensor
          noise_level = 0.02 * torch.std(seq)
          seq += torch.randn_like(seq) * noise_level

      # Adaptive Scaling
      if random.random() < 0.5 and seq.numel() > 0: # Added check for empty tensor
          scale = random.uniform(0.95, 1.05)
          seq *= scale

      # Selective Masking with PyTorch
      if random.random() < 0.3 and seq.numel() > 0: # Added check for empty tensor
          mask = (torch.rand_like(seq) > 0.15).float()
          mean_val = torch.mean(seq) if seq.numel() > 0 else 0.0 # Handle empty tensor mean
          seq = seq * mask + (1 - mask) * mean_val

      # Temporal Permutation with PyTorch
      if random.random() < 0.2 and seq.size(0) > 5: # Ensure enough frames for splitting
          seq_len = seq.size(0)
          try:
              split_indices = sorted(random.sample(range(1, seq_len), random.randint(1, min(3, seq_len - 1))))
              split_sizes = [split_indices[0]]
              for i in range(len(split_indices) - 1):
                  split_sizes.append(split_indices[i+1] - split_indices[i])
              split_sizes.append(seq_len - split_indices[-1])
              segments = torch.split(seq, split_sizes)

              indices = torch.randperm(len(segments))
              seq = torch.cat([segments[i] for i in indices], dim=0)[:seq_len] # Ensure length is preserved
          except ValueError as e:
               print(f"Warning: Temporal permutation failed for a sample. {e}") # Log error

      return seq

class AdaptiveExerciseStandards:
    """
    Syst√®me de standards d'exercices adaptatifs avec zones de tol√©rance dynamiques
    et apprentissage continu bas√© sur le profil utilisateur.
    """

    def __init__(self, user_profile=None):
        self.user_profile = user_profile or {}
        self.adaptation_history = []
        self.performance_metrics = {}

        # Standards biom√©caniques adaptatifs
        self.exercise_standards = {
            "squat": {
                'biomechanical_parameters': {
                    'knee_angle_min': {'novice': 80, 'intermediate': 75, 'expert': 70},
                    'knee_angle_max': {'novice': 120, 'intermediate': 115, 'expert': 110},
                    'back_straight_threshold': {'novice': 0.15, 'intermediate': 0.12, 'expert': 0.08},
                    'hip_mobility_index': {'novice': 0.6, 'intermediate': 0.75, 'expert': 0.9},
                    'ankle_dorsiflexion': {'novice': 15, 'intermediate': 20, 'expert': 25},
                    'coordination_score': {'novice': 0.5, 'intermediate': 0.7, 'expert': 0.9},
                    'stability_coefficient': {'novice': 0.6, 'intermediate': 0.8, 'expert': 0.95}
                },
                'tolerance_zones': {
                    'novice': 0.20,      # ¬±20% de tol√©rance
                    'intermediate': 0.12, # ¬±12% de tol√©rance
                    'expert': 0.05       # ¬±5% de tol√©rance
                },
                'adaptive_corrections': {
                    'novice': {
                        'knee_too_closed': "Descendez jusqu'√† ce que vos hanches soient au niveau de vos genoux. Prenez votre temps pour ma√Ætriser le mouvement.",
                        'knee_too_open': "Remontez l√©g√®rement, √©vitez de descendre trop bas pour commencer.",
                        'back_not_straight': "Gardez la poitrine haute et regardez droit devant. Imaginez un mur derri√®re votre dos.",
                        'progression_tip': "Excellent progr√®s ! Continuez √† travailler votre mobilit√© de cheville."
                    },
                    'intermediate': {
                        'knee_too_closed': "Travaillez votre mobilit√© de cheville et descendez plus profond√©ment en gardant les talons au sol.",
                        'knee_too_open': "Contr√¥lez mieux la descente, la profondeur optimale se situe √† 90¬∞ aux genoux.",
                        'back_not_straight': "Renforcez votre gainage et initiez le mouvement par les hanches plut√¥t que les genoux.",
                        'progression_tip': "Bonne technique ! Travaillez maintenant la coordination hanche-genou."
                    },
                    'expert': {
                        'knee_too_closed': "Optimisez votre patron moteur : initiation hanche-genou coordin√©e avec activation pr√©alable du tronc.",
                        'knee_too_open': "Ajustez la profondeur selon votre morphologie et vos objectifs de performance.",
                        'back_not_straight': "Travaillez la stabilit√© dynamique du tronc en int√©grant des charges asym√©triques.",
                        'progression_tip': "Technique ma√Ætris√©e ! Explorez les variations avanc√©es."
                    }
                },
                'fatigue_adjustments': {
                    'light': 1.0,    # Pas d'ajustement
                    'moderate': 1.15, # +15% de tol√©rance
                    'high': 1.30     # +30% de tol√©rance
                }
            },

            "push-up": {
                'biomechanical_parameters': {
                    'elbow_angle_min': {'novice': 50, 'intermediate': 45, 'expert': 40},
                    'elbow_angle_max': {'novice': 95, 'intermediate': 90, 'expert': 85},
                    'body_alignment_threshold': {'novice': 0.20, 'intermediate': 0.15, 'expert': 0.10},
                    'scapular_stability': {'novice': 0.5, 'intermediate': 0.7, 'expert': 0.9},
                    'core_activation': {'novice': 0.6, 'intermediate': 0.8, 'expert': 0.95},
                    'movement_tempo': {'novice': 2.0, 'intermediate': 1.5, 'expert': 1.0},
                    'range_of_motion': {'novice': 0.7, 'intermediate': 0.85, 'expert': 1.0}
                },
                'tolerance_zones': {
                    'novice': 0.25,
                    'intermediate': 0.15,
                    'expert': 0.08
                },
                'adaptive_corrections': {
                    'novice': {
                        'arms_too_wide': "Rapprochez vos mains et gardez les coudes pr√®s du corps. Commencez sur les genoux si n√©cessaire.",
                        'not_low_enough': "Descendez progressivement en contr√¥lant le mouvement. L'amplitude viendra avec la pratique.",
                        'body_not_straight': "Contractez les abdominaux et gardez le corps rigide comme une planche.",
                        'progression_tip': "Excellent effort ! Votre stabilit√© s'am√©liore s√©ance apr√®s s√©ance."
                    },
                    'intermediate': {
                        'arms_too_wide': "Optimisez l'angle des coudes √† 45¬∞ pour un meilleur recrutement musculaire.",
                        'not_low_enough': "Travaillez l'amplitude compl√®te en touchant presque le sol avec la poitrine.",
                        'body_not_straight': "Renforcez la cha√Æne post√©rieure et maintenez l'alignement t√™te-bassin.",
                        'progression_tip': "Bonne forme ! Int√©grez maintenant des variations temporelles."
                    },
                    'expert': {
                        'arms_too_wide': "Ajustez finement l'angle selon vos objectifs : force (coudes serr√©s) ou volume (45¬∞).",
                        'not_low_enough': "Exploitez le cycle √©tirement-raccourcissement en pause isom√©trique en bas.",
                        'body_not_straight': "Travaillez la stabilit√© anti-extension avec des charges externes.",
                        'progression_tip': "Technique exemplaire ! Exp√©rimentez les variations unipodales."
                    }
                }
            },

            "deadlift": {
                'biomechanical_parameters': {
                    'back_straight_threshold': {'novice': 0.15, 'intermediate': 0.10, 'expert': 0.05},
                    'hip_angle_min': {'novice': 35, 'intermediate': 30, 'expert': 25},
                    'hip_angle_max': {'novice': 180, 'intermediate': 180, 'expert': 180},
                    'knee_tracking': {'novice': 0.6, 'intermediate': 0.8, 'expert': 0.95},
                    'bar_path_deviation': {'novice': 3.0, 'intermediate': 2.0, 'expert': 1.0},
                    'hip_hinge_ratio': {'novice': 0.6, 'intermediate': 0.8, 'expert': 0.9},
                    'lockout_timing': {'novice': 1.5, 'intermediate': 1.2, 'expert': 1.0}
                },
                'tolerance_zones': {
                    'novice': 0.22,
                    'intermediate': 0.14,
                    'expert': 0.06
                },
                'adaptive_corrections': {
                    'novice': {
                        'back_not_straight': "Gardez la poitrine haute et les √©paules en arri√®re. Commencez avec une barre sur√©lev√©e si n√©cessaire.",
                        'hips_too_low': "Remontez les hanches, le deadlift n'est pas un squat. Poussez les hanches vers l'arri√®re.",
                        'knees_not_aligned': "Gardez les genoux dans l'axe des pieds, poussez le sol avec vos talons.",
                        'progression_tip': "Bon travail ! Votre patron de hanche s'am√©liore."
                    },
                    'intermediate': {
                        'back_not_straight': "Travaillez la mobilit√© thoracique et renforcez les √©recteurs du rachis.",
                        'hips_too_low': "Optimisez l'angle de d√©part selon votre morphologie (longueur f√©mur/tibia).",
                        'knees_not_aligned': "Renforcez les moyens fessiers et travaillez la stabilit√© frontale.",
                        'progression_tip': "Excellente progression ! Votre technique se stabilise."
                    },
                    'expert': {
                        'back_not_straight': "Peaufinez la pr√©-tension du syst√®me et l'activation s√©quentielle des cha√Ænes.",
                        'hips_too_low': "Ajustez selon la variante : conventional, sumo ou trap bar.",
                        'knees_not_aligned': "Optimisez la strat√©gie neuromusculaire selon vos points faibles.",
                        'progression_tip': "Ma√Ætrise technique ! Explorez les variations avanc√©es."
                    }
                }
            }
        }

        # M√©triques de progression
        self.progression_tracking = {
            'consistency_score': 0.0,
            'improvement_rate': 0.0,
            'technique_stability': 0.0,
            'adaptation_speed': 0.0
        }

    def get_user_level(self):
        """D√©termine le niveau de l'utilisateur bas√© sur l'historique et les performances."""
        if not self.user_profile:
            return 'novice'

        experience_months = self.user_profile.get('experience_months', 0)
        consistency_score = self.progression_tracking.get('consistency_score', 0)
        technique_stability = self.progression_tracking.get('technique_stability', 0)

        # Algorithme de classification adaptatif
        if experience_months >= 24 and consistency_score >= 0.8 and technique_stability >= 0.85:
            return 'expert'
        elif experience_months >= 6 and consistency_score >= 0.6 and technique_stability >= 0.7:
            return 'intermediate'
        else:
            return 'novice'

    def calculate_dynamic_thresholds(self, exercise, user_level, fatigue_state='light'):
        """Calcule les seuils adaptatifs en temps r√©el."""
        base_params = self.exercise_standards[exercise]['biomechanical_parameters']
        tolerance = self.exercise_standards[exercise]['tolerance_zones'][user_level]
        fatigue_factor = self.exercise_standards[exercise]['fatigue_adjustments'][fatigue_state]

        adjusted_thresholds = {}
        for param, values in base_params.items():
            if isinstance(values, dict):
                base_value = values[user_level]
                adjusted_tolerance = tolerance * fatigue_factor
                adjusted_thresholds[param] = {
                    'target': base_value,
                    'min': base_value * (1 - adjusted_tolerance),
                    'max': base_value * (1 + adjusted_tolerance),
                    'tolerance': adjusted_tolerance
                }

        return adjusted_thresholds

    def get_adaptive_correction(self, exercise, error_type, user_level, performance_history=None):
        """G√©n√®re une correction adapt√©e au niveau et √† l'historique de l'utilisateur."""
        corrections = self.exercise_standards[exercise]['adaptive_corrections'][user_level]

        base_correction = corrections.get(error_type, "Ajustez votre technique selon les indications.")

        # Ajustement contextuel bas√© sur l'historique
        if performance_history:
            recent_errors = performance_history.get('recent_errors', [])
            if error_type in recent_errors[-3:]:  # Erreur r√©currente
                context_prefix = "Erreur fr√©quente d√©tect√©e : "
                if user_level == 'novice':
                    context_prefix += "Concentrez-vous particuli√®rement sur ce point. "
                elif user_level == 'intermediate':
                    context_prefix += "Travaillez sp√©cifiquement cet aspect entre les s√©ances. "
                else:
                    context_prefix += "Analysez votre patron moteur pour cette compensation. "

                return context_prefix + base_correction

        return base_correction

    def update_adaptation_history(self, exercise, performance_data, user_feedback=None):
        """Met √† jour l'historique d'adaptation pour l'apprentissage continu."""
        timestamp = datetime.now().isoformat()

        adaptation_entry = {
            'timestamp': timestamp,
            'exercise': exercise,
            'performance': performance_data,
            'user_feedback': user_feedback,
            'corrections_applied': performance_data.get('corrections', []),
            'improvement_detected': self._detect_improvement(exercise, performance_data)
        }

        self.adaptation_history.append(adaptation_entry)
        self._update_progression_metrics(exercise, performance_data)

        # Limite la taille de l'historique
        if len(self.adaptation_history) > 1000:
            self.adaptation_history = self.adaptation_history[-800:]

    def _detect_improvement(self, exercise, current_performance):
        """D√©tecte l'am√©lioration bas√©e sur l'historique r√©cent."""
        recent_sessions = [entry for entry in self.adaptation_history[-10:]
                          if entry['exercise'] == exercise]

        if len(recent_sessions) < 3:
            return False

        # Analyse de tendance simple
        recent_scores = [session['performance'].get('overall_score', 0)
                        for session in recent_sessions]
        current_score = current_performance.get('overall_score', 0)

        return current_score > np.mean(recent_scores) + np.std(recent_scores)

    def _update_progression_metrics(self, exercise, performance_data):
        """Met √† jour les m√©triques de progression globales."""
        # Impl√©mentation simplifi√©e des m√©triques de progression
        overall_score = performance_data.get('overall_score', 0)

        if exercise not in self.performance_metrics:
            self.performance_metrics[exercise] = []

        self.performance_metrics[exercise].append(overall_score)

        # Calcul de la consistance (√©cart-type invers√©)
        if len(self.performance_metrics[exercise]) >= 5:
            scores = self.performance_metrics[exercise][-10:]
            self.progression_tracking['consistency_score'] = max(0, 1 - np.std(scores) / max(np.mean(scores), 0.1))

            # Taux d'am√©lioration (pente de r√©gression)
            if len(scores) >= 5:
                x = np.arange(len(scores))
                slope, _ = np.polyfit(x, scores, 1)
                self.progression_tracking['improvement_rate'] = max(0, slope)

    def generate_scoring_feedback(self, exercise, performance_score, user_level):
        """G√©n√®re un feedback graduel bas√© sur le score de performance."""
        if performance_score >= 95:
            level = "Excellent"
            message = f"Performance exceptionnelle ! Technique ma√Ætris√©e au niveau {user_level}."
        elif performance_score >= 85:
            level = "Tr√®s bon"
            message = f"Tr√®s bonne ex√©cution. Quelques ajustements mineurs pour la perfection."
        elif performance_score >= 70:
            level = "Bon"
            message = f"Bonne base technique. Continuez √† travailler les points sp√©cifiques."
        elif performance_score >= 50:
            level = "Moyen"
            message = f"Technique en d√©veloppement. Concentrez-vous sur les fondamentaux."
        else:
            level = "√Ä am√©liorer"
            message = f"Technique √† retravailler. Prenez le temps de ma√Ætriser les bases."

        return {
            'level': level,
            'score': performance_score,
            'message': message,
            'next_focus': self._get_next_focus(exercise, performance_score, user_level)
        }

    def _get_next_focus(self, exercise, score, user_level):
        """Sugg√®re le prochain point d'attention selon le niveau et le score."""
        focus_map = {
            'squat': {
                'novice': ['mobilit√© cheville', 'gainage de base', 'patron de mouvement'],
                'intermediate': ['coordination hanche-genou', 'charge progressive', 'variations'],
                'expert': ['optimisation biom√©canique', 'variations avanc√©es', 'performance']
            },
            'push-up': {
                'novice': ['stabilit√© scapulaire', 'force de base', 'amplitude progressive'],
                'intermediate': ['variations d\'angle', 'tempo contr√¥l√©', 'unilat√©ral'],
                'expert': ['plyom√©trie', 'charges externes', 'patterns complexes']
            },
            'deadlift': {
                'novice': ['patron de hanche', 'position de d√©part', 'mobilit√©'],
                'intermediate': ['timing coordination', 'variations techniques', 'charge'],
                'expert': ['optimisation morphologique', 'sp√©cialisation', 'performance']
            }
        }

        exercise_focuses = focus_map.get(exercise, {}).get(user_level, ['technique g√©n√©rale'])
        score_index = min(len(exercise_focuses) - 1, int(score / 33))

        return exercise_focuses[score_index]


In [4]:
import os
import json
import random
import numpy as np
import torch
from torch.utils.data import Dataset

class EnhancedExerciseHierarchicalDataset(Dataset):
    def __init__(self,
                 exercices_points: dict,
                 grouped_sports: dict,
                 max_seq_len: int = 45,
                 augment: bool = False,
                 validate_physics: bool = True,
                 data_dir: str = None):
        """
        Dataset flexible pour exercices Posture √¢‚Ç¨‚Äú supports preprocessed files or webcam-only mode.
        """
        self.exercises_points  = exercices_points
        self.grouped_sports    = grouped_sports
        self.max_seq_len       = max_seq_len
        self.augment           = augment
        self.validate_physics  = validate_physics
        self.data_dir          = data_dir

        if self.data_dir:
            # Mode pr√É¬©trait√É¬© : scan des fichiers et chargement des samples
            self._build_mappings_dynamic()
            self.max_feature_dim = self._calculate_max_feature_dim(self.present_exercises)
            self._load_samples_enhanced()
            if self.validate_physics:
                self._validate_dataset_quality()
        else:
            # Mode webcam only : initialisation directe des mappings
            self.present_exercises = list(self.exercises_points.keys())
            self.exercise_to_idx   = {ex: i for i, ex in enumerate(self.present_exercises)}
            self.idx_to_exercise   = {i: ex for ex, i in self.exercise_to_idx.items()}
            self.exercise_to_group = {
                ex: grp
                for grp, exercises in self.grouped_sports.items()
                for ex in exercises
                if ex in self.exercise_to_idx
            }
            present_groups = sorted(set(self.exercise_to_group.values()))
            self.group_to_idx = {g: i for i, g in enumerate(present_groups)}
            self.idx_to_group = {i: g for g, i in self.group_to_idx.items()}
            self.max_feature_dim = self._calculate_max_feature_dim(self.present_exercises)

    def __len__(self):
        return len(getattr(self, 'samples', []))

    def _build_mappings_dynamic(self):
        """Scan du r√É¬©pertoire data_dir pour g√É¬©n√É¬©rer mappings et liste de samples."""
        self.present_exercises = []
        if not os.path.isdir(self.data_dir):
            self.exercise_to_idx = {}
            self.idx_to_exercise = {}
            self.exercise_to_group = {}
            self.group_to_idx = {}
            self.idx_to_group = {}
            return

        # Rep√É¬©rage des exercices valides
        for ex in self.exercises_points:
            ex_dir = os.path.join(self.data_dir, ex)
            if os.path.isdir(ex_dir) and any(f.endswith('.npy') for f in os.listdir(ex_dir)):
                if any(ex in lst for lst in self.grouped_sports.values()):
                    self.present_exercises.append(ex)
        self.present_exercises.sort()

        # Mappings indices √¢‚Ä†‚Äù exercices
        self.exercise_to_idx = {ex: i for i, ex in enumerate(self.present_exercises)}
        self.idx_to_exercise = {i: ex for ex, i in self.exercise_to_idx.items()}

        # Mapping exercice √¢‚Ä†‚Äô groupe
        self.exercise_to_group = {
            ex: grp
            for grp, exercises in self.grouped_sports.items()
            for ex in exercises
            if ex in self.exercise_to_idx
        }

        # Mappings indices √¢‚Ä†‚Äù groupes
        present_groups = sorted(set(self.exercise_to_group.values()))
        self.group_to_idx = {g: i for i, g in enumerate(present_groups)}
        self.idx_to_group = {i: g for g, i in self.group_to_idx.items()}

    def _calculate_max_feature_dim(self, exercises: list) -> int:
        """Calcule la dimension maximale de features selon exercises_points."""
        max_dim = 0
        for ex in exercises:
            num_pts = len(self.exercises_points.get(ex, []))
            # 3 coordonn√É¬©es + angles (3) + distances (2) + dynamiques (2√É‚Äî3√É‚Äînum_pts)
            dim = num_pts*3 + 3 + 2 + num_pts*3*2
            max_dim = max(max_dim, dim)
        return max_dim

    def _load_samples_enhanced(self):
        """Charge les chemins de fichiers .npy, labels et groupes pour l√¢‚Ç¨‚Ñ¢entra√É¬Ænement."""
        self.samples, self.labels, self.groups, self.exercise_names = [], [], [], []
        for ex in self.present_exercises:
            ex_dir = os.path.join(self.data_dir, ex)
            for fname in os.listdir(ex_dir):
                if not fname.endswith('.npy'): continue
                path = os.path.join(ex_dir, fname)
                if os.path.getsize(path) < 100: continue  # Fichiers trop petits
                self.samples.append(path)
                self.labels.append(self.exercise_to_idx[ex])
                self.groups.append(self.group_to_idx[self.exercise_to_group[ex]])
                self.exercise_names.append(ex)

    def _validate_dataset_quality(self):
        """Filtre les samples contenant NaN/Inf ou features statiques excessives."""
        valid_samples, valid_labels, valid_groups, valid_names = [], [], [], []
        for s, l, g, n in zip(self.samples, self.labels, self.groups, self.exercise_names):
            arr = np.load(s, mmap_mode='r')
            if np.isnan(arr).any() or np.isinf(arr).any(): continue
            var = np.var(arr, axis=0)
            if (var < 1e-6).sum()/arr.shape[1] > 0.3: continue
            valid_samples.append(s); valid_labels.append(l)
            valid_groups.append(g); valid_names.append(n)
        self.samples, self.labels = valid_samples, valid_labels
        self.groups, self.exercise_names = valid_groups, valid_names

    def __getitem__(self, idx):
        path = self.samples[idx]
        seq = np.load(path).astype(np.float32)
        if np.isnan(seq).any() or np.isinf(seq).any():
            return None
        seq = torch.from_numpy(seq)
        # Standardisation temporelle et dimensionnelle
        seq = self._standardize_sequence(seq)
        if self.augment: seq = self.augment_sequence(seq)
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        group = torch.tensor(self.groups[idx], dtype=torch.long)
        return seq, label, group

    def _standardize_sequence(self, seq: torch.Tensor) -> torch.Tensor:
        """Pad/trim features et s√É¬©quences pour obtenir (max_seq_len √É‚Äî max_feature_dim)."""
        # Dimension features
        d = seq.shape[1]
        if d < self.max_feature_dim:
            pad = self.max_feature_dim - d
            seq = torch.nn.functional.pad(seq, (0, pad))
        elif d > self.max_feature_dim:
            seq = seq[:, :self.max_feature_dim]
        # Temporal
        t = seq.shape[0]
        if t < self.max_seq_len:
            seq = torch.nn.functional.pad(seq, (0, 0, 0, self.max_seq_len - t))
        else:
            seq = seq[:self.max_seq_len]
        return seq

    def augment_sequence(self, seq: torch.Tensor) -> torch.Tensor:
        """Applique jitter, scaling, masking, permutation temporelle."""
        if random.random() < 0.5:
            seq += torch.randn_like(seq) * (0.02 * torch.std(seq))
        if random.random() < 0.5:
            seq *= random.uniform(0.95, 1.05)
        if random.random() < 0.3:
            mask = (torch.rand_like(seq) > 0.15).float()
            seq = seq * mask + (1-mask)*seq.mean()
        if random.random() < 0.2 and seq.size(0) > 5:
            splits = sorted(random.sample(range(1, seq.size(0)), random.randint(1,3)))
            sizes = [splits[0]] + [splits[i+1]-splits[i] for i in range(len(splits)-1)] + [seq.size(0)-splits[-1]]
            segments = torch.split(seq, sizes)
            perm = torch.randperm(len(segments))
            seq = torch.cat([segments[i] for i in perm], dim=0)[:self.max_seq_len]
        return seq
import json
# Chargement des mappings depuis le JSON g√É¬©n√É¬©r√É¬© avec le mod√É¬®le
with open("model_mappings.json", "r") as f:
    mappings = json.load(f)

exercise_points = mappings["exercise_points"]
group_mapping  = mappings["group_mapping"]

# Instanciation sans dossier pr√É¬©trait√É¬© (mode webcam uniquement)
dataset = EnhancedExerciseHierarchicalDataset(
    exercices_points=exercise_points,
    grouped_sports=group_mapping,
    max_seq_len=50,
    augment=False,
    validate_physics=True,
    data_dir=None
)

print("Groupes d√É¬©tect√É¬©s :", dataset.group_to_idx)          # Affiche les groupes [3]
print("Exercices d√É¬©tect√É¬©s :", dataset.exercise_to_idx)    # Affiche les exercices [3]

Groupes d√É¬©tect√É¬©s : {'core training': 0, 'hip dominant': 1, 'horizontal pulls': 2, 'horizontal pushes': 3, 'quad dominant': 4, 'vertical pulls': 5, 'vertical pushes': 6}
Exercices d√É¬©tect√É¬©s : {'t bar row': 0, 'pull up': 1, 'hammer curl': 2, 'decline bench press': 3, 'tricep pushdown': 4, 'chest fly machine': 5, 'squat': 6, 'bench press': 7, 'push-up': 8, 'deadlift': 9, 'leg raises': 10, 'russian twist': 11, 'barbell biceps curl': 12, 'lateral raise': 13, 'hip thrust': 14, 'lat pulldown': 15, 'plank': 16, 'incline bench press': 17, 'tricep dips': 18, 'leg extension': 19, 'romanian deadlift': 20, 'shoulder press': 21}


In [3]:
import json
import torch

import cv2
import numpy as np
import cv2
import mediapipe as mp
import numpy as np
import pyttsx3
import torch
import json


class PoseClassificationModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_groups, num_exercises):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim,
                           bidirectional=True,
                           num_layers=2,
                           dropout=0.3)

        # Attention Temporelle
        self.attention = nn.MultiheadAttention(2*hidden_dim, 4)

        # Branches Sp√©cialis√©es
        self.group_net = nn.Sequential(
            nn.Linear(2*hidden_dim, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, num_groups)
        )

        self.exercise_net = nn.Sequential(
            nn.Linear(2*hidden_dim, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, num_exercises)
        )

    def forward(self, x):
        out, _ = self.lstm(x)  # [batch, seq, 2*hidden]
        out = out.permute(1, 0, 2)  # [seq, batch, 2*hidden]

        # M√©canisme d'Attention
        attn_out, _ = self.attention(out, out, out)
        pooled = torch.mean(attn_out, dim=0)  # [batch, 2*hidden]

        group = self.group_net(pooled)
        exercise = self.exercise_net(pooled)

        return group, exercise

class ModelEvaluator:
    """
    Charge un mod√®le PyTorch entra√Æn√© et ses mappings JSON,
    puis fournit une m√©thode `predict` pour d√©duire le groupe et l‚Äôexercice.
    """
    def __init__(self, model_path, mappings_path, device='cpu'):
        self.device = device
        # Lecture des mappings JSON (idx_to_group, idx_to_exercise, dims)
        with open(mappings_path, 'r') as f:
            self.mappings = json.load(f)
        # Instanciation du mod√®le avec les dimensions sauvegard√©es
        self.model = PoseClassificationModel(
            input_dim   = self.mappings['input_dim'],
            hidden_dim  = self.mappings['hidden_dim'],
            num_groups      = self.mappings['num_groups'],
            num_exercises   = self.mappings['num_exercises']
        ).to(self.device)
        # Chargement du checkpoint PyTorch
        checkpoint = torch.load(model_path, map_location=self.device)
        state_dict = checkpoint.get('model_state_dict', checkpoint)
        self.model.load_state_dict(state_dict)
        self.model.eval()

    def predict(self, sequence_tensor):
        """
        Pr√©dit le groupe, l‚Äôexercice et la confiance pour une s√©quence de features.
        """
        with torch.no_grad():
            grp_out, ex_out = self.model(sequence_tensor)
        grp_idx = grp_out.argmax(dim=1).item()
        ex_idx  = ex_out.argmax(dim=1).item()
        confidence = float(torch.softmax(ex_out, dim=1).max())
        group    = self.mappings['idx_to_group'][str(grp_idx)]
        exercise = self.mappings['idx_to_exercise'][str(ex_idx)]
        return group, exercise, confidence

    def _load_model_and_mappings(self, model_path, mappings_path):
        """Charge le meilleur mod√®le sauvegard√© et ses mappings"""
        # Charger les mappings
        with open(mappings_path, 'r') as f:
            self.mappings = json.load(f)

    def _calculate_biomech_metrics(self, sequences):
        """Calcule les m√©triques biom√©caniques pour une s√©quence"""
        # Simulation des m√©triques - √† remplacer par votre logique r√©elle
        return {
            'angle_accuracy': np.random.uniform(0.7, 0.95),
            'movement_quality': np.random.uniform(0.6, 0.9),
            'stability_score': np.random.uniform(0.5, 0.85)
        }

    def generate_evaluation_report(self, results):
        """G√©n√®re un rapport d√©taill√© d'√©valuation"""
        total_samples = sum(len(class_results) for class_results in results.values())
        total_exercise_correct = sum(
            sum(1 for sample in class_results if sample['exercise_correct'])
            for class_results in results.values()
        )
        total_group_correct = sum(
            sum(1 for sample in class_results if sample['group_correct'])
            for class_results in results.values()
        )

        print(f"\nüìà RAPPORT d'√âVALUATION GLOBAL")
        print(f"=" * 50)
        print(f"√âchantillons analys√©s: {total_samples}")
        print(f"Pr√©cision exercices: {total_exercise_correct/total_samples:.2%}")
        print(f"Pr√©cision groupes: {total_group_correct/total_samples:.2%}")

        return {
            'total_samples': total_samples,
            'exercise_accuracy': total_exercise_correct/total_samples,
            'group_accuracy': total_group_correct/total_samples,
            'detailed_results': results
        }
CHECKPOINT_PATH = "pose_classification_model_best.pth"
MAPPINGS_PATH = "model_mappings.json"

with open("model_mappings.json", "r") as f:
    mappings = json.load(f)


class WebcamCorrector:
    def __init__(self, evaluator, user_level='intermediate'):
        self.evaluator = evaluator
        self.user_level = user_level
        self.pose = mp.solutions.pose.Pose(
            static_image_mode=False,
            model_complexity=1,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5
        )
        self.drawing = mp.solutions.drawing_utils
        self.tts = pyttsx3.init()
        self.tts.setProperty('rate', 150)
        self.tts.setProperty('volume', 0.8)
        self.sequence, self.max_len = [], 30
        self.current_ex = None
        self.max_feature_dim = evaluator.mappings['input_dim'] # Get expected input dimension from model mappings
        self.standards = AdaptiveExerciseStandards() # Initialize AdaptiveExerciseStandards

    def _extract_features(self, lm):
        coords = [coord for p in lm.landmark for coord in (p.x, p.y, p.z)]
        angles = self._compute_angles(lm)
        dists  = self._compute_distances(lm)
        features = np.array(coords + angles + dists, dtype=np.float32)

        # Pad features to match the expected input dimension of the model
        if features.shape[0] < self.max_feature_dim:
            padding = self.max_feature_dim - features.shape[0]
            features = np.pad(features, (0, padding), 'constant', constant_values=0)
        elif features.shape[0] > self.max_feature_dim:
            features = features[:self.max_feature_dim]

        return features

    def _process_frame(self, frame):
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        res = self.pose.process(rgb)
        corrs = []
        if res.pose_landmarks:
            self.drawing.draw_landmarks(frame, res.pose_landmarks, mp.solutions.pose.POSE_CONNECTIONS)
            features = self._extract_features(res.pose_landmarks)
            self.sequence.append(features)
            if len(self.sequence) > self.max_len:
                self.sequence.pop(0)
            if len(self.sequence) >= 15:
                seq_tensor = torch.tensor(self.sequence, dtype=torch.float32).unsqueeze(0).to(self.evaluator.device)
                grp, ex, conf = self.evaluator.predict(seq_tensor)
                # Generate corrections based on the predicted exercise and current user level
                corrs = self._generate_corrections(ex, self.user_level)
                return ex, grp, conf, corrs
        return None, None, 0.0, corrs

    def _compute_angles(self, lm):
        def angle(a, b, c):
            v1, v2 = a - b, c - b
            cosang = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
            return np.degrees(np.arccos(np.clip(cosang, -1, 1)))
        idxs = [(12,14,16), (11,13,15), (24,26,28), (23,25,27)]  # 4 angles
        pts = lm.landmark
        return [angle(np.array([pts[i].x, pts[i].y]),
                      np.array([pts[j].x, pts[j].y]),
                      np.array([pts[k].x, pts[k].y]))
                for i,j,k in idxs]

    def _compute_distances(self, lm):
        pts = lm.landmark
        sl, sr = np.array([pts[11].x, pts[11].y]), np.array([pts[12].x, pts[12].y])
        hl, hr = np.array([pts[23].x, pts[23].y]), np.array([pts[24].x, pts[24].y])
        nose, hipc = np.array([pts[0].x, pts[0].y]), (hl + hr) / 2
        return [np.linalg.norm(sr - sl),
                np.linalg.norm(hr - hl),
                np.linalg.norm(nose - hipc)]

    def _generate_corrections(self, exercise, user_level):
        """Generate corrections based on exercise and user level."""
        # This is a placeholder. In a real application, you would analyze the
        # biomechanical data (angles, distances, etc.) from the 'features'
        # variable to determine specific errors and generate corrections
        # using the AdaptiveExerciseStandards class.
        # For now, we'll just return some generic corrections for demonstration.
        if exercise in self.standards.exercise_standards:
            # Simulate detecting some common errors based on user level
            errors = []
            if user_level == 'novice':
                errors = ['back_not_straight', 'not_low_enough'] # Example novice errors
            elif user_level == 'intermediate':
                 errors = ['arms_too_wide', 'body_not_straight'] # Example intermediate errors
            elif user_level == 'expert':
                 errors = ['back_not_straight'] # Example expert errors

            corrections = []
            for error_type in errors:
                 correction_message = self.standards.get_adaptive_correction(exercise, error_type, user_level)
                 corrections.append({'message': correction_message, 'severity': 'medium'}) # Simulate medium severity

            if not corrections and exercise in ['squat', 'push-up', 'deadlift']:
                 # Add a progression tip if no specific errors are detected for these exercises
                 progression_tip = self.standards.get_adaptive_correction(exercise, 'progression_tip', user_level)
                 if progression_tip:
                     corrections.append({'message': progression_tip, 'severity': 'low'})

            return corrections

        return []

    def _draw(self, frame, corrs):
        """Superpose l‚Äôoverlay, le niveau utilisateur et les corrections."""
        h, w = frame.shape[:2]
        overlay = frame.copy()
        cv2.rectangle(overlay, (5,5), (w-5,140), (0,0,0), -1)
        frame = cv2.addWeighted(frame, 0.6, overlay, 0.4, 0)
        y = 30
        cv2.putText(frame, f"Niveau: {self.user_level}", (10,y),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
        for c in corrs[:3]:
            y += 25
            color = (0,0,255) if c.get('severity','low')=='high' else (0,165,255)
            cv2.putText(frame, c['message'], (10,y),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
        return frame

    def _speak(self, text):
        """Synth√©tise et prononce un texte."""
        try:
            self.tts.say(text)
            self.tts.runAndWait()
        except Exception as e:
            print(f"Error during text-to-speech: {e}")


    def start(self):
        # Variables pour estimation de niveau
        confidence_buffer = []
        WINDOW_SIZE = 30
        last_spoken_exercise = None
        last_spoken_correction = None

        # D√©marrage de la capture vid√©o
        cap = cv2.VideoCapture(0)
        if not cap.isOpened():
            raise RuntimeError("Impossible d'ouvrir la webcam")

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Pr√©diction et corrections
            ex, grp, conf, corrs = self._process_frame(frame)
            if ex:
                # Mise √† jour du buffer de confiance
                confidence_buffer.append(conf)
                if len(confidence_buffer) > WINDOW_SIZE:
                    confidence_buffer.pop(0)
                avg_conf = np.mean(confidence_buffer)

                # Estimation simple du niveau
                if avg_conf >= 0.85:
                    level = "expert"
                elif avg_conf >= 0.70:
                    level = "intermediate"
                else:
                    level = "novice"

                # Update user level in the corrector instance
                self.user_level = level

                # Affichage des informations
                cv2.putText(frame, f"Groupe: {grp}", (10,30),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
                cv2.putText(frame, f"Exercice: {ex}", (10,60),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
                cv2.putText(frame, f"Niveau estim√©: {level}", (10,90),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,255,0), 2)

                # Annonce vocale de l'exercice et des corrections
                if ex != last_spoken_exercise:
                    self._speak(f"Exercice d√©tect√© : {ex}")
                    last_spoken_exercise = ex
                    last_spoken_correction = None # Reset correction spoken when exercise changes

                if corrs and corrs[0]['message'] != last_spoken_correction:
                     self._speak(corrs[0]['message']) # Speak the first correction
                     last_spoken_correction = corrs[0]['message']


            # Dessin des corrections et affichage
            annotated = self._draw(frame, corrs)
            cv2.imshow('Analyse Niveau', annotated)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()


exercise_points = mappings["exercise_points"]
group_mapping  = mappings["group_mapping"]
# Initialisation de l‚Äô√©valuateur de mod√®le
evaluator = ModelEvaluator(
    model_path=CHECKPOINT_PATH,
    mappings_path=MAPPINGS_PATH,
    device='cpu'
)                                                                                    # [3]


# Instanciation du correcteur webcam
wc = WebcamCorrector(evaluator=evaluator, user_level='novice')

# 3. D√©marrer la boucle
wc.start()




  seq_tensor = torch.tensor(self.sequence, dtype=torch.float32).unsqueeze(0).to(self.evaluator.device)


In [None]:

import json
import torch

import cv2
import numpy as np
import cv2
import mediapipe as mp
import numpy as np
import pyttsx3
import torch
import json
import speech_recognition as sr # Import the speech_recognition library


class PoseClassificationModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_groups, num_exercises):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim,
                           bidirectional=True,
                           num_layers=2,
                           dropout=0.3)

        # Attention Temporelle
        self.attention = nn.MultiheadAttention(2*hidden_dim, 4)

        # Branches Sp√©cialis√©es
        self.group_net = nn.Sequential(
            nn.Linear(2*hidden_dim, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, num_groups)
        )

        self.exercise_net = nn.Sequential(
            nn.Linear(2*hidden_dim, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, num_exercises)
        )

    def forward(self, x):
        out, _ = self.lstm(x)  # [batch, seq, 2*hidden]
        out = out.permute(1, 0, 2)  # [seq, batch, 2*hidden]

        # M√©canisme d'Attention
        attn_out, _ = self.attention(out, out, out)
        pooled = torch.mean(attn_out, dim=0)  # [batch, 2*hidden]

        group = self.group_net(pooled)
        exercise = self.exercise_net(pooled)

        return group, exercise

class ModelEvaluator:
    """
    Charge un mod√®le PyTorch entra√Æn√© et ses mappings JSON,
    puis fournit une m√©thode `predict` pour d√©duire le groupe et l‚Äôexercice.
    """
    def __init__(self, model_path, mappings_path, device='cpu'):
        self.device = device
        # Lecture des mappings JSON (idx_to_group, idx_to_exercise, dims)
        with open(mappings_path, 'r') as f:
            self.mappings = json.load(f)
        # Instanciation du mod√®le avec les dimensions sauvegard√©es
        self.model = PoseClassificationModel(
            input_dim   = self.mappings['input_dim'],
            hidden_dim  = self.mappings['hidden_dim'],
            num_groups      = self.mappings['num_groups'],
            num_exercises   = self.mappings['num_exercises']
        ).to(self.device)
        # Chargement du checkpoint PyTorch
        checkpoint = torch.load(model_path, map_location=self.device)
        state_dict = checkpoint.get('model_state_dict', checkpoint)
        self.model.load_state_dict(state_dict)
        self.model.eval()

    def predict(self, sequence_tensor):
        """
        Pr√©dit le groupe, l‚Äôexercice et la confiance pour une s√©quence de features.
        """
        with torch.no_grad():
            grp_out, ex_out = self.model(sequence_tensor)
        grp_idx = grp_out.argmax(dim=1).item()
        ex_idx  = ex_out.argmax(dim=1).item()
        confidence = float(torch.softmax(ex_out, dim=1).max())
        group    = self.mappings['idx_to_group'][str(grp_idx)]
        exercise = self.mappings['idx_to_exercise'][str(ex_idx)]
        return group, exercise, confidence

    def _load_model_and_mappings(self, model_path, mappings_path):
        """Charge le meilleur mod√®le sauvegard√© et ses mappings"""
        # Charger les mappings
        with open(mappings_path, 'r') as f:
            self.mappings = json.load(f)

    def _calculate_biomech_metrics(self, sequences):
        """Calcule les m√©triques biom√©caniques pour une s√©quence"""
        # Simulation des m√©triques - √† remplacer par votre logique r√©elle
        return {
            'angle_accuracy': np.random.uniform(0.7, 0.95),
            'movement_quality': np.random.uniform(0.6, 0.9),
            'stability_score': np.random.uniform(0.5, 0.85)
        }

    def generate_evaluation_report(self, results):
        """G√©n√®re un rapport d√©taill√© d'√©valuation"""
        total_samples = sum(len(class_results) for class_results in results.values())
        total_exercise_correct = sum(
            sum(1 for sample in class_results if sample['exercise_correct'])
            for class_results in results.values()
        )
        total_group_correct = sum(
            sum(1 for sample in class_results if sample['group_correct'])
            for class_results in results.values()
        )

        print(f"\nüìà RAPPORT d'√âVALUATION GLOBAL")
        print(f"=" * 50)
        print(f"√âchantillons analys√©s: {total_samples}")
        print(f"Pr√©cision exercices: {total_exercise_correct/total_samples:.2%}")
        print(f"Pr√©cision groupes: {total_group_correct/total_samples:.2%}")

        return {
            'total_samples': total_samples,
            'exercise_accuracy': total_exercise_correct/total_samples,
            'group_accuracy': total_group_correct/total_samples,
            'detailed_results': results
        }
CHECKPOINT_PATH = "pose_classification_model_best.pth"
MAPPINGS_PATH = "model_mappings.json"

with open("model_mappings.json", "r") as f:
    mappings = json.load(f)


class WebcamCorrector:
    def __init__(self, evaluator, user_level='intermediate'):
        self.evaluator = evaluator
        self.user_level = user_level
        self.pose = mp.solutions.pose.Pose(
            static_image_mode=False,
            model_complexity=1,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5
        )
        self.drawing = mp.solutions.drawing_utils
        self.tts = pyttsx3.init()
        self.tts.setProperty('rate', 150)
        self.tts.setProperty('volume', 0.8)
        self.sequence, self.max_len = [], 30
        self.current_ex = None
        self.max_feature_dim = evaluator.mappings['input_dim'] # Get expected input dimension from model mappings
        self.standards = AdaptiveExerciseStandards() # Initialize AdaptiveExerciseStandards
        self.recognizer = sr.Recognizer() # Initialize speech recognizer


    def _extract_features(self, lm):
        coords = [coord for p in lm.landmark for coord in (p.x, p.y, p.z)]
        angles = self._compute_angles(lm)
        dists  = self._compute_distances(lm)
        features = np.array(coords + angles + dists, dtype=np.float32)

        # Pad features to match the expected input dimension of the model
        if features.shape[0] < self.max_feature_dim:
            padding = self.max_feature_dim - features.shape[0]
            features = np.pad(features, (0, padding), 'constant', constant_values=0)
        elif features.shape[0] > self.max_feature_dim:
            features = features[:self.max_feature_dim]

        return features

    def _process_frame(self, frame):
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        res = self.pose.process(rgb)
        corrs = []
        if res.pose_landmarks:
            self.drawing.draw_landmarks(frame, res.pose_landmarks, mp.solutions.pose.POSE_CONNECTIONS)
            features = self._extract_features(res.pose_landmarks)
            self.sequence.append(features)
            if len(self.sequence) > self.max_len:
                self.sequence.pop(0)
            if len(self.sequence) >= 15:
                seq_tensor = torch.tensor(self.sequence, dtype=torch.float32).unsqueeze(0).to(self.evaluator.device)
                grp, ex, conf = self.evaluator.predict(seq_tensor)
                # Generate corrections based on the predicted exercise and current user level
                corrs = self._generate_corrections(ex, self.user_level)
                return ex, grp, conf, corrs
        return None, None, 0.0, corrs

    def _compute_angles(self, lm):
        def angle(a, b, c):
            v1, v2 = a - b, c - b
            cosang = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
            return np.degrees(np.arccos(np.clip(cosang, -1, 1)))
        idxs = [(12,14,16), (11,13,15), (24,26,28), (23,25,27)]  # 4 angles
        pts = lm.landmark
        return [angle(np.array([pts[i].x, pts[i].y]),
                      np.array([pts[j].x, pts[j].y]),
                      np.array([pts[k].x, pts[k].y]))
                for i,j,k in idxs]

    def _compute_distances(self, lm):
        pts = lm.landmark
        sl, sr = np.array([pts[11].x, pts[11].y]), np.array([pts[12].x, pts[12].y])
        hl, hr = np.array([pts[23].x, pts[23].y]), np.array([pts[24].x, pts[24].y])
        nose, hipc = np.array([pts[0].x, pts[0].y]), (hl + hr) / 2
        return [np.linalg.norm(sr - sl),
                np.linalg.norm(hr - hl),
                np.linalg.norm(nose - hipc)]

    def _generate_corrections(self, exercise, user_level):
        """Generate corrections based on exercise and user level."""
        # This is a placeholder. In a real application, you would analyze the
        # biomechanical data (angles, distances, etc.) from the 'features'
        # variable to determine specific errors and generate corrections
        # using the AdaptiveExerciseStandards class.
        # For now, we'll just return some generic corrections for demonstration.
        if exercise in self.standards.exercise_standards:
            # Simulate detecting some common errors based on user level
            errors = []
            if user_level == 'novice':
                errors = ['back_not_straight', 'not_low_enough'] # Example novice errors
            elif user_level == 'intermediate':
                 errors = ['arms_too_wide', 'body_not_straight'] # Example intermediate errors
            elif user_level == 'expert':
                 errors = ['back_not_straight'] # Example expert errors

            corrections = []
            for error_type in errors:
                 correction_message = self.standards.get_adaptive_correction(exercise, error_type, user_level)
                 corrections.append({'message': correction_message, 'severity': 'medium'}) # Simulate medium severity

            if not corrections and exercise in ['squat', 'push-up', 'deadlift']:
                 # Add a progression tip if no specific errors are detected for these exercises
                 progression_tip = self.standards.get_adaptive_correction(exercise, 'progression_tip', user_level)
                 if progression_tip:
                     corrections.append({'message': progression_tip, 'severity': 'low'})

            return corrections

        return []

    def _draw(self, frame, corrs):
        """Superpose l‚Äôoverlay, le niveau utilisateur et les corrections."""
        h, w = frame.shape[:2]
        overlay = frame.copy()
        cv2.rectangle(overlay, (5,5), (w-5,140), (0,0,0), -1)
        frame = cv2.addWeighted(frame, 0.6, overlay, 0.4, 0)
        y = 30
        cv2.putText(frame, f"Niveau: {self.user_level}", (10,y),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
        for c in corrs[:3]:
            y += 25
            color = (0,0,255) if c.get('severity','low')=='high' else (0,165,255)
            cv2.putText(frame, c['message'], (10,y),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
        return frame

    def _speak(self, text):
        """Synth√©tise et prononce un texte."""
        try:
            self.tts.say(text)
            self.tts.runAndWait()
        except Exception as e:
            print(f"Error during text-to-speech: {e}")

    def _listen_command(self):
        """Listens for a voice command from the microphone and transcribes it."""
        with sr.Microphone() as source:
            print("Say something!")
            self.recognizer.adjust_for_ambient_noise(source, duration=1) # Adjust for noise
            audio = self.recognizer.listen(source)

        try:
            command = self.recognizer.recognize_google(audio, language="en-US") # You can change the language if needed
            print(f"You said: {command}")
            return command
        except sr.UnknownValueError:
            print("Could not understand audio")
            return None
        except sr.RequestError as e:
            print(f"Could not request results from Google Speech Recognition service; {e}")
            return None

    def start(self):
        # Variables pour estimation de niveau
        confidence_buffer = []
        WINDOW_SIZE = 30
        last_spoken_exercise = None
        last_spoken_correction = None
        command_active = False # Flag to indicate if a voice command is active
        command_exercise = None # Exercise specified by voice command
        command_reps = 0 # Repetitions specified by voice command
        current_reps = 0 # Counter for repetitions performed


        # D√©marrage de la capture vid√©o
        cap = cv2.VideoCapture(0)
        if not cap.isOpened():
            raise RuntimeError("Impossible d'ouvrir la webcam")

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Listen for a voice command (can be triggered by a key press or at intervals)
            # For now, let's listen when 'c' is pressed
            key = cv2.waitKey(1) & 0xFF
            if key == ord('c'):
                command = self._listen_command()
                if command:
                    # Process the command (This will be refined in the next step)
                    # Basic parsing for "number exercise" format
                    parts = command.split()
                    if len(parts) >= 2 and parts[0].isdigit():
                        try:
                            command_reps = int(parts[0])
                            command_exercise = " ".join(parts[1:]).lower()
                            print(f"Command received: {command_reps} {command_exercise}")
                            command_active = True
                            current_reps = 0 # Reset rep counter for new command
                            self._speak(f"Okay, let's do {command_reps} {command_exercise}s.")
                        except ValueError:
                            self._speak("Sorry, I didn't understand the number of repetitions.")
                    else:
                        self._speak("Sorry, I didn't understand the command format. Please say 'number exercise'.")


            # Pr√©diction et corrections
            ex, grp, conf, corrs = self._process_frame(frame)

            # If a command is active, use the commanded exercise instead of the predicted one
            if command_active and command_exercise:
                 ex = command_exercise
                 # Regenerate corrections based on the commanded exercise and current user level
                 corrs = self._generate_corrections(ex, self.user_level)


            if ex:
                # Mise √† jour du buffer de confiance
                confidence_buffer.append(conf)
                if len(confidence_buffer) > WINDOW_SIZE:
                    confidence_buffer.pop(0)
                avg_conf = np.mean(confidence_buffer)

                # Estimation simple du niveau
                if avg_conf >= 0.85:
                    level = "expert"
                elif avg_conf >= 0.70:
                    level = "intermediate"
                else:
                    level = "novice"

                # Update user level in the corrector instance
                self.user_level = level


                # Affichage des informations
                cv2.putText(frame, f"Groupe: {grp}", (10,30),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
                cv2.putText(frame, f"Exercice: {ex}", (10,60),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
                cv2.putText(frame, f"Niveau estim√©: {level}", (10,90),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,255,0), 2)

                # Annonce vocale de l'exercice et des corrections
                if ex != last_spoken_exercise:
                    if not command_active: # Only announce detected exercise if no command is active
                        self._speak(f"Exercice d√©tect√© : {ex}")
                    last_spoken_exercise = ex
                    last_spoken_correction = None # Reset correction spoken when exercise changes

                if corrs and corrs[0]['message'] != last_spoken_correction:
                    # Only speak corrections if a command is active or if no command is active and an exercise is detected
                    if command_active or (not command_active and ex):
                         self._speak(corrs[0]['message']) # Speak the first correction
                         last_spoken_correction = corrs[0]['message']

                # Repetition tracking (Placeholder - needs actual movement detection logic)
                # This is where you would add logic to detect a completed repetition
                # and increment current_reps. For now, we'll just demonstrate the display.
                if command_active:
                     cv2.putText(frame, f"Reps: {current_reps}/{command_reps}", (10,120),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,0), 2)
                     if current_reps >= command_reps and command_reps > 0:
                          self._speak(f"Great job! You completed {command_reps} {command_exercise}s.")
                          command_active = False # Deactivate command after completing reps
                          command_exercise = None
                          command_reps = 0


            # Dessin des corrections et affichage
            annotated = self._draw(frame, corrs)
            cv2.imshow('Analyse Niveau', annotated)

            if key == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()


exercise_points = mappings["exercise_points"]
group_mapping  = mappings["group_mapping"]
# Initialisation de l‚Äô√©valuateur de mod√®le
evaluator = ModelEvaluator(
    model_path=CHECKPOINT_PATH,
    mappings_path=MAPPINGS_PATH,
    device='cpu'
)                                                                                    # [3]


# Instanciation du correcteur webcam
wc = WebcamCorrector(evaluator=evaluator, user_level='novice')

# 3. D√©marrer la boucle
wc.start()

In [5]:
import json
import torch

import cv2
import numpy as np
import cv2
import mediapipe as mp
import numpy as np
import pyttsx3
import torch
import json
import speech_recognition as sr # Import the speech_recognition library


class PoseClassificationModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_groups, num_exercises):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim,
                           bidirectional=True,
                           num_layers=2,
                           dropout=0.3)

        # Attention Temporelle
        self.attention = nn.MultiheadAttention(2*hidden_dim, 4)

        # Branches Sp√©cialis√©es
        self.group_net = nn.Sequential(
            nn.Linear(2*hidden_dim, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, num_groups)
        )

        self.exercise_net = nn.Sequential(
            nn.Linear(2*hidden_dim, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, num_exercises)
        )

    def forward(self, x):
        out, _ = self.lstm(x)  # [batch, seq, 2*hidden]
        out = out.permute(1, 0, 2)  # [seq, batch, 2*hidden]

        # M√©canisme d'Attention
        attn_out, _ = self.attention(out, out, out)
        pooled = torch.mean(attn_out, dim=0)  # [batch, 2*hidden]

        group = self.group_net(pooled)
        exercise = self.exercise_net(pooled)

        return group, exercise

class ModelEvaluator:
    """
    Charge un mod√®le PyTorch entra√Æn√© et ses mappings JSON,
    puis fournit une m√©thode `predict` pour d√©duire le groupe et l‚Äôexercice.
    """
    def __init__(self, model_path, mappings_path, device='cpu'):
        self.device = device
        # Lecture des mappings JSON (idx_to_group, idx_to_exercise, dims)
        with open(mappings_path, 'r') as f:
            self.mappings = json.load(f)
        # Instanciation du mod√®le avec les dimensions sauvegard√©es
        self.model = PoseClassificationModel(
            input_dim   = self.mappings['input_dim'],
            hidden_dim  = self.mappings['hidden_dim'],
            num_groups      = self.mappings['num_groups'],
            num_exercises   = self.mappings['num_exercises']
        ).to(self.device)
        # Chargement du checkpoint PyTorch
        checkpoint = torch.load(model_path, map_location=self.device)
        state_dict = checkpoint.get('model_state_dict', checkpoint)
        self.model.load_state_dict(state_dict)
        self.model.eval()

    def predict(self, sequence_tensor):
        """
        Pr√©dit le groupe, l‚Äôexercice et la confiance pour une s√©quence de features.
        """
        with torch.no_grad():
            grp_out, ex_out = self.model(sequence_tensor)
        grp_idx = grp_out.argmax(dim=1).item()
        ex_idx  = ex_out.argmax(dim=1).item()
        confidence = float(torch.softmax(ex_out, dim=1).max())
        group    = self.mappings['idx_to_group'][str(grp_idx)]
        exercise = self.mappings['idx_to_exercise'][str(ex_idx)]
        return group, exercise, confidence

    def _load_model_and_mappings(self, model_path, mappings_path):
        """Charge le meilleur mod√®le sauvegard√© et ses mappings"""
        # Charger les mappings
        with open(mappings_path, 'r') as f:
            self.mappings = json.load(f)

    def _calculate_biomech_metrics(self, sequences):
        """Calcule les m√©triques biom√©caniques pour une s√©quence"""
        # Simulation des m√©triques - √† remplacer par votre logique r√©elle
        return {
            'angle_accuracy': np.random.uniform(0.7, 0.95),
            'movement_quality': np.random.uniform(0.6, 0.9),
            'stability_score': np.random.uniform(0.5, 0.85)
        }

    def generate_evaluation_report(self, results):
        """G√©n√®re un rapport d√©taill√© d'√©valuation"""
        total_samples = sum(len(class_results) for class_results in results.values())
        total_exercise_correct = sum(
            sum(1 for sample in class_results if sample['exercise_correct'])
            for class_results in results.values()
        )
        total_group_correct = sum(
            sum(1 for sample in class_results if sample['group_correct'])
            for class_results in results.values()
        )

        print(f"\nüìà RAPPORT d'√âVALUATION GLOBAL")
        print(f"=" * 50)
        print(f"√âchantillons analys√©s: {total_samples}")
        print(f"Pr√©cision exercices: {total_exercise_correct/total_samples:.2%}")
        print(f"Pr√©cision groupes: {total_group_correct/total_samples:.2%}")

        return {
            'total_samples': total_samples,
            'exercise_accuracy': total_exercise_correct/total_samples,
            'group_accuracy': total_group_correct/total_samples,
            'detailed_results': results
        }
CHECKPOINT_PATH = "pose_classification_model_best.pth"
MAPPINGS_PATH = "model_mappings.json"

with open("model_mappings.json", "r") as f:
    mappings = json.load(f)


class WebcamCorrector:
    def __init__(self, evaluator, user_level='intermediate'):
        self.evaluator = evaluator
        self.user_level = user_level
        self.pose = mp.solutions.pose.Pose(
            static_image_mode=False,
            model_complexity=1,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5
        )
        self.drawing = mp.solutions.drawing_utils
        self.tts = pyttsx3.init()
        self.tts.setProperty('rate', 150)
        self.tts.setProperty('volume', 0.8)
        self.sequence, self.max_len = [], 30
        self.current_ex = None
        self.max_feature_dim = evaluator.mappings['input_dim'] # Get expected input dimension from model mappings
        self.standards = AdaptiveExerciseStandards() # Initialize AdaptiveExerciseStandards
        self.recognizer = sr.Recognizer() # Initialize speech recognizer
        self.command_queue = [] # To store list of (reps, exercise) tuples
        self.current_command = None # To store the current (reps, exercise) tuple being guided
        self.current_command_reps_completed = 0 # To track completed reps for the current command


    def _extract_features(self, lm):
        coords = [coord for p in lm.landmark for coord in (p.x, p.y, p.z)]
        angles = self._compute_angles(lm)
        dists  = self._compute_distances(lm)
        features = np.array(coords + angles + dists, dtype=np.float32)

        # Pad features to match the expected input dimension of the model
        if features.shape[0] < self.max_feature_dim:
            padding = self.max_feature_dim - features.shape[0]
            features = np.pad(features, (0, padding), 'constant', constant_values=0)
        elif features.shape[0] > self.max_feature_dim:
            features = features[:self.max_feature_dim]

        return features

    def _process_frame(self, frame):
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        res = self.pose.process(rgb)
        corrs = []
        predicted_ex = None # Store the predicted exercise separately

        if res.pose_landmarks:
            self.drawing.draw_landmarks(frame, res.pose_landmarks, mp.solutions.pose.POSE_CONNECTIONS)
            features = self._extract_features(res.pose_landmarks)
            self.sequence.append(features)
            if len(self.sequence) > self.max_len:
                self.sequence.pop(0)

            if len(self.sequence) >= 15:
                seq_tensor = torch.tensor(self.sequence, dtype=torch.float32).unsqueeze(0).to(self.evaluator.device)
                grp, predicted_ex, conf = self.evaluator.predict(seq_tensor)

                # If a command is active, use the commanded exercise for corrections
                if self.current_command:
                     commanded_reps, commanded_exercise = self.current_command
                     corrs = self._generate_corrections(commanded_exercise, self.user_level)
                     return commanded_exercise, grp, conf, corrs # Return commanded exercise

                else:
                    # Otherwise, use the predicted exercise for corrections
                    corrs = self._generate_corrections(predicted_ex, self.user_level)
                    return predicted_ex, grp, conf, corrs # Return predicted exercise

        return None, None, 0.0, corrs # Return None for exercise if no landmarks


    def _compute_angles(self, lm):
        def angle(a, b, c):
            v1, v2 = a - b, c - b
            cosang = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
            return np.degrees(np.arccos(np.clip(cosang, -1, 1)))
        idxs = [(12,14,16), (11,13,15), (24,26,28), (23,25,27)]  # 4 angles
        pts = lm.landmark
        return [angle(np.array([pts[i].x, pts[i].y]),
                      np.array([pts[j].x, pts[j].y]),
                      np.array([pts[k].x, pts[k].y]))
                for i,j,k in idxs]

    def _compute_distances(self, lm):
        pts = lm.landmark
        sl, sr = np.array([pts[11].x, pts[11].y]), np.array([pts[12].x, pts[12].y])
        hl, hr = np.array([pts[23].x, pts[23].y]), np.array([pts[24].x, pts[24].y])
        nose, hipc = np.array([pts[0].x, pts[0].y]), (hl + hr) / 2
        return [np.linalg.norm(sr - sl),
                np.linalg.norm(hr - hl),
                np.linalg.norm(nose - hipc)]

    def _generate_corrections(self, exercise, user_level):
        """Generate corrections based on exercise and user level."""
        # This is a placeholder. In a real application, you would analyze the
        # biomechanical data (angles, distances, etc.) from the 'features'
        # variable to determine specific errors and generate corrections
        # using the AdaptiveExerciseStandards class.
        # For now, we'll just return some generic corrections for demonstration.
        if exercise in self.standards.exercise_standards:
            # Simulate detecting some common errors based on user level
            errors = []
            if user_level == 'novice':
                errors = ['back_not_straight', 'not_low_enough'] # Example novice errors
            elif user_level == 'intermediate':
                 errors = ['arms_too_wide', 'body_not_straight'] # Example intermediate errors
            elif user_level == 'expert':
                 errors = ['back_not_straight'] # Example expert errors

            corrections = []
            for error_type in errors:
                 correction_message = self.standards.get_adaptive_correction(exercise, error_type, user_level)
                 corrections.append({'message': correction_message, 'severity': 'medium'}) # Simulate medium severity

            if not corrections and exercise in ['squat', 'push-up', 'deadlift']:
                 # Add a progression tip if no specific errors are detected for these exercises
                 progression_tip = self.standards.get_adaptive_correction(exercise, 'progression_tip', user_level)
                 if progression_tip:
                     corrections.append({'message': progression_tip, 'severity': 'low'})

            return corrections

        return []

    def _draw(self, frame, corrs):
        """Superpose l‚Äôoverlay, le niveau utilisateur et les corrections."""
        h, w = frame.shape[:2]
        overlay = frame.copy()
        # Adjust overlay height to accommodate command info
        cv2.rectangle(overlay, (5,5), (w-5,165), (0,0,0), -1)
        frame = cv2.addWeighted(frame, 0.6, overlay, 0.4, 0)
        y = 30
        cv2.putText(frame, f"Niveau: {self.user_level}", (10,y),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)

        if self.current_command:
            commanded_reps, commanded_exercise = self.current_command
            cv2.putText(frame, f"Commande: {commanded_reps} {commanded_exercise}", (10, y + 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
            cv2.putText(frame, f"Reps: {self.current_command_reps_completed}/{commanded_reps}", (10, y + 60),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            y += 60 # Adjust starting y for corrections

        elif self.current_ex: # Display predicted exercise if no command is active
             cv2.putText(frame, f"Exercice: {self.current_ex}", (10,y+30),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
             cv2.putText(frame, f"Groupe: {self.current_grp}", (10,y+60),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
             y += 60 # Adjust starting y for corrections


        for c in corrs[:3]:
            y += 25
            color = (0,0,255) if c.get('severity','low')=='high' else (0,165,255)
            cv2.putText(frame, c['message'], (10,y),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
        return frame

    def _speak(self, text):
        """Synth√©tise et prononce un texte."""
        try:
            self.tts.say(text)
            self.tts.runAndWait()
        except Exception as e:
            print(f"Error during text-to-speech: {e}")

    def _listen_command(self):
        """Listens for a voice command from the microphone and transcribes it."""
        with sr.Microphone() as source:
            print("Say something!")
            self.recognizer.adjust_for_ambient_noise(source, duration=1) # Adjust for noise
            audio = self.recognizer.listen(source)

        try:
            command = self.recognizer.recognize_google(audio, language="en-US") # You can change the language if needed
            print(f"You said: {command}")
            return command
        except sr.UnknownValueError:
            print("Could not understand audio")
            return None
        except sr.RequestError as e:
            print(f"Could not request results from Google Speech Recognition service; {e}")
            return None

    def _parse_command(self, command_string):
        """Parses a command string like '10 squat then 20 deadlift' into a list of (reps, exercise) tuples."""
        commands = []
        parts = command_string.lower().split(" then ")
        for part in parts:
            sub_parts = part.split()
            if len(sub_parts) >= 2 and sub_parts[0].isdigit():
                try:
                    reps = int(sub_parts[0])
                    exercise = " ".join(sub_parts[1:])
                    # Simple check if exercise is in our known exercises (can be improved)
                    if exercise in self.evaluator.mappings['exercise_to_idx']:
                         commands.append((reps, exercise))
                    else:
                         print(f"Warning: Exercise '{exercise}' not recognized.")
                         self._speak(f"Sorry, I don't recognize the exercise {exercise}.")
                except ValueError:
                    print(f"Warning: Could not parse repetitions from '{part}'.")
                    self._speak("Sorry, I didn't understand the number of repetitions in your command.")
            else:
                 print(f"Warning: Could not parse command part '{part}'. Expected 'number exercise'.")
                 self._speak("Sorry, I didn't understand the format of your command.")

        return commands


    def start(self):
        # Variables pour estimation de niveau
        confidence_buffer = []
        WINDOW_SIZE = 30
        last_spoken_exercise = None
        last_spoken_correction = None
        # command_active = False # Flag to indicate if a voice command is active - managed by current_command now
        # command_exercise = None # Exercise specified by voice command
        # command_reps = 0 # Repetitions specified by voice command
        # current_reps = 0 # Counter for repetitions performed - managed by current_command_reps_completed now


        # D√©marrage de la capture vid√©o
        cap = cv2.VideoCapture(0)
        if not cap.isOpened():
            raise RuntimeError("Impossible d'ouvrir la webcam")

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Display instruction for voice command
            command_prompt = "Press 'c' and say 'reps exercise (then reps exercise)'"
            cv2.putText(frame, command_prompt, (10, frame.shape[0] - 20),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

            # Listen for a voice command when 'c' is pressed
            key = cv2.waitKey(1) & 0xFF
            if key == ord('c'):
                self._speak("Listening for command...")
                command_string = self._listen_command()
                if command_string:
                     parsed_commands = self._parse_command(command_string)
                     if parsed_commands:
                         self.command_queue.extend(parsed_commands)
                         if not self.current_command: # Start the first command if none is active
                             self.current_command = self.command_queue.pop(0)
                             self.current_command_reps_completed = 0
                             self._speak(f"Starting workout: {self.current_command[0]} {self.current_command[1]}s.")
                     else:
                         self._speak("No valid commands received.")
                else:
                    self._speak("Could not understand your command.")


            # Pr√©diction et corrections
            ex, grp, conf, corrs = self._process_frame(frame)

            # Update current exercise and group for display if no command is active
            if not self.current_command and ex:
                self.current_ex = ex
                self.current_grp = grp


            if ex: # ex here is either the predicted or commanded exercise
                # Mise √† jour du buffer de confiance
                confidence_buffer.append(conf)
                if len(confidence_buffer) > WINDOW_SIZE:
                    confidence_buffer.pop(0)
                avg_conf = np.mean(confidence_buffer)

                # Estimation simple du niveau
                if avg_conf >= 0.85:
                    level = "expert"
                elif avg_conf >= 0.70:
                    level = "intermediate"
                else:
                    level = "novice"

                # Update user level in the corrector instance
                self.user_level = level

                # Announce exercise or correction based on command state
                if self.current_command:
                    commanded_reps, commanded_exercise = self.current_command
                    # Repetition tracking (Placeholder - needs actual movement detection logic)
                    # This is where you would add logic to detect a completed repetition
                    # and increment self.current_command_reps_completed.
                    # For demonstration, let's simulate completing a rep on space bar press
                    if key == ord(' '):
                         self.current_command_reps_completed += 1
                         self._speak(f"Repetition {self.current_command_reps_completed} of {commanded_reps}.")
                         if self.current_command_reps_completed >= commanded_reps:
                              self._speak(f"Completed {commanded_reps} {commanded_exercise}s.")
                              if self.command_queue:
                                  self.current_command = self.command_queue.pop(0)
                                  self.current_command_reps_completed = 0
                                  self._speak(f"Next exercise: {self.current_command[0]} {self.current_command[1]}s.")
                              else:
                                  self._speak("Workout complete!")
                                  self.current_command = None # End of workout
                                  self.current_command_reps_completed = 0


                if corrs and corrs[0]['message'] != last_spoken_correction:
                     # Only speak corrections if a command is active or if no command is active and an exercise is detected
                     if self.current_command or (not self.current_command and ex):
                          self._speak(corrs[0]['message']) # Speak the first correction
                          last_spoken_correction = corrs[0]['message']
                elif ex != last_spoken_exercise and not self.current_command: # Announce new detected exercise only if no command is active
                     self._speak(f"Exercice d√©tect√© : {ex}")
                     last_spoken_exercise = ex
                     last_spoken_correction = None # Reset correction spoken when exercise changes



            # Dessin des informations et corrections
            annotated = self._draw(frame, corrs)
            cv2.imshow('Analyse Niveau', annotated)

            if key == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()


exercise_points = mappings["exercise_points"]
group_mapping  = mappings["group_mapping"]
# Initialisation de l‚Äô√©valuateur de mod√®le
evaluator = ModelEvaluator(
    model_path=CHECKPOINT_PATH,
    mappings_path=MAPPINGS_PATH,
    device='cpu'
)                                                                                    # [3]


# Instanciation du correcteur webcam
wc = WebcamCorrector(evaluator=evaluator, user_level='novice')

# 3. D√©marrer la boucle
wc.start()

KeyboardInterrupt: 

In [None]:
import json
import torch
import cv2
import numpy as np
import cv2
import mediapipe as mp
import numpy as np
import pyttsx3
import torch
import json
import speech_recognition as sr # Import the speech_recognition library


class PoseClassificationModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_groups, num_exercises):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim,
                           bidirectional=True,
                           num_layers=2,
                           dropout=0.3)

        # Attention Temporelle
        self.attention = nn.MultiheadAttention(2*hidden_dim, 4)

        # Branches Sp√©cialis√©es
        self.group_net = nn.Sequential(
            nn.Linear(2*hidden_dim, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, num_groups)
        )

        self.exercise_net = nn.Sequential(
            nn.Linear(2*hidden_dim, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, num_exercises)
        )

    def forward(self, x):
        out, _ = self.lstm(x)  # [batch, seq, 2*hidden]
        out = out.permute(1, 0, 2)  # [seq, batch, 2*hidden]

        # M√©canisme d'Attention
        attn_out, _ = self.attention(out, out, out)
        pooled = torch.mean(attn_out, dim=0)  # [batch, 2*hidden]

        group = self.group_net(pooled)
        exercise = self.exercise_net(pooled)

        return group, exercise

class ModelEvaluator:
    """
    Charge un mod√®le PyTorch entra√Æn√© et ses mappings JSON,
    puis fournit une m√©thode `predict` pour d√©duire le groupe et l‚Äôexercice.
    """
    def __init__(self, model_path, mappings_path, device='cpu'):
        self.device = device
        # Lecture des mappings JSON (idx_to_group, idx_to_exercise, dims)
        with open(mappings_path, 'r') as f:
            self.mappings = json.load(f)
        # Instanciation du mod√®le avec les dimensions sauvegard√©es
        self.model = PoseClassificationModel(
            input_dim   = self.mappings['input_dim'],
            hidden_dim  = self.mappings['hidden_dim'],
            num_groups      = self.mappings['num_groups'],
            num_exercises   = self.mappings['num_exercises']
        ).to(self.device)
        # Chargement du checkpoint PyTorch
        checkpoint = torch.load(model_path, map_location=self.device)
        state_dict = checkpoint.get('model_state_dict', checkpoint)
        self.model.load_state_dict(state_dict)
        self.model.eval()

    def predict(self, sequence_tensor):
        """
        Pr√©dit le groupe, l‚Äôexercice et la confiance pour une s√©quence de features.
        """
        with torch.no_grad():
            grp_out, ex_out = self.model(sequence_tensor)
        grp_idx = grp_out.argmax(dim=1).item()
        ex_idx  = ex_out.argmax(dim=1).item()
        confidence = float(torch.softmax(ex_out, dim=1).max())
        group    = self.mappings['idx_to_group'][str(grp_idx)]
        exercise = self.mappings['idx_to_exercise'][str(ex_idx)]
        return group, exercise, confidence

    def _load_model_and_mappings(self, model_path, mappings_path):
        """Charge le meilleur mod√®le sauvegard√© et ses mappings"""
        # Charger les mappings
        with open(mappings_path, 'r') as f:
            self.mappings = json.load(f)

    def _calculate_biomech_metrics(self, sequences):
        """Calcule les m√©triques biom√©caniques pour une s√©quence"""
        # Simulation des m√©triques - √† remplacer par votre logique r√©elle
        return {
            'angle_accuracy': np.random.uniform(0.7, 0.95),
            'movement_quality': np.random.uniform(0.6, 0.9),
            'stability_score': np.random.uniform(0.5, 0.85)
        }

    def generate_evaluation_report(self, results):
        """G√©n√®re un rapport d√©taill√© d'√©valuation"""
        total_samples = sum(len(class_results) for class_results in results.values())
        total_exercise_correct = sum(
            sum(1 for sample in class_results if sample['exercise_correct'])
            for class_results in results.values()
        )
        total_group_correct = sum(
            sum(1 for sample in class_results if sample['group_correct'])
            for class_results in results.values()
        )

        print(f"\nüìà RAPPORT d'√âVALUATION GLOBAL")
        print(f"=" * 50)
        print(f"√âchantillons analys√©s: {total_samples}")
        print(f"Pr√©cision exercices: {total_exercise_correct/total_samples:.2%}")
        print(f"Pr√©cision groupes: {total_group_correct/total_samples:.2%}")

        return {
            'total_samples': total_samples,
            'exercise_accuracy': total_exercise_correct/total_samples,
            'group_accuracy': total_group_correct/total_samples,
            'detailed_results': results
        }
CHECKPOINT_PATH = "pose_model_best.pth"
MAPPINGS_PATH = "model_mappings3.json"

with open("model_mappings3.json", "r") as f:
    mappings = json.load(f)


class WebcamCorrector:
    def __init__(self, evaluator, user_level='intermediate'):
        self.evaluator = evaluator
        self.user_level = user_level
        self.pose = mp.solutions.pose.Pose(
            static_image_mode=False,
            model_complexity=1,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5
        )
        self.drawing = mp.solutions.drawing_utils
        self.tts = pyttsx3.init()
        self.tts.setProperty('rate', 150)
        self.tts.setProperty('volume', 0.8)
        self.sequence, self.max_len = [], 30
        self.current_ex = None
        self.max_feature_dim = evaluator.mappings['input_dim'] # Get expected input dimension from model mappings
        self.standards = AdaptiveExerciseStandards() # Initialize AdaptiveExerciseStandards
        self.recognizer = sr.Recognizer() # Initialize speech recognizer
        self.command_queue = [] # To store list of (reps, exercise) tuples
        self.current_command = None # To store the current (reps, exercise) tuple being guided
        self.current_command_reps_completed = 0 # To track completed reps for the current command


    def _extract_features(self, lm):
        coords = [coord for p in lm.landmark for coord in (p.x, p.y, p.z)]
        angles = self._compute_angles(lm)
        dists  = self._compute_distances(lm)
        features = np.array(coords + angles + dists, dtype=np.float32)

        # Pad features to match the expected input dimension of the model
        if features.shape[0] < self.max_feature_dim:
            padding = self.max_feature_dim - features.shape[0]
            features = np.pad(features, (0, padding), 'constant', constant_values=0)
        elif features.shape[0] > self.max_feature_dim:
            features = features[:self.max_feature_dim]

        return features

    def _process_frame(self, frame):
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        res = self.pose.process(rgb)
        corrs = []
        predicted_ex = None # Store the predicted exercise separately

        if res.pose_landmarks:
            self.drawing.draw_landmarks(frame, res.pose_landmarks, mp.solutions.pose.POSE_CONNECTIONS)
            features = self._extract_features(res.pose_landmarks)
            self.sequence.append(features)
            if len(self.sequence) > self.max_len:
                self.sequence.pop(0)

            if len(self.sequence) >= 15:
                seq_tensor = torch.tensor(self.sequence, dtype=torch.float32).unsqueeze(0).to(self.evaluator.device)
                grp, predicted_ex, conf = self.evaluator.predict(seq_tensor)

                # If a command is active, use the commanded exercise for corrections
                if self.current_command:
                     commanded_reps, commanded_exercise = self.current_command
                     corrs = self._generate_corrections(commanded_exercise, self.user_level)
                     return commanded_exercise, grp, conf, corrs # Return commanded exercise

                else:
                    # Otherwise, use the predicted exercise for corrections
                    corrs = self._generate_corrections(predicted_ex, self.user_level)
                    return predicted_ex, grp, conf, corrs # Return predicted exercise

        return None, None, 0.0, corrs # Return None for exercise if no landmarks


    def _compute_angles(self, lm):
        def angle(a, b, c):
            v1, v2 = a - b, c - b
            cosang = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
            return np.degrees(np.arccos(np.clip(cosang, -1, 1)))
        idxs = [(12,14,16), (11,13,15), (24,26,28), (23,25,27)]  # 4 angles
        pts = lm.landmark
        return [angle(np.array([pts[i].x, pts[i].y]),
                      np.array([pts[j].x, pts[j].y]),
                      np.array([pts[k].x, pts[k].y]))
                for i,j,k in idxs]

    def _compute_distances(self, lm):
        pts = lm.landmark
        sl, sr = np.array([pts[11].x, pts[11].y]), np.array([pts[12].x, pts[12].y])
        hl, hr = np.array([pts[23].x, pts[23].y]), np.array([pts[24].x, pts[24].y])
        nose, hipc = np.array([pts[0].x, pts[0].y]), (hl + hr) / 2
        return [np.linalg.norm(sr - sl),
                np.linalg.norm(hr - hl),
                np.linalg.norm(nose - hipc)]

    def _generate_corrections(self, exercise, user_level):
        """Generate corrections based on exercise and user level."""
        # This is a placeholder. In a real application, you would analyze the
        # biomechanical data (angles, distances, etc.) from the 'features'
        # variable to determine specific errors and generate corrections
        # using the AdaptiveExerciseStandards class.
        # For now, we'll just return some generic corrections for demonstration.
        if exercise in self.standards.exercise_standards:
            # Simulate detecting some common errors based on user level
            errors = []
            if user_level == 'novice':
                errors = ['back_not_straight', 'not_low_enough'] # Example novice errors
            elif user_level == 'intermediate':
                 errors = ['arms_too_wide', 'body_not_straight'] # Example intermediate errors
            elif user_level == 'expert':
                 errors = ['back_not_straight'] # Example expert errors

            corrections = []
            for error_type in errors:
                 correction_message = self.standards.get_adaptive_correction(exercise, error_type, user_level)
                 corrections.append({'message': correction_message, 'severity': 'medium'}) # Simulate medium severity

            if not corrections and exercise in ['squat', 'push-up', 'deadlift']:
                 # Add a progression tip if no specific errors are detected for these exercises
                 progression_tip = self.standards.get_adaptive_correction(exercise, 'progression_tip', user_level)
                 if progression_tip:
                     corrections.append({'message': progression_tip, 'severity': 'low'})

            return corrections

        return []

    def _draw(self, frame, corrs):
        """Superpose l‚Äôoverlay, le niveau utilisateur et les corrections."""
        h, w = frame.shape[:2]
        overlay = frame.copy()
        # Adjust overlay height to accommodate command info
        cv2.rectangle(overlay, (5,5), (w-5,165), (0,0,0), -1)
        frame = cv2.addWeighted(frame, 0.6, overlay, 0.4, 0)
        y = 30
        cv2.putText(frame, f"Niveau: {self.user_level}", (10,y),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)

        if self.current_command:
            commanded_reps, commanded_exercise = self.current_command
            cv2.putText(frame, f"Commande: {commanded_reps} {commanded_exercise}", (10, y + 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
            cv2.putText(frame, f"Reps: {self.current_command_reps_completed}/{commanded_reps}", (10, y + 60),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            y += 60 # Adjust starting y for corrections

        elif self.current_ex: # Display predicted exercise if no command is active
             cv2.putText(frame, f"Exercice: {self.current_ex}", (10,y+30),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
             cv2.putText(frame, f"Groupe: {self.current_grp}", (10,y+60),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
             y += 60 # Adjust starting y for corrections


        for c in corrs[:3]:
            y += 25
            color = (0,0,255) if c.get('severity','low')=='high' else (0,165,255)
            cv2.putText(frame, c['message'], (10,y),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
        return frame

    def _speak(self, text):
        """Synth√©tise et prononce un texte."""
        try:
            self.tts.say(text)
            self.tts.runAndWait()
        except Exception as e:
            print(f"Error during text-to-speech: {e}")

    def _listen_command(self):
        """Listens for a voice command from the microphone and transcribes it."""
        with sr.Microphone() as source:
            print("Say something!")
            self.recognizer.adjust_for_ambient_noise(source, duration=1) # Adjust for noise
            audio = self.recognizer.listen(source)

        try:
            command = self.recognizer.recognize_google(audio, language="en-US") # You can change the language if needed
            print(f"You said: {command}")
            return command
        except sr.UnknownValueError:
            print("Could not understand audio")
            return None
        except sr.RequestError as e:
            print(f"Could not request results from Google Speech Recognition service; {e}")
            return None

    def _parse_command(self, command_string):
        """Parses a command string like '10 squat then 20 deadlift' into a list of (reps, exercise) tuples."""
        commands = []
        # Handle both "reps exercise" and "reps:exercise" formats, and "then" separator
        parts = command_string.lower().replace(':', ' ').split(" then ")
        for part in parts:
            sub_parts = part.split()
            if len(sub_parts) >= 2 and sub_parts[0].isdigit():
                try:
                    reps = int(sub_parts[0])
                    exercise = " ".join(sub_parts[1:])
                    # Simple check if exercise is in our known exercises (can be improved)
                    if exercise in self.evaluator.mappings['exercise_to_idx']:
                         commands.append((reps, exercise))
                    else:
                         print(f"Warning: Exercise '{exercise}' not recognized.")
                         self._speak(f"Sorry, I don't recognize the exercise {exercise}.")
                except ValueError:
                    print(f"Warning: Could not parse repetitions from '{part}'.")
                    self._speak("Sorry, I didn't understand the number of repetitions in your command.")
            else:
                 print(f"Warning: Could not parse command part '{part}'. Expected 'number exercise'.")
                 self._speak("Sorry, I didn't understand the format of your command.")

        return commands


    def start(self):
        # Variables pour estimation de niveau
        confidence_buffer = []
        WINDOW_SIZE = 30
        last_spoken_exercise = None
        last_spoken_correction = None
        # command_active = False # Flag to indicate if a voice command is active - managed by current_command now
        # command_exercise = None # Exercise specified by voice command
        # command_reps = 0 # Repetitions specified by voice command
        # current_reps = 0 # Counter for repetitions performed - managed by current_command_reps_completed now


        # D√©marrage de la capture vid√©o
        cap = cv2.VideoCapture(0)
        if not cap.isOpened():
            raise RuntimeError("Impossible d'ouvrir la webcam")

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Display instruction for voice/text command
            command_prompt = "Press 'c' for voice command ('reps exercise (then reps exercise)') or 'g' for text ('reps:exercise, reps:exercise,...')"
            cv2.putText(frame, command_prompt, (10, frame.shape[0] - 20),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)


            key = cv2.waitKey(1) & 0xFF
            if key == ord('c'):
                self._speak("Listening for voice command...")
                command_string = self._listen_command()
                if command_string:
                     parsed_commands = self._parse_command(command_string)
                     if parsed_commands:
                         self.command_queue.extend(parsed_commands)
                         if not self.current_command: # Start the first command if none is active
                             self.current_command = self.command_queue.pop(0)
                             self.current_command_reps_completed = 0
                             self._speak(f"Starting workout: {self.current_command[0]} {self.current_command[1]}s.")
                     else:
                         self._speak("No valid commands received.")
                else:
                    self._speak("Could not understand your voice command.")
            elif key == ord('g'):
                 # Text input via console (blocking)
                 cv2.destroyAllWindows() # Close video window temporarily
                 print("\nEnter workout command (e.g., '10:squat, 20:deadlift'):")
                 command_string = input()
                 # Re-open video window
                 cv2.namedWindow('Analyse Niveau', cv2.WINDOW_NORMAL)
                 cv2.imshow('Analyse Niveau', frame) # Show last frame
                 if command_string:
                      # Parse text command
                      parsed_commands = []
                      parts = command_string.lower().split(',')
                      for part in parts:
                           sub_parts = part.strip().split(':')
                           if len(sub_parts) == 2 and sub_parts[0].isdigit():
                               try:
                                    reps = int(sub_parts[0])
                                    exercise = sub_parts[1].strip()
                                    if exercise in self.evaluator.mappings['exercise_to_idx']:
                                         parsed_commands.append((reps, exercise))
                                    else:
                                         print(f"Warning: Exercise '{exercise}' not recognized.")
                                         self._speak(f"Sorry, I don't recognize the exercise {exercise}.")
                               except ValueError:
                                    print(f"Warning: Could not parse repetitions from '{part}'.")
                                    self._speak("Sorry, I didn't understand the number of repetitions in your command.")
                           else:
                                print(f"Warning: Could not parse command part '{part}'. Expected 'number:exercise'.")
                                self._speak("Sorry, I didn't understand the format of your command.")

                      if parsed_commands:
                          self.command_queue.extend(parsed_commands)
                          if not self.current_command: # Start the first command if none is active
                              self.current_command = self.command_queue.pop(0)
                              self.current_command_reps_completed = 0
                              self._speak(f"Starting workout: {self.current_command[0]} {self.current_command[1]}s.")
                      else:
                          self._speak("No valid commands received.")
                 else:
                     self._speak("No command entered.")



            # Pr√©diction et corrections
            ex, grp, conf, corrs = self._process_frame(frame)

            # Update current exercise and group for display if no command is active
            if not self.current_command and ex:
                self.current_ex = ex
                self.current_grp = grp


            if ex: # ex here is either the predicted or commanded exercise
                # Mise √† jour du buffer de confiance
                confidence_buffer.append(conf)
                if len(confidence_buffer) > WINDOW_SIZE:
                    confidence_buffer.pop(0)
                avg_conf = np.mean(confidence_buffer)

                # Estimation simple du niveau
                if avg_conf >= 0.85:
                    level = "expert"
                elif avg_conf >= 0.70:
                    level = "intermediate"
                else:
                    level = "novice"

                # Update user level in the corrector instance
                self.user_level = level

                # Annonce exercise or correction based on command state
                if self.current_command:
                    commanded_reps, commanded_exercise = self.current_command
                    # Repetition tracking (Placeholder - needs actual movement detection logic)
                    # This is where you would add logic to detect a completed repetition
                    # and increment self.current_command_reps_completed.
                    # For demonstration, let's simulate completing a rep on space bar press
                    if key == ord(' '):
                         self.current_command_reps_completed += 1
                         self._speak(f"Repetition {self.current_command_reps_completed} of {commanded_reps}.")
                         if self.current_command_reps_completed >= commanded_reps:
                              self._speak(f"Completed {commanded_reps} {commanded_exercise}s.")
                              if self.command_queue:
                                  self.current_command = self.command_queue.pop(0)
                                  self.current_command_reps_completed = 0
                                  self._speak(f"Next exercise: {self.current_command[0]} {self.current_command[1]}s.")
                              else:
                                  self._speak("Workout complete!")
                                  self.current_command = None # End of workout
                                  self.current_command_reps_completed = 0


                if corrs and corrs[0]['message'] != last_spoken_correction:
                     # Only speak corrections if a command is active or if no command is active and an exercise is detected
                     if self.current_command or (not self.current_command and ex):
                          self._speak(corrs[0]['message']) # Speak the first correction
                          last_spoken_correction = corrs[0]['message']
                elif ex != last_spoken_exercise and not self.current_command: # Announce new detected exercise only if no command is active
                     self._speak(f"Exercice d√©tect√© : {ex}")
                     last_spoken_exercise = ex
                     last_spoken_correction = None # Reset correction spoken when exercise changes



            # Dessin des informations et corrections
            annotated = self._draw(frame, corrs)
            cv2.imshow('Analyse Niveau', annotated)

            if key == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()


exercise_points = mappings["exercise_points"]
group_mapping  = mappings["group_mapping"]
# Initialisation de l‚Äô√©valuateur de mod√®le
evaluator = ModelEvaluator(
    model_path=CHECKPOINT_PATH,
    mappings_path=MAPPINGS_PATH,
    device='cpu'
)                                                                                    # [3]


# Instanciation du correcteur webcam
wc = WebcamCorrector(evaluator=evaluator, user_level='novice')

# 3. D√©marrer la boucle
wc.start()

  seq_tensor = torch.tensor(self.sequence, dtype=torch.float32).unsqueeze(0).to(self.evaluator.device)
