# Graphical Pipelines

### Content of this Notebook:
* Understanding what are graphical pipelines
* Understanding the API of graphical pipelines
* Examples of simple pipelines and how they can be implemented with graphical pipelines.
* More complex graphical pipeline (Forecasting + GridSearch)
* Grid search with a graphical pipeline



### What are Graphical Pipelines?
Recap sequential pipelines:

<img src="img/sequential_pipeline.png" width=750 />

Many tasks are non-sequential. To solve this two possibilities exist:
1. Nesting Sequential Pipelines.
2. Using Graphical Pipelines.


Thus, there is the generalised graphial pipeline.
* Graphical means that different steps may share the same predecessor or provide their outputs to the same successor (the dataflows can branch and merge).
<img src="img/graphical_pipeline.png" width=750 />


* Generalised means that the pipeline can be used for multiple tasks (e.g. forecasting, classification, ...).

**Note**

The graphical pipeline is a new feature, Thus, if you are considering any issues, we would be happy to get feedback on the graphical pipeline.


### Potential Use-Cases
There exist various potential use-case for the graphical pipeline. In the following, we focus on a forecasting and a classification pipeline.
#### Forecasting Use-Case for Graphical Pipelines


The input of forecasters depends on the output of other forecasters, which same the same input.
* Forecaster could use the same preprocessing (branching of data flow)
* Forecaster could use outputs of multiple predeccessors (merging of data flow)

<img src="img/graphical_pipeline_example.png" width=900 />


**Note:** The current experimental state of the graphical pipeline does not fully support this use-case. However, we are working on this. If you are interested in this use-case and want to contribute, please contact us.

