# Mesure de la synchronisation de mouvements de danse

Dans le cadre du projet de reconnaissance visuelle, nous aimerions comparer les mouvements de danseurs différents. En particulier, nous aimerons savoir si deux danseurs sont synchrones dans leurs mouvements pour une chorégraphie donnée. Cela nécessite un prétraitement de chaque frame d'une vidéo, puis d'identifier les différentes parties du corps de chaque danseurs, et enfin d'analyser les mouvements de chacune de ces parties. Dans ce notebook, nous allons traiter chacune de ces parties, en évitant au maximum le recours aux méthodes d'apprentissage automatique, mais en se fixant des contraintes sur le format de la vidéo.

## I. Prétraitement des données: suppression du fond

## II. Reconnaissance des parties du corps et analyse du mouvement

Une fois le danseur bien rogné, nous nous plaçons dans le cadre bien précis où les parties du corps à identifier sont identifiés par le port d'un objet de couleurs distinctes. Dans notre cas, le port d'un chapeau kaki pour identifier le haut de la tête, et respectivement d'une chaussette rose et d'une chaussure rouge pour le pied gauche et le pied droit.  
Nous avons instauré ces contraintes dû à la difficulté de reconnaitre les parties du corps sans l'aide d'algorithme de deeplearning. En particulier, même en définissant les pieds comme étant les points les plus bas du rognage, le pied droit n'est pas nécessairement à droite et le pied gauche n'est pas nécessairement à gauche. Ces généralités sont d'autant plus fausses pour des chorégraphies de danse riches en positions. 

In [2]:
import cv2
import numpy as np
import matplotlib.pyplot as plt

video_path = 'data/me2.mov'

Le code commenté ci-dessous a servi à identifier les fenêtres de couleur HSV des différents objets. Nous avons trouvé HSV ici plus adapté que RGB par exemple puisque la teinte de l'objet n'est pas supposée porter de variations importantes au cours de la vidéo, tandis que le contraste et la luminosité doivent varier davantage.

In [244]:
# import cv2
# import numpy as np
# import matplotlib.pyplot as plt

# def apply_hsv_filter(frame, lower_bound, upper_bound):
#     hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
#     mask = cv2.inRange(hsv, lower_bound, upper_bound)
#     filtered_frame = cv2.bitwise_and(frame, frame, mask=mask)
#     return filtered_frame

# def display_frames(original_frame, filtered_frame):
#     # Concaténer les images horizontalement
#     output_frame = np.hstack((original_frame, filtered_frame))
#     # Afficher l'image
#     plt.imshow(cv2.cvtColor(output_frame, cv2.COLOR_BGR2RGB))
#     plt.axis('off')
#     plt.show()

# def process_video(input_file, lower_bound, upper_bound, mode='static'):
#     cap = cv2.VideoCapture(input_file)
    
#     if mode == 'static':
#         for _ in range(5):
#             ret, frame = cap.read()
#             if not ret:
#                 break
#             filtered_frame = apply_hsv_filter(frame, lower_bound, upper_bound)
#             display_frames(frame, filtered_frame)
#     elif mode == 'video':
#         while True:
#             ret, frame = cap.read()
#             if not ret:
#                 break
#             filtered_frame = apply_hsv_filter(frame, lower_bound, upper_bound)
#             display_frames(frame, filtered_frame)
#             if cv2.waitKey(1) & 0xFF == ord('q'):
#                 break
    
#     cap.release()
#     cv2.destroyAllWindows()

# # Paramètres HSV pour le filtrage
# lower_bound_fuchsia = np.array([140, 100, 100])
# upper_bound_fuchsia = np.array([170, 255, 255])

# lower_bound_red = np.array([170, 50, 100])
# upper_bound_red = np.array([179, 255, 255])

# lower_bound_brown = np.array([10, 70, 50])
# upper_bound_brown = np.array([30, 255, 255])

# # Mode d'exécution (statique ou vidéo)
# mode = 'static' 

# # Appel de la fonction pour traiter la vidéo avec les paramètres spécifiés
# process_video(video_path, lower_bound_brown, upper_bound_brown, mode)


