## Extraction of Combined Kinematic-Muscular Synergies without Pose Data

This Python script performs a detailed muscle and kinematic synergy analysis on preprocessed experimental data for multiple participants. It is designed to be the final processing step, working from standardized data matrices to produce publication-ready results.

**Key Features:**

* **Works from Correctly Preprocessed Data:** The script loads two separate, pre-standardized data matrices (one for phase1, one for phase2) for each participant.

* **Multi-Level Analysis Loop:** It systematically iterates through Participants, number of Synergies, and Trials.

* **Four Distinct, Separate Analyses per Trial:** For every trial, four analyses are conducted and organized under the `Full Phases` and `Lift-Onset` naming convention:
    * **Full Phases / Phase 1:** Synergies from the complete reach-and-grasp phase.
    * **Full Phases / Phase 2:** Synergies from the complete lift-and-hold phase.
    * **Lift-Onset / Pre-Lift:** Synergies from the last 0.5s of Phase 1.
    * **Lift-Onset / Post-Lift:** Synergies from the first 1.0s of Phase 2.

* **Adaptive MMF Algorithm:** Uses an intelligent Mixed Matrix Factorization (MMF) method with adaptive iterations.

* **Exception Handling:** Handles and flags known missing trials for Participant 7.

* **Organized Output:** All results are saved into a highly organized folder structure.

