# Pipelines and Model Persistence - W07D1
### Instructor: Eithar Elbasheer
##### notebook Credit: Eric Elmoznino

## Overview - Pipelines
- Motivation and example
- Feature unions
- Column transformers
- Visualizing pipelines
- Hyperparameter tuning with pipelines
- Custom class in a pipeline

---
## Motivation and example
Consider the following example of a diabetes vs. non-diabetes classification task in Sklearn.

In [1]:
import pandas as pd

url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv"
names = ['preg', 'plas', 'pres', 'skin', 'test', 'mass', 'pedi', 'age', 'class']
df = pd.read_csv(url, names=names)

df.head()

Unnamed: 0,preg,plas,pres,skin,test,mass,pedi,age,class
0,6,148,72,35,0,33.6,0.627,50,1
1,1,85,66,29,0,26.6,0.351,31,0
2,8,183,64,0,0,23.3,0.672,32,1
3,1,89,66,23,94,28.1,0.167,21,0
4,0,137,40,35,168,43.1,2.288,33,1


In [2]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

X = df.drop(columns='class')
y = df['class']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=27, stratify=y)

scaler = StandardScaler()
scaler.fit(X_train)
X_train_scaled = scaler.transform(X_train)

pca = PCA(n_components=3)
pca.fit(X_train_scaled)
X_train_pca = pca.transform(X_train_scaled)

model = LogisticRegression()
model.fit(X_train_pca, y_train)

X_test_scaled = scaler.transform(X_test)
X_test_pca = pca.transform(X_test_scaled)

y_pred = model.predict(X_test_pca)
acc = accuracy_score(y_test, y_pred)
print(f'Test set accuracy: {acc}')

Test set accuracy: 0.6948051948051948


