# Data persistence and data caching

This notebook presents data persistence and data caching features in steps.
* Persistence helps to avoid re-running early steps of a pipeline when subsequent steps are changed
* Caching makes it possible to run complex, multi-path pipelines without re-computing the results of early steps

Note that the features presented here are different from *model persistence*, which saves the transformers as the steps are trained.

In [None]:
%load_ext autoreload
%autoreload 2

import numpy as np
import pandas as pd

from sklearn.externals import joblib
from sklearn.metrics import log_loss

import matplotlib.pyplot as plt
%matplotlib inline

import os

In [None]:
from steppy.base import Step, BaseTransformer
from steppy.adapter import Adapter, E
EXPERIMENT_DIR_A = './ex4a'
EXPERIMENT_DIR_B = './ex4b'

In [None]:
import shutil

# By default pipelines will try to load previously trained models so we delete the cache to ba sure we're starting from scratch
shutil.rmtree(EXPERIMENT_DIR_A, ignore_errors=True)
shutil.rmtree(EXPERIMENT_DIR_B, ignore_errors=True)

## Data

This time we'll have a look at text classification. We'll use the classic 20newsgroups dataset, but without the headers, footers or quotes which would make the task too easy.

In [None]:
from sklearn.datasets import fetch_20newsgroups

newsgroups_train = fetch_20newsgroups(subset='train', remove=('headers', 'footers', 'quotes'))
newsgroups_test = fetch_20newsgroups(subset='test', remove=('headers', 'footers', 'quotes'))

In [None]:
from sklearn.model_selection import train_test_split

X_train, y_train = newsgroups_train.data, newsgroups_train.target

X_fit, X_val, y_fit, y_val = train_test_split(X_train, y_train, test_size=0.1, stratify=y_train, random_state=42)

Let's use a label encoder to ensure our labels are well-behaved

In [None]:
from sklearn.preprocessing import LabelEncoder
input_label_enc = LabelEncoder().fit(newsgroups_train.target)

This time we have pre-defined training and test sets but we would like to have a hold-out set of training data available for ensembling

In [None]:
data_fit = {'input':
                {
                     'text': X_fit,
                     'label': input_label_enc.transform(y_fit),
                }
            }

data_val = {'input':
                {
                     'text': X_val,
                     'label': input_label_enc.transform(y_val),
                }
            }

data_test = {'input':
                {
                     'text': newsgroups_test.data,
                     'label': input_label_enc.transform(newsgroups_test.target),
                }
            }

def print_data_summary(data, title):
    print(title)
    print('  Num. documents: {}'.format(len(data['input']['text'])))
    print('  Num. categories: {}'.format(len(np.unique(data['input']['label']))))

for data, title in [(data_fit, 'Model fitting data'), (data_val, 'Validation data'), (data_test, 'Testing data')]:
    print_data_summary(data, title)

## Text processing transformers

We define a transformer that does count vectorization on our documents - again, we can just wrap the one from sklearn:

In [None]:
from sklearn.feature_extraction.text import CountVectorizer

class CountVecTransformer(BaseTransformer):
    def __init__(self, max_features):
        self.estimator = CountVectorizer(max_features=max_features)
        
    def fit(self, X):
        self.estimator.fit(X)
        return self

    def transform(self, X, **kwargs):
        X_tfm = self.estimator.transform(X)
        return {'X': X_tfm}
    
    def persist(self, filepath):
        joblib.dump(self.estimator, filepath)
        
    def load(self, filepath):
        self.estimator = joblib.load(filepath)
        return self

Similarly for the IDFs in our TF-IDF model:

In [None]:
from sklearn.feature_extraction.text import TfidfTransformer

class StepsTfidfTransformer(BaseTransformer):
    def __init__(self):
        self.estimator = TfidfTransformer()
        
    def fit(self, X):
        self.estimator.fit(X)
        return self

    def transform(self, X, **kwargs):
        X_tfm  = self.estimator.transform(X)
        return {'X': X_tfm}
    
    def persist(self, filepath):
        joblib.dump(self.estimator, filepath)
        
    def load(self, filepath):
        self.estimator = joblib.load(filepath)
        return self

This will give us a bunch of features to train on.

## Linear model

As a first attempt, we'll try to do our predictions with (sparse) logistic regression

In [None]:
from sklearn.linear_model import LogisticRegression

class SparseLogRegProbaTransformer(BaseTransformer):
    def __init__(self):
        self.estimator = LogisticRegression(penalty='l1')
        
    def fit(self, X, y):
        self.estimator.fit(X, y)
        return self

    def transform(self, X, **kwargs):
        y_proba  = self.estimator.predict_proba(X)
        return {'y_proba': y_proba}
    
    def persist(self, filepath):
        joblib.dump(self.estimator, filepath)
        
    def load(self, filepath):
        self.estimator = joblib.load(filepath)
        return self

In [None]:
count_vec_step = Step(name='CountVec',
                      transformer=CountVecTransformer(max_features=1000),
                      input_data=['input'],
                      adapter=Adapter({'X': E('input', 'text')}),
                      experiment_directory=EXPERIMENT_DIR_A)

