# Processing Signals with Pipelines

Now that we have identified and/or generated several primitives for our signal feature generation, we would like to define a reusable *pipeline* for doing so. 

First, let's import the required libraries and functions.


In [1]:
import sigpro
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from sigpro.demo import _load_demo as get_demo
from sigpro.demo import get_demo_data


## Defining Primitives

Recall that we can obtain the list of available primitives with the `get_primitives` method:


In [2]:
from sigpro import get_primitives

get_primitives()

['sigpro.SigPro',
 'sigpro.aggregations.amplitude.statistical.crest_factor',
 'sigpro.aggregations.amplitude.statistical.kurtosis',
 'sigpro.aggregations.amplitude.statistical.mean',
 'sigpro.aggregations.amplitude.statistical.rms',
 'sigpro.aggregations.amplitude.statistical.skew',
 'sigpro.aggregations.amplitude.statistical.std',
 'sigpro.aggregations.amplitude.statistical.var',
 'sigpro.aggregations.frequency.band.band_mean',
 'sigpro.transformations.amplitude.identity.identity',
 'sigpro.transformations.amplitude.spectrum.power_spectrum',
 'sigpro.transformations.frequency.band.frequency_band',
 'sigpro.transformations.frequency.fft.fft',
 'sigpro.transformations.frequency.fft.fft_real',
 'sigpro.transformations.frequency_time.stft.stft',
 'sigpro.transformations.frequency_time.stft.stft_real']

In addition, we can also define our own custom primitives.

## Building a Pipeline

Let’s go ahead and define a feature processing pipeline that sequentially applies the `identity`and `fft` transformations before applying the `std` aggregation. To pass these primitives into the signal processor, we must write each primitive as a dictionary with the following fields:

- `name`: Name of the transformation / aggregation.
- `primitive`: Name of the primitive to apply.
- `init_params`: Dictionary containing the initializing parameters for the primitive. *

Since we choose not to specify any initial parameters, we do not set `init_params` in these dictionaries.

In [3]:
identity_transform = {'name': 'identity1',
            'primitive': 'sigpro.transformations.amplitude.identity.identity'}

fft_transform =  {'name': 'fft1',
            'primitive': 'sigpro.transformations.frequency.fft.fft'}

std_agg = {'name': 'std1',
            'primitive': "sigpro.aggregations.amplitude.statistical.std"}


We now define a new pipeline containing  the primitives we would like to apply. At minimum, we will need to pass in a list of transformations and a list of aggregations; the full list of available arguments is given below.

- Inputs:
    - `transformations (list)` : List of dictionaries containing the transformation primitives.
    - `aggregations (list)`:  List of dictionaries containing the aggregation primitives.
    - `values_column_name (str)`(optional):The name of the column that contains the signal values. Defaults to `'values'`.
    - `keep_columns (Union[bool, list])`  (optional): Whether to keep non-feature columns in the output DataFrame or not. If a list of column names are passed, those columns are kept. Defaults to `False`.
    - `input_is_dataframe (bool)` (optional): Whether the input is a pandas Dataframe. Defaults to `True`.

Returning to the example:

In [4]:
transformations = [identity_transform, fft_transform]

aggregations = [std_agg]

mypipeline = sigpro.SigPro(transformations, aggregations, values_column_name = 'yvalues', keep_columns = True)


SigPro will proceed to build an `MLPipeline` that can be reused to build features.

To check that `mypipeline` was defined correctly, we can check the input and output arguments with the `get_input_args` and `get_output_args` methods.

In [5]:
input_args = mypipeline.get_input_args()
output_args = mypipeline.get_output_args()

print(input_args)
print(output_args)

[{'name': 'readings', 'keyword': 'data', 'type': 'pandas.DataFrame'}, {'name': 'feature_columns', 'default': None, 'type': 'list'}]
[{'name': 'readings', 'type': 'pandas.DataFrame'}, {'name': 'feature_columns', 'type': 'list'}]


## Applying a Pipeline with `process_signal`

Once our pipeline is correctly defined, we apply the `process_signal` method to a demo dataset. Recall that `process_signal` is defined as follows:


