# Adapters in bigger pipelines

In this tutorial we show how to use adapters to create more complicated pipelines in Steps.

In [None]:
%load_ext autoreload
%autoreload 2

import numpy as np
import pandas as pd
import xgboost
import traceback

from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.externals import joblib
from sklearn.metrics import log_loss

import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
from steps.base import Step, BaseTransformer, Dummy

## The problem

Let's recreate the pipeline for digits recognition from notebook #1.

We start off by fetching the data.

In [None]:
CACHE_DIR = './cache'
digits = load_digits()
X_digits, y_digits = digits.data, digits.target

X_train, X_test, y_train, y_test = train_test_split(X_digits, y_digits, test_size=0.2, stratify=y_digits, random_state=42)

print('{} samples for training'.format(len(y_train)))
print('{} samples for test'.format(len(y_test)))

data_train = {'input':
                {
                     'images': X_train,
                     'labels': y_train,
                }
            }

data_test = {'input':
                {
                     'images': X_test,
                     'labels': y_test,
                }
            }

We define `RandomForestTransformer` in similar manner as before. With one difference, though. `Transform` will use RandomForest's `predict_proba` instead of `predict` which will be useful in the latter part of this notebook.

In [None]:
class RandomForestTransformer(BaseTransformer):
    def __init__(self):
        self.estimator = RandomForestClassifier(n_estimators=10, max_depth=10, random_state=12345)
        
    def fit(self, X, y):
        self.estimator.fit(X, y)
        return self

    def transform(self, X, **kwargs):
        y_proba  = self.estimator.predict_proba(X)
        return {'y_proba': y_proba}
    
    def save(self, filepath):
        joblib.dump(self.estimator, filepath)
        
    def load(self, filepath):
        self.estimator = joblib.load(filepath)
        return self

In [None]:
rf_step = Step(name='random_forest',
               transformer=RandomForestTransformer(),
               input_data=['input'],        
               cache_dirpath=CACHE_DIR,
               force_fitting=True)

In [None]:
rf_step

The graph looks just like in notebook #1. Let's try to execute it!

In [None]:
try:
    preds_train_rf = rf_step.fit_transform(data_train)
except:
    traceback.print_exc()

As we can see, something went wrong. The problem is that `input` dictionary in `data_train` contains fields `images` and `labels`, whereas `RandomForestTransformer` expects arguments `X` and `y`.

## The solution: adapter

To handle such issues, `Step`'s initializer has `adapter` argument. `Adapter` describes how to reshape the data from the input nodes into the form expected by the transformer or further steps. 

The basic usage is as follows:

In [None]:
rename_step = Step(name='rename',
               transformer=Dummy(),
               input_data=['input'],
               adapter={'X': [('input', 'images')],
                        'y': [('input', 'labels')]},
               cache_dirpath=CACHE_DIR)

In [None]:
rename_step

We created a new step which gets its data from `input` node.

When the program flow gets to `rename_step`, first `adapter`-related code is executed. `RandomForestTransformer`'s `fit_transform` and `transform` methods expect arguments `X` and `y`. The `adapter` is basically a dictionary which for each expected argument tells how to get it. For instance `'X': [('input', 'images')]` tells the step, that value for `X` is stored under `images` key in the dictionary returned by `input` node.

Transformer inside this step is `Dummy` which means that its result is a dictionary described by the adapter.

Let's try to fit Random Forest again!

In [None]:
rf_step = Step(name='random_forest',
               transformer=RandomForestTransformer(),
               input_steps=[rename_step],        
               cache_dirpath=CACHE_DIR,
               force_fitting=True)

In [None]:
rf_step

In [None]:
rf_step.fit_transform(data_train)

This time it worked like charm - we see class probabilites for the train cases.

Note, that instead of creating a new step `rename`, we could have put `adapter` in `RandomForestTransformer` like this:
```
rf_step = Step(name='random_forest',
               transformer=RandomForestTransformer(),
               input_data=['input'],
               adapter={'X': [('input', 'images')],
                        'y': [('input', 'labels')]},     
               cache_dirpath=CACHE_DIR)
```
and the result would be exactly the same. However, this renaming will be necessary in further steps, so we decided to go with a proxy step. Otherwise, we would have to copy this adapter in all other steps that expect `X` and `y` instead of `images` and `labels`.

## Pipeline with model ensembling

Very often when we have multiple models which perform on the same level it makes sense to combine them. The created model ensembling tends to be more stable and can even improve results a little.

