## Extraction des données, Creation de vecteur et Normalisation 


Ici on extrait avec open face des csv des informations sur chaques participants par frame.
Ensuite on trie les csv pour ne prendre que les AUs selectionnée et les transformer en tableaux numpy.
Puis enfin on normalise les données de chaques AUs 


In [None]:
import os
import re
import subprocess
from pathlib import Path

import numpy as np
import pandas as pd

# === CONFIGURATION ===
OPENFACE_DIR = Path(r"C:\Users\Ordinateur\Desktop\Projet tutoret\OpenFace_2.2.0_win_x64\OpenFace_2.2.0_win_x64")
VIDEOS_DIR   = Path(r"C:\Users\Ordinateur\Desktop\----------------------Test pipeline----------------------------\Vidéos") #mettre le dossier ou se situe les videos des participants 
CSV_DIR      = VIDEOS_DIR / "openface_csv"
NUMPY_DIR    = VIDEOS_DIR / "openface_numpy"

# Création des dossiers si pas existants
#CSV_DIR.mkdir(exist_ok=True)
#NUMPY_DIR.mkdir(exist_ok=True)

AU_COLS = [
    "AU01_r", "AU02_r", "AU04_r", "AU05_r", "AU06_r", "AU07_r",
    "AU09_r", "AU10_r", "AU12_r", "AU14_r", "AU15_r", "AU17_r",
    "AU20_r", "AU23_r", "AU25_r", "AU26_r", "AU45_r"
]

# === FONCTION 1 : Extraction des CSV depuis les vidéos ===
def extract_openface_features(videos_dir: Path, openface_dir: Path, out_dir: Path):
    """Extrait les features OpenFace pour toutes les vidéos dans videos_dir."""
    extractor = openface_dir / "FeatureExtraction.exe"
    if not extractor.exists():
        raise FileNotFoundError(f"{extractor} introuvable.")

    for i, vid in enumerate(videos_dir.iterdir()):
        if vid.suffix.lower() in {'.mov', '.mp4', '.avi'}:
            output_subdir = out_dir / f"participant_{i+1}"
            output_subdir.mkdir(exist_ok=True)
            print(f"[OpenFace] Extraction de {vid.name} → {output_subdir.name}")
            subprocess.run(
                [
                    str(extractor),
                    "-f", str(vid),
                    "-out_dir", str(output_subdir)
                ],
                cwd=str(openface_dir),
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL,
                check=True
            )

# === FONCTION 2 : Conversion CSV → Tableaux numpy ===
def convert_csvs_to_numpy(csv_dir: Path, robot_csv: Path, au_cols: list[str], numpy_dir: Path):
    """Charge les CSV, extrait les colonnes AU, sauvegarde les tableaux numpy."""

    def load_and_clean(filepath: Path) -> np.ndarray:
        df = pd.read_csv(filepath)
        df.columns = [re.sub(r"\s+", "", col) for col in df.columns]
        missing = set(au_cols) - set(df.columns)
        if missing:
            print(f"⚠ Colonnes manquantes dans {filepath.name} : {missing}")
        return df.loc[:, au_cols].values

    # Charger le robot
    robot_data = load_and_clean(robot_csv)
    np.save(numpy_dir / "robot.npy", robot_data)
    
    # Charger chaque participant
    for part_dir in csv_dir.iterdir():
        if part_dir.is_dir():
            for file in part_dir.glob("*.csv"):
                participant_name = part_dir.name
                data = load_and_clean(file)
                np.save(numpy_dir / f"{participant_name}.npy", data)

# === FONCTION 3 : Normalisation des tableaux numpy ===
def normalize_numpy_arrays(numpy_dir: Path):
    """Charge les .npy, les normalise, et les écrase."""

    def z_normalize(X: np.ndarray, eps: float = 1e-8) -> np.ndarray:
        mu  = X.mean(axis=0, keepdims=True)
        std = X.std(axis=0, keepdims=True)
        std[std < eps] = 1.0
        return (X - mu) / std

    for file in numpy_dir.glob("*.npy"):
        print(f"Normalisation de {file.name}")
        data = np.load(file)
        data_norm = z_normalize(data)
        np.save(file, data_norm)

# === PIPELINE COMPLÈTE (exemple d'appel) ===
def run_data_preparation():
    extract_openface_features(VIDEOS_DIR, OPENFACE_DIR, CSV_DIR)
    convert_csvs_to_numpy(CSV_DIR, Path(r"C:\Users\Ordinateur\Desktop\Test pipeline\Robot_csv\WhatsApp Video 2025-04-30 at 12.37.38.csv"), AU_COLS, NUMPY_DIR)
    normalize_numpy_arrays(NUMPY_DIR)



