In [None]:
import pathlib as path
import yaml
import matplotlib.pyplot as plt
import pandas as pd
import sys
import numpy as np
from scipy.signal import welch

In [None]:
sys.path.append('../common_eurobench')
from conversion import convert_utils as conv

In [None]:
subjects = [1, 2, 3, 4, 5]
sf = 2048
cond = 'without' #with or whithout (exo)
folder = 'SUBJECT_0X/SESSION_01/IRREGULAR_TERRAIN/EMG'

In [None]:
# Change the path of your data
path_data = '/Users/neuralrehabilitationgroup/PycharmProjects/DATA_ExoStim/DATA/gait_' + cond + '_exo'

In [None]:
path_raw = path.Path(path_data).joinpath('EUROBENCH',folder)
path_gait_filt = path.Path(path_data).joinpath('PROCESSED',folder,'FILTERED')
path_gait_clean = path.Path(path_data).joinpath('PROCESSED',folder,'CLEANED')
path_events = path.Path(path_data).joinpath('EUROBENCH', folder)
path_gait_env = path.Path(path_data).joinpath('PROCESSED',folder,'ENVELOPE')

# Plot EMG signals (filtered or cleaned)

In [None]:
# For filtered signals:
# path_signal = path_gait_filt
# For cleaned signals:
path_signal = path_gait_filt

In [None]:
for subject in subjects:
    print('SUBJECT 0' + str(subject))
    list_events_gait = list(path_events.glob('*subject_0' + str(subject) + '*.yaml'))
    with open(list_events_gait[0], 'r') as archivo:
        events_data = yaml.safe_load(archivo)
        
    list_emg_gait = list(path_signal.glob('*subject_0' + str(subject) + '*.csv'))
    emg_gait = pd.read_csv(list_emg_gait[0])
    
    columns_emg = conv.find_unique_cols_csv(list_emg_gait)
    
    dfs = conv.convert_dir_eurobench_to_defaultdict(list_emg_gait, columns=columns_emg, path_events=path_events, split_using_events=True, split_events_category=['gait_without_estimulation_pre','gait_with_estimulation','gait_without_estimulation_post'])
    
    for i, muscle_name in enumerate(emg_gait.columns):
        if muscle_name != 'time':
            fig, ax = plt.subplots()
        
            muscle = emg_gait.loc[:,muscle_name]
            #print(muscle)
            
            for c in range(0,len(dfs)):
                for s in range(0,len(dfs[c][muscle_name]['col'][0])):
                    ax.plot(dfs[c]['time']['col'][0][s], dfs[c][muscle_name]['col'][0][s], label = str(dfs[c][muscle_name]['split_by'][0]))
                    
                    
                    ax.set_xlabel('time')
                    ax.set_ylabel(muscle_name)
                    ax.set_title('SUBJECT 0' + str(subject))
                    ax.legend()
                             
                            

# Plot both filtered and cleaned

In [None]:
for subject in subjects:
    print('SUBJECT 0' + str(subject))
    list_events_gait = list(path_events.glob('*subject_0' + str(subject) + '*.yaml'))
    with open(list_events_gait[0], 'r') as archivo:
        events_data = yaml.safe_load(archivo)
        
    list_emg_filt = list(path_gait_filt.glob('*subject_0' + str(subject) + '*.csv'))
    
    columns_emg = conv.find_unique_cols_csv(list_emg_filt)
    
    dfs_filt = conv.convert_dir_eurobench_to_defaultdict(list_emg_filt, columns=columns_emg, path_events=path_events, split_using_events=False)
    
    list_emg_clean = list(path_gait_clean.glob('*subject_0' + str(subject) + '*.csv'))
    
    dfs_clean = conv.convert_dir_eurobench_to_defaultdict(list_emg_clean, columns=columns_emg, path_events=path_events, split_using_events=False)
    
    for i, muscle_name in enumerate(columns_emg):
        if muscle_name != 'time':
            fig, ax = plt.subplots() 
            # Plot the original (filtered) and the cleaned signal
            ax.plot(dfs_filt[0][muscle_name]['col'][0], label='Filtered')
            ax.plot(dfs_clean[0][muscle_name]['col'][0], label='Cleaned')
            ax.set_xlabel('Tiempo (s)')
            ax.set_ylabel('Amplitud')
            ax.legend()
            ax.set_title('SUBJECT 0' + str(subject) + '. ' + muscle_name)
            ax.grid(True)

### Compute optimal window size

In [None]:
import librosa
from librosa import feature

