## Code Information
<small> <quote>
**Author:**  Shafagh Keyvanian [shkey@seas.upenn.edu]  <br>
**Date**: *Spring 2024*
</quote> </small>

<Large> <quote>
**All Subjects**<br>
**Filter extracted angles (7dof)**<br>
**Bones: Hip, Ab, Chest, UArm, FArm, Hand** </quote> </Large>

Input:  
.npz file containing Upper-body Euler angles
- Folder: 
<small> <blockquote> ../data/prossesed_exports/ </blockquote> </small>
- Columns:  
<small> <blockquote> 'time', 'eul_right', 'eul_left', 'eul_head' </blockquote> </small>
- Chains:  
<small> <blockquote>
        chain_R = ['Skeleton', 'Ab', 'Chest', 'RUArm','RFArm','RHand']<br>
        chain_L = ['Skeleton', 'Ab', 'Chest', 'LUArm','LFArm','LHand']<br>
        chain_H = ['Skeleton', 'Ab', 'Chest', 'Neck', 'Head'] </blockquote> </small>
    
Output: 
- arms_dof 
<small> <blockquote> ['sh_abd', 'sh_rot', 'sh_ext', 'el_fle', 'el_sup', 'wr_dev', 'wr_sup', 'wr_fle'] </blockquote> </small>
- Data diagnostics
- Plots

In [None]:
#%% import libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
from mpl_toolkits.mplot3d import Axes3D
from tf.transformations import quaternion_matrix, euler_from_matrix, euler_matrix

In [None]:
#%% Input data: 7 DoF

PLOT = True     # Plot the data
n_ax = 3        # angles, first derivative, second derivative

# Initialize variables
n_dof = 8    
dof_names = ['Shoulder Abduction', 'Shoulder Rotation', 'Shoulder Extension', 'Elbow Flexion', \
             'Elbow Supination', 'Wrist Deviation', 'Wrist Supination', 'Wrist Flexion']
dof_codes = ['sh_abd', 'sh_rot', 'sh_ext', 'el_fle', 'el_sup', 'wr_dev', 'wr_sup', 'wr_fle']

n_subjects = 5
subject_labels = ['Subject 0', 'Subject 1', 'Subject 2', 'Subject 3', 'Subject 4', 'All Subjects']

# Open npz file for each subject
npz_file_0 = '../data/processed_exports/subject0_7dof_eul_angles.npz'
npz_file_1 = '../data/processed_exports/subject1_7dof_eul_angles.npz'
npz_file_2 = '../data/processed_exports/subject2_7dof_eul_angles.npz'
npz_file_3 = '../data/processed_exports/subject3_7dof_eul_angles.npz'
npz_file_4 = '../data/processed_exports/subject4_7dof_eul_angles.npz'

npz_data_0, npz_data_1, npz_data_2, npz_data_3, npz_data_4 = \
    np.load(npz_file_0), np.load(npz_file_1), np.load(npz_file_2), np.load(npz_file_3), np.load(npz_file_4)

for s in range(n_subjects):
    exec(f"time_{s}, eul_right_{s}, eul_left_{s} = \
         npz_data_{s}['time'], npz_data_{s}['eul_right'], npz_data_{s}['eul_left']")


print("{:^9} {:^16} {:^16} {:^20} {:^17} {:^20} {:^17} {:^16} {:^16}".format\
      ("", "Shoulder", "Shoulder", "Shoulder", "Elbow", "Elbow", "Wrist", "Wrist", "Wrist"))
print("{:^9} {:^16} {:^16} {:^20} {:^17} {:^20} {:^17} {:^16} {:^16}".format\
      ("", "Abduction", "Rotation", "Extension", "Flexion", "Supination", "Deviation", "Supination", "Flexion"))