Nous définissons une première classe générique pour représenter chaque objet. Chaque objet est défini par le résultat du masque appliqué pour retrouver les pixels définis dans une fenêtre de couleurs HSV. Nous commençons par trouver les contours de ces zones là. Si la frame est très bruitée et contient des pixels "aléatoires" des teintes de notre fenêtre, ils seront éliminés puisque notre fonction find_objects ne prend en compte que le plus grand contour. Le barycentre est alors calculé pour cette zone, et fait office de coordonnées pour notre objet.

In [236]:
class Segmenter:
    def __init__(self, lower_bound, upper_bound, min_area=100, k=1):
        self.lower_bound = np.array(lower_bound)
        self.upper_bound = np.array(upper_bound)
        self.min_area = min_area
        self.k = k

    def convert_to_hsv(self, frame):
        return cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

    def apply_mask(self, frame):
        hsv = self.convert_to_hsv(frame)
        mask = cv2.inRange(hsv, self.lower_bound, self.upper_bound)
        mask = self.filter_small_masks(mask)
        _, binary_mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY)
        return binary_mask


    def filter_small_masks(self, mask):
        contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        filtered_mask = np.zeros_like(mask)
        for contour in contours:
            if cv2.contourArea(contour) > self.min_area:
                cv2.drawContours(filtered_mask, [contour], -1, 255, thickness=cv2.FILLED)
        return filtered_mask

    def display_result(self, original_frame, result_frame, display_mode='static'):
        if display_mode == 'video':
            # Pour le mode vidéo, combine et montre l'image directement sans convertir en RGB
            combined = np.hstack((original_frame, result_frame))
            cv2.imshow('Result', combined)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                cv2.destroyAllWindows()
        elif display_mode == 'static':
            # Pour le mode statique (utilisé avec matplotlib), convertit en RGB
            fig, ax = plt.subplots(1, 2, figsize=(10, 5))
            ax[0].imshow(cv2.cvtColor(original_frame, cv2.COLOR_BGR2RGB))
            ax[0].set_title('Original Frame')
            ax[0].axis('off')
            ax[1].imshow(cv2.cvtColor(result_frame, cv2.COLOR_BGR2RGB))
            ax[1].set_title('Mask Applied')
            ax[1].axis('off')
            plt.show()
    
    def find_objects(self, mask):
        """
        Trouve jusqu'à k objets basés sur les contours dans le masque.
        Retourne les barycentres des k plus grands contours.
        """
        contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        sorted_contours = sorted(contours, key=cv2.contourArea, reverse=True)[:self.k]
        barycentres = []
        for contour in sorted_contours:
            M = cv2.moments(contour)
            if M["m00"] != 0:
                cx = int(M["m10"] / M["m00"])
                cy = int(M["m01"] / M["m00"])
                barycentres.append((cx, cy))
        return barycentres



On définit alors des classes pour chacun des objets, avec des fenêtres de couleurs différentes.

In [237]:
class LeftShoeSegmenter(Segmenter):
    def __init__(self):
        super().__init__(lower_bound=[170, 50, 100], upper_bound=[179, 255, 255], k=1)

class RightShoeSegmenter(Segmenter):
    def __init__(self):
        super().__init__(lower_bound=[140, 100, 100], upper_bound=[170, 255, 255], k=1)

class CapSegmenter(Segmenter):
    def __init__(self):
        super().__init__(lower_bound=[10, 70, 50], upper_bound=[30, 255, 255], k=1)



On définit ensuite une classe ObjectTracker pour stocker les positions, vitesses et accélérations de chaque objet.  
Nous testons diverses techniques pour détecter les impacts d'un mouvement de danse, par un changement drastique du vecteur accélération, ou d'un changement de direction du vecteur vitesse.

