In [29]:
import numpy as np
import scipy as sp
import pandas as pd
import mne
from mne.io import read_raw_ant
import os
from matplotlib.lines import lineStyles, Line2D
from matplotlib.colors import ListedColormap
from matplotlib.patches import Patch
from scipy.signal import butter, sosfiltfilt, savgol_filter
from scipy.stats import zscore
from scipy.ndimage import label
from combined_analysis_bachelor.code.movement_detection_functions import take_out_short_off_onset, new_on_offsets, fill_activity_mask, create_behavioral_array
import matplotlib
matplotlib.use('Qt5Agg')
import matplotlib.pyplot as plt

channel_custom_order = ["BIP3", "BIP4", "BIP5", "BIP9", "BIP10", "BIP12", "BIP6", "BIP1", "BIP2", "BIP11", "BIP8", "BIP7"]
EMG = ["BIP7", "BIP8", "BIP9", "BIP10", "BIP11", "BIP12"]
ACC = ["BIP1", "BIP2", "BIP3", "BIP4", "BIP5", "BIP6"]
locations = {"BIP7":"right M. tibialis anterior",
            "BIP8": "right M. deltoideus",
            "BIP11": "right M. brachioradialis",
            "BIP1" : "ACC right hand : y",
            "BIP2" : "ACC right hand : z",
            "BIP6" : "ACC right hand : x",
            "BIP3" : "ACC left hand : x",
            "BIP4" : "ACC left hand : y",
            "BIP5" : "ACC left hand : z",
            "BIP9" : "left M. brachioradialis",
            "BIP10" : "left M. deltoideus",
            "BIP12" : "left M. tibialis anterior"}
sf=1000

### EMG Values - left forearm ###

In [3]:
A_move2_filtered = pd.read_hdf("C:/Users/User/Documents/bachelorarbeit/data/EMG_ACC/sub-91/"
                              "PTB_01_data_processed/A_1.5_move_processed.h5", key="data")
A_move2_filtered_df = pd.DataFrame(A_move2_filtered)


### location: left forearm ###
left_forearm_move = A_move2_filtered_df["BIP9"]
left_forearm_move *= 1e6

# rectifying and building envelope #
rectified_move = left_forearm_move.abs()
low_pass = 4/(1000/2)
sos = butter(4, low_pass, btype='lowpass', output="sos")
enveloped_forearm_move2 = sosfiltfilt(sos, x=rectified_move)

# taking baseline section
A_move_2_baseline = enveloped_forearm_move2[9*sf:15*sf] # 10 secs for baseline values
p99 = np.percentile(A_move_2_baseline, 99)
mu_move2_baseline, sigma_move2_baseline = A_move_2_baseline.mean(), A_move_2_baseline.std()

# set thresh and get activity
p99_thresh = p99 * 4
std_thresh_move2_forearm = mu_move2_baseline + 12 * sigma_move2_baseline
envelope_activity2 = enveloped_forearm_move2 > std_thresh_move2_forearm

# get on and offsets
min_samples = int(1 * sf)         # 1000-ms-Grenze

labels, n_lbl = label(envelope_activity2)
valid = np.zeros_like(envelope_activity2, dtype=bool)

for lbl in range(1, n_lbl + 1):
    idx = np.where(labels == lbl)[0]
    if idx.size >= min_samples:
        valid[idx] = True

# On- & Offsets
Onsets_move2_forearm  = np.where(np.diff(valid.astype(int)) ==  1)[0] + 1
Offsets_move2_forearm = np.where(np.diff(valid.astype(int)) == -1)[0] + 1

new_onsets_move2_forearm, new_offsets_move2_forearm = take_out_short_off_onset(Onsets_move2_forearm, Offsets_move2_forearm, 1, sf)

### EMG Values - left delt ###

In [4]:
left_delt_move2 = A_move2_filtered_df["BIP10"]
left_delt_move2 *= 1e6

# rectifying and building envelope #
rectified_delt_move2 = left_delt_move2.abs()
enveloped_delt_move2 = sosfiltfilt(sos, x=rectified_delt_move2)

# taking baseline section
A_move2_delt_baseline = enveloped_delt_move2[9*sf:15*sf] # 6 secs for baseline values
p99 = np.percentile(A_move2_delt_baseline, 99)
mu_move2_delt_baseline, sigma_move2_delt_baseline = A_move2_delt_baseline.mean(), A_move2_delt_baseline.std()