print("{:<10} {:<8} {:<8} {:<8} {:<8} {:<8} {:<11} {:<8} {:<8} {:<8} {:<11} {:<8} {:<8} {:<8} {:<8} {:<8} {:<12}".format\
      ("Subject", "min", "max", "min", "max", "min", "max", "min", "max", "min", "max", "min", "max", "min", "max", "min", "max"))

#%% Plot the data vs time for diagnostics
i_end_s = np.zeros(n_subjects, dtype=int)
colors = ['b', 'g', 'r', 'c', 'm', 'orange', 'purple', 'brown']

min_angles = np.zeros((n_subjects, n_dof))
max_angles = np.zeros((n_subjects, n_dof))

for s in range(n_subjects):   
    side = 'R' if s == 0 or s == 4 else 'L'

    # Arm angles (+ and - for right and left arm consistency)
    exec(f"R_sh_abd_{s} = -np.around(eul_right_{s}[:,3,0] * 180/np.pi, 2)")
    exec(f"R_sh_rot_{s} =  np.around(eul_right_{s}[:,3,1] * 180/np.pi, 2)")
    exec(f"R_sh_ext_{s} = -np.around(eul_right_{s}[:,3,2] * 180/np.pi, 2)")
    exec(f"R_el_fle_{s} =  np.around(eul_right_{s}[:,4,0] * 180/np.pi, 2)")
    exec(f"R_el_sup_{s} =  np.around(eul_right_{s}[:,4,1] * 180/np.pi, 2)")
    exec(f"R_wr_dev_{s} = -np.around(eul_right_{s}[:,5,0] * 180/np.pi, 2)")
    exec(f"R_wr_sup_{s} =  np.around(eul_right_{s}[:,5,1] * 180/np.pi, 2)")
    exec(f"R_wr_fle_{s} = -np.around(eul_right_{s}[:,5,2] * 180/np.pi, 2)")

    exec(f"L_sh_abd_{s} =  np.around(eul_left_{s}[:,3,0] * 180/np.pi, 2)")
    exec(f"L_sh_rot_{s} =  np.around(eul_left_{s}[:,3,1] * 180/np.pi, 2)")
    exec(f"L_sh_ext_{s} =  np.around(eul_left_{s}[:,3,2] * 180/np.pi, 2)")
    exec(f"L_el_fle_{s} = -np.around(eul_left_{s}[:,4,0] * 180/np.pi, 2)")
    exec(f"L_el_sup_{s} =  np.around(eul_left_{s}[:,4,1] * 180/np.pi, 2)")
    exec(f"L_wr_dev_{s} =  np.around(eul_left_{s}[:,5,0] * 180/np.pi, 2)")
    exec(f"L_wr_sup_{s} =  np.around(eul_left_{s}[:,5,1] * 180/np.pi, 2)")
    exec(f"L_wr_fle_{s} =  np.around(eul_left_{s}[:,5,2] * 180/np.pi, 2)")
    
    # Healthy arm angles:
    for d, dof_code in enumerate(dof_codes):
        exec(f"{dof_code}_{s} = {side}_{dof_code}_{s}")
        exec(f"min_angles[s, d], max_angles[s, d] = min({dof_code}_{s}), max({dof_code}_{s})")

    exec(f"ang_range_{s} = max_angles[s,:] - min_angles[s,:]")
    # print("ang_range: ", eval(f"ang_range_{s}")) 

    exec(f"ax_min_{s}, ax_max_{s} = np.min(min_angles[s,:]), np.max(max_angles[s,:])")

    print("{:<9} {:<8} {:<8} {:<8} {:<8} {:<8} {:<11} {:<8} {:<8} {:<8} {:<11} {:<8} {:<8} {:<8} {:<8} {:<8} {:<12}".format("#"+str(s), \
      str(min_angles[s,0]), str(max_angles[s,0]), str(min_angles[s,1]), str(max_angles[s,1]), str(min_angles[s,2]), str(max_angles[s,2]), str(min_angles[s,3]), str(max_angles[s,3]), \
      str(min_angles[s,4]), str(max_angles[s,4]), str(min_angles[s,5]), str(max_angles[s,5]), str(min_angles[s,6]), str(max_angles[s,6]), str(min_angles[s,7]), str(max_angles[s,7])))

    # plot data
    i_start, i_end = 0, 20000#len(eval(f"time_{s}"))-1

    i_end_s[s] = min(i_end, len(eval(f"time_{s}"))-1)
    fig1, ax1 = plt.subplots(n_ax, 1, sharex=True, figsize=(n_ax*6, 10))
    ax1[0].set(position=(0.1, 0.5, 0.8, 0.45), ylim=(eval(f"ax_min_{s}")-10, eval(f"ax_max_{s}")+10), ylabel=f'Angles (deg)', title=f'{subject_labels[s]}')
    ax1[1].set(position=(0.1, 0.27, 0.8, 0.2), ylim=(-10, 10), ylabel=f'Angles_First derivative (deg/s)')
    ax1[2].set(position=(0.1, 0.05, 0.8, 0.2), ylim=(-10, 10), ylabel=f'Angles_Second derivative (deg/s^2)', xlabel='Time (s)')
    for n in range(n_ax):
        if i_start > len(eval(f"time_{s}")):
            break
        ax1[n].set(xlim=(eval(f"time_{s}")[i_start]-1, eval(f"time_{s}")[i_start]+(i_end-i_start)/100+10))
        for d in range(n_dof):
            ax1[n].plot(eval(f"time_{s}")[i_start:i_end_s[s]-n], np.diff(eval(f"{dof_codes[d]}_{s}")[i_start:i_end_s[s]], n=n), label=f'{dof_names[d]}', color=colors[d])    
    
    # Jumps, Switches, and Filters in the data
    exec(f"take_ind_{s} = [0]")
    exec(f"jump_ind_{s} = []")
    exec(f"switch_ind_{s} = []")
    exec(f"filt_ind_{s} = []")
    for t in range(2, len(eval(f"time_{s}"))):
        if eval(f"time_{s}")[t] - eval(f"time_{s}")[t-1] < 0.001:
            exec(f"take_ind_{s}.append(t)")
            [exec(f"{dof_codes[d]}_{s}[t] = {dof_codes[d]}_{s}[t+1]") for d in range(n_dof)]
            continue
        ang_dot = np.array([eval(f"{dof_codes[d]}_{s}[t]-{dof_codes[d]}_{s}[t-1]") for d in range(n_dof)])
        ang_dot_prev = np.array([eval(f"{dof_codes[d]}_{s}[t-1]-{dof_codes[d]}_{s}[t-2]") for d in range(n_dof)])
        ang_ddot = np.abs(ang_dot - ang_dot_prev)    # second derivative
        ang_dot = np.abs(ang_dot)    # first derivative
        if sum((ang_dot > 20)) > 2:
            exec(f"jump_ind_{s}.append(t)")
            t += 1
            # print(f"Time: {t}, Ang_dot: {sum(ang_dot)}, Ang_ddot: {sum(ang_ddot)}")
            continue
        if sum((ang_dot > eval(f"ang_range_{s}")/4)) > 1:
            exec(f"switch_ind_{s}.append(t)")
            # t += 1
            continue
        if (ang_dot > eval(f"ang_range_{s}")/4).any():
            # only update the dof that has a jump
            for d in range(n_dof):
                if ang_dot[d] > 10:
                    exec(f"{dof_codes[d]}_{s}[t] = {dof_codes[d]}_{s}[t-1]")
            exec(f"filt_ind_{s}.append(t)")
        # if sum((ang_ddot > 2)) > 3 and sum(ang_dot)>5:

    for d, dof_code in enumerate(dof_codes):
        exec(f"min_angles[{s}, {d}], max_angles[{s}, {d}] = min({dof_code}_{s}), max({dof_code}_{s})")

    print("{:<9} {:<8} {:<8} {:<8} {:<8} {:<8} {:<11} {:<8} {:<8} {:<8} {:<11} {:<8} {:<8} {:<8} {:<8} {:<8} {:<12}".format("#"+str(s), \
      str(min_angles[s,0]), str(max_angles[s,0]), str(min_angles[s,1]), str(max_angles[s,1]), str(min_angles[s,2]), str(max_angles[s,2]), str(min_angles[s,3]), str(max_angles[s,3]), \
      str(min_angles[s,4]), str(max_angles[s,4]), str(min_angles[s,5]), str(max_angles[s,5]), str(min_angles[s,6]), str(max_angles[s,6]), str(min_angles[s,7]), str(max_angles[s,7])))
    print()

    for n in range(n_ax):
        if i_start > len(eval(f"time_{s}")):
            ax1[n].axis('off')
            break
        for d in range(n_dof):
            ax1[n].plot(eval(f"time_{s}")[i_start:i_end_s[s]-n], np.diff(eval(f"{dof_codes[d]}_{s}")[i_start:i_end_s[s]], n=n), \
                        label=f'{dof_names[d]}_filtered', linestyle='dashed', color=colors[d])
        
        annotations = {'Take': 'black', 'Jump': 'red', 'Switch': 'purple'}
        for label, color in annotations.items():
            for index in eval(f"{label.lower()}_ind_{s}"):
                if index < i_end_s[s]:
                    ax1[0].annotate(label, xy=(eval(f"time_{s}")[index], 90), xytext=(eval(f"time_{s}")[index], 150), arrowprops=dict(facecolor=color, shrink=0.05))

        for filt in eval(f"filt_ind_{s}"):
            if filt < i_end_s[s]:
                ax1[n].fill_between(eval(f"time_{s}")[filt], -200, 200, color='yellow', alpha=0.5)

        ax1[n].grid()
        ax1[n].xaxis.set_major_locator(plt.MultipleLocator(20))
    ax1[0].legend(loc='right')
    
    npz_file = f'../data/processed_exports/filtered/subject{s}_{side}h_filtered_7dof_eul_angles.npz'
    kwargs = {dof_codes[d]: eval(f"{dof_codes[d]}_{s}") for d in range(n_dof)}
    kwargs['time'] = eval(f"time_{s}")
    np.savez(npz_file, **kwargs)
    # print(f"Subject {s}, axis min: {eval(f'ax_min_{s}')}, axis max: {eval(f'ax_max_{s}')}")

