In [1]:
import numpy as np
import pandas as pd
import scipy.stats as ss
from typing import List

# Series Processing pipeline

`TODO`

In [2]:
# Load your data
df_tmp = pd.read_parquet("data/empatica/tmp.parquet", engine='fastparquet').set_index("timestamp")
df_gsr = pd.read_parquet("data/empatica/gsr.parquet", engine='fastparquet').set_index("timestamp")
df_acc = pd.read_parquet("data/empatica/acc.parquet", engine='fastparquet').set_index("timestamp")

# !!! The data MUST datetime Indexed
assert isinstance(df_tmp.index, pd.DatetimeIndex)

print(df_tmp.head(3))   # ~4Hz
print('-'*60)
print(df_acc.head(10))  # ~32Hz
print('-'*60)
print(df_gsr.head(3))   # ~4Hz

                                         TMP
timestamp                                   
2017-06-13 14:22:13+02:00         382.209991
2017-06-13 14:22:13.250000+02:00  382.209991
2017-06-13 14:22:13.500000+02:00  382.209991
------------------------------------------------------------
                                  ACC_x  ACC_y  ACC_z
timestamp                                            
2017-06-13 14:22:13+02:00             0      5     63
2017-06-13 14:22:13.031250+02:00      0      5     63
2017-06-13 14:22:13.062500+02:00      0      5     63
2017-06-13 14:22:13.093750+02:00      0      5     63
2017-06-13 14:22:13.125000+02:00      0      5     63
2017-06-13 14:22:13.156250+02:00     -1      5     63
2017-06-13 14:22:13.187500+02:00     -1      5     63
2017-06-13 14:22:13.218750+02:00      0      5     63
2017-06-13 14:22:13.250000+02:00      0      5     63
2017-06-13 14:22:13.281250+02:00      0      5     63
------------------------------------------------------------
     

**note**: The `ACC` signal is sampled at a different sample frequency.

> So we deal with `multivariate data` of which each modality has a different sample-frequency.

# Feature extraction

In [3]:
try: 
    from tsflex.features import FeatureCollection, NumpyFuncWrapper
    from tsflex.features import FeatureDescriptor, MultipleFeatureDescriptors
except:
    import sys
    sys.path.append('../')
    from tsflex.features import FeatureCollection, NumpyFuncWrapper
    from tsflex.features import FeatureDescriptor, MultipleFeatureDescriptors

## Defining functions

