# Introduction: 

### Python Imports

In [None]:
import os
import tqdm
import numpy as np
import pandas as pd

## Data loaders for .mot files

In [None]:
# Data loader for .mot files

def read_header(mot_file,header_line=10): 
    if not os.path.isfile(mot_file): 
        return []

    try:         
        with open(mot_file, 'r') as f:
            lines = f.readlines()
            headers = lines[header_line]
            headers = headers.split()
            return headers
    except Exception as e:
        print(f"Unable to load headers for file:{mot_file}. Error:{e}")
        return [] 

def storage_to_numpy(storage_file, excess_header_entries=0):
    """Returns the data from a storage file in a numpy format. Skips all lines
    up to and including the line that says 'endheader'.
    Parameters
    ----------
    storage_file : str
        Path to an OpenSim Storage (.sto) file.
    Returns
    -------
    data : np.ndarray (or numpy structure array or something?)
        Contains all columns from the storage file, indexable by column name.
    excess_header_entries : int, optional
        If the header row has more names in it than there are data columns.
        We'll ignore this many header row entries from the end of the header
        row. This argument allows for a hacky fix to an issue that arises from
        Static Optimization '.sto' outputs.
    Examples
    --------
    Columns from the storage file can be obtained as follows:
        >>> data = storage2numpy('<filename>')
        >>> data['ground_force_vy']
    """
    # What's the line number of the line containing 'endheader'?
    f = open(storage_file, 'r')

    header_line = False
    for i, line in enumerate(f):
        if header_line:
            column_names = line.split()
            break
        if line.count('endheader') != 0:
            line_number_of_line_containing_endheader = i + 1
            header_line = True
    f.close()
    # With this information, go get the data.
    if excess_header_entries == 0:
        names = True
        skip_header = line_number_of_line_containing_endheader
    else:
        names = column_names[:-excess_header_entries]
        skip_header = line_number_of_line_containing_endheader + 1
    data = np.genfromtxt(storage_file, names=names,
            skip_header=skip_header)

    new_data = []
    for d in data:
        new_data.append(list(d))
    new_data = np.array(new_data)

    return data


def storage_to_dataframe(storage_file, headers):
    # Extract data
    data = storage_to_numpy(storage_file)
    data = np.array(data)
    new_data = []
    for d in data:
        new_data.append(list(d))
    new_data = np.array(new_data)
    header_mapping = {header:i for i,header in enumerate(headers)}

    out = pd.DataFrame(data=data['time'], columns=['time'])
    for count, header in enumerate(headers):
        out.insert(count + 1, header, new_data[:,count+1])    
    
    return out

# Load simulation data into a dictionary called subjects

In [None]:
data_path ='/media/shubh/Elements/RoseYu/UCSD-OpenCap-Fitness-Dataset/MCS_DATA/Data/'

# Load simulation for data for given list of sessions
def load_simulation_data(mcs_sessions):
    subjects = {}

    for subject_name in tqdm.tqdm(mcs_sessions):

        if not os.path.isdir(os.path.join(data_path, subject_name)): continue
        simulation_results_path = os.path.join(data_path, subject_name, 'OpenSimData','Dynamics')

        if not os.path.isdir(simulation_results_path):
            continue
        
        subject = {}
        for trial_name in os.listdir(simulation_results_path): 
            
            if 'segment' not in trial_name: # Contains same results as segment-1 
                continue
            
            trial = {}
            
            kinetics_path = os.path.join(simulation_results_path, trial_name,f"kinetics_{trial_name}_muscle_driven.mot")
            mot_headers = read_header(kinetics_path,header_line=6)            
                        
            if len(mot_headers) == 0:
                print(f"Unable to load headers for file:", subject_name,trial_name, kinetics_path)
                continue 
            
            # Remove time from headers
            mot_headers.remove('time')

            kinetics = storage_to_dataframe(kinetics_path, mot_headers)
            
            trial['kinetics'] = kinetics

            kinematics_path = os.path.join(simulation_results_path, trial_name,f"kinematics_activations_{trial_name}_muscle_driven.mot")
            mot_headers = read_header(kinematics_path,header_line=10)

            if len(mot_headers) == 0:
                print(f"Unable to load headers for file:", subject_name,kinematics_path)
                continue 
            # print("Headers:", mot_headers)
            # Remove time from headers
            mot_headers.remove('time')

            kinematics = storage_to_dataframe(kinematics_path, mot_headers)
            trial['kinematics'] = kinematics

            subject['dof_names'] = kinematics.columns.tolist()    
            
            if len(trial) > 0:
                subject[trial_name] = trial        
                print("Loaded data for:", subject_name, trial_name, "Kinetics:", subject[trial_name]['kinetics'].shape, "Kinematics:", subject[trial_name]['kinematics'].shape)            

        if len(subject) > 0:
            subjects[subject_name] = subject
        else: 
            print("No trials found for subject:", subject_name)
    
    return subjects 


# PPE Files containing with MCS Scores
mcs_sessions = ["349e4383-da38-4138-8371-9a5fed63a56a","015b7571-9f0b-4db4-a854-68e57640640d","c613945f-1570-4011-93a4-8c8c6408e2cf","dfda5c67-a512-4ca2-a4b3-6a7e22599732","7562e3c0-dea8-46f8-bc8b-ed9d0f002a77","275561c0-5d50-4675-9df1-733390cd572f","0e10a4e3-a93f-4b4d-9519-d9287d1d74eb","a5e5d4cd-524c-4905-af85-99678e1239c8","dd215900-9827-4ae6-a07d-543b8648b1da","3d1207bf-192b-486a-b509-d11ca90851d7","c28e768f-6e2b-4726-8919-c05b0af61e4a","fb6e8f87-a1cc-48b4-8217-4e8b160602bf","e6b10bbf-4e00-4ac0-aade-68bc1447de3e","d66330dc-7884-4915-9dbb-0520932294c4","0d9e84e9-57a4-4534-aee2-0d0e8d1e7c45","2345d831-6038-412e-84a9-971bc04da597","0a959024-3371-478a-96da-bf17b1da15a9","ef656fe8-27e7-428a-84a9-deb868da053d","c08f1d89-c843-4878-8406-b6f9798a558e","d2020b0e-6d41-4759-87f0-5c158f6ab86a","8dc21218-8338-4fd4-8164-f6f122dc33d9"]
mcs_scores = [4,4,2,3,2,4,3,3,2,3,4,3,4,2,2,3,4,4,3,3,3 ]
mcs_scores = dict(zip(mcs_sessions,mcs_scores))


mcs_sessions = os.listdir(data_path)


PPE_Subjects = ["PPE09182201","PPE09182202","PPE09182203","PPE09182204","PPE09182205","PPE09182206","PPE09182207","PPE09182208","PPE09182209","PPE091822010","PPE09182211","PPE09182212","PPE09182213","PPE09182214","PPE09182215","PPE09182216","PPE09182217","PPE09182218","PPE09182219","PPE09182220","PPE09182221"]
PPE_Subjects = dict(zip(mcs_sessions,PPE_Subjects))

import pandas as pd
subject2opencap = pd.read_table('/media/shubh/Elements/RoseYu/UCSD-OpenCap-Fitness-Dataset/MCS_DATA/subject2opencap.txt',sep=',')
subject2opencap
PPE_Subjects = dict(zip( subject2opencap[' OpenCap-ID'].tolist(), subject2opencap['PPE'].tolist()))

for session in PPE_Subjects:
    if session not in mcs_scores:
        mcs_scores[session] = -1  
    print(session, PPE_Subjects[session], mcs_scores[session])

subjects = load_simulation_data(mcs_sessions)

In [None]:
print(f"Subjects Loaded:", len(subjects.keys()))

## Plot temporal segmentation results

In [None]:
# Name Mappings from Github copilot
plot_names_mapping = {
    'lumbar_bending': 'Trunk Obliquity',
    'lumbar_extension': 'Trunk Tilt',
    'lumbar_rotation': 'Trunk Rotation',
    'pelvis_list': 'Pelvic Obliquity',
    'pelvis_tilt': 'Pelvic Tilt',
    'pelvis_rotation': 'Pelvic Rotation',
    'hip_adduction_l': 'Left Hip Ab/Adduction',
    'hip_flexion_l': 'Left Hip Flexion/Extension',
    'hip_rotation_l': 'Left Hip Rotation',
    'hip_adduction_r': 'Right Hip Ab/Adduction',
    'hip_flexion_r': 'Right Hip Flexion/Extension',
    'hip_rotation_r': 'Right Hip Rotation',
    'knee_angle_l': 'Left Knee Flexion/Extension',
    'knee_angle_r': 'Right Knee Flexion/Extension',
    'ankle_angle_l': 'Left Ankle Dorsi/Plantar',
    'ankle_angle_r': 'Right Ankle Dorsi/Plantar'
}

