In [1]:
# Importing Python and external packages
import os
import sys
import json
import importlib
import pandas as pd
import numpy as np
import sklearn as sk
from scipy import stats, signal
from scipy import __version__ as scipy_version
import matplotlib.pyplot as plt

from itertools import product

In [2]:
# check some package versions for documentation and reproducability
print('Python sys', sys.version)
print('pandas', pd.__version__)
print('numpy', np.__version__)
# print('mne_bids', mne_bids.__version__)
# print('mne', mne.__version__)
print('sci-py', scipy_version)
print('sci-kit learn', sk.__version__)

Python sys 3.9.13 (main, Oct 13 2022, 21:23:06) [MSC v.1916 64 bit (AMD64)]
pandas 1.4.4
numpy 1.23.3
sci-py 1.9.1
sci-kit learn 1.1.2


In [3]:
import retap_utils.utils_dataManagement as utils_dataMn


In [4]:
# DEFINE RETAP OUTCOME DIRECTORY
ft_path = os.path.join(utils_dataMn.get_local_proj_dir(),
                       'aDBS_tapping',
                       'retap_results', 'features')
assert os.path.exists(ft_path), 'defined ft_path does not exist'
fig_path = os.path.join(utils_dataMn.get_local_proj_dir(),
                       'aDBS_tapping', 'figures')
if not os.path.exists(fig_path): os.makedirs(fig_path)

In [5]:
# DEFINE FEATURES OF INTEREST
FEAT_SEL = ['trace_RMSn',
            'coefVar_impactRMS',
            'coefVar_intraTapInt',
            'mean_raise_velocity']

NORM_METHOD = 'norm'

explore aDBS data content


In [6]:
def get_unique_subs(ft_path):
    
    subs = np.unique([f.split('_')[1] for f in os.listdir(ft_path)])

    return subs

In [7]:
def load_on_off_first10(subs, ft_path, sel_time='first',
                        stim_task_sel=['cDBS-StimOff', 'cDBS-StimOn',
                                       'aDBS-Preset4'],
                        FEAT_SEL=['trace_RMSn',
                                  'coefVar_impactRMS',
                                  'coefVar_intraTapInt',
                                  'mean_raise_velocity']):
    
    allowed_part_sel = ['first', 'last', 'full', 'diff']
    
    assert sel_time in allowed_part_sel, (
        f'sel_30s_part should be in {allowed_part_sel}'
    )

    files_dict = {}
    for sub in subs:
        
        files_dict[sub] = {}

        # only consider Med OFF recordings
        files = [f for f in os.listdir(ft_path) if
                 f'{sub}_' in f
                 and 'medoff' in f.lower()]
        
        if sel_time != 'diff':
            # select off and on cont DBS (for first, last, or full)
            for stim_task in stim_task_sel:
                s1, s2 = stim_task.split('-')
                # print(sub, stim_task, s1, s2)
                json_file = [f for f in files if sel_time in f
                            and s1.lower() in f.lower()
                            and s2.lower() in f.lower()][0]
                # load json with features
                with open(os.path.join(ft_path, json_file), 'r') as f:
                    files_dict[sub][f'{stim_task}_{sel_time}'] = json.load(f)
            
        elif sel_time == 'diff':
            # select off and on cont DBS for first AND last
            temp_dict = {}
            for sel, stim_task in product(['first', 'last'],
                                           stim_task_sel):
                s1, s2 = stim_task.split('-')
                json_file = [f for f in files if sel in f
                             and s1.lower() in f.lower()
                             and s2.lower() in f.lower()][0]
                # load json file
                with open(os.path.join(ft_path, json_file), 'r') as f:
                    temp_dict[f'{stim_task}_{sel}'] = json.load(f)

            for stim_task in stim_task_sel:
                files_dict[sub][f'{stim_task}_diff'] = {}    
                for ft in FEAT_SEL:
                    last_value = temp_dict[f'{stim_task}_last'][ft]
                    first_value = temp_dict[f'{stim_task}_first'][ft]
                    files_dict[sub][f'{stim_task}_diff'][ft] = (
                        # (last_value - first_value) / first_value * 100
                        last_value - first_value
                    )
            
    return files_dict

In [8]:
def norm_feat_lists(box_lists, feat_names,
                    conditions,
                    norm_method):

    n_conditions = len(conditions)

    for i_ft, ft in enumerate(feat_names):

        con1 = box_lists[i_ft * n_conditions]
        con2 = box_lists[i_ft * n_conditions + 1]
        if n_conditions > 2: con3 = box_lists[i_ft * n_conditions + 2]

        if norm_method == 'norm':
            norm_value = np.nanmax(con1)
            # print(ft, norm_value)
            con1 = np.array(con1) / norm_value
            con2 = np.array(con2) / norm_value
            # plt.plot(con1, lw=5, alpha=.5, c='gray')
            # plt.plot(con2, c='red', )
            # plt.title(ft)
            # plt.show()
            if n_conditions > 2: con3 = np.array(con3) / norm_value
        
        elif norm_method == 'std':
            norm_m = np.nanmean(con1)
            norm_sd = np.nanstd(con1)
            con1 = (np.array(con1) - norm_m) / norm_sd
            con2 = (np.array(con2) - norm_m) / norm_sd
            if n_conditions > 2: con3 = (np.array(con3) - norm_m) / norm_sd

        box_lists[i_ft * n_conditions] = con1
        box_lists[i_ft * n_conditions + 1] = con2
        if n_conditions > 2: box_lists[i_ft * n_conditions + 2] = con3

        S, p = stats.ttest_rel(con1, con2)
        print(f'\n{ft}, {conditions[0]} vs {conditions[1]}: '
              f'R: {round(S, 3)}, p = {round(p, 5)}')

        if n_conditions > 2:
            S, p = stats.ttest_rel(con1, con3)
            print(f'\n{ft}, {conditions[0]} vs {conditions[2]}: '
                  f'R: {round(S, 3)}, p = {round(p, 5)}')
    
    return box_lists

