# Compute co-activation

## NOTE: Data is not currently shared. This code is only provided for reference.

Ccomputing co-activation requires a Guassian resampling to provide fewer datapoints per seocnd (a method for downsampling), so the analysis can be perfomred more handily.
Futher, we need to define the *target-pairs* and then quantify coactivations for those muscles.

In [1]:
# Import libraries and setup values
import os
import pickle as pkl

import numpy as np
import pandas as pd
import ray
from scipy.io import loadmat

import EssentialEMGFuncs as ess

ray.init(dashboard_host='127.0.0.1', ignore_reinit_error=True)
save_data = False
load_savedData = True
resample_data, use_resampledData = False, True

2022-11-11 19:30:34,386	INFO worker.py:1509 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


In [None]:
# Setup specific variables
SUBJS = {'young': ["PS04", "PS05", "PS06", "PS07", "PS15", "PS16", "PS17", "PS18", "PS19", "PS20", "PS21", "PS22",
                   "PS23", "PS24", "PS25", "PS26", "PS27"],
         'old': ["PS08", "PS09", "PS10", "PS28", "PS29", "PS31", "PS32", "PS33", "PS34", "PS35", "PS36"]
         }
CONDS = ['LME', 'LEI', 'RME', 'REI']
BLOCKS = {'pre': [0], 'pert': [1, 3], 'catch': [2], 'post': [4]}
DATA_PATH = './data/emg/'
SUBJ_PATH = {s: DATA_PATH + s + '/' for a in SUBJS for s in SUBJS[a]}
RESMAPLED_DATA = 'gaussian-resmped_young-old_raw-data.pickle'
COACTIVATION_DATA = 'coactivation_young-and-old_resampled_perturb-recovery-steps.pickle'
STEP_DATA = 'stepping_time_young-and-old.pickle'

if not os.path.isdir(DATA_PATH + 'group_response'):
    os.mkdir(DATA_PATH + 'group_response')
for a in SUBJS:
    for s in SUBJS[a]:
        if not os.path.isdir(SUBJ_PATH[s] + 'figs'):
            os.mkdir(SUBJ_PATH[s] + 'figs')

In [None]:
# Load individual EMG data
if not load_savedData:
    par_read_feather = ray.remote(ess.read_normal_profile)
    normal_envelope, read_idx = [{c: {s: [] for a in SUBJS for s in SUBJS[a]} for c in CONDS} for _ in range(2)]
    for a in SUBJS:
        for s in SUBJS[a]:
            for c in CONDS:
                read_idx[c][s] = par_read_feather.remote(f'{SUBJ_PATH[s]}{s}_{c}_normalEnvelope.feather')
    for a in SUBJS:
        for s in SUBJS[a]:
            for c in CONDS:
                try:
                    normal_envelope[c][s] = ray.get(read_idx[c][s]).copy()
                    del read_idx[c][s]
                except Exception:
                    print(f'cannot load {s}_{c}, disregard if you are aware of this unavalibity')
                    normal_envelope[c][s] = pd.DataFrame()


## Gaussian resampling
It turns out that 2000 time points is a lot for this co-contraction analysis.
So, let's have it with 200 time points. We will use Gaussian smoothing from SMART for this.

In [None]:
if resample_data and not load_savedData:
    kernel_size, num_dp = 0.01, 200
    resamp_envelope = {c: {s: pd.DataFrame(data=np.zeros((num_dp, len(normal_envelope[c][s].columns))),
                           columns=normal_envelope[c][s].columns) for a in SUBJS for s in SUBJS[a]} for c in CONDS}
    for a in SUBJS:
        for s in SUBJS[a]:
            for c in CONDS:
                if not normal_envelope[c][s].empty:
                    for m in resamp_envelope[c][s].columns.get_level_values(0).unique().to_list():
                        resamp_envelope[c][s][m] = ess.gaussian_smoothing_df(
                            normal_envelope[c][s][m], normal_envelope[c][s]['time'], kernel_size, num_dp)
                    print(f'completed resampling for {s}_{c}')

    if save_data:
        pkl.dump(resamp_envelope, open(
            DATA_PATH + 'group_response/' + RESMAPLED_DATA, 'wb+'))