In [238]:
class ObjectTracker:
    def __init__(self, fps):
        self.fps = fps
        self.positions = []  # Liste des tuples (x, y)
        self.velocities = []  # Liste des tuples (vx, vy)
        self.accelerations = []  # Liste des tuples (ax, ay)

    def update_position(self, position):
        """Mise à jour de la position de l'objet avec une position tuple (x, y) directe."""
        if position:
            self.positions.append(position)
        else:
            self.positions.append((0, 0))
    
    def get_latest_position(self):
        """Retourne la dernière position connue de l'objet."""
        if self.positions:
            return self.positions[-1]
        else:
            return None  # Retourne None si aucune position n'a été enregistrée

    def calculate_velocity(self):
        """Calcule la vitesse (vx, vy) entre les positions consécutives."""
        if len(self.positions) > 1:
            dx = self.positions[-1][0] - self.positions[-2][0]
            dy = self.positions[-1][1] - self.positions[-2][1]
            # Calculer la vitesse comme un vecteur (vx, vy)
            vx = dx * self.fps
            vy = dy * self.fps
            self.velocities.append((vx, vy))
        else:
            self.velocities.append((0, 0))

    def calculate_acceleration(self):
        """Calcule l'accélération (ax, ay) entre les vitesses consécutives."""
        if len(self.velocities) > 1:
            dvx = self.velocities[-1][0] - self.velocities[-2][0]
            dvy = self.velocities[-1][1] - self.velocities[-2][1]
            # Calculer l'accélération comme un vecteur (ax, ay)
            ax = dvx * self.fps
            ay = dvy * self.fps
            self.accelerations.append((ax, ay))
        else:
            self.accelerations.append((0, 0))

    def get_latest_velocity(self):
        """Retourne la dernière vitesse connue de l'objet."""
        if self.velocities:
            return self.velocities[-1]
        else:
            return (0, 0)

    def get_latest_acceleration(self):
        """Retourne la dernière accélération connue de l'objet."""
        if self.accelerations:
            return self.accelerations[-1]
        else:
            return (0, 0)

    def smooth_velocity(self, window_size=10):
        """Lisse la vitesse en utilisant une moyenne mobile."""
        smoothed_velocities = []
        for i in range(len(self.velocities)):
            # Déterminer la fenêtre pour la moyenne mobile
            start_index = max(0, i - window_size + 1)
            end_index = i + 1
            window = self.velocities[start_index:end_index]
            
            # Calculer la moyenne dans la fenêtre
            vx_avg = sum(v[0] for v in window) / len(window)
            vy_avg = sum(v[1] for v in window) / len(window)
            smoothed_velocities.append((vx_avg, vy_avg))
        
        return smoothed_velocities

    def smooth_acceleration(self, window_size=20):
        """Lisse la vitesse en utilisant une moyenne mobile."""
        smoothed_acceleration = []
        for i in range(len(self.accelerations)):
            # Déterminer la fenêtre pour la moyenne mobile
            start_index = max(0, i - window_size + 1)
            end_index = i + 1
            window = self.accelerations[start_index:end_index]
            
            # Calculer la moyenne dans la fenêtre
            vx_avg = sum(v[0] for v in window) / len(window)
            vy_avg = sum(v[1] for v in window) / len(window)
            smoothed_acceleration.append((vx_avg, vy_avg))
        
        return smoothed_acceleration
    
    def calculate_acceleration_change(self):
        """Calcule la différence entre les accélérations consécutives."""
        acceleration_changes = []
        for i in range(1, len(self.accelerations)):
            current_acc = self.accelerations[i]
            prev_acc = self.accelerations[i-1]
            # Calculer la différence vectorielle entre accélérations consécutives
            delta_acc = (current_acc[0] - prev_acc[0], current_acc[1] - prev_acc[1])
            # Calculer la norme de la différence
            delta_acc_norm = np.sqrt(delta_acc[0]**2 + delta_acc[1]**2)
            acceleration_changes.append(delta_acc_norm)
        return acceleration_changes

    def detect_impacts(self, threshold=5000.0):
        impacts = []
        acceleration_changes = self.calculate_acceleration_change()
        for i, change in enumerate(acceleration_changes):
            if change > threshold:
                # Assurez-vous que l'accès à self.positions est valide.
                # Si i + 1 est utilisé pour aligner avec les changements d'accélération, 
                # il faut s'assurer que cela correspond correctement aux indices des positions.
                if i < len(self.positions) - 1:  # S'assurer que l'index est dans la plage
                    impacts.append((i + 1, self.positions[i + 1]))  
                else:
                    # Si l'index dépasse, utilisez le dernier disponible.
                    impacts.append((i + 1, self.positions[-1]))  
        print(f"Impacts détectés : {impacts}")
        return impacts

    def detect_acceleration_peaks(self, acceleration_threshold=5.0):
        """Détecte les instants d'impact basés sur des pics d'accélération."""
        impacts = []
        for i in range(1, len(self.accelerations)):
            # Calculer la norme de l'accélération actuelle
            acceleration_norm = np.sqrt(self.accelerations[i][0]**2 + self.accelerations[i][1]**2)

            if acceleration_norm > acceleration_threshold:
                impacts.append(i)  # L'indexation commence à 0
                
        return impacts

    def calculate_angle_changes(self):
        """Calcule les changements d'angle entre les vecteurs de vitesse consécutifs."""
        angle_changes = []
        for i in range(1, len(self.velocities)):
            v1 = self.velocities[i-1]
            v2 = self.velocities[i]
            # Calculer l'angle en radians entre v1 et v2
            dot_product = np.dot(v1, v2)
            norm_v1 = np.linalg.norm(v1)
            norm_v2 = np.linalg.norm(v2)
            # Éviter la division par zéro
            if norm_v1 == 0 or norm_v2 == 0:
                angle = 0
            else:
                cos_angle = dot_product / (norm_v1 * norm_v2)
                # S'assurer que cos_angle reste dans les limites valides pour l'arc cosinus
                cos_angle = np.clip(cos_angle, -1, 1)
                angle = np.arccos(cos_angle)
            
            # Convertir l'angle en degrés pour une interprétation plus facile
            angle_degrees = np.degrees(angle)
            angle_changes.append(angle_degrees)
        
        return angle_changes

    def detect_direction_changes(self, angle_threshold=30, speed_threshold=0.5):
        """Détecte les instants où le changement d'angle dépasse un seuil donné et la vitesse est au-dessus d'un certain seuil."""
        impacts = []
        angle_changes = self.calculate_angle_changes()
        for i in range(len(angle_changes)):
            # Vérifier également que la vitesse à cet instant dépasse le seuil minimal pour être considérée comme un mouvement
            speed_norm = np.linalg.norm(self.velocities[i+1])  # i+1 car les angles changent sont calculés à partir de la seconde vitesse
            if angle_changes[i] > angle_threshold and speed_norm > speed_threshold:
                impacts.append(i + 1)  # Ajouter 1 si l'indexation de frames commence à 1
                
        return impacts