In [None]:


plot_muscle_activations_mapping = {
    'addbrev_l/activation': 'Adductor Brevis (Left)',
    'addlong_l/activation': 'Adductor Longus (Left)',
    'addmagDist_l/activation': 'Adductor Magnus Distal (Left)',
    'addmagIsch_l/activation': 'Adductor Magnus Ischial (Left)',
    'addmagMid_l/activation': 'Adductor Magnus Middle (Left)',
    'addmagProx_l/activation': 'Adductor Magnus Proximal (Left)',
    'bflh_l/activation': 'Biceps Femoris Long Head (Left)',
    'bfsh_l/activation': 'Biceps Femoris Short Head (Left)',
    'edl_l/activation': 'Extensor Digitorum Longus (Left)',
    'ehl_l/activation': 'Extensor Hallucis Longus (Left)',
    'fdl_l/activation': 'Flexor Digitorum Longus (Left)',
    'fhl_l/activation': 'Flexor Hallucis Longus (Left)',
    'gaslat_l/activation': 'Gastrocnemius Lateral (Left)',
    'gasmed_l/activation': 'Gastrocnemius Medial (Left)',
    'glmax1_l/activation': 'Gluteus Maximus 1 (Left)',
    'glmax2_l/activation': 'Gluteus Maximus 2 (Left)',
    'glmax3_l/activation': 'Gluteus Maximus 3 (Left)',
    'glmed1_l/activation': 'Gluteus Medius 1 (Left)',
    'glmed2_l/activation': 'Gluteus Medius 2 (Left)',
    'glmed3_l/activation': 'Gluteus Medius 3 (Left)',
    'glmin1_l/activation': 'Gluteus Minimus 1 (Left)',
    'glmin2_l/activation': 'Gluteus Minimus 2 (Left)',
    'glmin3_l/activation': 'Gluteus Minimus 3 (Left)',
    'grac_l/activation': 'Gracilis (Left)',
    'iliacus_l/activation': 'Iliacus (Left)',
    'perbrev_l/activation': 'Peroneus Brevis (Left)',
    'perlong_l/activation': 'Peroneus Longus (Left)',
    'piri_l/activation': 'Piriformis (Left)',
    'psoas_l/activation': 'Psoas (Left)',
    'recfem_l/activation': 'Rectus Femoris (Left)',
    'sart_l/activation': 'Sartorius (Left)',
    'semimem_l/activation': 'Semimembranosus (Left)',
    'semiten_l/activation': 'Semitendinosus (Left)',
    'soleus_l/activation': 'Soleus (Left)',
    'tfl_l/activation': 'Tensor Fasciae Latae (Left)',
    'tibant_l/activation': 'Tibialis Anterior (Left)',
    'tibpost_l/activation': 'Tibialis Posterior (Left)',
    'vasint_l/activation': 'Vastus Intermedius (Left)',
    'vaslat_l/activation': 'Vastus Lateralis (Left)',
    'vasmed_l/activation': 'Vastus Medialis (Left)',
    'addbrev_r/activation': 'Adductor Brevis (Right)',
    'addlong_r/activation': 'Adductor Longus (Right)',
    'addmagDist_r/activation': 'Adductor Magnus Distal (Right)',
    'addmagIsch_r/activation': 'Adductor Magnus Ischial (Right)',
    'addmagMid_r/activation': 'Adductor Magnus Middle (Right)',
    'addmagProx_r/activation': 'Adductor Magnus Proximal (Right)',
    'bflh_r/activation': 'Biceps Femoris Long Head (Right)',
    'bfsh_r/activation': 'Biceps Femoris Short Head (Right)',
    'edl_r/activation': 'Extensor Digitorum Longus (Right)',
    'ehl_r/activation': 'Extensor Hallucis Longus (Right)',
    'fdl_r/activation': 'Flexor Digitorum Longus (Right)',
    'fhl_r/activation': 'Flexor Hallucis Longus (Right)',
    'gaslat_r/activation': 'Gastrocnemius Lateral (Right)',
    'gasmed_r/activation': 'Gastrocnemius Medial (Right)',
    'glmax1_r/activation': 'Gluteus Maximus 1 (Right)',
    'glmax2_r/activation': 'Gluteus Maximus 2 (Right)',
    'glmax3_r/activation': 'Gluteus Maximus 3 (Right)',
    'glmed1_r/activation': 'Gluteus Medius 1 (Right)',
    'glmed2_r/activation': 'Gluteus Medius 2 (Right)',
    'glmed3_r/activation': 'Gluteus Medius 3 (Right)',
    'glmin1_r/activation': 'Gluteus Minimus 1 (Right)',
    'glmin2_r/activation': 'Gluteus Minimus 2 (Right)',
    'glmin3_r/activation': 'Gluteus Minimus 3 (Right)',
    'grac_r/activation': 'Gracilis (Right)',
    'iliacus_r/activation': 'Iliacus (Right)',
    'perbrev_r/activation': 'Peroneus Brevis (Right)',
    'perlong_r/activation': 'Peroneus Longus (Right)',
    'piri_r/activation': 'Piriformis (Right)',
    'psoas_r/activation': 'Psoas (Right)',
    'recfem_r/activation': 'Rectus Femoris (Right)',
    'sart_r/activation': 'Sartorius (Right)',
    'semimem_r/activation': 'Semimembranosus (Right)',
    'semiten_r/activation': 'Semitendinosus (Right)',
    'soleus_r/activation': 'Soleus (Right)',
    'tfl_r/activation': 'Tensor Fasciae Latae (Right)',
    'tibant_r/activation': 'Tibialis Anterior (Right)',
    'tibpost_r/activation': 'Tibialis Posterior (Right)',
    'vasint_r/activation': 'Vastus Intermedius (Right)',
    'vaslat_r/activation': 'Vastus Lateralis (Right)',
    'vasmed_r/activation': 'Vastus Medialis (Right)'
}

from collections import defaultdict

# Group muscles by their base name (without _l or _r)
grouped_muscles = defaultdict(list)
for key in plot_muscle_activations_mapping.keys():
    base_name = key.split('_')[0]
    grouped_muscles[base_name].append(key)

# Create a list of muscle pairs
muscle_pairs = []
for base_name, keys in grouped_muscles.items():
    if len(keys) == 2:  # Ensure both left and right muscles are present
        muscle_pairs.append((keys[0], keys[1]))
        
# Number of muscle pairs per page (3 columns * 3 rows)
num_pairs_per_page = 5 * 2

# Split muscle pairs into groups for each page
pages = [muscle_pairs[i:i + num_pairs_per_page] for i in range(0, len(muscle_pairs), num_pairs_per_page)]

# Create dictionaries for each page
plot_muscle_activations_mapping_pages = []
for page in pages:
    page_dict = {}
    for left_key, right_key in page:
        page_dict[left_key] = plot_muscle_activations_mapping[left_key]
        page_dict[right_key] = plot_muscle_activations_mapping[right_key]
    plot_muscle_activations_mapping_pages.append(page_dict)

# Print the dictionaries for each page
for i, page_dict in enumerate(plot_muscle_activations_mapping_pages):
    print(f"Muscle Activation Page {i + 1}:")
    print(page_dict)
    print()



In [None]:

def get_plotting_data(subject,trial,remove_headers=['pelvis_tx','pelvis_ty','pelvis_tz']): 
    # Get the subject data and remove pelvis translation 

    headers = subject['dof_names'] 

    keep_index = [headers.index(header)  for header in plot_names_mapping.keys()]

    plot_headers = [headers[i] for i in keep_index]

    plot_data = {}
    plot_data['kinematics'] = subject[trial]['kinematics'] 
    plot_data['kinematics'] = plot_data['kinematics'][plot_headers]


    plot_headers_kinetics = [ h +'_moment'  for h in plot_headers]
    plot_data['kinetics'] = subject[trial]['kinetics'] 
    plot_data['kinetics'] = plot_data['kinetics'][plot_headers_kinetics]

    for page_index, page_dict in enumerate(plot_muscle_activations_mapping_pages):
        plot_muscle_activations_index = [headers.index(header) for header in page_dict.keys()]
        plot_muscle_activations_headers = [headers[i] for i in plot_muscle_activations_index]
        plot_data[f'muscle_activations-{page_index}'] = subject[trial]['kinematics'][plot_muscle_activations_headers]


    for k in plot_data:
        if 'muscle_activations' in k:
            mc_page_index = int(k.split('-')[-1])
            assert plot_data[k].shape[-1] == len(plot_muscle_activations_mapping_pages[mc_page_index]), f"Length of headers should match headers length. Found:{plot_data[k].shape[-1]} , expected:{plot_muscle_activations_mapping_pages[mc_page_index]}"
        else: 
            assert plot_data[k].shape[-1] == len(plot_headers), f"Length of headers should match headers length. Found:{plot_data[k].shape[-1]} , expected:{len(plot_headers)}"

        plot_data[k] = plot_data[k].to_numpy()

    return plot_headers, plot_data

# get_plotting_data(subjects["015b7571-9f0b-4db4-a854-68e57640640d"],'SQT01_segment_3')

## Temporal Segmentation

In [None]:
import math
import plotly

import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Temporal Segmentation
from scipy.signal import savgol_filter
from scipy.signal import find_peaks

from scipy.interpolate import CubicSpline


def time_normalization(time_series,duration=101): 

    orig_time_space = np.linspace(0,1,len(time_series))
        
    spline = CubicSpline(orig_time_space, time_series)

    spline_input = np.linspace(0,1,duration)
    split_output = spline(spline_input)
            
    return split_output

def find_valleys_in_max_angular_velocity(max_angular_velocity,seconds_per_frame=0.01,allowed_height_difference_threshold=0.1):
    """
        Find peaks in the angular velocity of a time series of rotation vectors.

        Args:
        angular_velocity: A numpy array of shape (N, 24) containing angular velocities in radians per second.
        framerate: The frame rate of the data in Hz.

        Returns: A numpy array of shape (M,) containing the indices of the peaks in the angular velocity.
    """

    diff = np.max(max_angular_velocity) - np.min(max_angular_velocity)
    min_height = np.min(max_angular_velocity) + allowed_height_difference_threshold*diff

    distance_between_valleys = max(1,int(1/(10*seconds_per_frame)))
    # distance_between_valleys = 1

    print(f"distance between valleys={distance_between_valleys}")
    print(f"Max allowed valley height={min_height} allowed_height_difference_threshold={allowed_height_difference_threshold}")
    valleys, meta_data = find_peaks(-max_angular_velocity,distance=distance_between_valleys,height=-min_height)

    print("Valleys",valleys, "Meta data: ",meta_data)

    return valleys


def find_best_n_segments(temporal_segmentation_data, num_segments, duplicate_threshold=0.75, rms_threshold=0.75):
    """
        Validate segments using DTW

    """

    for trial in temporal_segmentation_data:

        max_pose_velocity = temporal_segmentation_data[trial]['max_pose_velocity']
        valleys = temporal_segmentation_data[trial]['change_points']
        

        if len(valleys) < num_segments -1 : continue

        if len(valleys) > 25: 
            valleys = np.sort(np.random.choice(valleys,25,replace=False))

        # Add first and last frame for completeness
        valleys = [0] + list(valleys) + [len(max_pose_velocity)-1]
        valleys = np.array(valleys)

        # In some cases there could a single valley which defines the start and the stop cycle. 
        # If that is the case, we need to duplicate the valley
        # Check if a particular valley is too far to the adjacent valleys. If so, duplicate it.
        # If the distance between the two adjacent valleys is greater than 1/2*average segment duaration, duplicate the valley 
        valley_copys = []
        valley_threshold = (len(max_pose_velocity)/num_segments)*duplicate_threshold
        for i,v in enumerate(valleys):
            if i == 0: continue
            if i == len(valleys)-1: continue 


            if (valleys[i+1] - valleys[i])  > valley_threshold and (valleys[i] - valleys[i-1])  > valley_threshold: 
                valley_copys.append(valleys[i])
        
        valleys = np.concatenate([valleys,valley_copys]).astype(int)
        valleys.sort()


        # Start-Stop candidates
        print(f"Start-Stop Candidates for {trial_name}",valleys)
        temporal_segmentation_data[trial]['change_points'] = valleys

    # from tslearn.barycenters import dtw_barycenter_averaging
    from itertools import combinations

    best_combination = {} 
    best_combination_score = np.inf
    best_combination_data = ()


    ## Hard tests for filtet out invalid combinations 
    rms_sequence = np.sqrt(np.sum(max_pose_velocity**2)) # RMS for entire sequence. If some segment has motion less than the 1/2*num_segments, probably no motion is happening in it.  
    
    for trial in temporal_segmentation_data:
        valleys = temporal_segmentation_data[trial]['change_points']
        max_pose_velocity = temporal_segmentation_data[trial]['max_pose_velocity']
        
        for cur_comb in combinations(valleys, 2*num_segments):
        
            segments = [    [cur_comb[i*2 + 0],cur_comb[i*2 + 1]] for i in range(num_segments)    ]

            rms_segments = [np.sqrt(np.sum(max_pose_velocity[segment[0]:segment[1]]**2)) for segment in segments]

            if np.min(rms_segments) < rms_sequence*rms_threshold/num_segments:
                continue

            print("Testing segments",segments,"RMS Sequence:", rms_sequence,"RMS Threshold:",rms_sequence*rms_threshold/num_segments)
            print(rms_segments)

            if 'valid_segments' not in temporal_segmentation_data[trial]: 
                temporal_segmentation_data[trial]['valid_segments'] = []
            
            temporal_segmentation_data[trial]['valid_segments'].append(segments)
    
    
    def compute_combination_score(combination): 
        
        print(f"Computing combination score:{combination}")
        time_series = []
        for trial in combination: 
            segments = combination[trial]   
            max_pose_velocity = temporal_segmentation_data[trial]['max_pose_velocity']
            # Normalize Time Series
            time_normalized_series = np.array([time_normalization(max_pose_velocity[segment[0]:segment[1]]) for segment in segments])
            time_series.append(time_normalized_series)
        # Minimize STD at for entire duration
        combination_score = np.sum(np.std(time_series,axis=0))
        
        return combination_score        
    
    # Sort the trial names in data
    trials_names = sorted([k for k in temporal_segmentation_data], key=lambda x : int(x.split('_')[-1]))
    
    # Run recursion on all trials and return the combinations with the best matching score acrross trials. on all valid combinations. If num_combiations 
    def dfs(ind, combination):
        if ind == len(temporal_segmentation_data): 
            combination_score = compute_combination_score(combination)
            return combination, combination_score
            
        trial_name = trials_names[ind] 
        if 'valid_segments' not in temporal_segmentation_data[trial_name] or len(temporal_segmentation_data[trial_name]['valid_segments']) == 0:
            return dfs(ind+1, combination)
        
        print(f"{ind} combinations:{combinations}")
        
        best_combination_score = np.inf
        best_combination = None
        for trial_combination in temporal_segmentation_data[trial_name]['valid_segments']: 
            combination[trial_name] = trial_combination
            combination, combination_score = dfs(ind+1, combination)
            
            
            if combination_score < best_combination_score: 
                best_combination_score = combination_score
                best_combination = combination.copy()
        
            del combination[trial_name] # Remove combination, try another combination
        
        print(f"Best Score for: {ind} {best_combination_score} best_combination:{best_combination} ")
        
        return best_combination, best_combination_score
                     
    best_combination, best_combination_score = dfs(0, {}) 
    
    if np.isinf(best_combination_score):
        print("Could not find a valid segments. All combinations below RMS threshold") 
        return {}, np.inf
        
    return best_combination,best_combination_score


