# Adapters in bigger pipelines

In this tutorial we show how to use adapters to create more complicated pipelines in Steps.

In [None]:
%load_ext autoreload
%autoreload 2

import numpy as np
import pandas as pd
import xgboost
import traceback

from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.externals import joblib
from sklearn.metrics import log_loss

import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
from steppy.base import Step, BaseTransformer, NoOperation, make_transformer
from steppy.adapter import Adapter, E

## The problem

Let's recreate the pipeline for digits recognition from notebook #1.

We start off by fetching the data. In the latter part of this notebook we will create a model ensembling, hence this time we split the data into three parts.

In [None]:
CACHE_DIR = './cache'
digits = load_digits()
X_digits, y_digits = digits.data, digits.target

X_train, X_test, y_train, y_test = train_test_split(X_digits, y_digits, test_size=0.15, stratify=y_digits, random_state=643793)
X_train, X_ens, y_train, y_ens = train_test_split(X_train, y_train, test_size=0.35, stratify=y_train, random_state=976542)

print('{} samples for training'.format(len(y_train)))
print('{} samples for ensembling'.format(len(y_ens)))
print('{} samples for test'.format(len(y_test)))

data_train = {
    'input': {
        'images': X_train,
        'labels': y_train,
    }
}

data_ensembling = {
    'input': {
        'images': X_ens,
        'labels': y_ens
    }
}

data_test = {
    'input': {
        'images': X_test,
        'labels': y_test
    }
}

In [None]:
!rm -r ./cache

We define `RandomForestTransformer` in similar manner as before. With one difference, though. `Transform` will use RandomForest's `predict_proba` instead of `predict` which will be useful in the latter part of this notebook.

In [None]:
class RandomForestTransformer(BaseTransformer):
    def __init__(self, random_state=None):
        self.estimator = RandomForestClassifier(random_state=random_state)
        
    def fit(self, X, y):
        self.estimator.fit(X, y)
        return self

    def transform(self, X, **kwargs):
        y_proba  = self.estimator.predict_proba(X)
        return {'y_proba': y_proba}
    
    def save(self, filepath):
        joblib.dump(self.estimator, filepath)
        
    def load(self, filepath):
        self.estimator = joblib.load(filepath)
        return self

In [None]:
rf_step = Step(name='random_forest',
               transformer=RandomForestTransformer(),
               input_data=['input'],        
               cache_dirpath=CACHE_DIR)

In [None]:
rf_step

The graph looks just like in notebook #1. Let's try to execute it!

In [None]:
try:
    preds_train_rf = rf_step.fit_transform(data_train)
except:
    traceback.print_exc()

As we can see, something went wrong. The problem is that `input` dictionary in `data_train` contains fields `images` and `labels`, whereas `RandomForestTransformer` expects arguments `X` and `y`.

## The solution: adapter

To handle such issues, `Step`'s initializer has `adapter` argument. `Adapter` describes how to reshape the data from the input nodes into the form expected by the transformer or further steps. 

The basic usage is as follows:

In [None]:
rf_step = Step(name='random_forest',
               transformer=RandomForestTransformer(),
               input_data=['input'],
               adapter=Adapter({
                   'X': E('input', 'images'),
                   'y': E('input', 'labels')
               }),
               cache_dirpath=CACHE_DIR)

In [None]:
rf_step

We created a new step which gets its data from `input` node.

When the program flow gets to `rename_step`, first `adapter`-related code is executed. `RandomForestTransformer`'s `fit_transform` and `transform` methods expect arguments `X` and `y`. The `adapter` is basically a dictionary which for each expected argument tells how to get it. For instance `'X': [('input', 'images')]` tells the step, that value for `X` is stored under `images` key in the dictionary returned by `input` node.

Let's try to fit Random Forest again!

In [None]:
rf_step.fit_transform(data_train)

This time it worked like charm - we see class probabilites for the train cases.

## Pipeline with model ensembling

Very often when we have multiple models which perform on the same level it makes sense to combine them. The created model ensembling tends to be more stable and can even improve results a little.

To take advantage of that fact, we will train a couple of forests. Thanks to a different random seeds each forest will make somewhat different predictions, and therefore their combination will improve performance of the entire pipeline.