```markdown
└── P(1)/  (Example for a standard participant)
    └── Synergies Publication/
        ├── Full Phases/
        │   ├── Phase 1/
        │   │   ├── 1_Syn/
        │   │   │   ├── Trial_01/
        │   │   │   │   ├── W_synergies.npy
        │   │   │   │   ├── C_activations.npy
        │   │   │   │   └── synergies_Full_Phases_Phase_1.png
        │   │   │   ├── Trial_02/
        │   │   │   └── ... (up to Trial_24)
        │   │   ├── 2_Syn/
        │   │   └── ... (up to 7_Syn)
        │   │
        │   └── Phase 2/
        │       └── ... (same structure as Phase 1)
        │
        └── Lift-Onset/
            ├── Phase 1/
            │   └── ... (same structure as in Full Phases)
            │
            └── Phase 2/
                └── ... (same structure as in Full Phases)

In [None]:
import os
import numpy as np
import joblib
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
from scipy.fft import rfft, rfftfreq

# =====================================================================
# 0) Global Parameters & Configuration
# =====================================================================
BASE_DIR = r"C:\Users\schmi\Documents\Studium\TUM\Masterthesis\Experimental Data"
PARTICIPANTS = [1, 2, 3, 4, 5, 6, 7, 8]
PREPROCESSED_DATA_DIR_NAME = "Preprocessed_Data_Matrix"

P7_SKIPPED_TRIALS = [20, 22]

MU_S = 0.1
LAMBD_S = 50
NUM_EMG_CHANNELS = 117
BASE_ITERATIONS = 5000
ITERATION_STEP = 5000
MAX_TOTAL_ITERATIONS = 20000

SMOOTHNESS_FREQ_THRESHOLD_HZ = 5.0
HIGH_FREQ_POWER_RATIO = 0.10

SAMPLING_RATE = 500.0

PRE_LIFT_CLIP_SECONDS = 0.5
POST_LIFT_CLIP_SECONDS = 1.0
PRE_LIFT_SAMPLES = int(PRE_LIFT_CLIP_SECONDS * SAMPLING_RATE)
POST_LIFT_SAMPLES = int(POST_LIFT_CLIP_SECONDS * SAMPLING_RATE)

# =====================================================================
# 1) Helper Functions (Trial Info, R^2)
# =====================================================================
def trial_info(trial_number):
    """Returns a dictionary with details for a given trial number."""
    protocol = {
        1: ("Precision Grasp (Four Fingers and Thumb)", "Precision Handle", 0.25, "Left Lever", "No"),
        2: ("Precision Grasp (Four Fingers and Thumb)", "Precision Handle", 0.25, "Left Lever", "Yes"),
        3: ("Lateral Pinch Grasp", "Lateral Pinch Handle", 0.25, "Right Lever", "No"),
        4: ("Lateral Pinch Grasp", "Lateral Pinch Handle", 0.25, "Right Lever", "Yes"),
        5: ("Ball Grasp", "Ball Handle", 0.50, "Left Lever", "No"),
        6: ("Ball Grasp", "Ball Handle", 0.50, "Left Lever", "Yes"),
        7: ("Precision Grasp (Thumb and Index)", "Precision Handle", 0.25, "Front Lever", "No"),
        8: ("Precision Grasp (Thumb and Index)", "Precision Handle", 0.25, "Front Lever", "Yes"),
        9: ("Disc Grip", "Disc Handle", 0.50, "Back Lever", "No"),
        10: ("Disc Grip", "Disc Handle", 0.50, "Back Lever", "Yes"),
        11: ("Power Bar Grasp", "Power Bar Handle", 0.50, "Front Lever", "No"),
        12: ("Power Bar Grasp", "Power Bar Handle", 0.50, "Front Lever", "Yes"),
        13: ("Precision Grasp (Four Fingers and Thumb)", "Precision Handle", 0.25, "Front Lever", "No"),
        14: ("Precision Grasp (Four Fingers and Thumb)", "Precision Handle", 0.25, "Front Lever", "Yes"),
        15: ("Lateral Pinch Grasp", "Lateral Pinch Handle", 0.25, "Back Lever", "No"),
        16: ("Lateral Pinch Grasp", "Lateral Pinch Handle", 0.25, "Back Lever", "Yes"),
        17: ("Ball Grasp", "Ball Handle", 0.50, "Front Lever", "No"),
        18: ("Ball Grasp", "Ball Handle", 0.50, "Front Lever", "Yes"),
        19: ("Precision Grasp (Thumb and Index)", "Precision Handle", 0.25, "Back Lever", "No"),
        20: ("Precision Grasp (Thumb and Index)", "Precision Handle", 0.25, "Back Lever", "Yes"),
        21: ("Disc Grip", "Disc Handle", 0.50, "Left Lever", "No"),
        22: ("Disc Grip", "Disc Handle", 0.50, "Left Lever", "Yes"),
        23: ("Power Bar Grasp", "Power Bar Handle", 0.50, "Right Lever", "No"),
        24: ("Power Bar Grasp", "Power Bar Handle", 0.50, "Right Lever", "Yes"),
    }
    if trial_number not in protocol: return None
    tup = protocol[trial_number]
    return {'grasp_type': tup[0], 'handle_type': tup[1], 'weight_kg': tup[2], 'lever_side': tup[3], 'knowledge': tup[4]}

def r2_score_matrix(original_2d, reconstructed_2d):
    """Computes a single R^2 value for entire matrices."""
    ss_res = np.sum((original_2d - reconstructed_2d)**2)
    ss_tot = np.sum((original_2d - np.mean(original_2d))**2)
    return 1.0 - (ss_res / ss_tot) if ss_tot != 0 else 1.0

# =====================================================================
# 2) MMF Core & Adaptive Learning Logic
# =====================================================================
def check_smoothness_fft(c_vector, fs, freq_thresh, power_ratio_thresh):
    """Evaluates smoothness of a 1D signal using FFT."""
    n = len(c_vector)
    if n < 2: return True
    yf = rfft(c_vector - np.mean(c_vector))
    xf = rfftfreq(n, 1 / fs)
    power = np.abs(yf)**2
    total_power = np.sum(power)
    if total_power < 1e-9: return True
    high_freq_indices = np.where(xf > freq_thresh)
    high_freq_power = np.sum(power[high_freq_indices])
    ratio = high_freq_power / total_power
    return ratio < power_ratio_thresh

def run_mmf_iterations(X, W, C, k, mu_s, lambd_s, num_iters):
    """Runs MMF for a fixed number of iterations."""
    n_vars, n_synergies = W.shape
    normX = np.linalg.norm(X)
    if normX == 0: return W, C
    mu_W = mu_s / normX
    mu_C = mu_s / normX
    lambd = lambd_s / (n_vars * n_synergies)
    for _ in range(num_iters):
        W_update = mu_W * ((X @ C.T) - (W @ (C @ C.T)) - lambd * W)
        C_update = mu_C * ((W.T @ X) - ((W.T @ W) @ C))
        W += W_update
        C += C_update
        W[:k, :] = np.maximum(W[:k, :], 0)
    return W, C

def adaptive_mmf_wrapper(data_matrix, n_synergies):
    """
    A wrapper for MMF that adaptively increases iterations until C is smooth.
    
    MODIFIED: For very short signals (like the lift-onset clips), the FFT-based
    smoothness check is unreliable. This function now detects short signals and
    runs a fixed number of iterations instead of getting stuck in the adaptive loop.
    """
    if data_matrix is None or data_matrix.shape[0] < 5:
        print("   -> WARNING: Input data matrix is too short or None. Skipping MMF.")
        return None, None
        
    X = data_matrix.T
    n_vars, n_samples = X.shape
    
    np.random.seed(42)
    W = np.random.rand(n_vars, n_synergies) * 0.01
    C = np.random.rand(n_synergies, n_samples) * 0.01

    # --- FIX: Conditional Smoothness Check ---
    # Check if the signal is a short clip. We use POST_LIFT_SAMPLES as the
    # threshold since it's the longer of the two clip lengths.
    is_short_signal = n_samples <= POST_LIFT_SAMPLES

    if is_short_signal:
        # For short signals, the FFT check is unreliable. Run a fixed number of iterations.
        print(f"   -> Short signal detected (len={n_samples}). Running fixed iterations.")
        W, C = run_mmf_iterations(X, W, C, NUM_EMG_CHANNELS, MU_S, LAMBD_S, BASE_ITERATIONS)
        print(f"   -> Fixed {BASE_ITERATIONS} iterations complete.")
        return W, C

    # --- For longer signals, use the original adaptive approach ---
    total_iters = 0
    all_smooth = False
    while total_iters < MAX_TOTAL_ITERATIONS:
        iters_to_run = BASE_ITERATIONS if total_iters == 0 else ITERATION_STEP
        W, C = run_mmf_iterations(X, W, C, NUM_EMG_CHANNELS, MU_S, LAMBD_S, iters_to_run)
        total_iters += iters_to_run
        smoothness_results = [check_smoothness_fft(C[i, :], SAMPLING_RATE, SMOOTHNESS_FREQ_THRESHOLD_HZ, HIGH_FREQ_POWER_RATIO) for i in range(n_synergies)]
        if all(smoothness_results):
            print(f"   -> Smoothness achieved after {total_iters} iterations.")
            all_smooth = True
            break
        else:
            print(f"   -> Not smooth after {total_iters} iterations. Continuing...")
    
    if not all_smooth:
        print(f"   -> WARNING: Max iterations ({MAX_TOTAL_ITERATIONS}) reached. Coefficients may not be smooth.")
        
    return W, C


# =====================================================================
# 3) Plotting Function
# =====================================================================
def plot_and_save_synergies(W, C, save_dir, prefix, index_dict, r2_score):
    """Plots and saves synergy results."""
    n_features, n_synergies = W.shape
    os.makedirs(save_dir, exist_ok=True)
    otb_start, otb_end = index_dict['otb_indices']
    myo_start, myo_end = index_dict['myo_indices']
    kin_hand_start, kin_hand_end = index_dict['kin_hand_indices']
    
    colors = ["gray"] * n_features
    for i in range(otb_start, otb_end): colors[i] = '#4F81BD'
    for i in range(myo_start, myo_end): colors[i] = 'pink'
    for i in range(kin_hand_start, kin_hand_end): colors[i] = 'orange'
    
    fig_height = 4 * n_synergies
    fig, axes = plt.subplots(n_synergies, 2, figsize=(20, fig_height), squeeze=False)
    fig.suptitle(f"Synergies for {prefix} (R² = {r2_score:.3f})", fontsize=16)
    
    for i in range(n_synergies):
        ax_bar = axes[i, 0]
        ax_bar.bar(np.arange(n_features), W[:, i], color=colors, alpha=0.7, width=1.0)
        ax_bar.set_title(f"Synergy Vector {i+1}", fontsize=12)
        ax_bar.set_ylabel("Component Value")
        ax_bar.set_xticks([])
        if i == n_synergies - 1:
            ax_bar.set_xlabel("Concatenated Features")
        ax_bar.legend(handles=[Patch(facecolor='#4F81BD', label='OTB'), Patch(facecolor='pink', label='Myo'), Patch(facecolor='orange', label='Kinematic Hand')])
        
        ax_line = axes[i, 1]
        time_axis = np.arange(C.shape[1]) / SAMPLING_RATE
        ax_line.plot(time_axis, C[i, :], color='green', linewidth=1.5)
        ax_line.set_title(f"Activation Coefficient {i+1}", fontsize=12)
        ax_line.set_ylabel('Activation')
        if i == n_synergies - 1:
            ax_line.set_xlabel('Time (s)')
        ax_line.grid(True, linestyle='--', linewidth=0.5)
        
    plt.tight_layout(rect=[0, 0.03, 1, 0.97])
    out_path = os.path.join(save_dir, f"{prefix}_synergies.png")
    plt.savefig(out_path, dpi=150)
    plt.close(fig)

# =====================================================================
# 4) Main Execution
# =====================================================================
def run_analysis_block(analysis_path, data_matrix, n_syn, trial_idx, participant_dir, index_dict, is_skipped):
    """Helper to run and save one full analysis block."""
    analysis_name = os.path.normpath(analysis_path).replace(os.sep, '_')
    print(f" Running {analysis_path} analysis...")
    
    save_dir = os.path.join(participant_dir, "Synergies Publication", analysis_path, f"{n_syn}_Syn", f"Trial_{trial_idx:02d}")
    if is_skipped:
        save_dir += "_SKIPPED_TRIAL"
    os.makedirs(save_dir, exist_ok=True)
    
    if not is_skipped and data_matrix is not None:
        W, C = adaptive_mmf_wrapper(data_matrix, n_syn)
        if W is not None and C is not None:
            r2 = r2_score_matrix(data_matrix.T, W @ C)
            np.save(os.path.join(save_dir, "W_synergies.npy"), W)
            np.save(os.path.join(save_dir, "C_activations.npy"), C)
            plot_prefix = f"synergies_{analysis_name}"
            plot_and_save_synergies(W, C, save_dir, plot_prefix, index_dict, r2)
    elif is_skipped:
        print("   -> Skipping analysis for this trial as marked.")


def main():
    """Main function to run the complete analysis pipeline."""
    for pid in PARTICIPANTS:
        participant_str = f"P({pid})"
        participant_dir = os.path.join(BASE_DIR, participant_str)
        preprocessed_dir = os.path.join(participant_dir, PREPROCESSED_DATA_DIR_NAME)
        
        matrix_p1_path = os.path.join(preprocessed_dir, f"P{pid}_combined_matrix_phase1.npy")
        matrix_p2_path = os.path.join(preprocessed_dir, f"P{pid}_combined_matrix_phase2.npy")
        indices_path = os.path.join(preprocessed_dir, f"P{pid}_feature_indices.joblib")
        
        if not all(os.path.exists(p) for p in [matrix_p1_path, matrix_p2_path, indices_path]):
            print(f"ERROR: Preprocessed files not found for {participant_str}. Skipping.")
            continue
            
        print(f"\n{'='*80}\nProcessing Participant: {pid}\n{'='*80}")
        p1_full_matrix = np.load(matrix_p1_path)
        p2_full_matrix = np.load(matrix_p2_path)
        index_dict = joblib.load(indices_path)
        
        if 'phase1_trial_lengths' not in index_dict or 'phase2_trial_lengths' not in index_dict:
            print(f"ERROR: Trial length information not found in {indices_path}.")
            print("Please update your preprocessing script and re-run it. Skipping participant.")
            continue
            
        p1_trial_lengths = index_dict['phase1_trial_lengths']
        p2_trial_lengths = index_dict['phase2_trial_lengths']
        
        participant_skip_list = P7_SKIPPED_TRIALS if pid == 7 else []
        valid_trials = [t for t in range(1, 25) if t not in participant_skip_list]

        if len(p1_trial_lengths) != len(valid_trials):
            print(f"ERROR: Mismatch between number of trial lengths ({len(p1_trial_lengths)}) and valid trials ({len(valid_trials)}) for P{pid}. Skipping.")
            continue

        p1_trial_map, p2_trial_map = {}, {}
        p1_current_row, p2_current_row = 0, 0
        for i, trial_idx in enumerate(valid_trials):
            p1_len = p1_trial_lengths[i]
            p1_start, p1_end = p1_current_row, p1_current_row + p1_len
            p1_trial_map[trial_idx] = (p1_start, p1_end)
            p1_current_row = p1_end

            p2_len = p2_trial_lengths[i]
            p2_start, p2_end = p2_current_row, p2_current_row + p2_len
            p2_trial_map[trial_idx] = (p2_start, p2_end)
            p2_current_row = p2_end
        
        for n_syn in range(1, 8):
            for trial_idx in range(1, 25):
                print(f"-- P{pid}, N_Synergies={n_syn}, Trial={trial_idx} --")
                is_skipped_trial = (pid == 7 and trial_idx in P7_SKIPPED_TRIALS)

                full_phase1_data, full_phase2_data = None, None
                lift_onset_p1_data, lift_onset_p2_data = None, None
                
                if not is_skipped_trial:
                    p1_rows = slice(*p1_trial_map[trial_idx])
                    p2_rows = slice(*p2_trial_map[trial_idx])
                    
                    full_phase1_data = p1_full_matrix[p1_rows]
                    full_phase2_data = p2_full_matrix[p2_rows]
                    
                    lift_onset_p1_data = full_phase1_data[-PRE_LIFT_SAMPLES:]
                    lift_onset_p2_data = full_phase2_data[:POST_LIFT_SAMPLES]
                
                run_analysis_block(os.path.join("Full Phases", "Phase 1"), full_phase1_data, n_syn, trial_idx, participant_dir, index_dict, is_skipped_trial)
                run_analysis_block(os.path.join("Full Phases", "Phase 2"), full_phase2_data, n_syn, trial_idx, participant_dir, index_dict, is_skipped_trial)
                run_analysis_block(os.path.join("Lift-Onset", "Phase 1"), lift_onset_p1_data, n_syn, trial_idx, participant_dir, index_dict, is_skipped_trial)
                run_analysis_block(os.path.join("Lift-Onset", "Phase 2"), lift_onset_p2_data, n_syn, trial_idx, participant_dir, index_dict, is_skipped_trial)

    print("\n\nAnalysis complete for all participants.")

if __name__ == "__main__":
    main()


Processing Participant: 1
-- P1, N_Synergies=1, Trial=1 --
 Running Full Phases\Phase 1 analysis...
   -> Smoothness achieved after 5000 iterations.
 Running Full Phases\Phase 2 analysis...
   -> Smoothness achieved after 5000 iterations.
 Running Lift-Onset\Phase 1 analysis...
   -> Not smooth after 5000 iterations. Continuing...
   -> Not smooth after 10000 iterations. Continuing...
   -> Not smooth after 15000 iterations. Continuing...
   -> Not smooth after 20000 iterations. Continuing...
 Running Lift-Onset\Phase 2 analysis...
   -> Not smooth after 5000 iterations. Continuing...
   -> Not smooth after 10000 iterations. Continuing...
   -> Not smooth after 15000 iterations. Continuing...
   -> Not smooth after 20000 iterations. Continuing...
-- P1, N_Synergies=1, Trial=2 --
 Running Full Phases\Phase 1 analysis...
   -> Smoothness achieved after 5000 iterations.
 Running Full Phases\Phase 2 analysis...
   -> Smoothness achieved after 5000 iterations.
 Running Lift-Onset\Phase 1 a

KeyboardInterrupt: 