## Pipelines with Python and Scikit-learn 
####  The step towards more structured and cleaner data science projects

This notebook presents concept of pipelines with practical implementation in python and sklearn library. My goal is to clearly explain what pipelines are what benefits come with using them in your project workflow.


##### Some initial steps:

We make some needed imports and read example dataset we will work on:

In [209]:
#Imports
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler, Imputer, FunctionTransformer
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
from sklearn.pipeline import Pipeline, make_pipeline, make_union, FeatureUnion
from sklearn.base import BaseEstimator, TransformerMixin

In [210]:
#Read data
data = pd.read_csv('../data/data_sample', sep=',')

In [211]:
data.head(3)

Unnamed: 0,NAME_INCOME_TYPE,NAME_CONTRACT_TYPE,CODE_GENDER,AMT_INCOME_TOTAL,AMT_CREDIT,AMT_REQ_CREDIT_BUREAU_MON,TARGET
0,Working,Cash loans,F,72000.0,180000.0,0.0,0
1,Commercial associate,Cash loans,M,198000.0,1159515.0,0.0,1
2,Working,Cash loans,F,157500.0,668304.0,0.0,0


In [212]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 30000 entries, 0 to 29999
Data columns (total 7 columns):
NAME_INCOME_TYPE             30000 non-null object
NAME_CONTRACT_TYPE           30000 non-null object
CODE_GENDER                  30000 non-null object
AMT_INCOME_TOTAL             30000 non-null float64
AMT_CREDIT                   30000 non-null float64
AMT_REQ_CREDIT_BUREAU_MON    25597 non-null float64
TARGET                       30000 non-null int64
dtypes: float64(3), int64(1), object(3)
memory usage: 1.6+ MB


There are six variables with binary classification target. Dataset intensionally consists of various types: string, categorical, continuous. 

In [213]:
X_train, X_test, y_train, y_test = train_test_split(data.drop('TARGET', axis=1), data['TARGET'], test_size=0.30, stratify = data['TARGET'], random_state=42)

##### Without any pipelines yet:

There is a bunch of operations we perform almost in every data science project. Before we start explaining pipelines, let's demonstrate some examples, because it will help us to understand pipelines later on.

Handle missing values:

In [214]:
X_train.isnull().sum()

NAME_INCOME_TYPE                0
NAME_CONTRACT_TYPE              0
CODE_GENDER                     0
AMT_INCOME_TOTAL                0
AMT_CREDIT                      0
AMT_REQ_CREDIT_BUREAU_MON    3105
dtype: int64

In [215]:
X_train['AMT_REQ_CREDIT_BUREAU_MON'] = X_train['AMT_REQ_CREDIT_BUREAU_MON'].fillna(0)
X_test['AMT_REQ_CREDIT_BUREAU_MON'] = X_test['AMT_REQ_CREDIT_BUREAU_MON'].fillna(0)

In [216]:
X_train.isnull().sum()

NAME_INCOME_TYPE             0
NAME_CONTRACT_TYPE           0
CODE_GENDER                  0
AMT_INCOME_TOTAL             0
AMT_CREDIT                   0
AMT_REQ_CREDIT_BUREAU_MON    0
dtype: int64

Encode categorical variable:

In [217]:
le = LabelEncoder()

In [218]:
le.fit(X_train['CODE_GENDER'])

LabelEncoder()

In [219]:
X_train['CODE_GENDER'] = le.transform(X_train['CODE_GENDER'])
X_test['CODE_GENDER'] = le.transform(X_test['CODE_GENDER'])

Log transformation of continuous variable:

In [220]:
X_train['AMT_INCOME_TOTAL'] = np.log(X_train['AMT_INCOME_TOTAL'])
X_test['AMT_INCOME_TOTAL'] = np.log(X_test['AMT_INCOME_TOTAL'])

In [221]:
X_train.head()

Unnamed: 0,NAME_INCOME_TYPE,NAME_CONTRACT_TYPE,CODE_GENDER,AMT_INCOME_TOTAL,AMT_CREDIT,AMT_REQ_CREDIT_BUREAU_MON
3334,Commercial associate,Cash loans,0,12.196022,1671210.0,0.0
8434,Pensioner,Revolving loans,1,12.419166,630000.0,0.0
22997,Commercial associate,Cash loans,0,12.382125,182016.0,0.0
14401,Commercial associate,Cash loans,0,11.407565,101880.0,0.0
27944,State servant,Cash loans,0,11.967181,1113840.0,0.0