In [9]:
def extract_feature_lists(files_dict, feat_names, subs,
                          norm_method=False,):

    sub1 = list(files_dict.keys())[0]
    # conditions = list(np.unique(
    #     [k.split('_')[0] for k in files_dict[sub1].keys()]
    # ))
    conditions = list(files_dict[sub1].keys())

    # if 'diff' in conditions[0]:
    #     conditions = list(np.unique(
    #         [c.split('_diff')[0] for c in conditions]
    #     ))
    #     CALC_DIFF = True
    print(f'conditions found: {conditions}')

    box_lists = []
    box_labels = []

    for ft, con in product(feat_names, conditions):

        box_labels.append(f'{ft} {con}')
        values = [files_dict[s][con][ft] for s in subs]
        # print(f'{ft}, {con} values found: {values}')
        box_lists.append(values)
    
    if isinstance(norm_method, str):
        assert norm_method in ['norm', 'std', 'none'], 'incorrect norm_method'
        box_lists = norm_feat_lists(box_lists=box_lists,
                                    feat_names=FEAT_SEL,
                                    conditions=conditions,
                                    norm_method=norm_method)
        
    return box_lists, box_labels

In [10]:
subs = get_unique_subs(ft_path=ft_path)

files_dict = load_on_off_first10(subs=subs, ft_path=ft_path,
                                 sel_time='diff',)



In [None]:
NORM_METHOD = 'norm'

subs = get_unique_subs(ft_path=ft_path)

files_dict = load_on_off_first10(subs=subs, ft_path=ft_path,
                                 sel_time='first')

box_lists, box_labels = extract_feature_lists(files_dict=files_dict,
                                              feat_names=FEAT_SEL, subs=subs,
                                              norm_method=NORM_METHOD,
                                              )
print(len(box_lists), box_labels)

In [None]:
SCATTER = True
SCATTER_LINES = True

SAVE_FIG=False
SHOW_FIG=True

fig_name = f'diffRun_4feats_OffvsOnvsA4_{NORM_METHOD}'

fig, ax = plt.subplots(1, 1, figsize=(16, 12))

fontsize=18

ax.boxplot(box_lists, positions=np.arange(len(box_lists)),)
ax.set_xticklabels(box_labels, rotation=90,
                   size=fontsize)
if NORM_METHOD == 'norm':
    ylabel = 'Normalised feature values\n(against max OFF value)'
elif NORM_METHOD == 'std':
    ylabel = 'Standardised feature values\n(against OFF values)'

if SCATTER:
    for i_x, values in enumerate(box_lists):
        ax.scatter([i_x] * len(values), values,
                   alpha=.4, s=100,)
if SCATTER_LINES:
    for i_ft in np.arange(len(FEAT_SEL)):
        for i_sub in np.arange(len(box_lists[i_ft])):
            y1 = box_lists[i_ft * 3][i_sub]
            y2 = box_lists[i_ft * 3 + 1][i_sub]
            y3 = box_lists[i_ft * 3 + 2][i_sub]
            ax.plot([i_ft*3, i_ft*3+1], [y1, y2],
                    c='gray', alpha=.5)
            ax.plot([i_ft*3+1, i_ft*3+2], [y2, y3],
                    c='gray', alpha=.5)

ax.set_ylabel(ylabel,
              size=fontsize)

plt.tick_params(axis='both', labelsize=fontsize, size=fontsize)

plt.tight_layout()

if SAVE_FIG:
    plt.savefig(os.path.join(fig_path, fig_name), dpi=300,
                facecolor='w',)
if SHOW_FIG: plt.show()
else: plt.close()

Open single result file

In [34]:
# get FEATURES
files = os.listdir(ft_path)

f = [f for f in files if 'task-cDbs' in f
 and 'StimOn' in f and '536' in f and 'first' in f][0]

# load json file
with open(os.path.join(ft_path, f), 'r') as file:
    fts = json.load(file)

# get ACC-DATA
acc_path = os.path.join(utils_dataMn.get_local_proj_dir(),
                       'aDBS_tapping', 'acc_raw')
files = os.listdir(acc_path)

f = [f for f in files if 'task-cDbs' in f
 and 'StimOn' in f and '536' in f and 'full' in f][0]

# load csv file
acc = pd.read_csv(os.path.join(acc_path, f), header=0)

In [36]:
%matplotlib qt
plt.plot(acc)

plt.show()