In [None]:
NR_OF_FORESTS = 4
random_seeds =  [np.random.randint(1000000) for _ in range(NR_OF_FORESTS)]

rf_steps = [Step(name='random_forest_{}'.format(i),
                 transformer=RandomForestTransformer(random_state=seed),
                 input_data=['input'],      
                 adapter=Adapter({
                     'X': E('input', 'images'),
                     'y': E('input', 'labels')
                 }),    
                 cache_dirpath=CACHE_DIR)
            for i, seed in enumerate(random_seeds)]

In [None]:
rf_steps[0]

For ensembling we will use boosting trees. First we need to create a transformer that will wrap XGBoost. What we need to do is really analogous to what we did for Random Forests.

In [None]:
class XGBoostTransformer(BaseTransformer):
    def __init__(self, xgb_params, num_boost_round):
        self.estimator = None
        self.xgb_params = xgb_params
        self.num_boost_round = num_boost_round
        
    def fit(self, X, y):
        tr_mat = xgboost.DMatrix(X, label=y)
        evals = [(tr_mat, 'train')]
        self.estimator = xgboost.train(self.xgb_params,
                                       tr_mat,
                                       num_boost_round=self.num_boost_round,
                                       verbose_eval=False,
                                       evals=evals)
        return self

    def transform(self, X, **kwargs):
        test_mat = xgboost.DMatrix(X)
        y_proba  = self.estimator.predict(test_mat)
        return {'y_proba': y_proba}
    
    def save(self, filepath):
        joblib.dump({'estimator': self.estimator,
                     'xgb_params': self.xgb_params,
                     'num_boost_round': self.num_boost_round},
                    filepath)
        
    def load(self, filepath):
        d = joblib.load(filepath)
        self.estimator = d['estimator']
        self.xgb_params = d['xgb_params']
        self.num_boost_round = d['num_boost_round']
        return self
    
def get_xgb_params():
    return {
        'objective': 'multi:softprob',
        "num_class": 10,
        'eta': 0.5,
        'max_depth': 4,
        'silent': True,
        'nthread': -1,
        'lambda': 2.0,
        'eval_metric': ["mlogloss", "merror"]
    }
    

To connect ensembling step with random forests we need to do some more advanced adapting.

In [None]:
gather_step = Step(
    name='gather_step',
    transformer=make_transformer(lambda lst, y: {'X': np.hstack(lst), 'y': y}),
    input_steps=rf_steps,
    input_data=['input'],
    adapter=Adapter({
        'lst': [E(rf_step.name, 'y_proba') for rf_step in rf_steps],
        'y': E('input', 'labels')
    }),
    cache_dirpath=CACHE_DIR
)

ensemble_step = Step(name='ensembler',
                     transformer=XGBoostTransformer(xgb_params=get_xgb_params(), num_boost_round=10),
                     input_steps=[gather_step],
                     cache_dirpath=CACHE_DIR)

In [None]:
ensemble_step

We used a little different syntax in `adapter` this time. Recipe for `X` consists of two things:
- a list of objects returned by input steps that should be used to construct `X`,
- a function which merges them into a final `X` object.

So `[(rf_step.name, 'y_proba') for rf_step in rf_steps]` tells the adapter to extract `y_proba` arrays from dictionaries returned by all random forests. All these `y_proba`s are put on a list which is then passed to `lambda lst: np.hstack(lst))`. This function will merge outputs of all forests into one big array, which is eventually passed to the `XGBoostTransformer`.

An adapter is actually a description of how to build arguments for `fit_transform` and `transform`. Let _brick description_ mean a pair of node name and key in the dictionary returned by that node. An adapter is a dictionary, where:
- keys must agree with transormer's `fit_transform` and `transform` arguments,
- values must be either:
  1. a brick description,
  2. a list of brick descriptions,
  3. a pair of:
    - a list of brick descriptions,
    - a function that adjusts objects extracted according to the above list,