Standarization of variables with different scales:

In [222]:
scalar = StandardScaler()

In [223]:
scalar.fit(X_train[['AMT_INCOME_TOTAL','AMT_CREDIT']])

StandardScaler(copy=True, with_mean=True, with_std=True)

In [224]:
X_train[['AMT_INCOME_TOTAL','AMT_CREDIT']] = scalar.transform(X_train[['AMT_INCOME_TOTAL','AMT_CREDIT']])
X_test[['AMT_INCOME_TOTAL','AMT_CREDIT']] = scalar.transform(X_test[['AMT_INCOME_TOTAL','AMT_CREDIT']])

Encode text variables:

In [225]:
dv = DictVectorizer(sparse=False, separator='_')

In [226]:
X_train_dict = X_train[['NAME_INCOME_TYPE','NAME_CONTRACT_TYPE']].to_dict('records')
X_test_dict = X_test[['NAME_INCOME_TYPE','NAME_CONTRACT_TYPE']].to_dict('records')

In [227]:
#Fit
dv.fit(X_train_dict)

DictVectorizer(dtype=<class 'numpy.float64'>, separator='_', sort=True,
        sparse=False)

In [228]:
#Transform
X_train_encoded = dv.transform(X_train_dict)
X_test_encoded = dv.transform(X_test_dict)

In [229]:
#Merge encoded columns with X_train
cols = dv.get_feature_names()
X_train_encoded = pd.DataFrame(X_train_encoded, index=X_train.index, columns=cols)
X_test_encoded = pd.DataFrame(X_test_encoded, index=X_test.index, columns=cols)

X_train_trans = pd.concat([X_train_encoded, X_train.drop(['NAME_INCOME_TYPE', 'NAME_CONTRACT_TYPE'], axis=1)], axis=1)
X_test_trans = pd.concat([X_test_encoded, X_test.drop(['NAME_INCOME_TYPE', 'NAME_CONTRACT_TYPE'], axis=1)], axis=1)

Define and fit model:

In [230]:
model = LogisticRegression()

In [231]:
model.fit(X_train_trans, y_train)

LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
          intercept_scaling=1, max_iter=100, multi_class='ovr', n_jobs=1,
          penalty='l2', random_state=None, solver='liblinear', tol=0.0001,
          verbose=0, warm_start=False)

Check performance on training set and predict test set

In [232]:
y_pred_train = model.predict(X_train_trans)

In [233]:
y_pred_test = model.predict(X_test_trans)

### So what is Pipeline?

`official scikit-learn documentation:`

Sequentially apply a list of transforms and a final estimator. 

`StackOverFlow:`

Pipeline is a useful tool for encapsulating multiple different transformers alongside an estimator into one object, so that you only have to call your methods on your raw data once.


In other words think about pipelines as a series of transformations performed on your data. You feed pipeline with raw data, it goes as input to the first transformer, then output from that transformer is input to the second transformer and so on... Last element in pipeline can be either transformer or estimator.



For example

```
Data -> [ Select variables ] -> [ Normalize ] -> [ Reduce dimensions ] -> [ Logistic Regression ] -> Output
```

`[ Select variables ]` - transformer for selecting variables

`[ Normalize ]` - normalization step

`[ Reduce dimensions ]` - dimension reduction

`[ Logistic Regression ]` - estimator

**Transformers** are for data preparation. They have two method `fit` and `transform`. 

`Fit` finds parameters from training data, if needed, and `transform` apply defined transformation to training or test data. 

**Estimator** are for modeling. With  `fit` you train your model and you make prediction with `predict`.

Example:


#### **Transformer - StandardScaler** 


• `fit` – find mean, standard deviation of each feature

• `transform` – subtract mean, then divide by standard deviation


#### **Estimator - LogisticRegression**


• `fit` – find coefficients in logistic regression formula

• `predict` – plug into formula, get predicted class

Now for simplicity let's consider only one variable and build our first **Pipelines**:

In [234]:
X_train_var1 = X_train.loc[:, ['AMT_INCOME_TOTAL']]
X_test_var1 = X_test.loc[:, ['AMT_INCOME_TOTAL']]

We will use two transformers from sklearn library, `Imputer` and `StandardScaler`. The former is for completing missing values, the latter standardize features by removing the mean and scaling to unit variance.

In [235]:
#We put steps together in a Pipeline
pipe = Pipeline([
    ('imputer', Imputer()),
    ('standardizer', StandardScaler())
])

In [236]:
#And just write this:
X_train_final = pipe.fit_transform(X_train_var1)
X_test_final = pipe.transform(X_test_var1)

As simple as that! We just apply series of transformations and receive final output.

#### Let's directly include estimator in pipeline:

In [237]:
pipe = Pipeline([
    ('imputer', Imputer()),
    ('standardizer', StandardScaler()),
    ('logReg', LogisticRegression())
])

In [238]:
pipe.fit(X_train_var1, y_train)
y_pred = pipe.predict(X_test_var1)

In [239]:
y_pred

array([0, 0, 0, ..., 0, 0, 0], dtype=int64)

We applied exacly the same transformations and made prediction at once. 

**Notice!** Pipeline help us to prevent **data leakage**. When we call method `predict` on test data, pipelines use previously learnt parameters on train data and apply only `transform` methods to test data.

Use **make_pipeline** notation instead of Pipeline. It just simpler shortcut and works exactly the same as code above.

In [240]:
pipe = make_pipeline(
    Imputer(),
    StandardScaler(),
    LogisticRegression()
)

#### Set parameters in Pipelines

In [241]:
pipe.get_params()

{'memory': None,
 'steps': [('imputer',
   Imputer(axis=0, copy=True, missing_values='NaN', strategy='mean', verbose=0)),
  ('standardscaler', StandardScaler(copy=True, with_mean=True, with_std=True)),
  ('logisticregression',
   LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
             intercept_scaling=1, max_iter=100, multi_class='ovr', n_jobs=1,
             penalty='l2', random_state=None, solver='liblinear', tol=0.0001,
             verbose=0, warm_start=False))],
 'imputer': Imputer(axis=0, copy=True, missing_values='NaN', strategy='mean', verbose=0),
 'standardscaler': StandardScaler(copy=True, with_mean=True, with_std=True),
 'logisticregression': LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
           intercept_scaling=1, max_iter=100, multi_class='ovr', n_jobs=1,
           penalty='l2', random_state=None, solver='liblinear', tol=0.0001,
           verbose=0, warm_start=False),
 'imputer__axis': 0,
 'imputer__co

In [242]:
params = {'imputer__strategy':'most_frequent', 'standardscaler__with_mean':False}

pipe.set_params(**params)

Pipeline(memory=None,
     steps=[('imputer', Imputer(axis=0, copy=True, missing_values='NaN', strategy='most_frequent',
    verbose=0)), ('standardscaler', StandardScaler(copy=True, with_mean=False, with_std=True)), ('logisticregression', LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
          intercept_scaling=1, max_iter=100, multi_class='ovr', n_jobs=1,
          penalty='l2', random_state=None, solver='liblinear', tol=0.0001,
          verbose=0, warm_start=False))])

**Note:**  You can even use Pipeline with **GridSearch**.

___

### Transformers in Parallel

In a real project we almost always need to make diffrent independent transformation on the same original variable. Up till now our examples were pipeline you can think about as a single stream. Now we will use `FeatureUnion` which let us perform parallel transformations. Analogously to make_pipeline `maek_union` is a shortcut.

`sklearn.pipeline.make_union`

    Creates a union of transformers
    
    ```
    
             transformer 1
           /               \
          /                 \
    input                     output
          \                 /    
           \               /
             transformer 2
             
    ```

In [243]:
X_train_var1.head()

Unnamed: 0,AMT_INCOME_TOTAL
3334,0.62029
8434,1.083136
22997,1.006305
14401,-1.015134
27944,0.145626


In [244]:
pipe_paral = make_pipeline(
    Imputer(),    
    make_union(
        FunctionTransformer(np.sin),
        FunctionTransformer(lambda x: x+10),
        FunctionTransformer(lambda x: x+20)
    ),
    FunctionTransformer(lambda x: x+1)
)

    Scheme:
    ```
    
                         transformer: np.sin
                       /                     \
                      /                       \
    input -- Imputer  -- transformer: x+10   --  -- transformer: x+1 -- output
                      \                       /    
                       \                     /
                         transformer: x+20
             
    ```