In [239]:
# Définition des paramètres
fps = 30
cap = cv2.VideoCapture(video_path)

# Création des segmenteurs
left_shoe_segmenter = LeftShoeSegmenter() # Exemple
right_shoe_segmenter = RightShoeSegmenter() # Exemple
cap_segmenter = CapSegmenter()

# Création des trackers correspondants
left_shoe_tracker = ObjectTracker(fps=fps)
right_shoe_tracker = ObjectTracker(fps=fps)
cap_tracker = ObjectTracker(fps=fps)
 

Les fonctions suivantes servent à dessiner les positions et les vecteurs:

In [240]:
def draw_positions(frame, positions, color=(100, 150, 30)):
    """
    Dessine des cercles aux positions fournies sur la frame.
    """
    for (x, y) in positions:
        cv2.circle(frame, (x, y), radius=30, color=color, thickness=-1)
    return frame

def draw_vectors(frame, position, velocity, acceleration, color_velocity=(255, 0, 0), color_acceleration=(0, 0, 255)):
    """
    Dessine les vecteurs vitesse et accélération pour un objet sur la frame.
    
    :param frame: L'image sur laquelle dessiner.
    :param position: La position actuelle de l'objet (x, y).
    :param velocity: La vitesse de l'objet sous forme de tuple (vx, vy).
    :param acceleration: L'accélération de l'objet sous forme de tuple (ax, ay).
    :param color_velocity: La couleur du vecteur vitesse.
    :param color_acceleration: La couleur du vecteur accélération.
    """
    if position and velocity:
        end_point_velocity = (int(position[0] + velocity[0]), int(position[1] + velocity[1]))
        cv2.arrowedLine(frame, position, end_point_velocity, color_velocity, 2, tipLength=0.5)
    
    if position and acceleration:
        end_point_acceleration = (int(position[0] + acceleration[0]), int(position[1] + acceleration[1]))
        cv2.arrowedLine(frame, position, end_point_acceleration, color_acceleration, 2, tipLength=0.5)

    return frame