There are several inconvenient things about this:
1. We have a lot of ugly code. We keep calling `.fit()` and `.transform()` on different objects, and
we keep having to rename transformed variables so as not to cause confusions later in our notebook.
2. Our preprocessing and modeling code is distributed and therefore error-prone. If we try running our model 
somewhere else and forget to copy over a step (e.g. we don't apply StandardScaler to the test set), 
then our model will not work as expected.
3. We can only use convenient Sklearn functions/classes such as `GridSearchCV()` on the *model class* (e.g. LogisticRegression). What if we want to try different numbers of components, or different scaling methods?

#### The solution: Sklearn Pipelines

In [3]:
from sklearn.pipeline import Pipeline

pipeline = Pipeline(steps=[('scaling', StandardScaler()),
                           ('pca', PCA(n_components=3)),
                           ('classifier', LogisticRegression())])
pipeline.fit(X_train, y_train)

y_pred = pipeline.predict(X_test)
acc = accuracy_score(y_test, y_pred)
print(f'Test set accuracy: {acc}')

Test set accuracy: 0.6948051948051948


Notice how much cleaner this code is. The composite model created using `Pipeline`
can be used just like any other Sklearn model you have learned, which means that it
can also be passed to functions like `cross_val_score()`.

To get a better understanding of what is happening under the hood,
let's try to build our own pipeline-like class that has some of the
same core functionality as the Sklearn one.

In [4]:
# optional
class BasicPipeline:
    
    def __init__(self, steps):
        self.steps = steps
        
    def fit(self, X, y):
        print('Called .fit()')
        # Fit all preprocessing modules and sequentially transform input using them
        for name, estimator in self.steps[:-1]:
            print(f'Fitting {name}')
            estimator.fit(X)
            print(f'Transforming with {name}')
            X = estimator.transform(X)
        
        # Fit the final (prediction) module
        name, estimator = self.steps[-1]
        print(f'Fitting {name}\n')
        estimator.fit(X, y)
        
        # Return fitted self so that we can write things like "model = model.fit(X, y)",
        # in addition to just "model.fit(X, y)" on its own line
        return self
        
    def predict(self, X):
        print('Called .predict()')
        # Sequentially transform input using all the preprocessing modules
        for name, estimator in self.steps[:-1]:
            print(f'Transforming with {name}')
            X = estimator.transform(X)
        
        # Predict using the final module
        name, estimator = self.steps[-1]
        print(f'Predicting with {name}\n')
        y_pred = estimator.predict(X)
        
        return y_pred

In [5]:
pipeline = BasicPipeline(steps=[('scaling', StandardScaler()),
                                ('pca', PCA(n_components=3)),
                                ('classifier', LogisticRegression())])
pipeline.fit(X_train, y_train)

y_pred = pipeline.predict(X_test)
acc = accuracy_score(y_test, y_pred)
print(f'Test set accuracy: {acc}')

Called .fit()
Fitting scaling
Transforming with scaling
Fitting pca
Transforming with pca
Fitting classifier

Called .predict()
Transforming with scaling
Transforming with pca
Predicting with classifier

Test set accuracy: 0.6948051948051948


---
## Feature unions
`Pipeline` lets us specify a sequence of steps that will be executed in one after the other (i.e. in `series`),
but want if we want branches in our process? For instance, what if we want to create two different sets
of features and use both of them when fitting our model?

For this type of application, we can use a `FeatureUnion`. It is an Sklearn class that lets us join the
outputs of several steps through *concatenation* (i.e. in parallel). `FeatureUnion`'s can be composed with `Pipeline`'s however much we want.

![](images/series_and_parallel.png)

In [6]:
from sklearn.pipeline import FeatureUnion
from sklearn.feature_selection import SelectKBest

feature_union = FeatureUnion([('pca', PCA(n_components=3)), 
                              ('select_best', SelectKBest(k=6))])

pipeline = Pipeline(steps=[('scaling', StandardScaler()),
                           ('features', feature_union),
                           ('classifier', LogisticRegression())])
pipeline.fit(X_train, y_train)

y_pred = pipeline.predict(X_test)
acc = accuracy_score(y_test, y_pred)
print(f'Test set accuracy: {acc}')

Test set accuracy: 0.7337662337662337


In [39]:
X_train.shape

(614, 8)

---
## Column transformers
Often times, our preprocessing and feature engineering is specific to columns in a
`DataFrame`. For instance, for a dataset with both numerical and categorical features,
we may want to do something like the following:
1. For *numeric* columns:
    1. Impute missing values with the *mean*
    2. Standard scale the values
2. For *categorical* columns:
    1. Impute missing values with the *mode*
    2. One-hot-encode the categories
3. Fit a model to the resulting features

Luckily, there is a straightforward way to perform steps 1 and 2
in a larger pipeline using the `ColumnTransformer`. Initializing a `ColumnTransformer`
has a very similar syntax to that of a `FeatureUnion` or `Pipeline`, except that
we must also specify the *column names* that each transform applies to.

In [7]:
from seaborn import load_dataset

tips_df = load_dataset('tips')
X_tips, y_tips = tips_df.drop(columns='tip'), tips_df['tip'].values
X_train_tips, X_test_tips, y_train_tips, y_test_tips = train_test_split(X_tips, y_tips, random_state=27)

tips_df.head()

Unnamed: 0,total_bill,tip,sex,smoker,day,time,size
0,16.99,1.01,Female,No,Sun,Dinner,2
1,10.34,1.66,Male,No,Sun,Dinner,3
2,21.01,3.5,Male,No,Sun,Dinner,3
3,23.68,3.31,Male,No,Sun,Dinner,2
4,24.59,3.61,Female,No,Sun,Dinner,4


In [8]:
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LinearRegression

numeric_transform = Pipeline([('impute_mean', SimpleImputer(strategy='mean')), 
                              ('scaling', StandardScaler())])

categorical_transform = Pipeline([('impute_mode', SimpleImputer(strategy='most_frequent')), 
                                  ('one-hot-encode', OneHotEncoder(sparse=False))])

preprocessing_tips = ColumnTransformer([('numeric', numeric_transform, ['total_bill', 'size']), 
                                        ('categorical', categorical_transform, ['sex', 'smoker', 'day', 'time'])])

pipeline_tips = Pipeline([('preprocessing', preprocessing_tips), 
                          ('model', LinearRegression())])

pipeline_tips.fit(X_train_tips, y_train_tips)

r2 = pipeline_tips.score(X_test_tips, y_test_tips)
print(f'Test set r^2: {r2}')

Test set r^2: 0.3492266984179321


What is the `ColumnTransformer` actually doing? We can get a better sense by looking at our data before and after
the transform it applies.

In [9]:
# Initial data
X_tips.head()

Unnamed: 0,total_bill,sex,smoker,day,time,size
0,16.99,Female,No,Sun,Dinner,2
1,10.34,Male,No,Sun,Dinner,3
2,21.01,Male,No,Sun,Dinner,3
3,23.68,Male,No,Sun,Dinner,2
4,24.59,Female,No,Sun,Dinner,4


In [43]:
X_tips.day.unique()

['Sun', 'Sat', 'Thur', 'Fri']
Categories (4, object): ['Thur', 'Fri', 'Sat', 'Sun']

In [40]:
X_tips.shape

(244, 6)

In [10]:
# Preprocessed data
X_tips_preprocessed = preprocessing_tips.fit_transform(X_tips)
X_tips_preprocessed.shape

(244, 12)

---
## Visualizing pipelines
Another advantage of having these pipelines is that we can quickly visualize complex workflows used in our
modeling as HTML, which can be helpful for debugging purposes or presentations.

<sub>*Note: I highly recommend you use this in your own presentations as a substitute for (or in addition to) code.*</sub>

In [11]:
# Display HTML representation in a jupyter context
from sklearn import set_config
set_config(display='diagram')
pipeline

Note that you can also click on the individual parts in the diagram (e.g. PCA) to see their arguments.

In [12]:
# Or, save the HTML to a file
from sklearn.utils import estimator_html_repr

with open('images/model_pipeline.html', 'w') as f:  
    f.write(estimator_html_repr(pipeline))

---
## Hyperparameter tuning with pipelines
Normally, if we want to tune hyperparameters using something like `GridSearchCV`, we need to pass it:
1. A model object.
2. A dictionary of (parameter name, list of values to try) pairs.

When not using pipelines, we can only tune hyperparameters for a single model (the one we specify as the
model in `GridSearchCV`. As we've seen, however, we can create composite models using `Pipeline`. We can
then pass this composite model to `GridSearchCV` and tune hyperparameters for multiple components at once.

In [13]:
from sklearn.linear_model import RidgeClassifier
from sklearn.model_selection import GridSearchCV

feature_union = FeatureUnion([('pca', PCA()), 
                              ('select_best', SelectKBest())])

pipeline = Pipeline(steps=[('scaling', StandardScaler()),
                           ('features', feature_union),
                           ('classifier', RidgeClassifier())])

# Find the best hyperparameters using GridSearchCV on the train set
param_grid = {'classifier__alpha': [0.001, 0.01, 0.1], 
              'features__pca__n_components': [3, 5],
              'features__select_best__k': [1, 3, 6]}
grid = GridSearchCV(pipeline, param_grid=param_grid, cv=5)
grid.fit(X_train, y_train)

best_model = grid.best_estimator_
best_hyperparams = grid.best_params_
best_acc = grid.score(X_test, y_test)
print(f'Best test set accuracy: {best_acc}\nAchieved with hyperparameters: {best_hyperparams}')

Best test set accuracy: 0.7467532467532467
Achieved with hyperparameters: {'classifier__alpha': 0.001, 'features__pca__n_components': 3, 'features__select_best__k': 3}


In addition to trying out different hyperparameters for a given step in the pipeline, you can also try different classes altogether. For instance, what if we wanted to try both Logistic Regression and SVM for the "classifier" step?

In [14]:

from sklearn.svm import SVC

pipeline = Pipeline(steps=[('scaling', StandardScaler()),
                           ('features', feature_union),
                           ('classifier', LogisticRegression())])

# Find the best hyperparameters and model using GridSearchCV on the train set
param_grid = {'classifier': [RidgeClassifier(alpha=0.1), RidgeClassifier(alpha=0.01), SVC()],    # Which is better, Logistic Regression or an SVM Classifier?
              'features__pca__n_components': [3, 5],
              'features__select_best__k': [1, 3, 6]}
grid = GridSearchCV(pipeline, param_grid=param_grid, cv=5)
grid.fit(X_train, y_train)

best_model = grid.best_estimator_
best_hyperparams = grid.best_params_
best_acc = grid.score(X_test, y_test)
print(f'Best test set accuracy: {best_acc}\nAchieved with hyperparameters: {best_hyperparams}')

Best test set accuracy: 0.7272727272727273
Achieved with hyperparameters: {'classifier': SVC(), 'features__pca__n_components': 3, 'features__select_best__k': 3}


---
## Custom class in a pipeline (optional)
In some scenarios, the standard Sklearn models and preprocessing functions may not be enough, and you will
have to generate your own custom classes.
However, you'll still want the convenience of `Pipeline` and the advantages that come with it.

Here, we'll see how to embed your own custom class into an Sklearn `Pipeline`. We'll be working with the `tips` dataset from earlier in this notebook and we'll try doing a conditional imputation of the column `total_bill` by looking at the mean for parties of the `size` (e.g. if we're missing the total bill for a party of size 4, we want to impute it with the average bill for parties of size 4 in the training set).

In [15]:
X_tips.head()

Unnamed: 0,total_bill,sex,smoker,day,time,size
0,16.99,Female,No,Sun,Dinner,2
1,10.34,Male,No,Sun,Dinner,3
2,21.01,Male,No,Sun,Dinner,3
3,23.68,Male,No,Sun,Dinner,2
4,24.59,Female,No,Sun,Dinner,4


Let's add some missing values randomly throughout the `total_bill` column for 10% of the datapoints.

In [16]:
import numpy as np
np.random.seed(4)  # Just putting this for reproducibility

# Flip a biased coin len(X_tips) times and store the True/False's of each toss
missing_rows = np.random.binomial(n=1, p=0.1, size=len(X_tips))
missing_rows = missing_rows.astype(np.bool)

# Replace with missing values
X_tips_missing = X_tips.copy()
X_tips_missing.loc[missing_rows, 'total_bill'] = np.nan

# Make a train/test split
X_tips_train, X_tips_test, y_tips_train, y_tips_test = train_test_split(X_tips_missing, y_tips)

X_tips_missing.head(5)

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  missing_rows = missing_rows.astype(np.bool)


Unnamed: 0,total_bill,sex,smoker,day,time,size
0,,Female,No,Sun,Dinner,2
1,10.34,Male,No,Sun,Dinner,3
2,,Male,No,Sun,Dinner,3
3,23.68,Male,No,Sun,Dinner,2
4,24.59,Female,No,Sun,Dinner,4


In [17]:
from sklearn.base import BaseEstimator, TransformerMixin

class GroupMeanImputer(BaseEstimator, TransformerMixin):

    def __init__(self, impute_col, group_col):
        # Your __init__ function takes in arguments as input
        # and does some initialization, such as creating model parameters.
        
        # Store class arguments
        self.impute_col = impute_col
        self.group_col = group_col
        
        # Initialize our parameters
        self.group_means = None

    def fit(self, X, y = None):
        # Your fit() function takes in an X (and optionally a y)
        # and fits its parameters to the data. It then returns "self".
        
        # Compute the mean of `impute_col` for each unique value in `group_col`
        self.group_means = X.groupby(self.group_col)[self.impute_col].mean()
        
        return self

    def transform(self, X, y = None):
        # Your transform() function takes in an X (and optionally a y)
        # and spits out the transformed output.
        
        # For each row, check if `impute_col` is missing (i.e. NaN).
        # If it is, fetch the mean for its group and use that to
        # fill the missing value.

        def replace_with_group(row):
            impute_val = row[self.impute_col]
            group_val = row[self.group_col]
            if np.isnan(impute_val):
                return self.group_means[group_val]
            else:
                return impute_val
        
        X = X.copy()  # Don't modify the original variable that was passed in
        X[self.impute_col] = X.apply(replace_with_group, axis='columns')

        return X

In [18]:
from sklearn.metrics import r2_score

pipeline = Pipeline(steps=[('total_bill_imputation', GroupMeanImputer(impute_col='total_bill', group_col='size')), 
                           ('preprocessing', preprocessing_tips),  # Defined above (does things like one-hot-encoding)
                           ('model', LinearRegression())])
pipeline.fit(X_tips_train, y_tips_train)

y_tips_pred = pipeline.predict(X_tips_test)
r2 = r2_score(y_tips_test, y_tips_pred)
print(f'\nTest set r^2: {r2}')


Test set r^2: 0.6244788177305094


Just to show you what our custom group imputer did, let's print out it's input/output

In [19]:
group_imputer = GroupMeanImputer(impute_col='total_bill', group_col='size')

print('Before:')
print(X_tips_missing.head())

group_imputer.fit(X_tips_missing)
X_tips_filled = group_imputer.transform(X_tips_missing)

print('\nAfter:')
print(X_tips_filled.head())

print('\nFitted group means:')
print(group_imputer.group_means)

Before:
   total_bill     sex smoker  day    time  size
0         NaN  Female     No  Sun  Dinner     2
1       10.34    Male     No  Sun  Dinner     3
2         NaN    Male     No  Sun  Dinner     3
3       23.68    Male     No  Sun  Dinner     2
4       24.59  Female     No  Sun  Dinner     4

After:
   total_bill     sex smoker  day    time  size
0   16.240368  Female     No  Sun  Dinner     2
1   10.340000    Male     No  Sun  Dinner     3
2   23.395455    Male     No  Sun  Dinner     3
3   23.680000    Male     No  Sun  Dinner     2
4   24.590000  Female     No  Sun  Dinner     4

Fitted group means:
size
1     6.796667
2    16.240368
3    23.395455
4    28.430968
5    30.068000
6    37.423333
Name: total_bill, dtype: float64


## Function Transformers


Sometimes, the class you're trying to implement really only has a `.transform()` component (i.e. `__init__()` and `fit()` are empty).
In these cases, you can use the `FunctionTransformer()` from the `sklearn.preprocessing` module.

We'll do an example of this where we just want to do some feature engineering by
computing the log of the `predi` column in our diabetes classification dataset.

In [20]:
X.head()

Unnamed: 0,preg,plas,pres,skin,test,mass,pedi,age
0,6,148,72,35,0,33.6,0.627,50
1,1,85,66,29,0,26.6,0.351,31
2,8,183,64,0,0,23.3,0.672,32
3,1,89,66,23,94,28.1,0.167,21
4,0,137,40,35,168,43.1,2.288,33


In [21]:
from sklearn.preprocessing import FunctionTransformer

# Define the transformation we want as a function
def log_transform(X):
    X_log = np.log(X)
    return X_log

# Wrap the function inside a FunctionTransformer.
# This variable can now be used in a pipeline.
log_transform_object = FunctionTransformer(log_transform)

# We just want to log one specific column, leaving the others unchanged
log_pedi = ColumnTransformer([('log_pedi', log_transform_object, ['pedi'])], 
                              remainder='passthrough')

pipeline = Pipeline(steps=[('df_transform', log_pedi),
                           ('scaling', StandardScaler()),
                           ('pca', PCA(n_components=3)),
                           ('classifier', LogisticRegression())])
pipeline.fit(X_train, y_train)

y_pred = pipeline.predict(X_test)
acc = accuracy_score(y_test, y_pred)
print(f'Test set accuracy: {acc}')

Test set accuracy: 0.6753246753246753