tfidf_step = Step(name='TF-IDF',
                  transformer=StepsTfidfTransformer(),
                  input_steps=[count_vec_step],        
                  experiment_directory=EXPERIMENT_DIR_A,
                  persist_output=True,
                  load_persisted_output=True  # This breaks when switching from training data to val data or test data!
                  )

logreg_step = Step(name='SparseLogReg',
                   transformer=SparseLogRegProbaTransformer(),
                   input_steps=[tfidf_step],
                   input_data=['input'],
                   adapter=Adapter({'X': E('TF-IDF', 'X'),
                                    'y': E('input', 'label')
                                   }),
                   experiment_directory=EXPERIMENT_DIR_A)

Note that we have passed `persist_output=True` to the `tfidf_step` constructor. This will make this step save its output so that once it's been computed once, it can later just be loaded from disk. Therefore, we will be able to work on the logistic regression classifier without having to re-compute the outputs of its ancestor nodes.  Additionally, we have also set `load_persisted_output=True`, which tells this step to load the previously computed and saved outputs instead of processing the data.

In [None]:
logreg_step

In [None]:
preds_linear_fit = logreg_step.fit_transform(data_fit)

In [None]:
from sklearn.metrics import accuracy_score

acc_linear_fit = accuracy_score(y_true=data_fit['input']['label'], y_pred=np.argmax(preds_linear_fit['y_proba'], axis=1))
print('Model fitting accuracy: {:.4f}'.format(acc_linear_fit))

In [None]:
# Bug workaround: manually delete saved output when switching datasets
os.remove(os.path.join(EXPERIMENT_DIR_A, 'outputs', 'TF-IDF'))
preds_linear_val = logreg_step.transform(data_val)

In [None]:
acc_linear_val = accuracy_score(y_true=data_val['input']['label'], y_pred=np.argmax(preds_linear_val['y_proba'], axis=1))
print('Validation accuracy: {:.4f}'.format(acc_linear_val))

## Random forest model

As an alternative, we'll also build a neural net model on top of the same TF-IDF features. We'll use the multi-layer perceptron (MLP) which is available in Scikit-learn

In [None]:
from sklearn.ensemble import RandomForestClassifier

class RfClfTransformer(BaseTransformer):
    def __init__(self, n_estimators, max_depth):
        self.estimator = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth)
        
    def fit(self, X, y):
        self.estimator.fit(X, y)
        return self

    def transform(self, X, **kwargs):
        y_proba  = self.estimator.predict_proba(X)
        return {'y_proba': y_proba}
    
    def persist(self, filepath):
        joblib.dump(self.estimator, filepath)
        
    def load(self, filepath):
        self.estimator = joblib.load(filepath)
        return self

In [None]:
rf_step = Step(name='RF',
               transformer=RfClfTransformer(n_estimators=200, max_depth=8),
               input_steps=[tfidf_step],
               input_data=['input'],
               adapter=Adapter({'X': E('TF-IDF', 'X'),
                                'y': E('input', 'label')
                               }),
               experiment_directory=EXPERIMENT_DIR_A)

In [None]:
rf_step

OK, so it was easy to add a different model on top of TF-IDF features. Indeed, this time we will be able to use the **saved** TF-IDF output, so we can get straight to fitting the random forest.

In [None]:
# Bug workaround: manually delete saved output when switching datasets
os.remove(os.path.join(EXPERIMENT_DIR_A, 'outputs', 'TF-IDF'))

preds_rf_fit = rf_step.fit_transform(data_fit)

In [None]:
acc_rf_fit = accuracy_score(y_true=data_fit['input']['label'], y_pred=np.argmax(preds_rf_fit['y_proba'], axis=1))
print('Model fitting accuracy: {:.4f}'.format(acc_rf_fit))

In [None]:
# Bug workaround: manually delete saved output when switching datasets
os.remove(os.path.join(EXPERIMENT_DIR_A, 'outputs', 'TF-IDF'))

preds_rf_val = rf_step.transform(data_val)

In [None]:
acc_rf_val = accuracy_score(y_true=data_val['input']['label'], 
                            y_pred=np.argmax(preds_rf_val['y_proba'], axis=1))
print('Validation accuracy: {:.4f}'.format(acc_rf_val))

## Ensembling

We'll do simple ensembling by averaging predictions:

In [None]:
class AvgTransformer(BaseTransformer):
    def __init__(self):
        pass
        
    def fit(self, y_proba_1, y_proba_2):
        return self

    def transform(self, y_proba_1, y_proba_2, **kwargs):
        y_proba  = (y_proba_1 + y_proba_2) / 2
        return {'y_proba': y_proba}
    
    def persist(self, filepath):
        joblib.dump({}, filepath)
        
    def load(self, filepath):
        self.estimator = joblib.load(filepath)
        return self

