In [None]:
import pyecap
import os
from pathlib import Path
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import scipy.io as sio
from scipy.signal import find_peaks
from fnirs_functions import *

def filter_hr(data, cutoff = 10, downsample = False, downsample_factor = 10, check_plot=False):

    raw_ecg = data.array.compute()[0] * 1e6

    data_f = data.filter_gaussian(Wn=cutoff, btype='lowpass')
    ecg_f = data_f.array.compute()[0] * 1e6

    time = data.time().compute()

    if downsample:
        ds_ECGr = signal.decimate(raw_ecg, downsample_factor, zero_phase = True)
        ds_time = signal.decimate(time, downsample_factor, zero_phase = True)
        ds_ECGf = signal.decimate(ecg_f, downsample_factor, zero_phase = True)

        """Check alignment of filtered and downsampled signal against original"""
        if check_plot:
            fig = make_subplots(specs = [[{'secondary_y' : True}]])
            fig.add_trace(go.Scatter(x = ds_time,y = ds_ECGr, name = 'raw'))
            fig.add_trace(go.Scatter(x = ds_time,y = ds_ECGf, name = 'Filtered'), secondary_y = True)
            fig.show()

        d = ds_ECGf
        t = ds_time

    else:
        d = ecg_f
        t = time

        """Check alignment of filtered and downsampled signal against original"""
        if check_plot:
            fig = make_subplots(specs = [[{'secondary_y' : True}]])
            fig.add_trace(go.Scatter(x = time,y = raw_ecg, name = 'raw'))
            fig.add_trace(go.Scatter(x = time,y = ecg_f, name = 'Filtered'), secondary_y = True)
            fig.show()

    data_array = np.vstack((d,t))
    return data_array

def calc_hr(data, fs, peak_height, check_plot = False):
    """Calculate ECG"""
    d = data[0]
    time = data[1]
    peaks = signal.find_peaks(d, height = peak_height, distance = fs * 0.25)[0]
    peaks = peaks[1:-1]

    """Get distance (time) between peaks and calculate heart rate"""
    peak_dt = np.diff(peaks) / fs #Time between peaks in seconds

    idx = [ int((peaks[i] + peaks[i + 1]) / 2) for i in np.arange(len(peaks) - 1)]
    peak_time = time[idx]
    bpm = 60 / peak_dt #Instantaneous Heart rate in BPM based on peak_dt

    #Construct ECG dataframe?
    ecg_dct = {'peak_idx' : idx,
               'peak_dt' : peak_dt,
               'Time (s)' : peak_time,
               'b2b bpm' : bpm}

    ecgDF = pd.DataFrame(ecg_dct)
    ecgDF['smooth bpm'] = ecgDF['b2b bpm'].rolling(5).mean()

    if check_plot:
        #fig = go.Figure()
        fig = make_subplots(specs = [[{"secondary_y" : True}]])
        fig.add_trace(go.Scatter(x = time[peaks], y = d[peaks], mode = 'markers'), secondary_y = True)
        fig.add_trace(go.Scatter(x = time,y = d, name = 'ECG Trace'), secondary_y = True)
        fig.add_trace(go.Scatter(x = ecgDF['Time (s)'], y = ecgDF['b2b bpm'], name = 'HR (bpm)'))#, secondary_y=True)
        fig.show()

    return ecgDF

In [None]:
meta_index = 4
metaDF = pd.read_excel(r'D:\Data\TDT_fNIRs\20260213_fNIRs_QC\metadata.xlsx')
tank = r'D:\Data\TDT_fNIRs\20260213_fNIRs_QC\CVP_SingleCh_Ramp-260213\\' + metaDF.at[meta_index, 'Tank']
flex_folder = metaDF.loc[meta_index, 'fNIRs Folder'] + '.mat'
fd = flexNIRs(flex_folder)

ecg_data = pyecap.Ephys(tank, stores = 'ECGG')
ecg_data = ecg_data.remove_ch(channels=['ECGG 2','ECGG 3', 'ECGG 4'])

stim = pyecap.Stim(tank)
stimDF = stim.parameters

In [None]:
"""Align fNIRs Artifact"""
alignDF = stimDF.loc[stimDF['pulse amplitude (Î¼A)'] < 0]
fd.manual_alignment(stimDF = alignDF, stim_start_index = int(60*266.6))
fd.plot_artifact(channel = 'D1 Ambient', show_stim = True)

In [None]:
"""Performs short-channel regression to generate SSR-DF"""
data_type = 'Mua_filt' # Data type of original data to run SSR regression on
fd.ssr_regression(data_type = data_type)
fd.ssr_plot(data_type, channel = 'D1 LL IR', show_stim=True, show_ss=True)

In [None]:
"""Perform HR calculations"""
#fNIRs HR
fd.calc_hr(channel = 'SS Red', filter_cutoffs=(0.5,15),peak_height = 2000, check_plot = False)

#Filter TDT ECG without Down-sampling
tdt_ecg = filter_hr(ecg_data, cutoff = 10, downsample=False, check_plot=False)
ecgDF = calc_hr(tdt_ecg, peak_height = 250, fs = ecg_data.sample_rate)

#Filter and down sample TDT ECG data for Plotting
DS_factor = 10
tdt_ecg_DS = filter_hr(ecg_data, cutoff = 10, downsample=True, downsample_factor = DS_factor, check_plot=False)
ecgDF_DS = calc_hr(tdt_ecg_DS, peak_height = 250, fs = ecg_data.sample_rate / DS_factor)

In [None]:
"""Plot ECG"""
fig = go.Figure()
fig.add_trace(go.Scatter(x = ecgDF['Time (s)'], y = ecgDF['b2b bpm'], name = 'TDT HR'))
fig.add_trace(go.Scatter(x = fd.ecgDF['Time (s)'], y = fd.ecgDF['b2b bpm'],name = 'fNIRs HR'))
#fig.add_trace(go.Scatter(x = ecgDF_DS['Time (s)'], y = ecgDF_DS['b2b bpm'], name = 'downsampled'))
for param in stimDF.index:
        fig.add_vrect(x0=fd.stimDF.loc[param]['onset time (s)'],
                  x1=fd.stimDF.loc[param]['offset time (s)'])
fig.show()

In [None]:
fd.plot_channel_interactive(data_type = 'SSR_filt', channel = 'D1 LL Red SSR', show_stim =True, show_hr=True, hr_chan='smooth')

In [None]:
data_type = 'Mua_filt' # Data type of original data
fd.ssr_regression(data_type = data_type)
#fd.ssr_plot(data_type, channel = 'D1 LL Red', show_stim=True, show_ss=True)

In [None]:
"""Plot for data presentation
Plots of interest:
HbO/HbR or each channel with different stims
Priority: HbT of each channel of different stim parameters overlaid on top of each to assess differences in stim amplitude effects?
Plot showing effect of SSR regression data
Add HR as a secondary plot?
"""
fd.plot_channel(data_type = 'SSR_filt', channel = 'D1 LL', pre_time =5, post_time = 5, plot_style='Full', fig_size =(4,10), zero_shift = True)