# pipeline

> Predifined sklearn pipeline to process and create features with financial data

In [None]:
#| default_exp pipeline

In [None]:
#| hide
from eccore.ipython import nb_setup, pandas_nrows_ncols
from myquantlab.core import load_test_df
from nbdev import nbdev_export

In [None]:
#| hide
nb_setup()

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
Set autoreload mode


In [None]:
#| export
import re

import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.preprocessing import MinMaxScaler, StandardScaler

## Custom transforms

Transforms applicable to preprocess and feature engineer financial data.

In [None]:
#| export
class MyBaseTransformer(BaseEstimator, TransformerMixin):
    """Base class for my custom transformers"""
    def __init__(self):
        self.input_features_ = None
        self.output_features_ = None
        self.postfix = "transformed"

    def fit(self, X, y=None):
        if hasattr(X, 'columns'):
            self.input_features_ = X.columns.tolist()
        else:
            self.input_features_ = [f'f_{i}' for i in range(X.shape[1])]
        self.output_features_ = [f"{feat}_{self.postfix}" for feat in self.input_features_]
        return self

    def transform(self, X) -> np.ndarray:
        X = X.copy()
        return X.values

    def get_feature_names_out(self, input_features=None) -> list[str]|None:
        if input_features is not None:
            return [f"{feat}_{self.postfix}" for feat in input_features]
        else:
            return self.output_features_

In [None]:
pipe = Pipeline([
    ('base-transformer', MyBaseTransformer())
])

df = load_test_df()
pipe.fit_transform(df)[:5,:]

array([[ 2759.02,  2779.27,  2747.27,  2754.48, 26562.  ],
       [ 2753.11,  2755.36,  2690.69,  2743.45, 38777.  ],
       [ 2744.83,  2748.58,  2651.23,  2672.8 , 41777.  ],
       [ 2670.8 ,  2722.9 ,  2657.93,  2680.71, 39034.  ],
       [ 2675.59,  2692.34,  2627.59,  2663.57, 61436.  ]])

In [None]:
pd.DataFrame(data=pipe.fit_transform(df), columns=pipe.get_feature_names_out(), index=df.index).head(5)

Unnamed: 0_level_0,Open_transformed,High_transformed,Low_transformed,Close_transformed,Volume_transformed
dt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2018-10-22,2759.02,2779.27,2747.27,2754.48,26562.0
2018-10-23,2753.11,2755.36,2690.69,2743.45,38777.0
2018-10-24,2744.83,2748.58,2651.23,2672.8,41777.0
2018-10-25,2670.8,2722.9,2657.93,2680.71,39034.0
2018-10-26,2675.59,2692.34,2627.59,2663.57,61436.0


In [None]:
#| export
class ReturnTransformer(MyBaseTransformer):
    """Evaluate the percentage return over 1 or more periods"""
    def __init__(self, periods:int=1) -> None:
        super().__init__()
        self.postfix = "ret"
        self.periods = periods

    def transform(self, X) -> np.ndarray:
        """percentage change with previous bar, fist bar is 0"""
        X = X.copy()
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X)
        return X.pct_change(periods=self.periods).fillna(0.0).values

In [None]:
pipe = ColumnTransformer([
    ('r', ReturnTransformer(), ['Open', 'Close', 'Volume'])
])
pd.DataFrame(pipe.fit_transform(df), columns=pipe.get_feature_names_out(), index=df.index).head(5)

Unnamed: 0_level_0,r__Open_ret,r__Close_ret,r__Volume_ret
dt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2018-10-22,0.0,0.0,0.0
2018-10-23,-0.002142,-0.004004,0.459867
2018-10-24,-0.003008,-0.025752,0.077365
2018-10-25,-0.026971,0.002959,-0.065658
2018-10-26,0.001793,-0.006394,0.57391


In [None]:
#| export
class StdTransformer(MyBaseTransformer):
    """Evaluate the standard deviation over a window"""
    def __init__(self, window:int=5) -> None:
        super().__init__()
        self.window = window
        self.postfix = f"std{self.window}"

    def transform(self, X) -> np.ndarray:
        X = X.copy()
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X)
        return X.rolling(window=self.window, min_periods=1).std().values

In [None]:
pipe = ColumnTransformer([
    ('returns', StdTransformer(3), ['Open', 'Close'])
])
pd.DataFrame(data=pipe.fit_transform(df), columns=pipe.get_feature_names_out(), index=df.index).head(5)