# set thresh and get activity
p99_thresh = p99 * 4
std_thresh_move2_delt = mu_move2_delt_baseline + 18 * sigma_move2_delt_baseline
envelope_activity2_delt = enveloped_delt_move2 > std_thresh_move2_delt

# get on and offsets
min_samples = int(1.5 * sf)         # 1500-ms-Grenze

labels, n_lbl = label(envelope_activity2_delt)
valid = np.zeros_like(envelope_activity2_delt, dtype=bool)

for lbl in range(1, n_lbl + 1):
    idx = np.where(labels == lbl)[0]
    if idx.size >= min_samples:
        valid[idx] = True

# On- & Offsets
Onsets_move2_delt  = np.where(np.diff(valid.astype(int)) ==  1)[0] + 1
Offsets_move2_delt = np.where(np.diff(valid.astype(int)) == -1)[0] + 1

new_onsets_move2_delt, new_offsets_move2_delt = take_out_short_off_onset(Onsets_move2_delt, Offsets_move2_delt, 0.2, sf)

### ACC (z-axis) of left side ###

In [5]:
ACC_left_move2 = A_move2_filtered_df["BIP5"] # * -1

# smooth signal
ACC_left_move2_smooth = savgol_filter(ACC_left_move2, window_length=21, polyorder=3)

# RMS
window_size = 60
ACC_left_move2_rms = np.sqrt(np.convolve(ACC_left_move2_smooth**2, np.ones(window_size)/window_size, mode='same'))

# get baseline parts
A_move2_baseline = ACC_left_move2_rms[:15*sf] # 10 secs for baseline values
mu_move2_baseline, sigma_move2_baseline = A_move2_baseline.mean(), A_move2_baseline.std()
p99 = np.percentile(ACC_left_move2_rms, 99)

# define threshold and activity
ACC_thresh = mu_move2_baseline + 4 * sigma_move2_baseline
ACC_activity = ACC_left_move2_rms > ACC_thresh

# get on and offsets
min_samples = int(1.6 * sf) # 1600-ms-Grenze --> min länge der aktivitäten

labels, n_lbl = label(ACC_activity)
valid = np.zeros_like(ACC_activity, dtype=bool)

for lbl in range(1, n_lbl + 1):
    idx = np.where(labels == lbl)[0]
    if idx.size >= min_samples:
        valid[idx] = True

# On- & Offsets
Acc_move2_Onsets  = np.where(np.diff(valid.astype(int)) ==  1)[0] + 1
Acc_move2_Offsets = np.where(np.diff(valid.astype(int)) == -1)[0] + 1

#sieht gut aus, jetzt noch einmal die funktion anwenden!!
new_Acc_move2_onsets, new_Acc_move2_offsets = take_out_short_off_onset(Acc_move2_Onsets,            Acc_move2_Offsets, 0.17, sf) # -> max länge der zwischen spikes!

### plot erstellen ###

In [6]:
fig, axs = plt.subplots(3, sharex=False)
axs[0].plot(A_move2_filtered_df["Sync_Time (s)"], enveloped_delt_move2 , "b", label="Enveloped EMG Signal")
axs[0].axhline(std_thresh_move2_delt, color="r", linestyle="--", label="Threshold (baseline.mean + k *std")
for x in new_onsets_move2_delt:
    axs[0].axvline(A_move2_filtered_df["Sync_Time (s)"][x], ls="--", c="green")
for x in new_offsets_move2_delt:
    axs[0].axvline(A_move2_filtered_df["Sync_Time (s)"][x], ls="--", c="black")
axs[0].set_ylabel("Amplitude (µV)")
axs[0].set_title("EMG signal left delt")


axs[1].plot(A_move2_filtered_df["Sync_Time (s)"], enveloped_forearm_move2, "b", label="Enveloped EMG Signal")
axs[1].axhline(std_thresh_move2_forearm, color="r", linestyle="--", label="Threshold (baseline.mean + 12*std")
for x in new_onsets_move2_forearm:
    axs[1].axvline(A_move2_filtered_df["Sync_Time (s)"][x], ls="--", c="green")
for x in new_offsets_move2_forearm:
    axs[1].axvline(A_move2_filtered_df["Sync_Time (s)"][x], ls="--", c="black")