In [None]:
def compute_rms(spec_1, sf, b_plot, b_save, win_size, muscle):

    rms_1 =librosa.feature.rms(S = spec_1, frame_length = sf)

    if b_plot:
        fig, ax = plt.subplots(1, 2, sharex=True, sharey=False, figsize=(10, 4), constrained_layout=True)
        times_1 = librosa.times_like(rms_1)
        ax[0].semilogy(times_1, rms_1[0], label='RMS Energy')
        ax[0].set(xticks=[])
        ax[0].legend()
        ax[0].label_outer()
        # librosa.display.specshow(librosa.amplitude_to_db(np.abs(spec_1), ref=np.max), y_axis='log', x_axis='time', ax=ax[1,0])
        # ax[1,0].set(title='log Power spectrogram')

        ax[0].set(xlabel = 'Time (s)', ylabel = 'Frequency (Hz)')

        if b_save:
            plt.figure()
            # plt.show()
    
            fig.suptitle(subject + '. Vrms (ws = ' + str(win_size) + ')')
            plt.savefig('/Users/neuralrehabilitationgroup/PycharmProjects/ictus-synergies/Spectrogram_analysis/RESULTS/' + subject + '/Vrms_' + str(win_size) + '_' + subject + '_' + muscle)
            plt.close()

    return rms_1

In [None]:
for subject in subjects:
    print('SUBJECT 0' + str(subject))
    list_events_gait = list(path_events.glob('*subject_0' + str(subject) + '*.yaml'))
    with open(list_events_gait[0], 'r') as archivo:
        events_data = yaml.safe_load(archivo)
        
    list_emg_gait = list(path_signal.glob('*subject_0' + str(subject) + '*.csv'))
    emg_gait = pd.read_csv(list_emg_gait[0])
    
    columns_emg = conv.find_unique_cols_csv(list_emg_gait)
    
    dfs = conv.convert_dir_eurobench_to_defaultdict(list_emg_gait, columns=columns_emg, path_events=path_events, split_using_events=True, split_events_category=['gait_without_estimulation_pre','gait_with_estimulation','gait_without_estimulation_post'])
    
    for i, muscle_name in enumerate(emg_gait.columns):
        if muscle_name != 'time':
            for c in range(0,len(dfs)): 
                fig, ax = plt.subplots()
                for s in range(0,len(dfs[c][muscle_name]['col'][0])):
                    rms1 = compute_rms(dfs[c][muscle_name]['col'][0][s], sf, muscle_name, b_plot=False, b_save=False)
                    #S1, f1 = plt.psd(dfs[c][muscle_name]['col'][0][s], NFFT=256, Fs=sf, window=np.hanning(256), noverlap=128, label=str(dfs[c][muscle_name]['split_by'][0]))
                    # plt.close()
                    ax.plot(f1, S1, label=str(dfs[c][muscle_name]['split_by'][0]))
                    ax.set(title= 'SUBJECT 0' + str(subject) + '. ' + muscle_name, xlabel = 'frequency [Hz]', ylabel = 'PSD (dB)')
                    ax.legend()
                    ax.set_xlim(0,500)
                    

# Compute PSD for each part of the trial

In [None]:
from scipy.signal import find_peaks

In [None]:
def compute_psd_welch(data, sf, wind_size, muscle, b_plot=False, b_save=False):
    S1, f1 = welch(data, fs=sf, nperseg = wind_size, window=np.hanning(wind_size), noverlap=wind_size/2)
    return S1, f1

In [None]:
for subject in subjects:
    print('SUBJECT 0' + str(subject))
    list_events_gait = list(path_events.glob('*subject_0' + str(subject) + '*.yaml'))
    with open(list_events_gait[0], 'r') as archivo:
        events_data = yaml.safe_load(archivo)
        
    list_emg_gait = list(path_signal.glob('*subject_0' + str(subject) + '*.csv'))
    emg_gait = pd.read_csv(list_emg_gait[0])
    
    columns_emg = conv.find_unique_cols_csv(list_emg_gait)
    
    dfs = conv.convert_dir_eurobench_to_defaultdict(list_emg_gait, columns=columns_emg, path_events=path_events, split_using_events=True, split_events_category=['gait_without_estimulation_pre','gait_with_estimulation','gait_without_estimulation_post'])
    
    for i, muscle_name in enumerate(emg_gait.columns):
        if muscle_name != 'time':
            for c in range(0,len(dfs)): 
                fig, ax = plt.subplots()
                for s in range(0,len(dfs[c][muscle_name]['col'][0])):
                    # wind_size = [32, 64, 128, 256, 512, 1024, 2048]
                    
                    #for wind in wind_size:
                    f1, S1 = compute_psd_welch(dfs[c][muscle_name]['col'][0][s], sf, 256, muscle_name, b_plot=False, b_save=False)
                    #S1, f1 = plt.psd(dfs[c][muscle_name]['col'][0][s], NFFT=256, Fs=sf, window=np.hanning(256), noverlap=128, label=str(dfs[c][muscle_name]['split_by'][0]))
                    # plt.close()
                    picos, _ = find_peaks(S1)
                    freq_peaks = f1[picos]
                    ax.plot(f1, S1, label=str(dfs[c][muscle_name]['split_by'][0]))
                    ax.plot(f1[picos], S1[picos], "x", markersize=10, label="Picos")
                    ax.set(title= 'SUBJECT 0' + str(subject) + '. ' + muscle_name + '. Ws = ' + str(256), xlabel = 'frequency [Hz]', ylabel = 'PSD (dB)')
                    ax.legend()
                    ax.axvline(x=50, color='r', linestyle='--')
                    ax.set_xlim(0,500)
                    
                    
                    