In [2]:
run_data_preparation()

[OpenFace] Extraction de essai-AVEC-imitation.avi → participant_1
[OpenFace] Extraction de essai-SANS-imitation.avi → participant_2
Normalisation de participant_1.npy
Normalisation de participant_2.npy
Normalisation de robot.npy


# DTWI et DTWD causales 
On utilise la bibliothèque dtaidistance pour performer DTWi et DTWd au tableau d'au de chaque participants par apport au robot.


Dans les DTW classique toutes les correspondances sont acceptées, même si l'humain agit avant le robot, mais cela ne correspond pas à notre définition de l'imitation (puisque le robot doit agir en premier).

Pour respecter la logique d’imitation, on filtre nos données pour que dans l'algorithme on mesure la similiarité que si les données des participants précedes celles du rebot dans lanalyse. Concretement on garde uniquement les paires (i, j) du chemin optimal où j ≥ i.

Ensuite, pour éviter de prendre en compte des réponses trop tardives, on ajoute aussi une contrainte de fenêtre temporelle.On filtre aussi les paires (i, j) pour garder uniquement celles où le décalage est inférieur ou égal à un certain nombre de frames (par exemple 30 frames). Cela nous permet de ne valider l'imitation que si elle se produit rapidement après l'action du robot.

LEs mesures: 
- score: Rapport DTWD / (DTWI + ε). Il donne une idée de combien la séquence globale (DTWD) est proche par rapport à la somme indépendante (DTWI). Plus il est petit, plus il y a une vraie coordination globale.
- Distance DTW dépendante, mais en filtrant uniquement les appariements causaux (le participant réagit après ou très peu après le robot, avec un délai ≤ 30 frames). Plus c’est faible, plus l’imitation causale est forte.
- Distance DTW indépendante en version causale (chaque AU évaluée individuellement, mais toujours en respectant la causalité).

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from dtaidistance import dtw, dtw_ndim