```python
def process_signal(self, data=None, window=None, time_index=None, groupby_index=None,
                       feature_columns=None, **kwargs):

		...
		return data, feature_columns
```

`process_signal` accepts as input the following arguments:

- `data (pd.Dataframe)` : Dataframe with a column containing signal values.
- `window (str)`: Duration of window size, e.g. ('1h').
- `time_index (str)`: Name of column in `data` that represents the time index.
- `groupby_index (str or list[str])`: List of column names to group together and take the window over.
- `feature_columns (list)`: List of columns from the input data that should be considered as features (and not dropped).

`process_signal` outputs the following:

- `data (pd.Dataframe)`: Dataframe containing output feature values as constructed from the signal
- `feature_columns (list)`: list of (generated) feature names.

We now apply our pipeline to a toy dataset in the `xvalues`, `yvalues` format. We will define our toy dataset as follows. 

In [6]:
demo_dataset = get_demo_data()
demo_dataset['xvalues'] = demo_dataset['timestamp'].copy()
demo_dataset['yvalues'] = demo_dataset['values'].copy()
demo_dataset = (demo_dataset.set_index('timestamp').resample(rule = '60T').apply(lambda x: x.to_list())).reset_index()
demo_dataset[['turbine_id', 'signal_id', 'sampling_frequency']] = demo_dataset[['turbine_id', 'signal_id', 'sampling_frequency']].apply(lambda x: x[0])
demo_dataset = demo_dataset[['turbine_id', 'signal_id', 'xvalues', 'yvalues', 'sampling_frequency']]
demo_dataset.head()

Unnamed: 0,turbine_id,signal_id,xvalues,yvalues,sampling_frequency
0,T001,Sensor1_signal1,"[2020-01-01 00:00:00, 2020-01-01 00:00:01, 202...","[0.43616983763682876, -0.17662312586241055, 0....",1000.0
1,T001,Sensor1_signal1,"[2020-01-01 01:00:00, 2020-01-01 01:00:01, 202...","[0.8023828754411122, -0.14122063493312714, -0....",1000.0
2,T001,Sensor1_signal1,"[2020-01-01 02:00:00, 2020-01-01 02:00:01, 202...","[-1.3143142430046044, -1.1055740033788437, -0....",1000.0
3,T001,Sensor1_signal1,"[2020-01-01 03:00:00, 2020-01-01 03:00:01, 202...","[-0.45981995520032104, -0.3255426061995603, -0...",1000.0
4,T001,Sensor1_signal1,"[2020-01-01 04:00:00, 2020-01-01 04:00:01, 202...","[-0.6380405111460377, -0.11924167777027689, 0....",1000.0


Finally, we apply the `process_signal` method of our previously defined pipeline:

In [7]:
processed_data, feature_columns = mypipeline.process_signal(demo_dataset, time_index = 'xvalues')

processed_data.head()


Unnamed: 0,turbine_id,signal_id,xvalues,yvalues,sampling_frequency,identity1.fft1.std1.std_value
0,T001,Sensor1_signal1,"[2020-01-01 00:00:00, 2020-01-01 00:00:01, 202...","[0.43616983763682876, -0.17662312586241055, 0....",1000.0,14.444991
1,T001,Sensor1_signal1,"[2020-01-01 01:00:00, 2020-01-01 01:00:01, 202...","[0.8023828754411122, -0.14122063493312714, -0....",1000.0,12.326223
2,T001,Sensor1_signal1,"[2020-01-01 02:00:00, 2020-01-01 02:00:01, 202...","[-1.3143142430046044, -1.1055740033788437, -0....",1000.0,12.051415
3,T001,Sensor1_signal1,"[2020-01-01 03:00:00, 2020-01-01 03:00:01, 202...","[-0.45981995520032104, -0.3255426061995603, -0...",1000.0,10.657243
4,T001,Sensor1_signal1,"[2020-01-01 04:00:00, 2020-01-01 04:00:01, 202...","[-0.6380405111460377, -0.11924167777027689, 0....",1000.0,12.640728



Success! We have managed to apply the primitives to generate features on the input dataset.