def temporal_segementation(data,headers, seconds_per_frame=0.01,visualize=True,num_segments=5,allowed_height_difference_threshold=0.1,fig_title=None,isdeg=True):
    """
        Find segments in the angular velocity of a time series of rotation vectors.
        params:
            data: A dictionary containing the pose_params

        returns: 
            segments: A list of tuples containing the start and end indices of the segments.
    """
    trials_names = sorted([k for k in data], key=lambda x : int(x.split('_')[-1])) 
 
    # Create subplots
    fig = plotly.subplots.make_subplots(rows=2, cols=len(data), subplot_titles=[ f"{trial_index}. Start-Stop:{trial_name} " for trial_index, trial_name in enumerate(trials_names)])
    
    temporal_segmentation_data = {} 
    
    # Sort the trial names in data


    for trial_index, trial_name in enumerate(trials_names):
        
        print(f"Finding valleys in For {trial_index}:{trial_name}")
        
        pose_velocity = plot_data[trial_name]['kinematics']
        
        print(f"  Time series: {pose_velocity.shape}")

        
        # Smoothing Filter ## Note window length and polyorder should be adjusted based on the data
        for i in range(len(headers)): 
            pose_velocity[:,i] = savgol_filter(pose_velocity[:,i], window_length=21, polyorder=3)

        # Only consider knee joints kinematics
        knee_indices = [i for i,header in enumerate(headers) if "knee" in header.lower()]
        max_pose_velocity = np.max(pose_velocity[:,knee_indices],axis=1)

        x = np.arange(len(max_pose_velocity))*seconds_per_frame

        if not isdeg:
            max_pose_velocity = np.rad2deg(max_pose_velocity)

        for i in range(len(headers)): 
            fig.add_trace(go.Scatter(
                x=x,
                y=pose_velocity[:,i],
                mode='lines',
                name=plot_headers[i],
                showlegend=True),row=1,col=trial_index+1)

        fig.add_trace(go.Scatter(
            x=x,
            y=max_pose_velocity,
            mode='lines',
            name='Knee Kinematics',
            showlegend=False
        ),row=2,col=trial_index+1)
        
        change_points = find_valleys_in_max_angular_velocity(max_pose_velocity,seconds_per_frame=seconds_per_frame,allowed_height_difference_threshold=allowed_height_difference_threshold)
        
        fig.add_trace(go.Scatter(
            x=x[change_points],
            y=max_pose_velocity[change_points],
            mode='markers',
            marker=dict(
                color='red',
                size=16,
                symbol='arrow-up'
            ),
            name='change_points'
        ),row=2,col=trial_index+1)
            
        temporal_segmentation_data[trial_name] = {'change_points':change_points, 'max_pose_velocity': max_pose_velocity}
    
    # fig.show()
        
    segments_all_trial,segment_score = find_best_n_segments(temporal_segmentation_data,num_segments=num_segments)
    
            
    for trial_index, trial_name in enumerate(trials_names):

        change_points = temporal_segmentation_data[trial_name]['change_points']
        max_pose_velocity = temporal_segmentation_data[trial_name]['max_pose_velocity']
        segments = np.array(segments_all_trial[trial_name])

        print(segments,trial_name)

        x = np.arange(len(max_pose_velocity))*seconds_per_frame

        if not isdeg:
            max_pose_velocity = np.rad2deg(max_pose_velocity)
        
        # Plot the line segments 
        if not np.isinf(segment_score): 
            empty_array = np.array([None]*segments.shape[0]).reshape((-1,1))
            plot_y_segments = np.tile( np.max(max_pose_velocity).reshape((1,1)), segments.shape )
            plot_y_segments = np.concatenate([plot_y_segments,empty_array],axis=1).reshape(-1)

            plot_x_segments = np.concatenate([segments*seconds_per_frame,empty_array],axis=1).reshape(-1)

            fig.add_trace(go.Scatter(
                x=plot_x_segments,
                y=plot_y_segments,
                line_shape='linear',
                name='Selected Segments'
            ),row=2,col=trial_index+1)

        else: 
            segments = None


    # Update subplot titles
    fig.update_layout(
        title_text=fig_title if fig_title != '' or fig_title is not None else f'Temporal Segmentation using Knee Kinematics',
        font=dict(family="Times New Roman"),
    )
 
    # Set figure size
    fig.update_layout(width=1000, height=400)
    
    fig.update_xaxes(title_text="Time (s)", row=1, col=1)
    fig.update_yaxes(title_text="Max Knee Flexion (deg)", row=1, col=1)


    if visualize: 
        # Show the figure
        fig.show()

    # fig.write_image(image_path + "_angular.png")
    return fig, segments_all_trial



###############################################################

# skip_subjects = ["c08f1d89-c843-4878-8406-b6f9798a558e","0d9e84e9-57a4-4534-aee2-0d0e8d1e7c45","c28e768f-6e2b-4726-8919-c05b0af61e4a","0e10a4e3-a93f-4b4d-9519-d9287d1d74eb","349e4383-da38-4138-8371-9a5fed63a56a"]
# skip_subjects = [mcs_sessions[0], mcs_sessions[1]]
skip_subjects = []
plot_headers = None
for subject_ind, subject_name in tqdm.tqdm(enumerate(mcs_sessions)):
    
    print(f"Evaluating Id: {subject_ind} Name: {subject_name}")
    
    if subject_name in skip_subjects: continue
    
    if subject_name not in subjects: 
        print(f"Subject not found in data:{subject_name}")
        continue
    
    if len(subjects[subject_name]) <= 1:  # If dict is empty skip.
        print(f"Subject is empty:{subjects[subject_name]}")    
        continue
    
    print(f" Data:{subjects[subject_name].keys()}")
    
    
     
    plot_data = {}

    # Get seconds per frame 
    seconds_per_frame = 0 

    for trial_name in subjects[subject_name]: 
        if trial_name == 'dof_names': continue
        
        if trial_name == 'seconds_per_frame': continue  
    

        if len(subjects[subject_name][trial_name]) <= 1:  # If dict is empty skip.
            print(f"Trial is empty:{subjects[subject_name][trial_name]}") 
            continue 
        print(subjects[subject_name][trial_name].keys())
    
        trial_length = subjects[subject_name][trial_name]['kinematics']['time'].iloc[-1] - subjects[subject_name][trial_name]['kinematics']['time'].iloc[0]
        if trial_length < 1: continue # Can't perform squat in less tha a second . 
        
        
        seconds_per_frame += trial_length
        
        plot_headers, plot_data[trial_name] = get_plotting_data(subjects[subject_name],trial_name)
        
        print(seconds_per_frame,subjects[subject_name][trial_name]['kinematics']['time'].iloc[-1],subjects[subject_name][trial_name]['kinematics']['time'].iloc[0])

        print(f"Subject:{subject_name} Trial Index:{trial_name} Length: {trial_length} Headers:{plot_headers}")

    if seconds_per_frame == 0: 
        print("Tracks are empty, skipping subject")
        continue

    assert seconds_per_frame > 0, f"Subject Index:{subject_ind} seconds_per_frame should be greater 0. Likely no trial found to evaluate." 
    
    seconds_per_frame /= sum([len(plot_data[trial_name]['kinematics']) for trial_name in plot_data])


    
    
    fig_title = f"Temporal Segmentation using Knee Kinematics for Subject:{subject_name}"

    num_segments = 1 # Number of segments per trial

    # Temporal Segmentation (using knee angles kinematics since it gave the most reasonable results) 
    segments_fig, segments_all_trials = temporal_segementation(plot_data,plot_headers,\
                                      num_segments=num_segments, seconds_per_frame=seconds_per_frame,\
                                      allowed_height_difference_threshold=0.15,\
                                      isdeg=True,visualize=False,fig_title=fig_title)



    os.makedirs("pdfs",exist_ok=True)
    plotly.io.write_image(segments_fig, f'pdfs/{PPE_Subjects[subject_name]}_segmentation.pdf', format='pdf')

    if len(segments_all_trials) == 0: 
        print("Could not find segments")
        continue  
    
    # Update data information
    for trial_name in segments_all_trials:
        subjects[subject_name][trial_name]['segments'] = segments_all_trials[trial_name]
    
    subjects[subject_name]['seconds_per_frame'] = seconds_per_frame
        