def full_causal_dtw_analysis(numpy_dir: Path, fps: int = 30, delay_sec: float = 1.0):
    """
    Pipeline complète : calcul DTWD_causal, DTWI_causal, heatmaps, alignements, courbes brutes.
    
    Args:
        numpy_dir (Path): Dossier contenant les .npy normalisés.
        fps (int): Images par seconde de la vidéo (défaut = 30).
        delay_sec (float): Délai maximum pour l'imitation (en secondes).
    """
    print("Chargement des données...")
    robot_array = np.load(numpy_dir / "robot.npy")
    participant_files = [f for f in numpy_dir.glob("*.npy") if "robot" not in f.name]
    max_delay_frames = int(delay_sec * fps)

    results = []

    print("Calcul des distances DTW causales...")
    for part_file in participant_files:
        part_array = np.load(part_file)

        # DTWD causal
        path_d = dtw_ndim.warping_path(robot_array, part_array)
        valid_pairs_d = [(i, j) for (i, j) in path_d if 0 <= j - i <= max_delay_frames]
        dtwd_causal = np.mean([np.linalg.norm(robot_array[i] - part_array[j]) for (i, j) in valid_pairs_d]) if valid_pairs_d else np.nan

        # DTWI causal
        distances_i = []
        for d in range(robot_array.shape[1]):
            path_i = dtw.warping_path(robot_array[:, d], part_array[:, d])
            valid_pairs_i = [(i, j) for (i, j) in path_i if 0 <= j - i <= max_delay_frames]
            if valid_pairs_i:
                distances_i.extend([abs(robot_array[i, d] - part_array[j, d]) for (i, j) in valid_pairs_i])
        dtwi_causal = np.mean(distances_i) if distances_i else np.nan

        # Score causal
        score_causal = dtwd_causal / (dtwi_causal + 1e-6) if dtwi_causal else np.nan

        results.append({
            "participant": part_file.stem,
            "DTWD_causal": dtwd_causal,
            "DTWI_causal": dtwi_causal,
            "score_causal": score_causal
        })

    results_df = pd.DataFrame(results)
    print("Résultats calculés :")
    print(results_df)

    # --- Graphiques ---
    print("Génération des graphiques...")

    # Courbes brutes (moyennes des AUs)
    robot_mean = robot_array.mean(axis=1)
    plt.figure(figsize=(12,6))
    plt.plot(robot_mean, label="Robot", linewidth=2)
    for part_file in participant_files:
        part_array = np.load(part_file)
        plt.plot(part_array.mean(axis=1), label=part_file.stem, linestyle='--')
    plt.title("Courbes brutes (sans DTW) - Moyenne des AUs par frame")
    plt.xlabel("Frames")
    plt.ylabel("Intensité normalisée")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    # Heatmaps
    dtwd_values = [0] + results_df["DTWD_causal"].tolist()
    dtwi_values = [0] + results_df["DTWI_causal"].tolist()
    labels = ["Robot"] + results_df["participant"].tolist()

    dtwd_matrix = np.full((len(dtwd_values), len(dtwd_values)), np.nan)
    dtwi_matrix = np.full((len(dtwi_values), len(dtwi_values)), np.nan)
    for i in range(1, len(dtwd_values)):
        dtwd_matrix[0, i] = dtwd_values[i]
        dtwd_matrix[i, 0] = dtwd_values[i]
        dtwi_matrix[0, i] = dtwi_values[i]
        dtwi_matrix[i, 0] = dtwi_values[i]

    plt.figure(figsize=(6,5))
    sns.heatmap(dtwd_matrix, annot=True, fmt=".3f", cmap="Blues", xticklabels=labels, yticklabels=labels)
    plt.title("Heatmap DTWD_causal")
    plt.show()

    plt.figure(figsize=(6,5))
    sns.heatmap(dtwi_matrix, annot=True, fmt=".3f", cmap="Greens", xticklabels=labels, yticklabels=labels)
    plt.title("Heatmap DTWI_causal")
    plt.show()

    # Alignements globaux et par AU
    for part_file in participant_files:
        part_array = np.load(part_file)
        participant_name = part_file.stem

        # Alignement global DTWD
        path = dtw_ndim.warping_path(robot_array, part_array)
        valid_pairs = [(i, j) for (i, j) in path if 0 <= j - i <= max_delay_frames]
        robot_mean = robot_array.mean(axis=1)
        part_mean = part_array.mean(axis=1)

        offset = 2.5
        plt.figure(figsize=(14,5))
        plt.plot(robot_mean, label="Robot", color='red')
        plt.plot(part_mean + offset, label=participant_name, color='blue')
        for i, j in valid_pairs:
            plt.plot([i, j], [robot_mean[i], part_mean[j] + offset], color='gray', alpha=0.5, linewidth=0.6)
        plt.title(f"Alignement DTWD Causal (robot vs {participant_name})")
        plt.xlabel("Frames")
        plt.yticks([])
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()

        # Alignement par AU (DTWI causal)
        n_aus = robot_array.shape[1]
        for d in range(n_aus):
            robot_seq = robot_array[:, d]
            part_seq = part_array[:, d]
            path = dtw.warping_path(robot_seq, part_seq)
            valid_pairs = [(i, j) for (i, j) in path if 0 <= j - i <= max_delay_frames]

            plt.figure(figsize=(14,5))
            plt.plot(robot_seq, label="Robot", color='red')
            plt.plot(part_seq + offset, label=participant_name, color='blue')
            for i, j in valid_pairs:
                plt.plot([i, j], [robot_seq[i], part_seq[j] + offset], color='gray', alpha=0.5, linewidth=0.6)
            plt.title(f"Alignement DTWI Causal - AU{d+1} ({participant_name})")
            plt.xlabel("Frames")
            plt.yticks([])
            plt.legend()
            plt.grid(True)
            plt.tight_layout()
            plt.show()

    return results_df


In [1]:
full_causal_dtw_analysis(numpy_dir=NUMPY_DIR, fps=30, delay_sec=1.0)

NameError: name 'full_causal_dtw_analysis' is not defined

## Representation graphiques

### Matrice 
Plus la distance est petite, plus les séquences sont similaires.

Si DTWA(Robot, Part1) < DTWA(Robot, Part2), c’est un indice fort d’imitation réussie par le participant 1.

### Alignement global

### Alignement par AUS

### Moyenne des AUs tab (trace les courbes moyennes des AUs par frame, sans appariement DTW du tout, encore moins filtré.)
Si les courbes se ressemblent en forme (pics, plateaux...), c’est un signe d’imitation ou de synchronisation.

Si elles sont décalées mais suivent les mêmes motifs, cela peut être corrigé par la DTW.

Une courbe très différente (autres pics, autres tendances) = pas d’imitation.

# Graph pour chaques AUs

L'alignement par Aus pour un participant semble plus pertinent à analyser graphiquement on peux voir si des actions musculaire ont lieux apres celles du robot 