In [None]:
ens_step = Step(name='Ensembler',
                transformer=AvgTransformer(),
                input_steps=[logreg_step, rf_step],
                adapter=Adapter({'y_proba_1': E('SparseLogReg', 'y_proba'),
                                 'y_proba_2': E('RF', 'y_proba'),
                                }),
                experiment_directory=EXPERIMENT_DIR_A)

In [None]:
ens_step

Note that for the TF-IDF step we set `cache_output` to `True`. What does this do? Note that the output of the TF-IDF step is used both by RF and SparseLogReg. This means that when we run the Ensemble node on some data, it will in turn call MLP and SparseLogReg, which will both call TF-IDF. Without caching, this would mean we're computing the output of the TF-IDF step twice, which is definitely a waste of precious compute time and could possibly lead to some inconsistencies in the data (e.g.  if the TF-IDF step was randomized in some way). Caching solves both problems without keeping anything in memory - the caching is done on disk, not in RAM.

In [None]:
os.remove(os.path.join(EXPERIMENT_DIR_A, 'outputs', 'TF-IDF'))  # Bug workaround: manually delete saved output when switching datasets
preds_ens_val = ens_step.fit_transform(data_val)

os.remove(os.path.join(EXPERIMENT_DIR_A, 'outputs', 'TF-IDF'))  # Bug workaround: manually delete saved output when switching datasets
preds_ens_test = ens_step.transform(data_test)

In [None]:
acc_ens_val = accuracy_score(y_true=data_val['input']['label'], y_pred=np.argmax(preds_ens_val['y_proba'], axis=1))
print('Validation accuracy: {:.4f}'.format(acc_ens_val))

acc_ens_test = accuracy_score(y_true=data_test['input']['label'], y_pred=np.argmax(preds_ens_test['y_proba'], axis=1))
print('Test accuracy: {:.4f}'.format(acc_ens_test))

## Caching: saving output within one run only

Sometimes you want to keep your output within one run of your pipeline but discard it at the end. This use case is handled by **caching**. Let's build a new pipeline that uses caching instead of saving to avoid re-computing results:

In [None]:
new_count_vec_step = Step(name='CountVec',
                          transformer=CountVecTransformer(max_features=1000),
                          input_data=['input'],
                          adapter=Adapter({'X': E('input', 'text')}),
                          experiment_directory=EXPERIMENT_DIR_B)

new_tfidf_step = Step(name='TF-IDF',
                      transformer=StepsTfidfTransformer(),
                      input_steps=[new_count_vec_step],        
                      experiment_directory=EXPERIMENT_DIR_B,
                      cache_output=True)

new_logreg_step = Step(name='SparseLogReg',
                   transformer=SparseLogRegProbaTransformer(),
                   input_steps=[new_tfidf_step],
                   input_data=['input'],
                   adapter=Adapter({'X': E('TF-IDF', 'X'),
                                    'y': E('input', 'label')
                                   }),
                   experiment_directory=EXPERIMENT_DIR_B)

new_rf_step = Step(name='RF',
               transformer=RfClfTransformer(n_estimators=200, max_depth=8),
               input_steps=[new_tfidf_step],
               input_data=['input'],
               adapter=Adapter({'X': E('TF-IDF', 'X'),
                                'y': E('input', 'label')
                               }),
               experiment_directory=EXPERIMENT_DIR_B)

new_ens_step = Step(name='Ensembler',
                transformer=AvgTransformer(),
                input_steps=[new_logreg_step, new_rf_step],
                adapter=Adapter({'y_proba_1': E('SparseLogReg', 'y_proba'),
                                 'y_proba_2': E('RF', 'y_proba')
                                }),
                experiment_directory=EXPERIMENT_DIR_B)

In [None]:
new_ens_step

In [None]:
new_ens_step.clean_cache()
new_preds_ens_fit = new_ens_step.fit_transform(data_fit)
new_ens_step.clean_cache()

If you look carefully at the training log above, you should see that when training the second branch, TF-IDF just loaded outputs instead of re-computing them.

In [None]:
new_ens_step.clean_cache()
new_preds_ens_val = new_ens_step.transform(data_val)
new_ens_step.clean_cache()

In [None]:
new_ens_step.clean_cache()
new_preds_ens_test = new_ens_step.transform(data_test)
new_ens_step.clean_cache()

In [None]:
new_acc_ens_fit = accuracy_score(y_true=data_fit['input']['label'], y_pred=np.argmax(new_preds_ens_fit['y_proba'], axis=1))
print('New fitting accuracy: {:.4f}'.format(new_acc_ens_fit))

new_acc_ens_val = accuracy_score(y_true=data_val['input']['label'], y_pred=np.argmax(new_preds_ens_val['y_proba'], axis=1))
print('New validation accuracy: {:.4f}'.format(new_acc_ens_val))

new_acc_ens_test = accuracy_score(y_true=data_test['input']['label'], y_pred=np.argmax(new_preds_ens_test['y_proba'], axis=1))
print('New test accuracy: {:.4f}'.format(new_acc_ens_test))

Now you should be familiar with data persistence features. The next few notebooks will focus on building deep learning pipelines with steps.