In [None]:
# Merge trials across distributions for trials 
def plot_simulation_data(headers,plot_data,title_text="Plot Data",visualize=True,data_type='kinematics',num_cols=3): 
    
    assert plot_data.shape[-1] == 101, "Length of data should be 101"
    assert len(headers) == plot_data.shape[0], "Length of headers should match headers length"
    assert num_cols > 0, "Number of columns should be greater than 0"
    
    
    num_rows = int(math.ceil(len(headers)/3))
    print(data_type)
    if data_type == 'kinematics' or data_type == 'kinetics':
        fig = make_subplots(rows=num_rows, cols=num_cols, subplot_titles=[plot_names_mapping[header] for header in headers]) 
    elif  'muscle_activations' in data_type:
        fig = make_subplots(rows=num_rows, cols=num_cols, subplot_titles=[plot_muscle_activations_mapping[header] for header in headers])
    else: 
        raise ValueError("Invalid data type. Should be either kinematics or kinetics or muscle_activations")
    
    
    # Colors for left and right sides
    colors = {'left': 'blue', 'right': 'red'}

    # Create each subplot
    for i, header in enumerate(headers):
        row = i // num_cols + 1
        col = i % num_cols + 1

        if 'muscle_activations' in data_type:
            title = plot_muscle_activations_mapping[header]
        elif data_type == 'kinematics' or data_type == 'kinetics':
            title = plot_names_mapping[header]
        else:
            raise ValueError("Invalid data type. Should be either kinematics or kinetics or muscle_activations")


        # Plot every kinematics data
        x = np.linspace(0,1,num=plot_data[i].shape[-1])
        for j in range(plot_data[i].shape[0]):
            fig.add_trace(go.Scatter(x=x, y=plot_data[i,j], name=f'{title}',showlegend=False), row=row, col=col)
    
        # Update y-axis label
        if data_type == 'kinematics':
            fig.update_yaxes(title_text='deg', title_standoff=10, row=row, col=col)
        elif data_type == 'kinetics':
            fig.update_yaxes(title_text='Nm', row=row, col=col)
        else:
            fig.update_yaxes(title_text='0-1', row=row, col=col)
            
        # fig.update_yaxes(title_text='deg', row=row, col=col)

    # Update x-axis label for the bottom row
    for col in range(1, num_rows+1):
        for row in range(1,num_cols+1):
            fig.update_xaxes(title_text='% SQT Cycle (Seconds)', row=row, col=col)

    # Update layout
    plot_width = 1800 if 'muscle_activations' in data_type else 1000
    fig.update_layout(height=2000, width = 1000,
                        showlegend=False,  title_x=0.5,
                        title_text=title_text,
                        font_family="Times New Roman",
                        font_color="black",
                        title_font_family="Times New Roman",
                        title_font_color="black")

    # Show the figure
    if visualize: 
        fig.show()
    
    return fig

# Plot indivifual sample & Store aggregate (mean, std, list ) values 

In [None]:
import copy

# Skip following subjects for torque simulation
# skip_subjects = ["c08f1d89-c843-4878-8406-b6f9798a558e","0d9e84e9-57a4-4534-aee2-0d0e8d1e7c45","c28e768f-6e2b-4726-8919-c05b0af61e4a","349e4383-da38-4138-8371-9a5fed63a56a", "0e10a4e3-a93f-4b4d-9519-d9287d1d74eb",]


# Skip following subjects for muscle simulation
# skip_subjects = ["c08f1d89-c843-4878-8406-b6f9798a558e", "0d9e84e9-57a4-4534-aee2-0d0e8d1e7c45", "c28e768f-6e2b-4726-8919-c05b0af61e4a", "349e4383-da38-4138-8371-9a5fed63a56a", "3d1207bf-192b-486a-b509-d11ca90851d7",   "0e10a4e3-a93f-4b4d-9519-d9287d1d74eb", "2345d831-6038-412e-84a9-971bc04da597", ""] # Skip subject for muscle simulation  
skip_subjects = []
num_segments = 1

# mcs_sessions = ["349e4383-da38-4138-8371-9a5fed63a56a","015b7571-9f0b-4db4-a854-68e57640640d","c613945f-1570-4011-93a4-8c8c6408e2cf","dfda5c67-a512-4ca2-a4b3-6a7e22599732","7562e3c0-dea8-46f8-bc8b-ed9d0f002a77","275561c0-5d50-4675-9df1-733390cd572f","0e10a4e3-a93f-4b4d-9519-d9287d1d74eb","a5e5d4cd-524c-4905-af85-99678e1239c8","dd215900-9827-4ae6-a07d-543b8648b1da","3d1207bf-192b-486a-b509-d11ca90851d7","c28e768f-6e2b-4726-8919-c05b0af61e4a","fb6e8f87-a1cc-48b4-8217-4e8b160602bf","e6b10bbf-4e00-4ac0-aade-68bc1447de3e","d66330dc-7884-4915-9dbb-0520932294c4","0d9e84e9-57a4-4534-aee2-0d0e8d1e7c45","2345d831-6038-412e-84a9-971bc04da597","0a959024-3371-478a-96da-bf17b1da15a9","ef656fe8-27e7-428a-84a9-deb868da053d","c08f1d89-c843-4878-8406-b6f9798a558e","d2020b0e-6d41-4759-87f0-5c158f6ab86a","8dc21218-8338-4fd4-8164-f6f122dc33d9"]

# mcs_scores = [4,4,2,3,2,4,3,3,2,3,0,3,4,2,2,3,4,4,3,3,3]
# mcs_scores = dict(zip(mcs_sessions,mcs_scores))

# PPE_Subjects = ["PPE09182201","PPE09182202","PPE09182203","PPE09182204","PPE09182205","PPE09182206","PPE09182207","PPE09182208","PPE09182209","PPE091822010","PPE09182211","PPE09182212","PPE09182213","PPE09182214","PPE09182215","PPE09182216","PPE09182217","PPE09182218","PPE09182219","PPE09182220","PPE09182221"]
# PPE_Subjects = dict(zip(mcs_sessions,PPE_Subjects))

############### STORE Manual segmentation results here: 
# manually_segment_subjects_list = [("3d1207bf-192b-486a-b509-d11ca90851d7","SQT01_segment_1"),
#                                   ("3d1207bf-192b-486a-b509-d11ca90851d7","SQT01_segment_2"),
#                                   ("3d1207bf-192b-486a-b509-d11ca90851d7","SQT01_segment_3"), 
                                  
#                                   ("2345d831-6038-412e-84a9-971bc04da597","SQT01_segment_1")]

manual_segments = {} 
# for subject_name,trial_name in manually_segment_subjects_list: 
#     if subject_name not in manual_segments: 
#         manual_segments[subject_name] = {}
#         continue 
    
#     if trial_name not in manual_segments[subject_name]:
#         manual_segments[subject_name][trial_name] = np.zeros((1,2)) # If not segment found, skip the trial
#         continue 
    
#     if 'segments' not in subjects[subject_name][trial_name]: 
#         continue 
    
#     manual_segments[subject_name][trial_name] = copy.deepcopy(subjects[subject_name][trial_name]['segments'])

    
# # Check and update the first set of keys
# if "3d1207bf-192b-486a-b509-d11ca90851d7" in manual_segments:
#     if "SQT01_segment_3" in manual_segments["3d1207bf-192b-486a-b509-d11ca90851d7"]:
#         manual_segments["3d1207bf-192b-486a-b509-d11ca90851d7"]["SQT01_segment_3"][0][0] += 15
#         manual_segments["3d1207bf-192b-486a-b509-d11ca90851d7"]["SQT01_segment_3"][0][1] += 15

#     # if "SQT01_segment_1" in manual_segments["3d1207bf-192b-486a-b509-d11ca90851d7"]:
#     #     manual_segments["3d1207bf-192b-486a-b509-d11ca90851d7"]["SQT01_segment_1"][0][0] += 40
#     #     manual_segments["3d1207bf-192b-486a-b509-d11ca90851d7"]["SQT01_segment_1"][0][1] += 40

# # Check and update the second set of keys
# if "2345d831-6038-412e-84a9-971bc04da597" in manual_segments:
#     if "SQT01_segment_1" in manual_segments["2345d831-6038-412e-84a9-971bc04da597"]:
#         manual_segments["2345d831-6038-412e-84a9-971bc04da597"]["SQT01_segment_1"][0][0] += 40 
        




##########################################################


os.makedirs("pdfs",exist_ok=True)

aggregate_data = {}
mcs_aggregate_data = {2:{}, 3:{}, 4:{}, 0:{}, -1:{}}
for plotting_variable in ['kinematics','kinetics']:
    aggregate_data[plotting_variable] = {}
    aggregate_data[plotting_variable]['mean'] = np.zeros((len(plot_names_mapping),101))
    aggregate_data[plotting_variable]['std'] = np.zeros((len(plot_names_mapping),101))


######## For muscle activations  #####################
for page_index, page_dict in enumerate(plot_muscle_activations_mapping_pages):
    aggregate_data[f'muscle_activations-{page_index}'] = {}
    aggregate_data[f'muscle_activations-{page_index}']['mean'] = np.zeros((len(page_dict),101))
    aggregate_data[f'muscle_activations-{page_index}']['std'] = np.zeros((len(page_dict),101))