### XGBoost steps

So let's train another model! This time we will use XGBoost. What we need to do is really analogous to what we did for Random Forests.

In [None]:
class XGBoostTransformer(BaseTransformer):
    def __init__(self, xgb_params, num_boost_round):
        self.estimator = None
        self.xgb_params = xgb_params
        self.num_boost_round = num_boost_round
        
    def fit(self, X, y):
        tr_mat = xgboost.DMatrix(X, label=y)
        evals = [(tr_mat, 'train')]
        self.estimator = xgboost.train(self.xgb_params,
                                       tr_mat,
                                       num_boost_round=self.num_boost_round,
                                       verbose_eval=False,
                                       evals=evals)
        return self

    def transform(self, X, **kwargs):
        test_mat = xgboost.DMatrix(X)
        y_proba  = self.estimator.predict(test_mat)
        return {'y_proba': y_proba}
    
    def save(self, filepath):
        joblib.dump({'estimator': self.estimator,
                     'xgb_params': self.xgb_params,
                     'num_boost_round': self.num_boost_round},
                    filepath)
        
    def load(self, filepath):
        d = joblib.load(filepath)
        self.estimator = d['estimator']
        self.xgb_params = d['xgb_params']
        self.num_boost_round = d['num_boost_round']
        return self
    
def get_xgb_params():
    return {
        'objective': 'multi:softprob',
        "num_class": 10,
        'eta': 0.5,
        'max_depth': 4,
        'silent': True,
        'nthread': -1,
        'lambda': 1.0,
        'eval_metric': ["mlogloss", "merror"]
    }
    

In [None]:
xgb_step = Step(name='xgboost',
               transformer=XGBoostTransformer(xgb_params=get_xgb_params(), num_boost_round=5),
               input_steps=[rename_step],
               cache_dirpath=CACHE_DIR,
               force_fitting=True)

In [None]:
xgb_step

In [None]:
xgb_step.fit_transform(data_train)

All right, so now we have two models trained, but we haven't checked so far how do they perform. Let's do it now. We will use one of data scientists' favourite measures: log-loss.

In [None]:
rf_proba_train = rf_step.transform(data_train)['y_proba']
rf_proba_test = rf_step.transform(data_test)['y_proba']
xgb_proba_train = xgb_step.transform(data_train)['y_proba']
xgb_proba_test = xgb_step.transform(data_test)['y_proba']

print("RF train: {:.3f}, test: {:.3f}".format(log_loss(y_pred=rf_proba_train, y_true=y_train),
                                      log_loss(y_pred=rf_proba_test, y_true=y_test)))
print("XGB train: {:.3f}, test: {:.3f}".format(log_loss(y_pred=xgb_proba_train, y_true=y_train),
                                       log_loss(y_pred=xgb_proba_test, y_true=y_test)))
print("Averaged predictions: {:.3f}, test: {:.3f}".format(log_loss(y_pred=(rf_proba_train + xgb_proba_train) / 2, y_true=y_train),
                                            log_loss(y_pred=(rf_proba_test + xgb_proba_test) / 2, y_true=y_test)))

We see that we noticeably overfit, but it's OK - making a perfect model is not the goal of this notebook. We also see that we could benefit from a very simple model ensembling: averaging of model predictions.

### Ensembling and evaluation steps

The next step incorporates averaging of model predictions into the pipeline.

In [None]:
ensemble_step = Step(name='ensembler',
                 transformer=Dummy(),
                 input_steps=[rf_step, xgb_step],                 
                 adapter={'y_proba': ([(rf_step.name, 'y_proba'),
                                      (xgb_step.name, 'y_proba')],
                                     lambda lst: np.array(lst).mean(axis=0))
                         },
                cache_dirpath=CACHE_DIR,
                force_fitting=True
                )

In [None]:
ensemble_step

We used a little different syntax in `adapter` this time. Recipe for `y_proba` consists of two things:
- a list of objects returned by input steps that should be used to build `y_proba`,
- a function which merges them into a final `y_proba` object.

So `[(rf_step.name, 'y_proba'), (xgb_step.name, 'y_proba')]` tells the adapter to extract `y_proba` arrays from dictionaries returned by `rf_step` and `xgb_step` and put them in a list. Then `lambda lst: np.array(lst).mean(axis=0)` will average these arrays.

An adapter is actually a description of how to build arguments for `fit_transform` and `transform`. It is a dictionary, where:
- keys must agree with transormer's `fit_transform` and `transform` arguments,
- values must be either:
  1. a brick description,
  2. a pair of:
    - a list of brick descriptions,
    - a function that merges extracted results of previous steps,