Unnamed: 0_level_0,returns__Open_std3,returns__Close_std3
dt,Unnamed: 1_level_1,Unnamed: 2_level_1
2018-10-22,,
2018-10-23,4.179001,7.799388
2018-10-24,7.12791,44.318367
2018-10-25,45.320958,38.708953
2018-10-26,41.427774,8.578467


In [None]:
#| export
class MATransformer(MyBaseTransformer):
    """Evaluate the moving average over a window"""
    def __init__(self, window:int=5) -> None:
        super().__init__()
        self.window = window
        self.postfix = f"MA{window}"

    def transform(self, X) -> np.ndarray:
        X = X.copy()
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X)
        return X.rolling(window=self.window, min_periods=1).mean().values

In [None]:
#| export
class EMATransformer(MyBaseTransformer):
    """Evaluate the exponential moving average over a window"""
    def __init__(self, window:int=5) -> None:
        super().__init__()
        self.window = window
        self.postfix = f"EMA{window}"

    def transform(self, X) -> np.ndarray:
        X = X.copy()
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X)
        return X.ewm(span=self.window).mean().values

Build a pipeline applying these transforms to specific columns.

In [None]:
pipe = ColumnTransformer([
    ('thru', 'passthrough', ['Open', 'High', 'Low', 'Close', 'Volume']),
    ('ret', ReturnTransformer(3), ['Close']),
    ('ma', MATransformer(3), ['Close']),
    ('ema', EMATransformer(3), ['Open', 'Close'])
])
pd.DataFrame(data=pipe.fit_transform(df), columns=pipe.get_feature_names_out(), index=df.index).head(5)


Unnamed: 0_level_0,thru__Open,thru__High,thru__Low,thru__Close,thru__Volume,ret__Close_ret,ma__Close_MA3,ema__Open_EMA3,ema__Close_EMA3
dt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2018-10-22,2759.02,2779.27,2747.27,2754.48,26562.0,0.0,2754.48,2759.02,2754.48
2018-10-23,2753.11,2755.36,2690.69,2743.45,38777.0,0.0,2748.965,2755.08,2747.126667
2018-10-24,2744.83,2748.58,2651.23,2672.8,41777.0,0.0,2723.576667,2749.222857,2704.654286
2018-10-25,2670.8,2722.9,2657.93,2680.71,39034.0,-0.026782,2698.986667,2707.397333,2691.884
2018-10-26,2675.59,2692.34,2627.59,2663.57,61436.0,-0.029117,2672.36,2690.980645,2677.270323


In [None]:
#| export
def simplify_colnames(cols)->list[str]:
    """Simplify the columns names by removing the prefix"""
    pat = re.compile(r"[\w\d\-]*_{2}(?P<end>\w*)")
    cols = [pat.match(c).group('end') for c in cols]
    return cols

In [None]:
print(pipe.get_feature_names_out())

['thru__Open' 'thru__High' 'thru__Low' 'thru__Close' 'thru__Volume'
 'ret__Close_ret' 'ma__Close_MA3' 'ema__Open_EMA3' 'ema__Close_EMA3']


In [None]:
print(simplify_colnames(pipe.get_feature_names_out()))

['Open', 'High', 'Low', 'Close', 'Volume', 'Close_ret', 'Close_MA3', 'Open_EMA3', 'Close_EMA3']


In [None]:
df_proc = pd.DataFrame(
    data=pipe.fit_transform(df), 
    columns=simplify_colnames(pipe.get_feature_names_out()), 
    index=df.index)
df_proc.head(5)

Unnamed: 0_level_0,Open,High,Low,Close,Volume,Close_ret,Close_MA3,Open_EMA3,Close_EMA3
dt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2018-10-22,2759.02,2779.27,2747.27,2754.48,26562.0,0.0,2754.48,2759.02,2754.48
2018-10-23,2753.11,2755.36,2690.69,2743.45,38777.0,0.0,2748.965,2755.08,2747.126667
2018-10-24,2744.83,2748.58,2651.23,2672.8,41777.0,0.0,2723.576667,2749.222857,2704.654286
2018-10-25,2670.8,2722.9,2657.93,2680.71,39034.0,-0.026782,2698.986667,2707.397333,2691.884
2018-10-26,2675.59,2692.34,2627.59,2663.57,61436.0,-0.029117,2672.36,2690.980645,2677.270323


In [None]:
#| hide
import nbdev
nbdev.nbdev_export()