elif load_savedData:
    resamp_envelope = pkl.load(open(DATA_PATH + 'group_response/' + RESMAPLED_DATA, 'rb'))

if use_resampledData:
    normal_envelope = resamp_envelope


In [None]:
# Load step times
step_time = {c: {s: [] for a in SUBJS for s in SUBJS[a]} for c in CONDS}
for a in SUBJS:
    for s in SUBJS[a]:
        for c in CONDS:
            data = loadmat(SUBJ_PATH[s] + 'times/' + s + '_' + c + '_time.mat')
            step_time[c][s] = pd.DataFrame(
                data=data['strideTime'], columns=['step_index', 'type', 'start', 'pert', 'other', 'end'])
            step_time[c][s] = ess.fill_perturb_time(step_time[c][s])

if save_data:
    pkl.dump(step_time, open(
        DATA_PATH + 'group_response/' + STEP_DATA, 'wb+'))

# Separate the steps into pre, post, perturbations and catches
idx = pd.IndexSlice
seperated_envelope = {c: {s: {b: [] for b in BLOCKS} for a in SUBJS for s in SUBJS[a]} for c in CONDS}
for a in SUBJS:
    for s in SUBJS[a]:
        for c in CONDS:
            last_pert = step_time[c][s][[c in BLOCKS['pert'] for c in step_time[c][s]['type']]].index[-1]
            step_time[c][s].loc[last_pert + 1:, 'type'] = 4
            if not bool(normal_envelope[c][s].empty):
                col0_length = len(normal_envelope[c][s].columns.get_level_values(0).unique())
                for b in BLOCKS:
                    seperated_envelope[c][s][b] = normal_envelope[c][s].loc[idx[:], idx[:, np.tile(
                        [t in BLOCKS[b] for t in step_time[c][s]['type']], (1, col0_length)).squeeze()]]


In [None]:
# Rename the muscles we need
MUSCLES = {'LTA': ['l tibia'], 'LSO': ['l sol'], 'LRF': ['l Vast', 'l rect'], 'LST': ['l semi'],
           'LAD': ['l deltoid ant', 'l ant deltoid'], 'LPD': ['l deltoid post', 'l post deltoid'], 'RTA': ['r tibia'],
           'RSO': ['r sol'], 'RRF': ['r Vast', 'r rect'], 'RST': ['r semi'], 'RAD': ['r deltoid ant', 'r ant deltoid'],
           'RPD': ['r deltoid post', 'r post deltoid']}  # the full name for the muscles paired with their keys

for a in SUBJS:
    for s in SUBJS[a]:
        for c in CONDS:
            for b in BLOCKS:
                try:
                    seperated_envelope[c][s][b].sort_index(axis=1, level=0, inplace=True)
                    cols = seperated_envelope[c][s][b].columns.get_level_values(0).unique().to_list()
                    for i, m in enumerate(cols):
                        if any(t.upper() in m for mk in MUSCLES for t in MUSCLES[mk]):
                            cols[i] = [mk for mk in MUSCLES for t in MUSCLES[mk] if t.upper() in m][0]
                    seperated_envelope[c][s][b].columns.set_levels(cols, level=0, inplace=True)
                    seperated_envelope[c][s][b].sort_index(axis=1, level=0, inplace=True)
                except Exception:
                    print(f'there was a problem in {a}:{s}_{c}_{b}')


## Compute co-activation
Simialr to the Guassian resmapling, the coactivation analysis needs using the `Ray` toolbox, so we can benefit form all the computational capacity fo the processor.
Co-activation requres *target-pairs* to compute if they are working agasints each other. This is the general case of co-contraction, which is the co-activatoin of the agonist and antagonist muscles acting on the same joint. As our study is related to the co-aontraction analysis, we defeined the *target-pairs* as the muscle pairs that act on the same joint. For each step, the first muscle is agonist and the second muscle in the pair is antagonist (See table 1 in the paper).