total_trials = 0

for subject_ind, subject_name in tqdm.tqdm(enumerate(subjects)):

    # Check if all the details that have to be plotted exist    
    if subject_name in skip_subjects: 
        continue 
    
    if len(subjects[subject_name]) <= 1:  # If dict is empty skip.
        print(f"Subject is empty:{subjects[subject_name]}")    
        continue
    
    print(f" Data:{subjects[subject_name].keys()}")
    
    plot_headers = None
    plot_data = {}

    for trial_name in subjects[subject_name]: 
        if trial_name == 'dof_names': continue
        if trial_name == 'seconds_per_frame': continue  
        
        if len(subjects[subject_name][trial_name]) <= 1:  # If dict is empty skip.
            print(f"Trial is empty:{subjects[subject_name][trial_name]}") 
            continue 
        print(subjects[subject_name][trial_name].keys())
    
        trial_length = subjects[subject_name][trial_name]['kinematics']['time'].iloc[-1] - subjects[subject_name][trial_name]['kinematics']['time'].iloc[0]
        if trial_length < 1: continue # Can't perform squat in less tha a second. 
        
 
        plot_headers, plot_data_trial = get_plotting_data(subjects[subject_name],trial_name)
        
    
        # Temporal Segmentation (using knee angles kinematics since it gave the most reasonable results) 
        try: 
            if subject_name in manual_segments and trial_name in manual_segments[subject_name]:
                segments = manual_segments[subject_name][trial_name]
            else: 
                segments = subjects[subject_name][trial_name]['segments']            
        except Exception as e: 
            print(f"Error computing segments using temporal segmetnation",e) 
            continue
        
        
        segment_time = sum([  segments[i][1] - segments[i][0] for i in range(len(segments))])*subjects[subject_name]['seconds_per_frame']
        
        print(f"    Subject:{subject_name} Trial Index:{trial_name} Length: {trial_length} Segment Length:{segment_time}  {segments} Headers:{plot_headers}")
        
        for plotting_variable in plot_data_trial:
            assert len(plot_data_trial[plotting_variable].shape) == 2, "Data should be 2D"

            time_normalized_series = [time_normalization(plot_data_trial[plotting_variable][segment[0]:segment[1]]) for segment in segments if segment[1] > segment[0] ] 
            
            if plotting_variable not in plot_data:
                plot_data[plotting_variable] = []
            plot_data[plotting_variable].extend(time_normalized_series)

    if len(plot_data) == 0: # Tracks are empty  
        continue
    
    for plotting_variable in plot_data: 
        plot_data[plotting_variable] = np.array(plot_data[plotting_variable]).transpose((2,0,1))

        print(subject_name)

        fig_title = f"{plotting_variable} for Subject:{PPE_Subjects[subject_name]} MCS:{mcs_scores[subject_name]}"
        if plotting_variable == 'kinematics' or plotting_variable == 'kinetics':
            fig = plot_simulation_data(plot_headers, plot_data[plotting_variable], fig_title, visualize=False , data_type=plotting_variable)
        elif 'muscle_activations' in plotting_variable:
            mc_page_index = int(plotting_variable.split('-')[-1])
            plot_mc_headers = plot_muscle_activations_mapping_pages[mc_page_index]
            fig = plot_simulation_data(plot_mc_headers, plot_data[plotting_variable], fig_title, visualize=False , data_type=plotting_variable,num_cols = 4)
        else: 
            raise ValueError(f"Unknown plotting variable:{plotting_variable}")        
        plotly.io.write_image(fig, f'pdfs/{PPE_Subjects[subject_name]}_{plotting_variable}.pdf', format='pdf')


        if not np.isnan(plot_data[plotting_variable]).any(): 
            aggregate_data[plotting_variable]['mean'] += plot_data[plotting_variable].sum(axis=1)
            aggregate_data[plotting_variable]['std'] += (plot_data[plotting_variable]**2).sum(axis=1)
            
            mcs_score = mcs_scores[subject_name]

            if plotting_variable not in mcs_aggregate_data[mcs_score]: 
                mcs_aggregate_data[mcs_score][plotting_variable] = {} 
                
                if 'muscle_activations' in plotting_variable:
                    mc_page_index = int(plotting_variable.split('-')[-1])
                    mcs_aggregate_data[mcs_score][plotting_variable]['mean'] = np.zeros((len(plot_muscle_activations_mapping_pages[mc_page_index]),101))
                    mcs_aggregate_data[mcs_score][plotting_variable]['std'] = np.zeros((len(plot_muscle_activations_mapping_pages[mc_page_index]),101))
                    mcs_aggregate_data[mcs_score][plotting_variable]['list'] = np.zeros((0,len(plot_muscle_activations_mapping_pages[mc_page_index]),101))
                elif plotting_variable == 'kinematics' or plotting_variable == 'kinetics': 
                    mcs_aggregate_data[mcs_score][plotting_variable]['mean'] = np.zeros((len(plot_names_mapping),101))
                    mcs_aggregate_data[mcs_score][plotting_variable]['std'] = np.zeros((len(plot_names_mapping),101))
                    mcs_aggregate_data[mcs_score][plotting_variable]['list'] = np.zeros((0,len(plot_names_mapping),101))

                else: 
                    raise ValueError(f"Unknown plotting variable:{plotting_variable}")

                mcs_aggregate_data[mcs_score][plotting_variable]['ppe_names'] = []
                mcs_aggregate_data[mcs_score][plotting_variable]['ppe_trial'] = []
                mcs_aggregate_data[mcs_score][plotting_variable]['total_trials'] = 0
            
            mcs_aggregate_data[mcs_score][plotting_variable]['mean'] += plot_data[plotting_variable].sum(axis=1)
            mcs_aggregate_data[mcs_score][plotting_variable]['std'] += (plot_data[plotting_variable]**2).sum(axis=1)

            mcs_aggregate_data[mcs_score][plotting_variable]['list'] = np.concatenate([ mcs_aggregate_data[mcs_score][plotting_variable]['list'],
                                                                                       np.transpose(plot_data[plotting_variable], (1,0,2)   ) ])
            
            mcs_aggregate_data[mcs_score][plotting_variable]['ppe_names'].extend([PPE_Subjects[subject_name]]*plot_data[plotting_variable].shape[1])
            # mcs_aggregate_data[mcs_score][plotting_variable]['ppe_trial'].extend([trial_index]*plot_data[plotting_variable].shape[1])
            mcs_aggregate_data[mcs_score][plotting_variable]['total_trials'] += plot_data[plotting_variable].shape[1]
            
            
    total_trials += plot_data[plotting_variable].shape[1]
        



for k in aggregate_data:
    aggregate_data[k]['mean'] /= total_trials
    aggregate_data[k]['std'] = np.sqrt(aggregate_data[k]['std']/total_trials - aggregate_data[k]['mean']**2)

    # break

for mcs_score in mcs_aggregate_data:
    for plotting_variable in mcs_aggregate_data[mcs_score]: 
        mcs_aggregate_data[mcs_score][plotting_variable]['mean'] /= mcs_aggregate_data[mcs_score][plotting_variable]['total_trials']
        mcs_aggregate_data[mcs_score][plotting_variable]['std'] = np.sqrt(mcs_aggregate_data[mcs_score][plotting_variable]['std']/mcs_aggregate_data[mcs_score][plotting_variable]['total_trials'] - mcs_aggregate_data[mcs_score][plotting_variable]['mean']**2)

In [None]:
mcs_scores

In [None]:
plot_data.keys()

In [None]:
assert plot_headers is not None, "Something went wrong. Headers should not be None"

# Plot Aggregate values

In [None]:
from matplotlib.pyplot import plot