axs[1].set_ylabel("Amplitude (µV)")
axs[1].set_title("EMG signal left forearm")
    
    
axs[2].plot(A_move2_filtered_df["Sync_Time (s)"], ACC_left_move2_rms, "b", label="Enveloped EMG Signal")
axs[2].axhline(ACC_thresh, color="r", linestyle="--", label="Threshold (baseline.mean + k *std")
for x in new_Acc_move2_onsets:
    axs[2].axvline(A_move2_filtered_df["Sync_Time (s)"][x], ls="--", c="green")
for x in new_Acc_move2_offsets:
    axs[2].axvline(A_move2_filtered_df["Sync_Time (s)"][x], ls="--", c="black")
axs[2].set_xlabel("Time (s)")
axs[2].set_ylabel("Acceleration")
axs[2].set_title("ACC signal - z-axis - on left hand")
    
for i,ax in enumerate(axs):
    ax.set_xlim(0,95)
    ax.spines[['right', 'top', "bottom"]].set_visible(False)
plt.legend()
plt.tight_layout()
plt.show()

### create behavioral data ### 

In [13]:
# create binary array of emg delt activity # 
# - liste erstellen mit paaren von onsets und offsets gepaart
# - einen array erstellen der länge der time samples 
# - dann jede subliste nehmen und benutzen um dazwischen True in array einzufügen
 # - der rest = False

# create binary array of ACC left activity #
 ## so wie oben beschrieben!
 
# new array where:
  # 1 = both True = movement
  # 2 = both False = rest
  # 3 = only EMG True = supression
  # 4 = only ACC True = EMG electrode off??

##### create binary arrays of emg/acc activity #####

In [7]:
# get final on- and offsets after movement detection 
delt_on_offsets = new_on_offsets(new_onsets_move2_delt, new_offsets_move2_delt)
acc_left_on_offsets = new_on_offsets(new_Acc_move2_onsets, new_Acc_move2_offsets)

In [34]:
mask = np.zeros(len(A_move2_filtered_df["Sync_Time (s)"]))

In [17]:
## create binary arrays for EMG & ACC ##
delt_binary_arr = fill_activity_mask(delt_on_offsets, sf, A_move2_filtered_df["Sync_Time (s)"])
acc_binary_arr = fill_activity_mask(acc_left_on_offsets, sf, A_move2_filtered_df["Sync_Time (s)"])

#### create behavioral array ####

In [24]:
move2_left_behavioral = create_behavioral_array(delt_binary_arr, acc_binary_arr)

### plotting final plot ###

In [101]:
# define colors 
state_colors = {
    1: 'darkseagreen',   # Both active
    2: 'white',          # Nothing active
    3: 'palevioletred',  # Only EMG
    4: 'slateblue'       # Only ACC
}

# Create figure with minimal spacing
fig, axs = plt.subplots(3, 1, sharex=True, figsize=(12, 7), 
                       gridspec_kw={'height_ratios': [1, 1, 0.4], 'hspace': 0.3})

# --- Plot 1: EMG Signal with subtle background ---
# Add very light behavioral background 
time_vals = A_move2_filtered_df["Sync_Time (s)"].values
time_boundaries = np.linspace(time_vals[0], time_vals[-1], len(move2_left_behavioral)+1)
current_state = move2_left_behavioral[0]
start_idx = 0
for i in range(1, len(move2_left_behavioral)):
    if move2_left_behavioral[i] != current_state:
        axs[0].axvspan(time_boundaries[start_idx], time_boundaries[i],
                      facecolor=state_colors[current_state], alpha=0.2)
        current_state = move2_left_behavioral[i]
        start_idx = i
axs[0].axvspan(time_boundaries[start_idx], time_boundaries[-1],
              facecolor=state_colors[current_state], alpha=0.2)

# Plot EMG signal
emg_line = axs[0].plot(A_move2_filtered_df["Sync_Time (s)"], enveloped_delt_move2, "b", label="Enveloped EMG Signal")
thresh_line = axs[0].axhline(std_thresh_move2_delt, color="r", linestyle="--", label="EMG Threshold")
onset_lines = [axs[0].axvline(A_move2_filtered_df["Sync_Time (s)"][x], ls="--", c="green") for x in new_onsets_move2_delt]
offset_lines = [axs[0].axvline(A_move2_filtered_df["Sync_Time (s)"][x], ls="--", c="black") for x in new_offsets_move2_delt]