In [245]:
X_train = pipe_paral.fit_transform(X_train_var1)
X_test = pipe_paral.transform(X_test_var1)

In [246]:
X_train

array([[ 1.58127143, 11.62029032, 21.62029032],
       [ 1.88343168, 12.08313627, 22.08313627],
       [ 1.84486084, 12.006305  , 22.006305  ],
       ...,
       [ 0.15044899,  9.98486645, 19.98486645],
       [ 0.54627965, 10.52906428, 20.52906428],
       [ 1.77419559, 11.88544321, 21.88544321]])

As shown on a scheme above, we apply three parallel tranformation to original data and receive three columns with differently tranformed original variable.

## Let's go deeper

In a real life projects, situation is almost never as simple as in presented examples. Normally datasets are a mix of:
- categorical features
- numerical features
- dates
- text data
- with missing values / without missing values

Possible transformations for **categorical features** are _one hot encoding_, which is converting to binary values, _convert to numerical values_ by using a hash of categorical variable, _target averaging_ which means replacing categorical feature with an average of the target.
    
For **numerical features** it could be _fill missing values, create bins with ranges, normalize, scale_. 

For **text** some common transformations are _bag of words vectorization, word2vec, sentence2vec_. 

From **dates** we often extract years, months, days, days of week.

Our goal is to preprocess all these types of data using **one pipeline**. It's clear that depends on problem we solve, there will be specific cases. Therefore will need to build our customer transformers.

To remind, our original example data looks like:

In [247]:
data.head()

Unnamed: 0,NAME_INCOME_TYPE,NAME_CONTRACT_TYPE,CODE_GENDER,AMT_INCOME_TOTAL,AMT_CREDIT,AMT_REQ_CREDIT_BUREAU_MON,TARGET
0,Working,Cash loans,F,72000.0,180000.0,0.0,0
1,Commercial associate,Cash loans,M,198000.0,1159515.0,0.0,1
2,Working,Cash loans,F,157500.0,668304.0,0.0,0
3,Commercial associate,Cash loans,F,171000.0,251280.0,0.0,1
4,Pensioner,Cash loans,M,135000.0,227520.0,,0