for s in range(n_subjects):
    print(f"Subject {s}, switches: {eval(f'switch_ind_{s}')}, filtered: {eval(f'len(filt_ind_{s})')}")

dof_min, dof_max = np.min(min_angles, axis=0), np.max(max_angles, axis=0)


In [None]:
#%% Plot angles vs eachother for each subject in a 2 by 3 figure
subject_colors = ['b', 'r', 'g', 'c', 'm']
pad = 5

def plot_dof_comparison(d1, d2, figsize=(12, 10)):
    fig_2d, ax_2d = plt.subplots(2, 3, figsize=figsize, num=f'all_7dof_{dof_names[d1]} vs {dof_names[d2]}') #, constrained_layout=True)
    fig_2d.suptitle(f'{dof_names[d1]} vs {dof_names[d2]} for each subject', fontsize='xx-large')
    big_ax = fig_2d.add_subplot(111, frameon=False)
    big_ax.tick_params(labelcolor='none', top=False, bottom=False, left=False, right=False)
    big_ax.set_xlabel(f'{dof_names[d1]} (deg)', fontsize='x-large', labelpad=15)
    big_ax.set_ylabel(f'{dof_names[d2]} (deg)', fontsize='x-large', labelpad=15)
    for s in range(n_subjects):
        ax_2d[s//3, s%3].scatter(eval(f"{dof_codes[d1]}_{s}"), eval(f"{dof_codes[d2]}_{s}"), label=f'subject {s}', color=subject_colors[s], alpha=0.5)
        # draw indices = jump_ind_{s} and switch_ind_{s} on the plot with red
        for jump in eval(f"jump_ind_{s}"):
            ax_2d[s//3, s%3].scatter(eval(f"{dof_codes[d1]}_{s}[{jump}]"), eval(f"{dof_codes[d2]}_{s}[{jump}]"), color='gray', marker='o', label='jump', alpha=0.5)
        for switch in eval(f"switch_ind_{s}"):
            ax_2d[s//3, s%3].scatter(eval(f"{dof_codes[d1]}_{s}[{switch}-10:{switch}+100]"), eval(f"{dof_codes[d2]}_{s}[{switch}-10:{switch}+100]"), color='k', marker='x', label='switch')
        for filt in eval(f"filt_ind_{s}"):
            ax_2d[s//3, s%3].scatter(eval(f"{dof_codes[d1]}_{s}[{filt}]"), eval(f"{dof_codes[d2]}_{s}[{filt}]"), color='gray', marker='x', label='filtered', alpha=0.5)
        ax_2d[1,2].scatter(eval(f"{dof_codes[d1]}_{s}"), eval(f"{dof_codes[d2]}_{s}"), label=f'subject {s}', color=subject_colors[s], alpha=1-s*0.2)
    for i in range(2):
        for j in range(3):
            ax_2d[i,j].set(title=subject_labels[i*3+j], aspect='equal', xlim=(dof_min[d1]-pad, dof_max[d1]+pad), ylim=(dof_min[d2]-pad, dof_max[d2]+pad))
    fig_2d.savefig(f'../figures/all_7dof_{dof_names[d1]} vs {dof_names[d2]}.png')

# ['sh_abd', 'sh_rot', 'sh_ext', 'el_fle', 'el_sup', 'wr_dev', 'wr_sup', 'wr_fle']

plot_dof_comparison(0, 1, figsize=(12, 5))     # Shoulder Abduction vs Shoulder Rotation
plot_dof_comparison(0, 2, figsize=(12, 8))     # Shoulder Abduction vs Shoulder Extension
plot_dof_comparison(1, 2, figsize=(12, 10))     # Shoulder Rotation vs Shoulder Extension
plot_dof_comparison(3, 0, figsize=(12, 10))     # Elbow Flexion vs Shoulder Abduction
plot_dof_comparison(3, 1, figsize=(12, 8))      # Elbow Flexion vs Shoulder Rotation
plot_dof_comparison(3, 2, figsize=(12, 12))     # Elbow Flexion vs Shoulder Extension
plot_dof_comparison(3, 4, figsize=(12, 8))      # Elbow Flexion vs Elbow Supination
plot_dof_comparison(7, 5, figsize=(12, 5.5))    # Wrist Flexion vs Wrist Deviation



In [None]:
#%% Print min and max values for healthy arm angles

print("{:^9} {:^16} {:^16} {:^20} {:^17} {:^20} {:^17} {:^16} {:^16}".format\
      ("", "Shoulder", "Shoulder", "Shoulder", "Elbow", "Elbow", "Wrist", "Wrist", "Wrist"))
print("{:^9} {:^16} {:^16} {:^20} {:^17} {:^20} {:^17} {:^16} {:^16}".format\
      ("", "Abduction", "Rotation", "Extension", "Flexion", "Supination", "Deviation", "Supination", "Flexion"))
print("{:<10} {:<8} {:<8} {:<8} {:<8} {:<8} {:<11} {:<8} {:<8} {:<8} {:<11} {:<8} {:<8} {:<8} {:<8} {:<8} {:<12}".format\
      ("Subject", "min", "max", "min", "max", "min", "max", "min", "max", "min", "max", "min", "max", "min", "max", "min", "max"))
for s in range(len(subject_labels)-1):
    print("{:<9} {:<8} {:<8} {:<8} {:<8} {:<8} {:<11} {:<8} {:<8} {:<8} {:<11} {:<8} {:<8} {:<8} {:<8} {:<8} {:<12}".format("#"+str(s), \
      str(min_angles[s,0]), str(max_angles[s,0]), str(min_angles[s,1]), str(max_angles[s,1]), str(min_angles[s,2]), \
      str(max_angles[s,2]), str(min_angles[s,3]), str(max_angles[s,3]), str(min_angles[s,4]), str(max_angles[s,4]), \
      str(min_angles[s,5]), str(max_angles[s,5]), str(min_angles[s,6]), str(max_angles[s,6]), str(min_angles[s,7]), str(max_angles[s,7])))
print("{:<9} {:<8} {:<8} {:<8} {:<8} {:<8} {:<11} {:<8} {:<8} {:<8} {:<11} {:<8} {:<8} {:<8} {:<8} {:<8} {:<12}".format("All", \
    str(dof_min[0]), str(dof_max[0]), str(dof_min[1]), str(dof_max[1]), str(dof_min[2]), str(dof_max[2]), \
    str(dof_min[3]), str(dof_max[3]), str(dof_min[4]), str(dof_max[4]), str(dof_min[5]), str(dof_max[5]), \
    str(dof_min[6]), str(dof_max[6]), str(dof_min[7]), str(dof_max[7])))

# print("{:<20} {:<8} {:<8} {:<8} {:<8} {:<12} {:<8} {:<8} {:<8} {:<8} {:<8}".format\
#       ("DoF", "min_0", "min_1", "min_2", "min_3", "min_4", "max_0", "max_1", "max_2", "max_3", "max_4"))
# for i in range(len(dof_codes)):
#     print("{:<20} {:<8} {:<8} {:<8} {:<8} {:<12} {:<8} {:<8} {:<8} {:<8} {:<8}".format(dof_names[i], \
#       str(min(eval(f"{dof_codes[i]}_0"))), str(min(eval(f"{dof_codes[i]}_1"))), str(min(eval(f"{dof_codes[i]}_2"))), \
#       str(min(eval(f"{dof_codes[i]}_3"))), str(min(eval(f"{dof_codes[i]}_4"))), \
#       str(max(eval(f"{dof_codes[i]}_0"))), str(max(eval(f"{dof_codes[i]}_1"))), str(max(eval(f"{dof_codes[i]}_2"))), \
#       str(max(eval(f"{dof_codes[i]}_3"))), str(max(eval(f"{dof_codes[i]}_4")))))

In [None]:
# %% Plot angles over time

# fig, ax = plt.subplots(4, 2, sharex=False, figsize=(14, 20))
# for i in range(4):
#     for j in range(2):
#         for s in range(n_subjects):
#             exec(f"ax[i,j].plot(time_{s}, {dof_codes[i+j*4]}_{s}, label='{dof_names[i+j*4]} subj{s}')")
#         ax[i,j].set(xlabel='Time (s)', ylabel=f'{dof_names[i+j*4]} (deg)')
#         ax[i,j].legend()
#         ax[i,j].grid()

i_start = 12000 
i_end = 13500
i_end_s = np.zeros(n_subjects, dtype=int)
fig0, ax0 = plt.subplots(5, 1, sharex=False, figsize=(12, 20))
for s in range(n_subjects):
    i_end_s[s] = min(i_end, len(eval(f"time_{s}")))  # Store the value in the dictionary
    if i_start < len(eval(f"time_{s}")):
        for d in range(n_dof):
            ax0[s].plot(eval(f"time_{s}")[i_start:i_end_s[s]], eval(f"{dof_codes[d]}_{s}")[i_start:i_end_s[s]], label=f'{dof_names[d]}')
            ax0[s].set(xlabel='Time (s)', ylabel=f'{subject_labels[s]} (deg)', xlim=(eval(f"time_{s}")[i_start], eval(f"time_{s}")[i_end]+4))
    ax0[s].legend()
    ax0[s].grid()