In [None]:
target_pair = {
    'left_step': {
        'LTA-LSO': ['LSO', 'LTA'],
        'LRF-LST': ['LRF', 'LST'],
        'LAD-LPD': ['LPD', 'LAD'],
        'RTA-RSO': ['RTA', 'RSO'],
        'RRF-RST': ['RST', 'RRF'],
        'RAD-RPD': ['RAD', 'RPD']
    },
    'right_step': {
        'LTA-LSO': ['LTA', 'LSO'],
        'LRF-LST': ['LST', 'LRF'],
        'LAD-LPD': ['LAD', 'LPD'],
        'RTA-RSO': ['RSO', 'RTA'],
        'RRF-RST': ['RRF', 'RST'],
        'RAD-RPD': ['RPD', 'RAD']
    }
}
tp_names = list(target_pair['left_step'])
TESTS = {'perturbed': ['start', 'other'], 'recovery': ['other', 'end']}
par_compute_coActivation = ray.remote(ess.compute_coActivation)
coact_idx = {t: {c: {s: {b: {p: [] for p in tp_names} for b in BLOCKS} for a in SUBJS for s in SUBJS[a]}
                 for c in CONDS} for t in TESTS}

for t in TESTS:
    for a in SUBJS:
        for s in SUBJS[a]:
            for c in CONDS:
                for b in BLOCKS:
                    try:
                        if c in ['LEI', 'LME']:
                            if t == 'perturbed':
                                t_p = target_pair['left_step']
                            else:
                                t_p = target_pair['right_step']
                        else:
                            if t == 'perturbed':
                                t_p = target_pair['right_step']
                            else:
                                t_p = target_pair['left_step']
                        strides = seperated_envelope[c][s][b].columns.get_level_values(1).unique().to_list()
                        s_t = step_time[c][s].iloc[strides, :]
                        for p in t_p:
                            coact_idx[t][c][s][b][p] = par_compute_coActivation.remote(
                                seperated_envelope[c][s][b][t_p[p]], seperated_envelope[c][s][b]['time'], s_t,
                                fillnopert=False, event=TESTS[t][0], duration=TESTS[t][1], mode='fixed')
                    except Exception:
                        print(f'there was a problem in {a}:{s}_{c}_{b}')
# Sanity check
# for t in TESTS:
#     for a in SUBJS:
#         for s in SUBJS[a]:
#             for c in CONDS:
#                 for b in BLOCKS:
#                     if c in ['LEI', 'LME']:
#                         if t != 'perturbed':
#                             t_p = target_pair['left_step']
#                         else:
#                             t_p = target_pair['right_step']
#                     else:
#                         if t != 'perturbed':
#                             t_p = target_pair['right_step']
#                         else:
#                             t_p = target_pair['left_step']
#                     strides = seperated_envelope[c][s][b].columns.get_level_values(1).unique().to_list()
#                     s_t = step_time[c][s].iloc[strides, :]
#                     for p in t_p:
#                         coactivation[t][c][s][b][p] = ess.compute_coActivation(
#                             seperated_envelope[c][s][b][t_p[p]], seperated_envelope[c][s][b]['time'], s_t,
#                             fillnopert=False, event=TESTS[t][0], duration=TESTS[t][1], mode='wasted_contraction')

coactivation = {t: {c: {s: {b: pd.DataFrame(columns=tp_names) for b in BLOCKS} for a in SUBJS for s in SUBJS[a]}
                    for c in CONDS} for t in TESTS}
for t in TESTS:
    for a in SUBJS:
        for s in SUBJS[a]:
            for c in CONDS:
                for b in BLOCKS:
                    try:
                        for p in tp_names:
                            coactivation[t][c][s][b][p] = ray.get(coact_idx[t][c][s][b][p]).copy()
                    except Exception:
                        print(f'there was a problem in {a}:{s}_{c}_{b}')
if save_data:
    pkl.dump(coactivation, open(DATA_PATH + 'group_response/' + COACTIVATION_DATA, 'wb+'))