On traite alors la vidéo. Notons que nous avons ici décidé de lisser les vecteurs de vitesse et d'accélération en faisant une moyenne glissée pour aténuer les variations dues au bruit des zones de couleurs. Enfin, le résultat pourra être affiché sous forme de vidéos ou pour les premières frames de la vidéo avec le paramètre display_mode.

In [242]:
def process_video(video_path, segmenters_and_trackers, display_mode='static'):
    cap = cv2.VideoCapture(video_path)
    frame_count = 0

    while True:
        ret, frame = cap.read()
        if not ret or (display_mode == 'static' and frame_count >= 10):
            break

        combined_masks = np.zeros(frame.shape[:2], dtype=np.uint8)

        # Mise à jour des positions, calcul et lissage des vitesses et accélérations pour chaque tracker
        for segmenter, tracker in segmenters_and_trackers:
            mask = segmenter.apply_mask(frame)
            positions = segmenter.find_objects(mask)
            for pos in positions:
                tracker.update_position(pos)
            tracker.calculate_velocity()
            tracker.calculate_acceleration()

        # Après avoir calculé les vitesses et accélérations, appliquez le lissage
        for _, tracker in segmenters_and_trackers:
            tracker.velocities = tracker.smooth_velocity()  # Remplacer par la méthode de lissage appropriée
              # Remplacer par la méthode de lissage appropriée
        for _, tracker in segmenters_and_trackers:
            tracker.accelerations = tracker.smooth_acceleration()
        
        # Conversion du masque combiné et préparation de la frame pour le dessin
        combined_masks_bgr = cv2.cvtColor(combined_masks, cv2.COLOR_GRAY2BGR)
        combined_frame = np.hstack((frame, combined_masks_bgr))

        # Dessin des positions et vecteurs lissés
        for segmenter, tracker in segmenters_and_trackers:
            latest_position = tracker.get_latest_position()
            if latest_position:
                adjusted_position = (latest_position[0] + frame.shape[1], latest_position[1])
                draw_positions(combined_frame, [adjusted_position], color=(0, 255, 0))

                velocity = tracker.get_latest_velocity()
                acceleration = tracker.get_latest_acceleration()
                
                # Ajustement et dessin des vecteurs sur la frame combinée
                velocity_adjusted = (int(velocity[0] * 5), int(velocity[1] * 5))
                acceleration_adjusted = (int(acceleration[0] * 0.3), int(acceleration[1] * 0.3))
                draw_vectors(combined_frame, adjusted_position, velocity_adjusted, acceleration_adjusted, color_velocity=(255, 0, 0), color_acceleration=(0, 255, 255))
                
        for _, tracker in segmenters_and_trackers:
            impacts = tracker.detect_direction_changes(angle_threshold=30, speed_threshold=15)  # Ajustez les seuils selon vos besoins
            for impact_frame in impacts:
                if 0 <= impact_frame < len(tracker.positions):
                    if frame_count == impact_frame:  # Vérifie si la frame actuelle correspond à un impact
                        impact_pos = tracker.positions[impact_frame]
                        # Dessiner un point rouge sur l'impact
                        cv2.circle(combined_frame, impact_pos, radius=50, color=(0, 0, 255), thickness=-1)

        # Affichage ou sauvegarde de la frame traitée
        if display_mode == 'video':
            cv2.imshow('Tracked Objects', combined_frame)
            if cv2.waitKey(25) & 0xFF == ord('q'):
                break
        elif display_mode == 'static' and frame_count < 10:
            plt.figure(figsize=(10, 5))
            plt.imshow(cv2.cvtColor(combined_frame, cv2.COLOR_BGR2RGB))
            plt.axis('off')
            plt.show()

        frame_count += 1

    cap.release()
    cv2.destroyAllWindows()
    for _ in range(4): cv2.waitKey(1)


In [243]:
# Associez chaque segmenteur à son tracker
segmenters_and_trackers = [(left_shoe_segmenter, left_shoe_tracker), (right_shoe_segmenter, right_shoe_tracker), (cap_segmenter, cap_tracker)]

# Processus d'exemple
process_video(video_path, segmenters_and_trackers, display_mode='video')  # Ou 'video' pour l'affichage vidéo