axs[0].set_ylabel("Amplitude (µV)")
axs[0].set_title("EMG Signal - Left Delt", pad=5)
axs[0].spines[['right', 'top']].set_visible(False)

# --- Plot 2: ACC Signal with subtle background ---
# Add very light behavioral background 
current_state = move2_left_behavioral[0]
start_idx = 0
for i in range(1, len(move2_left_behavioral)):
    if move2_left_behavioral[i] != current_state:
        axs[1].axvspan(time_boundaries[start_idx], time_boundaries[i],
                      facecolor=state_colors[current_state], alpha=0.2)
        current_state = move2_left_behavioral[i]
        start_idx = i
axs[1].axvspan(time_boundaries[start_idx], time_boundaries[-1],
              facecolor=state_colors[current_state], alpha=0.2)

# Plot ACC signal
acc_line = axs[1].plot(A_move2_filtered_df["Sync_Time (s)"], ACC_left_move2_rms, "b", label="ACC Signal - RMS")
acc_thresh = axs[1].axhline(ACC_thresh, color="r", linestyle="--", label="ACC Threshold")
acc_onset = [axs[1].axvline(A_move2_filtered_df["Sync_Time (s)"][x], ls="--", c="green") for x in new_Acc_move2_onsets]
acc_offset = [axs[1].axvline(A_move2_filtered_df["Sync_Time (s)"][x], ls="--", c="black") for x in new_Acc_move2_offsets]

axs[1].set_ylabel("Acceleration (g)")
axs[1].set_title("ACC Signal - Z-axis", pad=5)
axs[1].spines[['right', 'top']].set_visible(False)

# --- Plot 3: Behavioral States ---
current_state = move2_left_behavioral[0]
start_idx = 0
for i in range(1, len(move2_left_behavioral)):
    if move2_left_behavioral[i] != current_state:
        axs[2].axvspan(time_boundaries[start_idx], time_boundaries[i],
                      facecolor=state_colors[current_state], alpha=0.7)
        current_state = move2_left_behavioral[i]
        start_idx = i
axs[2].axvspan(time_boundaries[start_idx], time_boundaries[-1],
              facecolor=state_colors[current_state], alpha=0.7)

axs[2].set_yticks([])
axs[2].set_xlabel("Time (s)")
axs[2].spines[['right', 'top', 'left']].set_visible(False)

# --- Legends ---
# EMG Legend
emg_elements = [
    Line2D([0], [0], color='b', label='Enveloped EMG Signal'),
    Line2D([0], [0], color='r', linestyle='--', label='Threshold: baseline.mean + 18*std'),
    Line2D([0], [0], color='green', linestyle='--', label='Onset'),
    Line2D([0], [0], color='black', linestyle='--', label='Offset')
]
axs[0].legend(handles=emg_elements, loc='center left', bbox_to_anchor=(0.97, 0.5), frameon=False)

# ACC Legend
acc_elements = [
    Line2D([0], [0], color='b', label='ACC Signal: RMS'),
    Line2D([0], [0], color='r', linestyle='--', label='Threshold: baseline.mean + 4*std'),
    Line2D([0], [0], color='green', linestyle='--', label='Onset'),
    Line2D([0], [0], color='black', linestyle='--', label='Offset')
]
axs[1].legend(handles=acc_elements, loc='center left', bbox_to_anchor=(0.97, 0.5), frameon=False)

# "behavioral" legend
behavior_patches = [
    Patch(facecolor=state_colors[1], label='Both Active'),
    Patch(facecolor=state_colors[2], label='No Movement'),
    Patch(facecolor=state_colors[3], label='EMG Only'),
    Patch(facecolor=state_colors[4], label='ACC Only')
]
axs[2].legend(handles=behavior_patches, loc='center left', bbox_to_anchor=(0.97, 0.5), frameon=False)

# formatting
for ax in axs:
    ax.set_xlim(0, 95)
    ax.set_xticks(np.arange(0, 100, 10))
    ax.set_xticklabels([f"{int(t)}" for t in np.arange(0, 100, 10)])
    
    # x-axis labels to show on all subplots
    ax.xaxis.set_visible(True)
    ax.xaxis.set_tick_params(labelbottom=True) 

plt.tight_layout()
plt.subplots_adjust(right=0.85, hspace=0.2)
#plt.show()
plt.savefig("../images/movement_Detec_move2_left.png", bbox_inches="tight", dpi=300)

  plt.tight_layout()