### Credits
The graphical pipeline was first developed by pyWATTS [1] and was then adapted for sktime. The original implementation can be found [pyWATTS](https://github.com/KIT-IAI/pyWATTS). pyWATTS is a open source library developed at the Institute of Applied Informatics and Automation at the KIT and funded by HelmholtzAI.

> [1] Heidrich, Benedikt, et al. "pyWATTS: Python workflow automation tool for time series." arXiv preprint arXiv:2106.10157 (2021).


**Note:** The current experimental state of the graphical pipeline does not fully support this use-case. However, we are working on this. If you are interested in this use-case and want to contribute, please contact us.



## How to build a Graphical Pipeline

Let us first visualise a simple forecasting pipeline, we want to construct: 


<img src="img/forecasting_pipeline.png" width=750 />


Then we are having to ways on how to construct this pipeline with the graphical pipeline

1. Pass all steps to the pipeline during initialisation as for the sequential pipelines.



In [1]:
from sktime.forecasting.sarimax import SARIMAX
from sktime.pipeline.pipeline import Pipeline
from sktime.transformations.series.difference import Differencer

differencer = Differencer()

general_pipeline = Pipeline(
    [
        {"skobject": differencer, "name": "differencer", "edges": {"X": "y"}},
        {
            "skobject": SARIMAX(),
            "name": "sarimax",
            "edges": {"X": "X", "y": "differencer"},
        },
        {
            "skobject": differencer,
            "name": "differencer_inv",
            "edges": {"X": "sarimax"},
            "method": "inverse_transform",
        },
    ]
)



2. Create a pipeline object and add the steps one by one.




In [2]:

general_pipeline = Pipeline()
differencer = Differencer()

general_pipeline = general_pipeline.add_step(
    differencer, "differencer", edges={"X": "y"}
)
general_pipeline = general_pipeline.add_step(
    SARIMAX(), "sarimax", edges={"X": "X", "y": "differencer"}
)
general_pipeline = general_pipeline.add_step(
    differencer, "differencer_inv", edges={"X": "sarimax"}, method="inverse_transform"
)




## Explanation of the parameters

The `add_step`'s parameter or key of the dicts in the step list during initialisation are:

* skobject: The sktime object added to the pipeline
* name: The name of the step
* edges: The keys of the dictionary indicate the input of the skobject (X or y), and the values are the names of the steps that should be connected to the input argument. Note subsetting using `__` and feature union via lists are supported.
* method: The skobject's method that should be called. If not provided, the default method would be inferred based on the added skobject. This parameter is used for the inverse_transform method. Optional.
* kwargs: Additional keyword arguments passed to the sktime object. Optional.


Now let us fit the pipeline and make a prediction

In [3]:
from sktime.datasets import load_longley
from sktime.forecasting.model_selection import temporal_train_test_split

y, X = load_longley()
y_train, y_test, X_train, X_test = temporal_train_test_split(y, X)

general_pipeline.fit(y=y_train, X=X_train, fh=[1, 2, 3, 4])
general_pipeline.predict(X=X_test)

1959    67213.735362
1960    68328.076310
1961    68737.861398
1962    71322.894026
Freq: A-DEC, Name: TOTEMP, dtype: float64

## Further Examples


### Classification Pipeline
A simple classification pipeline implemented using the graphical pipeline.

In [4]:
from sktime.classification.distance_based import KNeighborsTimeSeriesClassifier
from sktime.transformations.series.exponent import ExponentTransformer

general_pipeline = Pipeline()
general_pipeline = general_pipeline.add_step(
    ExponentTransformer(), "exponent", edges={"X": "X"}
)
general_pipeline = general_pipeline.add_step(
    KNeighborsTimeSeriesClassifier(), "classifier", edges={"X": "exponent", "y": "y"}
)



Or alternatively defined using the constructor API.

In [5]:
general_pipeline = Pipeline(
    [
        {"skobject": ExponentTransformer(), "name": "exponent", "edges": {"X": "X"}},
        {
            "skobject": KNeighborsTimeSeriesClassifier(),
            "name": "classifier",
            "edges": {"X": "exponent", "y": "y"},
        },
    ]
)

This pipeline can be visualised as follows:

<img src="img/classification_pipeline.png" width=750 />


In [6]:
from sktime.datasets import load_arrow_head

X, y = load_arrow_head(split="train", return_X_y=True)
general_pipeline.fit(X=X, y=y)
general_pipeline.predict(X=X)

array(['0', '1', '2', '0', '1', '2', '0', '1', '2', '0', '1', '2', '0',
       '1', '2', '0', '1', '2', '0', '1', '2', '0', '1', '2', '0', '1',
       '2', '0', '1', '2', '0', '1', '2', '0', '1', '2'], dtype='<U1')

## A More Complex Example

The considered use-case is to forecast the inflation using forecasts of the real gross domestic product, real disposable personal income, and the unemployment rate. Furthermore the unemployment rate is forecasted using the same features except the unemployment rate itself.

<img src="img/graphical_pipeline_example.png" width=750 />


The data is taken from the macrodata dataset from the statsmodels package.


**Note** We stick with the add_step in the following.

Create Graphical Pipeline Instance

In [7]:
pipe = Pipeline()
pipe.set_config(warnings="off")




Add Preprocessing


In [8]:
from sklearn.preprocessing import StandardScaler

from sktime.transformations.series.adapt import TabularToSeriesAdaptor
from sktime.transformations.series.detrend import Deseasonalizer

pipe = pipe.add_step(
    TabularToSeriesAdaptor(StandardScaler()),
    name="scaler",
    edges={"X": "X__realgdp_realdpi_unemp"},
)
pipe = pipe.add_step(
    Deseasonalizer(sp=4), name="deseasonalizer", edges={"X": "X__realgdp_realdpi"}
)

Add forecastesr for GDP and DPI

In [9]:
from sklearn.linear_model import Lasso, Ridge

from sktime.forecasting.compose import make_reduction

pipe = pipe.add_step(
    make_reduction(Ridge(), windows_identical=False, window_length=5),
    name="forecaster_gdp",
    edges={"y": "deseasonalizer__realgdp"},
)

pipe = pipe.add_step(
    make_reduction(Ridge(), windows_identical=False, window_length=5),
    name="forecaster_dpi",
    edges={"y": "deseasonalizer__realdpi"},
)

Add Forecaster for unemployment rate that depends on forecasts of GDP and DPI

In [10]:
pipe = pipe.add_step(
    make_reduction(Ridge(), windows_identical=False, window_length=5),
    name="forecaster_unemp",
    edges={
        "y": "scaler__unemp",
        "X": [
            "forecaster_gdp",
            "forecaster_dpi",
        ],
    },
)

Add forecaster for the inflation that depends on forecasted DPI and unemployment rate



In [11]:
pipe = pipe.add_step(
    make_reduction(Ridge(), windows_identical=False, window_length=5),
    name="forecaster_inflation",
    edges={"X": ["forecaster_dpi", "forecaster_unemp"], "y": "y"},
)

Load data and split them into train and test

In [12]:
from sktime.datasets import load_macroeconomic
from sktime.forecasting.base import ForecastingHorizon

data = load_macroeconomic()

X = data[["realgdp", "realdpi", "unemp"]]
y = data[["infl"]]
fh = ForecastingHorizon([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12])

y_train, y_test, X_train, X_test = temporal_train_test_split(y, X=X, fh=fh)
X_train

Unnamed: 0_level_0,realgdp,realdpi,unemp
Period,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1959Q1,2710.349,1886.9,5.8
1959Q2,2778.801,1919.7,5.1
1959Q3,2775.488,1916.4,5.3
1959Q4,2785.204,1931.3,5.6
1960Q1,2847.699,1955.5,5.2
...,...,...,...
2005Q3,12683.153,9308.0,5.0
2005Q4,12748.699,9358.7,4.9
2006Q1,12915.938,9533.8,4.7
2006Q2,12962.462,9617.3,4.7


In [13]:
pipe.fit(y=y_train, X=X_train, fh=fh)
result = pipe.predict(X=None, fh=y_test.index)
result

  input_data[step_name] = pd.concat(
  input_data[step_name] = pd.concat(


Unnamed: 0_level_0,infl
Period,Unnamed: 1_level_1
2006Q4,3.090428
2007Q1,1.676421
2007Q2,0.219586
2007Q3,1.570087
2007Q4,0.350137
2008Q1,0.438966
2008Q2,0.615457
2008Q3,0.119022
2008Q4,0.257887
2009Q1,0.129785


In [14]:
((result - y_test) ** 2).mean()

infl    20.103326
dtype: float64

#### Grid Search with graphical pipeline

This pipeline has multiple parameters that might be tested to find the configurations. These parameters include:
* which forecaster should be used for which variable -> `MultiplexForecaster`
* what should be the hyperparameters of the forecaster
* which features should be used for the different forecasters -> Tune the edges of the graphical pipeline!

<img src="img/graphical_pipeline_example_grid.png" width=900 />

Since we do forecasting, we use the ForecastingGridSearchCV.

1. Create blue print of the pipeline


In [15]:
from sktime.forecasting.compose import MultiplexForecaster

pipe = Pipeline()
sklearn_scaler = StandardScaler()
sktime_scaler = TabularToSeriesAdaptor(sklearn_scaler)
deseasonalizer = Deseasonalizer(sp=4)

pipe = pipe.add_step(
    sktime_scaler, name="scaler", edges={"X": "X__realgdp_realdpi_unemp"}
)
pipe = pipe.add_step(
    deseasonalizer, name="deseasonalizer", edges={"X": "X__realgdp_realdpi"}
)

pipe = pipe.add_step(
    MultiplexForecaster(
        [
            (
                "ridge",
                make_reduction(Ridge(), windows_identical=False, window_length=5),
            ),
            (
                "lasso",
                make_reduction(Lasso(), windows_identical=False, window_length=5),
            ),
        ]
    ),
    name="forecaster_gdp",
    edges={"y": "deseasonalizer__realgdp"},
)

pipe = pipe.add_step(
    MultiplexForecaster(
        [
            (
                "ridge",
                make_reduction(Ridge(), windows_identical=False, window_length=5),
            ),
            (
                "lasso",
                make_reduction(Lasso(), windows_identical=False, window_length=5),
            ),
        ]
    ),
    name="forecaster_dpi",
    edges={"y": "deseasonalizer__realdpi"},
)

pipe = pipe.add_step(
    MultiplexForecaster(
        [
            (
                "ridge",
                make_reduction(Ridge(), windows_identical=False, window_length=5),
            ),
            (
                "lasso",
                make_reduction(Lasso(), windows_identical=False, window_length=5),
            ),
        ]
    ),
    name="forecaster_unemp",
    edges={
        "y": "scaler__unemp",
        "X": [
            "forecaster_gdp",
            "forecaster_dpi",
        ],
    },
)

pipe = pipe.add_step(
    MultiplexForecaster(
        [
            (
                "ridge",
                make_reduction(Ridge(), windows_identical=False, window_length=5),
            ),
            (
                "lasso",
                make_reduction(Lasso(), windows_identical=False, window_length=5),
            ),
        ]
    ),
    name="forecaster_inflation",
    edges={"X": ["forecaster_dpi", "forecaster_unemp"], "y": "y"},
)




2. Specify the parameter grid:

The keys of the dictionary are the parameters' in the pipeline, and the values specify which options should be tested.
Keys have the following structure: parameter of a step `<step_name>__skobject__<parameter-name>` and input edges of a step `<step-name>__edges_<Xory>`.

In [16]:
param_grid = {
    "forecaster_inflation__skobject__selected_forecaster": ["ridge", "lasso"],
    "forecaster_unemp__skobject__selected_forecaster": ["ridge", "lasso"],
    "forecaster_dpi__skobject__selected_forecaster": ["ridge", "lasso"],
    "forecaster_gdp__skobject__selected_forecaster": ["ridge", "lasso"],
    "forecaster_inflation__edges__X": [
        ["forecaster_unemp"],
        ["forecaster_unemp", "forecaster_dpi"],
    ],
    "forecaster_unemp__edges__X": [
        [],
        ["forecaster_dpi"],
        ["forecaster_gdp", "forecaster_dpi"],
    ],
    "deseasonalizer__edges__X": ["X__realgdp_realdpi", "scaler__realgdp_realdpi"],
}

Initialise the gridsearch using pipeline, cross-validation strategy, scoring, and param_grid.


In [17]:
from sktime.forecasting.model_selection import (
    ForecastingGridSearchCV,
    SlidingWindowSplitter,
)
from sktime.performance_metrics.forecasting import mean_absolute_error

gridcv = ForecastingGridSearchCV(
    pipe,
    cv=SlidingWindowSplitter(
        window_length=len(X_train) - 20,
        step_length=4,
        fh=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
    ),
    scoring=mean_absolute_error,
    param_grid=param_grid,
)

Call fit on the gridsearch object.

In [18]:
gridcv.fit(y=y_train, X=X_train)

  warn(
  input_data[step_name] = pd.concat(
  input_data[step_name] = pd.concat(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  input_data[step_name] = pd.concat(
  input_data[step_name] = pd.concat(
  input_data[step_name] = pd.concat(
  input_data[step_name] = pd.concat(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  input_data[step_name] = pd.concat(
  input_data[step_name] = pd.concat(
  model = cd_fast.enet_coordinate_descent(
  input_data[step_name] = pd.concat(
  input_data[step_name] = pd.concat(
  input_data[step_name] = pd.concat(
  model = cd_fast.enet_coordinate_descent(
  input_data[step_name] = pd.concat(
  input_data[step_name] = pd.concat(
  input_data[step_name] = pd.concat(
  input_data[step_name] = pd.concat(
  model = cd_fast.enet_coordinate_descent(
  model = cd_fast.enet_coordinate_descent(
  input_data[step_name] = pd.concat(
  input_data[step_name] = pd.concat(
  input_data[step_n

Examine the results of the gridsearch

In [19]:
gridcv.cv_results_

Unnamed: 0,mean_test__DynamicForecastingErrorMetric,mean_fit_time,mean_pred_time,params,rank_test__DynamicForecastingErrorMetric
0,1.539329,0.075673,0.023929,{'deseasonalizer__edges__X': 'X__realgdp_reald...,107.5
1,1.720565,0.076208,0.025338,{'deseasonalizer__edges__X': 'X__realgdp_reald...,119.5
2,1.394329,0.141800,0.046452,{'deseasonalizer__edges__X': 'X__realgdp_reald...,97.5
3,1.942051,0.151181,0.045115,{'deseasonalizer__edges__X': 'X__realgdp_reald...,129.5
4,2.033714,0.160442,0.059711,{'deseasonalizer__edges__X': 'X__realgdp_reald...,136.0
...,...,...,...,...,...
187,1.329079,0.096761,0.037935,{'deseasonalizer__edges__X': 'scaler__realgdp_...,48.5
188,1.329079,0.109997,0.040909,{'deseasonalizer__edges__X': 'scaler__realgdp_...,48.5
189,1.329079,0.100659,0.047549,{'deseasonalizer__edges__X': 'scaler__realgdp_...,48.5
190,1.329079,0.105045,0.055426,{'deseasonalizer__edges__X': 'scaler__realgdp_...,48.5


Using the fitted grid search to make a prediction with the best hyperparameters

In [20]:
result = gridcv.predict(X=None, fh=y_test.index)
result

Unnamed: 0_level_0,infl
Period,Unnamed: 1_level_1
2006Q4,2.188182
2007Q1,2.124281
2007Q2,1.04528
2007Q3,1.857716
2007Q4,1.790664
2008Q1,1.649457
2008Q2,1.874361
2008Q3,1.855627
2008Q4,1.858207
2009Q1,1.909693


### How to implement a bit simpler version of the pipeline above by nesting sequential pipelines
* Simplifcation: The forecasting of the unemployment rate is not dependent on the GDP and DPI.
<img src="img/graphical_pipeline_simplified.png" width=900 />



Create sequential pipelines for forecasting the GDP, DPI and unemployment rate.

In [21]:
from sktime.forecasting.compose import ColumnEnsembleForecaster, ForecastX
from sktime.transformations.series.subset import ColumnSelect

forecasting_pipeline_gdp = (
    ColumnSelect(["realgdp"])  # To train the forecaster only on the realgdp column
    * Deseasonalizer()
    * MultiplexForecaster(
        [
            (
                "ridge",
                make_reduction(Ridge(), windows_identical=False, window_length=5),
            ),
            (
                "lasso",
                make_reduction(Lasso(), windows_identical=False, window_length=5),
            ),
        ]
    )
)
forecasting_pipeline_dpi = (
    ColumnSelect(["realdpi"])
    * Deseasonalizer()
    * MultiplexForecaster(
        [
            (
                "ridge",
                make_reduction(Ridge(), windows_identical=False, window_length=5),
            ),
            (
                "lasso",
                make_reduction(Lasso(), windows_identical=False, window_length=5),
            ),
        ]
    )
)

forecasting_pipeline_unemp = (
    ColumnSelect(["unemp"])
    * Deseasonalizer()
    * MultiplexForecaster(
        [
            (
                "ridge",
                make_reduction(Ridge(), windows_identical=False, window_length=5),
            ),
            (
                "lasso",
                make_reduction(Lasso(), windows_identical=False, window_length=5),
            ),
        ]
    )
)

Use ColunmEnsembleForecaster to combine the forecasts of the DPI, GDP, UNEMP. (Union of forecasts)

In [22]:
input_inflation_forecast = ColumnEnsembleForecaster(
    [
        ("realdpi", forecasting_pipeline_dpi, "realdpi"),
        ("realgdp", forecasting_pipeline_gdp, "realgdp"),
        ("unemp", forecasting_pipeline_unemp, "unemp"),
    ]
)

Create the inflation forecaster.

In [23]:
inflation_forecast = ForecastX(
    MultiplexForecaster(
        [
            (
                "ridge",
                make_reduction(Ridge(), windows_identical=False, window_length=5),
            ),
            (
                "lasso",
                make_reduction(Lasso(), windows_identical=False, window_length=5),
            ),
        ]
    ),
    input_inflation_forecast,
)

In [24]:
inflation_forecast.fit(y=y_train, X=X_train, fh=fh)

In [25]:
inflation_forecast.predict()

Unnamed: 0,infl
2006Q4,3.979318
2007Q1,2.347512
2007Q2,1.443598
2007Q3,3.914533
2007Q4,2.533117
2008Q1,3.27801
2008Q2,3.861517
2008Q3,3.48751
2008Q4,4.195074
2009Q1,4.294984


In [26]:
inflation_forecast.get_params(True)

{'behaviour': 'update',
 'columns': None,
 'fh_X': None,
 'fit_behaviour': 'use_actual',
 'forecaster_X': ColumnEnsembleForecaster(forecasters=[('realdpi',
                                        TransformedTargetForecaster(steps=[ColumnSelect(columns=['realdpi']),
                                                                           Deseasonalizer(),
                                                                           MultiplexForecaster(forecasters=[('ridge',
                                                                                                             RecursiveTabularRegressionForecaster(estimator=Ridge(),
                                                                                                                                                  window_length=5)),
                                                                                                            ('lasso',
                                                                           

# Comparison graphical pipeline with nesting of sequential pipelines

### Advantages of graphical pipelines
* Enable an easy implementation of complex pipelines
    * By nesting sequential pipelines, even a simplified version of the graphical pipeline is very complicat to implement.
    * By nesting sequential pipelines, some graphical pipelines are not possible to implement (e.g., the example with coupled ForecastX).
* Preprocessing steps can not be shared between the different forecasters.
* The parameter structure can be very complex for the sequential pipelines.
* In a complex scenario, how would you fine-tune the edges?

### Advantages of sequential pipelines
* Constructing simple pipelines is very easy.
* Inverse operations are automatically applied.
* This is a mature feature compared to the experimental graphical pipeline.


### When to use what? 
* If your pipeline does not need much of nested pipelines and is mainly sequential, you should probably stick with the standard pipeline implementation.
* If your pipeline should represent a complex scenario with multiple forecasters, that are influencing other ones, you might want to use the graphical pipeline since it makes it easier to write the codel