where _brick description_ is a pair of node name and key in the dictionary returned by that node.

Step with an adapter proceeds like this:
1. It gathers results from preceeding nodes.
2. It builds a dictionary with the same keys as the adapter and with values built according to descriptions:
   - if the key in the adapter maps to a single brick description, an appropriate object is extracted from the results of input nodes,
   - in the other case, objects are extracted according to brick descriptions and added to a list, which is then passed to a function that generates final object.
3. Arguments of `fit_transform` and `transform` are filled using the above dictionary.

Let's check if our ensembling works.

In [None]:
ensemble_step.fit_transform(data_train)

Looks fine! However, often we are interested only in the class with the highest probability. Let's make a step that will find this class for us.

In [None]:
guesses_step = Step(name='guesses_maker',
                 transformer=Dummy(),
                 input_steps=[ensemble_step],                 
                 adapter={'y_pred': ([(ensemble_step.name, 'y_proba')],
                                     lambda lst: np.argmax(lst[0], axis=1))
                         },
                 cache_dirpath=CACHE_DIR,
                 force_fitting=True
                )

In [None]:
guesses_step

You should be already familiar with everything that happened here. New step, `guesses_maker`, takes its input from `ensembler`. Adapter will create just one element: `y_pred`. List of bricks used to build `y_pred` has only one element:  `y_proba` found in `ensembler`'s result. Function `lambda lst: np.argmax(lst[0], axis=1)` takes this list and performs row-wise `argmax` on its only element.

In [None]:
guesses_step.fit_transform(data_train)

In one of the previous cells we checked quality of our model manually. Let's add a final step that will do it for us automatically!

In [None]:
class EvaluationTransformer(BaseTransformer):
    def __init__(self):
        pass
    
    def fit(self, y_true, y_proba, y_pred):
        return self

    def transform(self, y_true, y_proba, y_pred):
        #print(len(y_true), len(y_pred))
        #print(y_proba)
        return {'Log-loss': log_loss(y_pred=y_proba, y_true=y_true),
                'Acc:': '{:.2f}'.format(sum(y_true == y_pred) / len(y_pred))
               }

evaluation_step = Step(name='evaluator',
                 transformer=EvaluationTransformer(),
                 input_steps=[ensemble_step, guesses_step, rename_step],                 
                 adapter={'y_proba': [(ensemble_step.name, 'y_proba')],
                          'y_pred':  [(guesses_step.name, 'y_pred')],
                          'y_true': [(rename_step.name, 'y')]
                         },
                 cache_dirpath=CACHE_DIR
                )

In [None]:
evaluation_step

In [None]:
evaluation_step.fit_transform(data_train)

In [None]:
evaluation_step.transform(data_test)

### Peek on pipeline predictions

It is always very pleasant to compare images with model's predictions. As a last example we show a step that displays a few images with the predicted probability distributions!

In [None]:
class PeekOnPredictions(BaseTransformer):
    def __init__(self):
        pass
    
    def fit(self, ens_proba, rf_proba, xgb_proba, images):
        return self

    def transform(self, ens_proba, rf_proba, xgb_proba, images): 
        pd.options.display.float_format = '{:6.3f}'.format
        for i in range(5):
            df = pd.DataFrame({'rf': rf_proba[i], 'xgb': xgb_proba[i], 'ens': ens_proba[i]}, index=list(range(10)))
            plt.figure(figsize=(6,2))
            left =  plt.subplot(1, 2, 1)
            right = plt.subplot(1, 2, 2)
            left.imshow(images[i].reshape(8, 8), cmap='gray')
            right.axis('off')
            right.text(0, 0.3, str(df.T), fontsize=16, fontname='monospace')
    

In [None]:
peek_step = Step(name='peek',
                 transformer=PeekOnPredictions(),
                 input_steps=[ensemble_step, rf_step, xgb_step],
                 input_data=['input'],
                 adapter={'ens_proba': [(ensemble_step.name, 'y_proba')],
                          'rf_proba':  [(rf_step.name, 'y_proba')],
                          'xgb_proba': [(xgb_step.name, 'y_proba')],
                          'images': [('input', 'images')]
                         },
                 cache_dirpath=CACHE_DIR
                )

In [None]:
peek_step

In [None]:
peek_step.fit_transform(data_train)

In [None]:
peek_step.transform(data_test)