Step with an adapter proceeds like this:
1. It gathers results from preceeding nodes.
2. It builds a dictionary with the same keys as the adapter and with values built according to descriptions:
   - if the key in the adapter maps to a single brick description, an appropriate object is extracted from the results of input nodes,
   - if list of brick descriptions is given, objects are extracted according to brick descriptions and added to a list,
   - if a function is also passed, it will be applied to the list from the previous step, and its returned value will be assigned to the key.
3. Arguments of `fit_transform` and `transform` are filled using the above dictionary.

Let's check if our ensembling works. To properly fit the pipeline we have to fit random forests first using the train data, and then fit the ensembler using part of the data for this purpose.

In [None]:
for rf_step in rf_steps:
    rf_step.fit_transform(data_train)

In [None]:
ensemble_step.fit_transform(data_ensembling)

Looks fine! However, often we are interested only in the class with the highest probability. Let's make a step that will find this class for us.

In [None]:
class GuessesTransformer(BaseTransformer):
    def transform(self, y_proba):
        return {'y_pred': np.argmax(y_proba, axis=1)}

guesses_step = Step(name='guesses_maker',
                 transformer=GuessesTransformer(),
                 input_steps=[ensemble_step],       
                 cache_dirpath=CACHE_DIR
                )

In [None]:
guesses_step

You should be already familiar with everything that happened here. New step, `guesses_maker`, takes its input from `ensembler`. Adapter will create just one element: `y_pred`. List of bricks used to build `y_pred` has only one element:  `y_proba` found in `ensembler`'s result. Function `lambda lst: np.argmax(lst[0], axis=1)` takes this list and performs row-wise `argmax` on its only element.

In [None]:
guesses_step.fit_transform(data_train)

We have created a quite complicated pipeline, so for sure everyone is anxious to see how it performs. Our final step will carry out the evaluation.

In [None]:
class EvaluationTransformer(BaseTransformer):
    def transform(self, y_true, y_proba, y_pred):
        return {'Log-loss': log_loss(y_pred=y_proba, y_true=y_true),
                'Acc:': '{:.2f}'.format(sum(y_true == y_pred) / len(y_pred))
               }

evaluation_step = Step(name='evaluator',
                 transformer=EvaluationTransformer(),
                 input_steps=[ensemble_step, guesses_step],
                 input_data=['input'],
                 adapter=Adapter({
                     'y_proba': E(ensemble_step.name, 'y_proba'),
                     'y_pred':  E(guesses_step.name, 'y_pred'),
                     'y_true': E('input', 'labels')
                 }),
                 cache_dirpath=CACHE_DIR
                )

In [None]:
evaluation_step

In [None]:
evaluation_step.fit_transform(data_train)

In [None]:
evaluation_step.transform(data_test)

As we can see thanks to ensembling we improved in comparison to a single model.

### Peek on pipeline predictions

Comparing images with model's predictions is always a very rewarding feeling. As a last example we show a step that displays a few images with the predicted probability distributions!

In [None]:
model_names = [rf_step.name for rf_step in rf_steps] + [ensemble_step.name]
class LookAtPredictions(BaseTransformer):
    def transform(self, probas, images): 
        pd.options.display.float_format = '{:5.2f}'.format
        for img_nr in range(5):
            df = pd.DataFrame({model_names[j]: probas[j][img_nr]
                               for j in range(len(model_names))
                              },
                              index=list(range(10)))
            df = df[model_names]
            plt.figure(figsize=(6,2))
            left =  plt.subplot(1, 2, 1)
            right = plt.subplot(1, 2, 2)
            left.imshow(images[img_nr].reshape(8, 8), cmap='gray')
            right.axis('off')
            right.text(0, 0.3, str(df.T), fontsize=14, fontname='monospace')
    

In [None]:
display_step = Step(
    name='display',
    transformer=LookAtPredictions(),
    input_steps=[ensemble_step] + rf_steps,
    input_data=['input'],
    adapter=Adapter({
        'probas': [E(rf_step.name, 'y_proba') for rf_step in rf_steps] +
            [E(ensemble_step.name, 'y_proba')],
        'images': E('input', 'images')
    }),
    cache_dirpath=CACHE_DIR
)

In [None]:
display_step

In [None]:
display_step.fit_transform(data_train)

In [None]:
display_step.transform(data_test)