def plot_data_distribution(headers,plot_data_mean,plot_data_std,title_text="Plot Data",data_type="kinematics",num_cols=3,visualize=False): 
    
    # assert plot_data.shape[-1] == 101, "Length of data should be 101"
    num_rows = int(math.ceil(len(headers)/num_cols))


    if data_type == 'kinematics' or data_type == 'kinetics':
        fig = make_subplots(rows=num_rows, cols=num_cols, subplot_titles=[ plot_names_mapping[header] + ( ' moments' if data_type == 'kinetics' else '' )  for header in headers]) 
    elif  'muscle_activations' in data_type:
        fig = make_subplots(rows=num_rows, cols=num_cols, subplot_titles=[plot_muscle_activations_mapping[header] for header in headers])
    else: 
        raise ValueError("Invalid data type. Should be either kinematics or kinetics or muscle_activations")


    # Colors for left and right sides
    colors = {'left': 'blue', 'right': 'red'}

    # Create each subplot
    for i, header in enumerate(headers):
        row = i // num_cols + 1
        col = i % num_cols + 1
        
        if i >= len(headers): 
            break


        if 'muscle_activations' in data_type:
            title = plot_muscle_activations_mapping[header]
        elif data_type == 'kinematics' or data_type == 'kinetics':
            title = plot_names_mapping[header]
        else:
            raise ValueError("Invalid data type. Should be either kinematics or kinetics or muscle_activations")

        # Plot every kinematics data
        x = np.linspace(0,1,num=plot_data_mean[i].shape[-1])

    
        fig.add_trace(go.Scatter(x=x, y=plot_data_mean[i], showlegend=False, name=f'{title}'), row=row, col=col)
        fig.add_trace(go.Scatter(x=list(x) + list(x)[::-1], 
                                    y=list(plot_data_mean[i] + np.array(plot_data_std[i])) + list(np.array(plot_data_mean[i]) - np.array(plot_data_std[i]))[::-1] ,
                                        mode='lines', line=dict(width=0), name=f'{title} Bounds', showlegend=False, fill='toself',hoverinfo="skip",),
        row=row, col=col)

        # Update y-axis label
        if data_type == 'kinematics':
            fig.update_yaxes(title_text='deg', row=row, col=col)
        elif data_type == 'kinetics':
            fig.update_yaxes(title_text='Nm', row=row, col=col)
        elif 'muscle_activations' in data_type: 
            fig.update_yaxes(title_text='0-1', row=row, col=col)

        fig.update_xaxes(title_text='% SQT Cycle (Seconds)', row=row, col=col)

    

    # Update layout
    # plot_height = 1800 if 'muscle_activations' in data_type else 1000
    fig.update_layout(height=2000, width=1000,
                        showlegend=False,  title_x=0.5,
                        title_text=title_text,
                        font_family="Times New Roman",
                        font_color="black",
                        title_font_family="Times New Roman",
                        title_font_color="black")

    # Show the figure
    if visualize:
        fig.show()
    
    return fig

for k in aggregate_data:
    if k == 'kinematics' or k == 'kinetics':
        fig = plot_data_distribution(plot_headers, aggregate_data[k]['mean'],aggregate_data[k]['std'], f"Aggregate {k} Data",data_type=k)
        plotly.io.write_image(fig, f'pdfs/all_subject_{k}.pdf', format='pdf')
    
    elif 'muscle_activations' in k:
        mc_page_index = int(k.split('-')[-1])
        fig = plot_data_distribution(plot_muscle_activations_mapping_pages[mc_page_index], aggregate_data[k]['mean'],aggregate_data[k]['std'], f"Aggregate {k} Data",data_type=k)
        plotly.io.write_image(fig, f'pdfs/all_subject_{k}.pdf', format='pdf')
    else: 
        raise ValueError(f"Unknown plotting variable:{k}")

In [None]:
# Plot Per MCS data

def plot_mcs_distribution(headers,mcs_aggregate_data,title_text="Plot Data",data_type="kinematics", num_cols=3): 
    
    # assert plot_data.shape[-1] == 101, "Length of data should be 101"
    num_rows = int(math.ceil(len(headers)/num_cols))

    # Create 4x4 subplots (we'll only use 14 of them)
    if data_type == 'kinematics' or data_type == 'kinetics':
        fig = make_subplots(rows=num_rows, cols=num_cols, subplot_titles=[plot_names_mapping[header] + ( ' moments' if data_type == 'kinetics' else '' ) for header in headers])
    elif  'muscle_activations' in data_type:
        fig = make_subplots(rows=num_rows, cols=num_cols, subplot_titles=[plot_muscle_activations_mapping[header] for header in headers])
    else:
        raise ValueError("Invalid data type. Should be either kinematics or kinetics or muscle_activations")


    # Colors for left and right sides
    colors = {2: 'rgba(255, 0, 0, 0.2)', 3: 'rgba(0, 0, 255, 0.2)' , 4: 'rgba(0, 255, 0, 0.2)', -1: 'rgba(0, 0, 0, 0.2)', 0: 'rgba(0, 0, 0, 0.2)' }
    colors_mean = {2: 'rgba(255, 0, 0, 1.0)', 3: 'rgba(0, 0, 255, 1.0)' , 4: 'rgba(0, 255, 0, 1.0)', -1: 'rgba(0, 0, 0, 1.0)', 0: 'rgba(0, 0, 0, 1.0)'}

    y_max = 0 
    y_min = 0 

    # Create each subplot
    for i, header in enumerate(headers):
        row = i // num_cols + 1
        col = i % num_cols + 1
        
        if i >= len(headers): 
            break
        
        if 'muscle_activations' in data_type:
            title = plot_muscle_activations_mapping[header]
        elif data_type == 'kinematics' or data_type == 'kinetics':
            title = plot_names_mapping[header]
        else:
            raise ValueError("Invalid data type. Should be either kinematics or kinetics or muscle_activations")

        # Plot every kinematics data
        x = None

        for mcs_score in mcs_aggregate_data: 
            if len(mcs_aggregate_data[mcs_score]) == 0: 
                continue  
            plot_data_mean = mcs_aggregate_data[mcs_score][data_type]['mean']
            plot_data_std = mcs_aggregate_data[mcs_score][data_type]['std']
            # print(plot_data_mean.shape)
            if x is None: 
                x = np.linspace(0,1,num=plot_data_mean[i].shape[-1]) 

            fig.add_trace(go.Scatter(x=x, y=plot_data_mean[i], showlegend = (i==len(headers)-1), 
                                    name=f'MCS:{mcs_score}',line=dict(color=colors_mean[mcs_score])), row=row, col=col)
            fig.add_trace(go.Scatter(x=list(x) + list(x)[::-1], 
                                        y=list(plot_data_mean[i] + np.array(plot_data_std[i]/2)) + list(np.array(plot_data_mean[i]) - np.array(plot_data_std[i]/2))[::-1] ,
                                            mode='lines', line=dict(width=0), name=f'MCS:{mcs_score} Bounds', showlegend=False, fill='toself',hoverinfo="skip",fillcolor=colors[mcs_score]), row=row, col=col)

            y_min = min(y_min, np.min(plot_data_mean[i] - plot_data_std[i]/2))
            y_max = max(y_max, np.max(plot_data_mean[i] + plot_data_std[i]/2))
            
        # Update y-axis label
        if data_type == 'kinematics':
            fig.update_yaxes(title_text='deg', row=row, col=col)
        elif data_type == 'kinetics':
            fig.update_yaxes(title_text='Nm', row=row, col=col)
        else:
            fig.update_yaxes(title_text='0-1', row=row, col=col)
        # fig.update_yaxes(title_text='deg', row=row, col=col)

        fig.update_xaxes(title_text='% SQT Cycle (Seconds)', row=row, col=col)

    

    # Update layout
    fig.update_layout(height=2000, width=1000,
                        showlegend=True,  title_x=0.5,
                        title_text=title_text,
                        font_family="Times New Roman",
                        font_color="black",
                        title_font_family="Times New Roman",
                        title_font_color="black")

    # fig.update_yaxes(range=[y_min,y_max])
    
    # Show the figure
    fig.show()
    
    return fig


for k in ['kinematics','kinetics']:
    fig = plot_mcs_distribution(plot_headers, mcs_aggregate_data, f"MCS {k} Distributions",data_type=k)
    plotly.io.write_image(fig, f'pdfs/mcs_subject_{k}_distribution.pdf', format='pdf')
    
for page_index,ma_page in enumerate(plot_muscle_activations_mapping_pages):
    k = f'muscle_activations-{page_index}'
    fig = plot_mcs_distribution(ma_page.keys(), mcs_aggregate_data, f"MCS {k} Distributions",data_type=k)
    plotly.io.write_image(fig, f'pdfs/mcs_subject_{k}_distribution.pdf', format='pdf')

# Plot all mcs lines

In [None]:
# Plot Per MCS data

from math import e