We will split data once again:`

In [248]:
X_train, X_test, y_train, y_test = train_test_split(data.drop('TARGET', axis=1), data['TARGET'], test_size=0.30, stratify = data['TARGET'], random_state=42)

### Custom Transformer

 Here are some simple custom tranformers:

In [249]:
class DoNothingTransformer(BaseEstimator, TransformerMixin):
    """
    Template Transformer class, which does nothing.
    """
    def __init__(self):
        pass

    def fit(self, x, y = None):
        return self

    def transform(self, x):
        return x

In [250]:
class AdderTransformer(BaseEstimator, TransformerMixin):
    """
    Tranformer, which add some value.
    """
    def __init__(self, add=0):
        self.add = add
        
    def fit(self, x, y = None):
        return self
    
    def transform(self, x):
        return x + self.add

In [251]:
class Selector(BaseEstimator, TransformerMixin):
    """
    Tranformer, which selects particular column.
    """
    def __init__(self, column):
        self.column = column
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        return X[self.column]

In [252]:
#Check if our custom tranformers work:

In [253]:
pipe = make_pipeline(
    Selector(['AMT_INCOME_TOTAL']),
    AdderTransformer(20)
)

In [254]:
pipe.fit_transform(X_train.head())

Unnamed: 0,AMT_INCOME_TOTAL
3334,198020.0
8434,247520.0
22997,238520.0
14401,90020.0
27944,157520.0


As a **final example** we will build pipeline to preprocess our example dataset. We will include the same tranformations as performed in a introduction of this notebook, so you could easily compare with pipeline approach.

To remind:

In [255]:
data.head(3)

Unnamed: 0,NAME_INCOME_TYPE,NAME_CONTRACT_TYPE,CODE_GENDER,AMT_INCOME_TOTAL,AMT_CREDIT,AMT_REQ_CREDIT_BUREAU_MON,TARGET
0,Working,Cash loans,F,72000.0,180000.0,0.0,0
1,Commercial associate,Cash loans,M,198000.0,1159515.0,0.0,1
2,Working,Cash loans,F,157500.0,668304.0,0.0,0


In [256]:
class CleanData(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        Xt = X.copy()
        Xt["AMT_REQ_CREDIT_BUREAU_MON"] = Xt["AMT_REQ_CREDIT_BUREAU_MON"].fillna(0)
        
        return Xt

In [257]:
class ConcatText(BaseEstimator, TransformerMixin):

    def __init__(self, columns, concat_name):
        self.columns = columns
        self.concat_name = concat_name
    
    def fit(self, X, *args):
        return self
    
    def transform(self, X):
        
        X[self.concat_name] = ''
        for col in self.columns:
            X[self.concat_name] += ' '
            X[self.concat_name] += X[col]
        return X

In [258]:
def to_records(df):
    return df.to_dict(orient='records')

In [259]:
pipeline = make_pipeline(
    CleanData(),
    ConcatText(['NAME_INCOME_TYPE', 'NAME_CONTRACT_TYPE'], 'concat_text'),
    FeatureUnion([
        ('GenderEncode', make_pipeline(Selector(['CODE_GENDER']), FunctionTransformer(to_records, validate=False), DictVectorizer(sparse=False))), 
        ('logTrans', make_pipeline(Selector(['AMT_INCOME_TOTAL']), FunctionTransformer(np.log))),
        ('Scalar', make_pipeline(Selector(['AMT_INCOME_TOTAL','AMT_CREDIT']), StandardScaler())),
        ('TextEncode', make_pipeline(Selector(['concat_text']), FunctionTransformer(to_records, validate=False), DictVectorizer(sparse=False))), 
    ])
)

In [260]:
X_train_final = pipeline.fit_transform(X_train)
X_test_final = pipeline.transform(X_test)

In [261]:
X_train_final.shape

(21000, 15)

####  Including estimator:

In [262]:
pipeline = make_pipeline(
    CleanData(),
    ConcatText(['NAME_INCOME_TYPE', 'NAME_CONTRACT_TYPE'], 'concat_text'),
    FeatureUnion([
        ('GenderEncode', make_pipeline(Selector(['CODE_GENDER']), FunctionTransformer(to_records, validate=False), DictVectorizer(sparse=False))), 
        ('logTrans', make_pipeline(Selector(['AMT_INCOME_TOTAL']), FunctionTransformer(np.log))),
        ('Scalar', make_pipeline(Selector(['AMT_INCOME_TOTAL','AMT_CREDIT']), StandardScaler())),
        ('TextEncode', make_pipeline(Selector(['concat_text']), FunctionTransformer(to_records, validate=False), DictVectorizer(sparse=False))), 
    ]),
    LogisticRegression()
)

In [263]:
pipeline.fit(X_train, y_train)

Pipeline(memory=None,
     steps=[('cleandata', CleanData()), ('concattext', ConcatText(columns=['NAME_INCOME_TYPE', 'NAME_CONTRACT_TYPE'],
      concat_name='concat_text')), ('featureunion', FeatureUnion(n_jobs=1,
       transformer_list=[('GenderEncode', Pipeline(memory=None,
     steps=[('selector', Selector(column=['CODE_...ty='l2', random_state=None, solver='liblinear', tol=0.0001,
          verbose=0, warm_start=False))])

In [264]:
y_pred = pipeline.predict(X_test)

In [265]:
y_pred

array([0, 0, 0, ..., 0, 0, 0], dtype=int64)

## Summary

Advantages:
- **Pipelines make your project more structured and manageable.** It pays off especially with time, when project is getting bigger and bigger, when it comes to hyperparameters tunning etc.) 
- **Your code is reusable.**

Disadvantages:
- **Initial investment is higher.**
- **It is originally designed to output numpy array.** Full integration with pandas dataframe requires some custom modifications.

### Resources to get more:

Official library: http://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html

Blog post: http://zacstewart.com/2014/08/05/pipelines-of-featureunions-of-pipelines.html

Lecture about pipelines: https://www.youtube.com/watch?v=BFaadIqWlAg