In [25]:
import sys
import pandas as pd
import numpy as np

sys.path.append("../")
import biobss
import matplotlib.pyplot as plt
import neurokit2 as nk
from plotly_resampler import register_plotly_resampler

In [26]:
import importlib
import sys
import pandas as pd
import numpy as np

sys.path.append("../")
import biobss
import os
os.getcwd()
import neurokit2 as nk
sample_data = biobss.utils.load_sample_data("ECG")

In [27]:
sr=256

In [28]:
sample=np.array(sample_data[0]).flatten()

In [29]:
clean=biobss.pipeline.Bio_Process(
    process_method=biobss.ecgtools.filter_ecg,process_name="clean",argmap={"sampling_rate":"sampling_rate"},method="pantompkins")

find_peaks = biobss.pipeline.Bio_Process(process_method=biobss.ecgtools.ecg_peaks,
                                         process_name="rpeaks",returns_event=True,event_args={"event_name":"ECG_Peaks"})


pipeline = biobss.pipeline.Bio_Pipeline()

pipeline.process_queue.add_process(clean,input_signals="ECG_Raw",output_signals="ECG_Clean")
pipeline.process_queue.add_process(find_peaks,input_signals="ECG_Clean",output_signals="ECG_Peaks")

channel=biobss.pipeline.Bio_Channel(signal=sample,name="ECG_Raw",sampling_rate=sr)
pipeline.set_input(channel)


In [30]:
pipeline

Bio_Pipeline:
	Preprocessors: Process list:

	Processors: Process list:
	1: clean(ECG_Raw) -> ECG_Clean
	2: rpeaks(ECG_Clean) -> ECG_Peaks

	Postprocessors: Process list:

	Window Size(Seconds): Not Windowed
	Step Size: Not Windowed

In [31]:
# Create feature extraction steps

# Extract time domain features. input_signals dictionary contains feature prefixes as keys and input signals as values
# For example rms of EDA_Tonic is extracted as Tonic_rms in this case
Rpeak_features = biobss.pipeline.Feature(name="time_features", function=biobss.ecgtools.from_Rpeaks, sampling_rate=sr)
stat_features = biobss.pipeline.Feature(name="stat_features", function=biobss.edatools.get_stat_features)

In [32]:
pipeline.run_pipeline()

In [33]:
pipeline.add_feature_step(Rpeak_features,input_signals=['ECG_Raw','Events_ECG_Clean'])
pipeline.add_feature_step(stat_features,input_signals=['ECG_Clean'],prefix="ECG")

In [34]:
pipeline.extract_features()

In [37]:
pipeline.features

Unnamed: 0,ecg_a_R,ecg_RR0,ecg_RR1,ecg_RR2,ecg_RRm,ecg_RR_0_1,ecg_RR_2_1,ecg_RR_m_1,ECG_mean,ECG_std,ECG_max,ECG_min,ECG_range,ECG_kurtosis,ECG_skew,ECG_momentum
0,"[0.093, 0.6484375, 0.6640625, -9.06640625, -2....","[0.372, 0.6640625, -9.06640625, 0.6484375, -2....","[0.454, -9.06640625, 0.6484375, 0.63671875, -2...","[0.125, 0.6484375, 0.63671875, 0.640625, 0.641...","[0.249, 0.63671875, 0.640625, 0.671875, 0.6497...","[0.134, 0.640625, 0.671875, 0.65625, 0.65625, ...","[0.378, 0.671875, 0.65625, 0.6484375, 0.658854...","[0.389, 0.65625, 0.6484375, 0.63671875, 0.6471...",0.000141,0.052429,0.221432,-0.133314,0.354746,1.942784,0.707288,0.002749


In [11]:
ex=pipeline.data['ECG_Clean'].channel

In [12]:
pipeline

Bio_Pipeline:
	Preprocessors: Process list:

	Processors: Process list:
	1: clean(ECG_Raw) -> ECG_Clean
	2: rpeaks(ECG_Clean) -> ECG_Peaks

	Postprocessors: Process list:

	Window Size(Seconds): Not Windowed
	Step Size: Not Windowed

In [13]:
pk = pipeline.data['Events_ECG_Clean'].channel

In [14]:
tst=biobss.edatools.eda_statistical.get_stat_features(ex,prefix='test')

In [26]:
tst['test_mean']

0.00010641522711905394

In [28]:
f=lambda x: np.mean(x)

In [83]:
def foo(*args,**kwargs):
    print(args)
    print(kwargs)

In [84]:
foo()

()
{}


In [20]:
res=biobss.ecgtools.from_Rpeaks(ex,pk,256)

In [28]:
res

{0: {'ecg_a_R': 0.14941537935018082,
  'ecg_RR0': 0.6484375,
  'ecg_RR1': 0.6640625,
  'ecg_RR2': -9.06640625,
  'ecg_RRm': -2.5846354166666665,
  'ecg_RR_0_1': 0.9764705882352941,
  'ecg_RR_2_1': -13.652941176470588,
  'ecg_RR_m_1': -3.892156862745098},
 1: {'ecg_a_R': 0.1990699821912972,
  'ecg_RR0': 0.6640625,
  'ecg_RR1': -9.06640625,
  'ecg_RR2': 0.6484375,
  'ecg_RRm': -2.5846354166666665,
  'ecg_RR_0_1': -0.07324429125376992,
  'ecg_RR_2_1': -0.07152089616544594,
  'ecg_RR_m_1': 0.2850782708602614},
 2: {'ecg_a_R': 0.20239536323008903,
  'ecg_RR0': -9.06640625,
  'ecg_RR1': 0.6484375,
  'ecg_RR2': 0.63671875,
  'ecg_RRm': -2.59375,
  'ecg_RR_0_1': -13.981927710843374,
  'ecg_RR_2_1': 0.9819277108433735,
  'ecg_RR_m_1': -4.0},
 3: {'ecg_a_R': 0.1546145352183906,
  'ecg_RR0': 0.6484375,
  'ecg_RR1': 0.63671875,
  'ecg_RR2': 0.640625,
  'ecg_RRm': 0.6419270833333334,
  'ecg_RR_0_1': 1.01840490797546,
  'ecg_RR_2_1': 1.0061349693251533,
  'ecg_RR_m_1': 1.0081799591002045},
 4: {'ecg

In [25]:
import re 

In [26]:
def mytype(v):
    s = str(v)
    regex = re.compile(r'(?P<list>\[[^]]+\])|(?P<float>\d*\.\d+)|(?P<int>\d+)|(?P<string>[a-zA-Z]+)')
    return  r"<type '%s'>" % regex.search(s).lastgroup

In [29]:
mytype(res)

"<type 'int'>"

In [None]:
def mytype(data):
    retun 