**[link to docs](https://predict-idlab.github.io/tsflex/features/index.html#getting-started)**

![features uml](https://raw.githubusercontent.com/predict-idlab/tsflex/main/docs/_static/features_uml.png)

As shown above, there are 3 relevant classes for feature-extraction.

1. [FeatureCollection](https://predict-idlab.github.io/tsflex/features/#tsflex.features.FeatureCollection): serves as a registry, withholding the to-be-calculated _features_
2. [FeatureDescriptor](https://predict-idlab.github.io/tsflex/features/#tsflex.features.FeatureDescriptor): an instance of this class describes a _feature_. <br>Features are defined by:
      * `series_name`: the names of the signal(s) which this feature will use. 
      * `function`: the _Callable_ feature-function - e.g. _np.mean_
      * `window`: the _time-based_ window -  e.g. _"1hour"_
      * `stride`: the _time-based_ stride - e.g. _"2days"_
3. [NumpyFuncWrapper](https://predict-idlab.github.io/tsflex/features/#tsflex.features.NumpyFuncWrapper): a wrapper around _Callable_ functions, intended for advanced feature function definitions, such as:
    * features with multiple output columns
    * passing _**kwargs_ to feature functions


**Note**: this library does `not` provide any feature-functions as:
* There already exist many other feature extraction libraries such as numpy, scipy, tsfresh with which `tsflex` integrates.
* (Relevant) features are dependent on the objective and signals-modalites, making features methods very problem specific.
* Finally, as can be seen in the example below, our `NumpyFuncWrapper`'s `func`-attribute is versatile enough to wrap the end-user's desired features.

In [4]:
# --------------------- some custom feature extraction functions ---------------------
# -- 1. one-to-many functions
#    To compute quantiles, you need sort the windowed data, which is a rather expensive
#    operation O(n*log(n)). Hence, you might want to calculate all your desired 
#    quantiles in a single function-wrapper, returning multiple outputs.

quantiles = [0.25, 0.5, 0.75]
f_quantiles = NumpyFuncWrapper(
    func=np.quantile,  # the wrapped function that will operate on numpy arrays
    output_names=[f"quantile_{q}" for q in quantiles],  # the output column names
    q=quantiles,  # optional - additional function-related kwargs
)


# -- 2. in-line functions
#    You can define your functions locally; these will serialize flawlessly
def slope(x):
    return np.polyfit(np.arange(0, len(x)), x, 1)[0]

f_slope = NumpyFuncWrapper(slope, output_names="slope")

# -- 3. Lambda's
#    Or even use lambda's and other modules' functions
f_rms = NumpyFuncWrapper(lambda x: np.sqrt(np.mean(x ** 2)), output_names="rms")
f_area = NumpyFuncWrapper(np.sum, output_names="area")


# (For convenience) we store the constructed `NumpyFuncWrappers` in a list
segment_funcs = [
    np.mean,
    np.std,
    np.var,
    np.max,
    np.min,
    ss.skew,  # use other libraries such as scipy
    ss.kurtosis,
    f_quantiles,
    f_slope,
    f_rms,
    f_area,
]
segment_funcs

[<function numpy.mean(a, axis=None, dtype=None, out=None, keepdims=<no value>, *, where=<no value>)>,
 <function numpy.std(a, axis=None, dtype=None, out=None, ddof=0, keepdims=<no value>, *, where=<no value>)>,
 <function numpy.var(a, axis=None, dtype=None, out=None, ddof=0, keepdims=<no value>, *, where=<no value>)>,
 <function numpy.amax(a, axis=None, out=None, keepdims=<no value>, initial=<no value>, where=<no value>)>,
 <function numpy.amin(a, axis=None, out=None, keepdims=<no value>, initial=<no value>, where=<no value>)>,
 <function scipy.stats.stats.skew(a, axis=0, bias=True, nan_policy='propagate')>,
 <function scipy.stats.stats.kurtosis(a, axis=0, fisher=True, bias=True, nan_policy='propagate')>,
 NumpyFuncWrapper(quantile, ['quantile_0.25', 'quantile_0.5', 'quantile_0.75'], {'q': [0.25, 0.5, 0.75]}),
 NumpyFuncWrapper(slope, ['slope'], {}),
 NumpyFuncWrapper(<lambda>, ['rms'], {}),
 NumpyFuncWrapper(sum, ['area'], {})]

## Single series feature extraction

The defined functions above will be encapsulated in a [FeatureDescriptor](https://predict-idlab.github.io/tsflex/features/index.html#tsflex.features.FeatureDescriptor) object.

A `FeatureDescriptor` describes a feature, and has 4 main attributes:

<center>

## Featuredescriptor constructor args:

|  attribute 	|                  type                 	| info                                                                                                             	|
|-----------:	|:-------------------------------------:	|------------------------------------------------------------------------------------------------------------------	|
| `function` 	| Union[Callable, <br>NumpyFuncWrapper] 	| The `function` that calculates this feature.                                                                     	|
|      `key` 	|                 Tuple[str, ...]                	| The signal key; i.e., the `pd.DataFrame` column name or <br> `pd.Series` name on which the function will operate.     	|
|   `window` 	|                  Union[str, pd.timedelta]               	| The window size on which this feature will be applied, <br> expressed in a **time-based** manner. 	|
|   `stride` 	|                  Union[str, pd.timedelta]                  	| The stride of the window rolling process, also as <br> expressed in a **time-based** manner|

</center>


**note**: [MultipleFeatureDescriptor](https://predict-idlab.github.io/tsflex/features/index.html#tsflex.features.MultipleFeatureDescriptors) is actaully a factory for `FeatureDescriptor` objects.

### Fixed window size & stride

**note**: this functionality is exposed by most existing time-series libraries (often on a sample-based matter).

In this example, we will use the _temperature_ signal from a wearable

In [1]:
df_tmp.sample(2)

NameError: name 'df_tmp' is not defined

Note how the `TMP`-column is used as signal_key in the `FeatureCollection`

In [5]:
# Define the sample frequency and window size
tmp_feat_extr = FeatureCollection(
    feature_descriptors=[
        MultipleFeatureDescriptors(
            functions=segment_funcs,  # The list of functions we constructed earlier
            series_names=["TMP"],
            windows='60s',
            strides='30s',
        )
    ]
)

# The FeatureCollection's __repr__() gives a nice overview of the structure
tmp_feat_extr

TMP: (
	win: 1m    , stride: 30s: [
		FeatureDescriptor - func: NumpyFuncWrapper(mean, ['mean'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(std, ['std'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(var, ['var'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(amax, ['amax'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(amin, ['amin'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(skew, ['skew'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(kurtosis, ['kurtosis'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(quantile, ['quantile_0.25', 'quantile_0.5', 'quantile_0.75'], {'q': [0.25, 0.5, 0.75]}),
		FeatureDescriptor - func: NumpyFuncWrapper(slope, ['slope'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(<lambda>, ['rms'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(sum, ['area'], {}),
	]
)

In [6]:
# to extract the features we just call the collection's `calculate()` function
extracted_feats = tmp_feat_extr.calculate(
    data=df_tmp,     # The signals on which features are calculated
    return_df=True,  # If true, an outer merge on the feature-outputs will be performed
    n_jobs=2         # If > 1, the feature extraction is parallellized
)

extracted_feats.sample(2)

Unnamed: 0_level_0,TMP__mean__w=1m_s=30s,TMP__var__w=1m_s=30s,TMP__std__w=1m_s=30s,TMP__amax__w=1m_s=30s,TMP__amin__w=1m_s=30s,TMP__skew__w=1m_s=30s,TMP__kurtosis__w=1m_s=30s,TMP__quantile_0.25__w=1m_s=30s,TMP__quantile_0.5__w=1m_s=30s,TMP__quantile_0.75__w=1m_s=30s,TMP__rms__w=1m_s=30s,TMP__slope__w=1m_s=30s,TMP__area__w=1m_s=30s
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
2017-06-13 16:03:43+02:00,32.084496,0.002701,0.051975,32.150002,31.93,-1.664221,2.031371,32.085,32.110001,32.110001,32.084541,-0.000589,7700.279297
2017-06-13 14:23:13+02:00,33.346001,2062.81958,45.418274,382.209991,27.370001,7.550932,55.016679,27.389999,27.41,27.474999,56.34515,-0.146092,8003.040039


### Multiple `time-based` window sizes and strides

_In this example, we use **multiple** stride-window-size combinations on a wearables' ElectorDermal Activity (EDA)_

Note that we do not use int-based window-stride combinations, but `time-based` ones. Also take a closer look at the `__repr__` string.

In [7]:
# PoC: we will select a random combination of the window_size stride combination
window_size_s = ['30s', '120s', '90s', '1h']
stride_size_s = ['15s', '30s']

import random

gsr_feat_extr = FeatureCollection(
    [
        FeatureDescriptor(
            series_name="EDA",
            window=random.choice(window_size_s),
            stride=random.choice(stride_size_s),
            function=f,
        )
        for f in segment_funcs
    ]
)

# the __repr__ string outputs the windows & strides in a time-string representation :)
print(gsr_feat_extr)
print('-'*60)
gsr_feat_extr.calculate(df_gsr, return_df=True, show_progress=False, n_jobs=None).sample(2)

EDA: (
	win: 1h    , stride: 30s: [
		FeatureDescriptor - func: NumpyFuncWrapper(mean, ['mean'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(slope, ['slope'], {}),
	]
	win: 30s   , stride: 15s: [
		FeatureDescriptor - func: NumpyFuncWrapper(std, ['std'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(kurtosis, ['kurtosis'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(quantile, ['quantile_0.25', 'quantile_0.5', 'quantile_0.75'], {'q': [0.25, 0.5, 0.75]}),
		FeatureDescriptor - func: NumpyFuncWrapper(sum, ['area'], {}),
	]
	win: 1m30s , stride: 15s: [
		FeatureDescriptor - func: NumpyFuncWrapper(var, ['var'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(amax, ['amax'], {}),
	]
	win: 1m30s , stride: 30s: [
		FeatureDescriptor - func: NumpyFuncWrapper(amin, ['amin'], {}),
	]
	win: 2m    , stride: 30s: [
		FeatureDescriptor - func: NumpyFuncWrapper(skew, ['skew'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(<lambda>, ['rms'], {}),
	]
)

----------------------------

Unnamed: 0_level_0,EDA__amin__w=1m30s_s=30s,EDA__amax__w=1m30s_s=15s,EDA__mean__w=1h_s=30s,EDA__area__w=30s_s=15s,EDA__std__w=30s_s=15s,EDA__rms__w=2m_s=30s,EDA__var__w=1m30s_s=15s,EDA__quantile_0.25__w=30s_s=15s,EDA__quantile_0.5__w=30s_s=15s,EDA__quantile_0.75__w=30s_s=15s,EDA__skew__w=2m_s=30s,EDA__kurtosis__w=30s_s=15s,EDA__slope__w=1h_s=30s
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
2017-06-13 16:19:58+02:00,,0.919896,,106.948418,0.003712,,9e-05,0.887922,0.89048,0.894317,,-0.368449,
2017-06-13 14:33:43+02:00,0.707522,0.825185,,96.396545,0.007705,0.760067,0.000587,0.799606,0.804083,0.808879,-0.289965,1.149474,


**note**: The `NaN` values in the above pd.DataFrame are cause by the outer merge which we do to retain the time-indices. Various feature were extracted at different windows & strides, thus making few features share the same time-indices.
<br><br>
If we set `return_df=False`, a `List[pd.Series]` will be returned

In [8]:
feat_list : List[pd.Series] = gsr_feat_extr.calculate(df_gsr, return_df=False, show_progress=False, n_jobs=None)
print(len(feat_list))
feat_list[0].sample(3)

11


Unnamed: 0_level_0,EDA__amin__w=1m30s_s=30s
timestamp,Unnamed: 1_level_1
2017-06-13 16:04:43+02:00,1.297283
2017-06-13 15:40:13+02:00,1.596755
2017-06-13 15:41:43+02:00,1.875565


## Multiple series feature extraction

In [9]:
# Construct the feature FeatureCollection
#   =  higher order wrapper which aggregates the featuredescriptions
multimodal_feature_extraction = FeatureCollection(
    feature_descriptors=[gsr_feat_extr, tmp_feat_extr]
)

print(multimodal_feature_extraction)
print('-'*60)

df_feat = multimodal_feature_extraction.calculate(
    [df_gsr, df_tmp], return_df=True
)
df_feat.sample(2)

EDA: (
	win: 1h    , stride: 30s: [
		FeatureDescriptor - func: NumpyFuncWrapper(mean, ['mean'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(slope, ['slope'], {}),
	]
	win: 30s   , stride: 15s: [
		FeatureDescriptor - func: NumpyFuncWrapper(std, ['std'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(kurtosis, ['kurtosis'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(quantile, ['quantile_0.25', 'quantile_0.5', 'quantile_0.75'], {'q': [0.25, 0.5, 0.75]}),
		FeatureDescriptor - func: NumpyFuncWrapper(sum, ['area'], {}),
	]
	win: 1m30s , stride: 15s: [
		FeatureDescriptor - func: NumpyFuncWrapper(var, ['var'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(amax, ['amax'], {}),
	]
	win: 1m30s , stride: 30s: [
		FeatureDescriptor - func: NumpyFuncWrapper(amin, ['amin'], {}),
	]
	win: 2m    , stride: 30s: [
		FeatureDescriptor - func: NumpyFuncWrapper(skew, ['skew'], {}),
		FeatureDescriptor - func: NumpyFuncWrapper(<lambda>, ['rms'], {}),
	]
)
TMP: (
	win: 1m    , stride: 

Unnamed: 0_level_0,EDA__mean__w=1h_s=30s,EDA__amin__w=1m30s_s=30s,EDA__amax__w=1m30s_s=15s,EDA__area__w=30s_s=15s,TMP__mean__w=1m_s=30s,TMP__var__w=1m_s=30s,TMP__amin__w=1m_s=30s,TMP__amax__w=1m_s=30s,EDA__var__w=1m30s_s=15s,EDA__rms__w=2m_s=30s,...,EDA__quantile_0.75__w=30s_s=15s,TMP__quantile_0.25__w=1m_s=30s,TMP__quantile_0.5__w=1m_s=30s,TMP__quantile_0.75__w=1m_s=30s,TMP__slope__w=1m_s=30s,TMP__kurtosis__w=1m_s=30s,TMP__skew__w=1m_s=30s,EDA__kurtosis__w=30s_s=15s,EDA__skew__w=2m_s=30s,EDA__slope__w=1h_s=30s
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2017-06-13 14:37:58+02:00,,,1.074579,111.493195,,,,,0.006337,,...,0.937732,,,,,,,3.663326,,
2017-06-13 14:39:28+02:00,,,1.207589,108.660507,,,,,0.00111,,...,0.92757,,,,,,,14.081461,,


## Logging

`TODO`

## Use case: batch-based feature extraction

In [10]:
from tsflex.chunking import chunk_data

* maybe execute this on a highdimensional series, like the `sleep data`

In [11]:
same_range_chunks = chunk_data(
    data=[df_tmp],
    fs_dict={"EDA": 4, "TMP": 4},
    max_chunk_dur='10min'
)

## Serialization

Serialization is mandatory to store and share your pipelines.
`TODO`

In [12]:
multimodal_feature_extraction.serialize("data/example_serialization.pkl")

# Bonus - Get LAYD: Look At Your Data

And as a bonus, for running/reading this notebook, you get some nice visualization code, for
ofcourse time-series.

In [13]:
# !pip install ipywidgets
# !pip install plotly
import ipywidgets as widgets
import plotly.graph_objects as go
from ipywidgets import interact_manual
from plotly.subplots import make_subplots

In [14]:
df_dict = {"tmp": df_tmp, "gsr": df_gsr}
feat_widget = widgets.SelectMultiple(options=df_feat.columns)
sig_widget = widgets.SelectMultiple(options=["gsr", "tmp"])


@interact_manual
def visuzalize(features=feat_widget, signals=sig_widget):
    row_titles = list(signals) + ["features"] if len(features) else []
    fig = make_subplots(
        rows=len(row_titles),
        cols=1,
        shared_xaxes=True,
        vertical_spacing=0.1 / len(row_titles),
        row_titles=row_titles,
    )
    fig.update_layout(height=300 * len(row_titles))

    # first, visualize the "raw" signals
    row_idx = 1
    for sig in signals:
        df_sig = df_dict[sig][10:].resample("1s").mean()
        for col in set(df_sig.columns).difference(["index", "timestamp"]):
            fig.add_trace(
                go.Scattergl(x=df_sig.index, y=df_sig[col], name=col, hoverinfo="skip"),
                row=row_idx,
                col=1,
            )
        row_idx += 1

    # then visualize the features
    for feature in features:
        df_ff = df_feat[feature].dropna()
        fig.add_trace(
            go.Scattergl(
                connectgaps=True,
                x=df_ff.index,
                y=df_ff,
                name=feature,
                hoverinfo="skip",
                mode="markers",
                showlegend=True,
            ),
            row=row_idx,
            col=1,
        )

    return fig.show()

interactive(children=(SelectMultiple(description='features', options=('EDA__mean__w=1h_s=30s', 'EDA__amin__w=1…