def plot_mcs_data(headers,mcs_aggregate_data,title_text="Plot Data",data_type="kinematics",num_cols=3, visualize=False, mcs_scores = None): 
    
    mcs_scores = mcs_aggregate_data.keys() if mcs_scores is None else mcs_scores

    # assert plot_data.shape[-1] == 101, "Length of data should be 101"
    num_rows = int(math.ceil(len(headers)/num_cols))

    # Create 4x4 subplots (we'll only use 14 of them)
    if data_type == 'kinematics' or data_type == 'kinetics':
        fig = make_subplots(rows=num_rows, cols=num_cols, subplot_titles=[plot_names_mapping[header] + ( ' moments' if data_type == 'kinetics' else '' ) for header in headers])
    elif  'muscle_activations' in data_type:
        fig = make_subplots(rows=num_rows, cols=num_cols, subplot_titles=[plot_muscle_activations_mapping[header] for header in headers])
    else:
        raise ValueError("Invalid data type. Should be either kinematics or kinetics or muscle_activations")
    

    colors = {2: 'rgba(255, 0, 0, 0.2)', 3: 'rgba(0, 0, 255, 0.2)' , 4: 'rgba(0, 255, 0, 0.2)', -1: 'rgba(0, 0, 0, 0.2)', 0: 'rgba(0, 0, 0, 0.2)' }
    colors_mean = {2: 'rgba(255, 0, 0, 1.0)', 3: 'rgba(0, 0, 255, 1.0)' , 4: 'rgba(0, 255, 0, 1.0)', -1: 'rgba(0, 0, 0, 0.1)', 0: 'rgba(0, 0, 0, 0.2)'}
    # Colors for left and right sides
    # colors = {2: 'rgba(255, 0, 0, 0.2)', 3: 'rgba(0, 0, 255, 0.2)' , 4: 'rgba(0, 255, 0, 0.2)', -1: 'rgba(0, 0, 0, 0.2)' }
    # colors_mean = {2: 'rgba(255, 0, 0, 1.0)', 3: 'rgba(0, 0, 255, 1.0)' , 4: 'rgba(0, 255, 0, 1.0)', -1: 'rgba(0, 0, 0, 1.0)'}

    y_min = 0 # Max value for y-axis
    y_max = 0 # Max value for y-axis

    # Create each subplot
    for i, header in enumerate(headers):
        row = i // num_cols + 1
        col = i % num_cols + 1
        
        if i >= len(headers): 
            break   
        
        if 'muscle_activations' in data_type:
            title = plot_muscle_activations_mapping[header]
        elif data_type == 'kinematics' or data_type == 'kinetics':
            title = plot_names_mapping[header]
        else: 
            raise ValueError("Invalid data type. Should be either kinematics or kinetics or muscle_activations")

        # Plot every kinematics data
        x = None

        for mcs_score in mcs_scores: 
            
            if data_type not in mcs_aggregate_data[mcs_score]: 
                continue 
            for y_ind, y in enumerate(mcs_aggregate_data[mcs_score][data_type]['list']):

                if x is None: 
                    x = np.linspace(0,1,num=y.shape[-1])                            

                show_lengend = (i==len(headers)-1) and y_ind==0 
                if show_lengend:
                    plot_name = f"MCS:{mcs_score}"
                else: 
                    plot_name = f'{mcs_aggregate_data[mcs_score][data_type]["ppe_names"][y_ind]}'

                fig.add_trace(go.Scatter(x=x, y=y[i], showlegend=show_lengend, 
                                        name=plot_name,line=dict(color=colors_mean[mcs_score])), row=row, col=col)
                y_min = min(y_min, y[i].min()) 
                y_max = max(y_max, y[i].max())
        # Update y-axis label
        if data_type == 'kinematics':
            fig.update_yaxes(title_text='deg', title_standoff=0, row=row, col=col)
        elif data_type == 'kinetics':
            fig.update_yaxes(title_text='Nm', title_standoff=0, row=row, col=col)
        else:
            fig.update_yaxes(title_text='0-1', title_standoff=0, row=row, col=col)

        fig.update_xaxes(title_text='% SQT Cycle (Seconds)', row=row, col=col)



    # Update layout
    # plot_width = 1800 if 'muscle_activations' in data_type else 1000
    # print("Setting y-axis range:",y_min,y_max)
    # fig.update_yaxes(range=[y_min, y_max])
    
    fig.update_layout(height=2000, width=1000,
                        showlegend=True,  title_x=0.5,
                        title_text=title_text,
                        font_family="Times New Roman",
                        font_color="black",
                        title_font_family="Times New Roman",
                        title_font_color="black",
                        )
    

    # Show the figure
    if visualize:
        fig.show()
    
    return fig


for k in ['kinematics','kinetics']:
    
    fig = plot_mcs_data(plot_headers, mcs_aggregate_data, f"MCS {k}",data_type=k,visualize=True)
    plotly.io.write_image(fig, f'pdfs/mcs_subject_{k}.pdf', format='pdf')
    
for page_index,ma_page in enumerate(plot_muscle_activations_mapping_pages):
    k = f'muscle_activations-{page_index}'
    fig = plot_mcs_data(ma_page.keys(), mcs_aggregate_data, f"MCS {k}",data_type=k,visualize=True, mcs_scores=[x for x in mcs_aggregate_data.keys() if x > 1])
    plotly.io.write_image(fig, f'pdfs/mcs_subject_{k}.pdf', format='pdf')

In [None]:

mcs_aggregate_data[mcs_score].keys()

In [None]:
from pypdf import PdfMerger
import glob

pdfs = os.listdir("pdfs")
pdfs = sorted(pdfs)

if ".ipynb_checkpoints" in pdfs:
    pdfs.remove(".ipynb_checkpoints")

for i in range(3,-1,-1):
    pdfs.remove(f"all_subject_muscle_activations-{i}.pdf")
    pdfs.insert(0,f"all_subject_muscle_activations-{i}.pdf")


pdfs.remove("all_subject_kinetics.pdf")
pdfs.insert(0,"all_subject_kinetics.pdf")

pdfs.remove("all_subject_kinematics.pdf")
pdfs.insert(0,"all_subject_kinematics.pdf")



for i in range(3,-1,-1):
    pdfs.remove(f"mcs_subject_muscle_activations-{i}.pdf")
    pdfs.insert(0,f"mcs_subject_muscle_activations-{i}.pdf")

pdfs.remove("mcs_subject_kinetics.pdf")
pdfs.insert(0,"mcs_subject_kinetics.pdf")

pdfs.remove("mcs_subject_kinematics.pdf")
pdfs.insert(0,"mcs_subject_kinematics.pdf")



for i in range(3,-1,-1):
    try:
        pdfs.remove(f"mcs_subject_muscle_activations-{i}_distribution.pdf")
        pdfs.insert(0,f"mcs_subject_muscle_activations-{i}_distribution.pdf")
    except Exception as e:
        print(f"mcs_subject_muscle_activations-{i}_distribution.pdf")
        

pdfs.remove("mcs_subject_kinetics_distribution.pdf")
pdfs.insert(0,"mcs_subject_kinetics_distribution.pdf")

pdfs.remove("mcs_subject_kinematics_distribution.pdf")
pdfs.insert(0,"mcs_subject_kinematics_distribution.pdf")


PPE_Subjects2session = {v:k for k,v in PPE_Subjects.items()}

# Remove individual plots for non mcs scores
keep_indices = []
for i in range(len(pdfs)):
    name = pdfs[i].split("_")[0]
    if name in PPE_Subjects2session: 
        # print(f"Checking:{name}")
        if PPE_Subjects2session[name] in mcs_scores and mcs_scores[PPE_Subjects2session[name]] > 1:
            # print(f"keep_indices:{name} MCS:{mcs_scores[PPE_Subjects2session[name]]}")
            keep_indices.append(i)
        else: 
            continue
    elif "subject" in pdfs[i]:
        keep_indices.append(i)
    

pdfs = [pdfs[i] for i in keep_indices]



merger = PdfMerger()

for pdf in pdfs:
    merger.append(os.path.join("pdfs",pdf))

merger.write("MCS-SQT-Muscle-Simulation.pdf")
merger.close()


# Plot the subject info

In [None]:
for mcs_score in mcs_aggregate_data:
    data_type = 'kinematics'
    if data_type not in mcs_aggregate_data[mcs_score]:
        print(mcs_score, {}, 0)
    else:
        subjects_set = set(mcs_aggregate_data[mcs_score][data_type]["ppe_names"])
        print(mcs_score, sorted(subjects_set), len(mcs_aggregate_data[mcs_score][data_type]["ppe_names"]))