In [None]:
#hide
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [None]:
# default_exp pipeline_classifier

In [None]:
# export
import shutil
from random import randint
from os import path,listdir
from shutil import rmtree
from tempfile import TemporaryDirectory
from itertools import product
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import re
import seaborn as sns
import tensorflow_hub as hub
import tensorflow as tf

ModuleNotFoundError: No module named 'matplotlib'

# Pipeline 
> Pipeline of the text binary classifier for job offer email responses 

The emails are classified as 'positive' or 'negative' using the Tensorflow `tf.estimator` module. 

In [None]:
#hide
#Shut down logging except Error
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

## Data Processing

The dataset of email responses is loaded from a csv file.  Three steps make up the data processing pipeline:  
Load the data -->  Make train-test split  -->  Create the Tensorflow input functions

Load and split the data using the Pandas library.  

In [None]:
# export


def split_dataframe(df, **kwargs):
    '''Split with the pandas `sample` method.
    '''
    train = df.sample(**kwargs)
    test = df.drop(train.index)
    return {'train': train, 'test': test}



def balanced_labels_in_split(df, **kwargs):
    '''Give a balanced train-test split in 'positive' and 'negative' labels
    '''
    positives = split_dataframe(df[df.sentiment == 1], **kwargs)
    negatives = split_dataframe(df[df.sentiment == 0], **kwargs)
    train = pd.concat([positives['train'], negatives['train']])
    test = pd.concat([positives['test'], negatives['test']])

    return {'train': train, 'test': test}

Test these function with the following example

In [None]:
sentiments = lambda sentiment, n: {'sentiment': [sentiment for _ in range(n)]}
df_example_f = lambda sentiment, ni, nf: pd.DataFrame(
    sentiments(sentiment, nf - ni), index=range(ni, nf)
).sample(nf - ni)
df_a =  df_example_f(1,0,100) # positive
df_b = df_example_f(0,150,200) # negative


In [None]:
random_state = randint(0, 2**32)
split_aa = split_dataframe(df_a, random_state=random_state, frac=.55)
split_bb = split_dataframe(df_a, random_state=random_state, frac=.55)

assert all(
    np.all((split_aa[k] == split_bb[k]).values) for k in ('train', 'test')
)

In [None]:
train, test = split_dataframe(df_a, frac=0.7).values()
assert train.size == 70 and test.size == 30

dataset = pd.concat([df_a, df_b]).sample(frac=1.0)
train, test = balanced_labels_in_split(dataset, frac=.8).values()

assert dataset.size == train.size + test.size
assert train.size / dataset.size

random_state = randint(0, 2**32)
balanced1 = balanced_labels_in_split(dataset, random_state=random_state, frac=.65)
balanced2 = balanced_labels_in_split(dataset, random_state=random_state, frac=.65)
assert all(
    np.all((balanced1[k] == balanced2[k]).values) for k in ('train', 'test')
)

The Tensorflow input functions are the input elements of the Tensorflow algorithms.
These functions are created with the help of the *version 1* `tf.estimator,inputs` module

In [None]:
# export
def tf_input_fn(df, **kwargs):
    ''' Load a TF function for a DataFrame
    '''
    return tf.compat.v1.estimator.inputs.pandas_input_fn(
        df, df["sentiment"],  **kwargs
    )

def train_input_fs(**kwargs):
    '''TF functions with parameters for training
    '''
    return {
        name: tf_input_fn(df, shuffle=True, num_epochs=None)
        for name, df in kwargs.items()
    }

def predict_input_fs(**kwargs):
    '''TF with with parameters for testing
    '''
    return {
        name: tf_input_fn(df,shuffle=False)  for name, df in kwargs.items()
    }

**Load and split the dataset**

In [None]:
df = pd.read_csv('../data/interim/payloads.csv')
dfs = balanced_labels_in_split(df,frac=0.6)

Percentage of the positive and negative sentiments

In [None]:
sentiment_rate = lambda df: 'sentiment: {negative: %s, positive: %s}' % (
    (df.sentiment == 0).sum() / df.size,
    (df.sentiment == 1).sum() / df.size
)

print('dataset -->',sentiment_rate(df),sep='\n')
print('train -->',sentiment_rate(dfs['train']),sep='\n')
print('test -->',sentiment_rate(dfs['test']),sep='\n')

Sample of the train dataset

In [None]:
dfs['train'].sample(2)

**Load Tensorflow input functions**

In [None]:
train_input_fn = train_input_fs(train = dfs['train'])
predict_input_fn =  predict_input_fs(**dfs)

## Set up the Model

***Feature columns***

From TF-Hub [feature column](https://github.com/tensorflow/hub/blob/master/docs/api_docs/python/hub/text_embedding_column.md)
we use [nnlm-en-dim128 module](https://tfhub.dev/google/nnlm-en-dim128/1). Some important facts are:

* The module takes **a batch of sentences in a 1-D tensor of strings** as input.
* The module is responsible for **preprocessing of sentences** (e.g. removal of punctuation and splitting on spaces).
* The module works with any input (e.g. **nnlm-en-dim128** hashes words not present in vocabulary into ~20.000 buckets).

In [None]:
#export
def embedded_text_feature_column_f(module_spec="https://tfhub.dev/google/nnlm-en-dim128/1"):
    ''' Call the text embedding from the TF-Hub library
    '''
    return hub.text_embedding_column(
        key="payload", module_spec=module_spec
    )

Load the embedded text feature column. If it's called it for the first time, it may take time to download it (it weights around 500 MB)

In [None]:
embedded_text_feature_column = embedded_text_feature_column_f()

***Premade Estimators***

For classification, we use a premade [DNN Classifier](https://www.tensorflow.org/api_docs/python/tf/estimator/DNNClassifier)

In [None]:
# export
def load_estimator(
    embedded_text_feature_column, estimator_f=tf.estimator.DNNClassifier,model_dir = None
):
    ''' Load the TF `estimator`
    '''
    return estimator_f(
        model_dir = model_dir,
        hidden_units=[500, 100],
        feature_columns=[embedded_text_feature_column],
        n_classes=2,
        optimizer=tf.keras.optimizers.Adagrad(lr=0.003)
    )

In [None]:
estimator = load_estimator(embedded_text_feature_column)

When no *model_dir* is provided, the `estimator` is stored in a temporary file. A deletion of an  `estimator` (either explicit or by the garbage collector) does not erase its corresponding directory from the disk

In [None]:
model_dir = estimator.model_dir
del estimator 
assert path.isdir(model_dir)
rmtree(model_dir)
estimator = load_estimator(embedded_text_feature_column)

## Train, Evaluate and Predict

Train the estimator for a reasonable amount of steps.  
Training for 5,000 steps means 640,000 training examples with the default
batch size (128).   
This is roughly equivalent to 640,000/363 ~ 1700 epochs since the training dataset
contains 363 examples.

In [None]:
#export
def train(estimator, train_input_fn,steps=5000):
    '''Train with TF `estimator.train`
    '''
    result = {}
    for name,input_fn in train_input_fn.items():
        result[name] = estimator.train(input_fn=input_fn, steps=steps)
    return result

In [None]:
train(estimator,train_input_fn,steps=5000)

The F1 score is not included in `tf.estimator.evaluate`. The function `f1_score`, calculates the F1 score from the precision and recall. The function `evaluate` adds to  `tf.estimator.evaluate` the  `f1_score` and is acting over the input functions.

In [None]:
# export
def f1_score(estimations):
    ''' Calculates function: \n
    *f1_score(precision,recall) =  (2 x precision x recall) / (precision + recall)*
    '''
    precision =  estimations['precision']
    recall = estimations['recall']
    if (precision + recall) < 10**(-12):
        return 0.0
    
    return 2.0 * precision * recall / (precision + recall)

In [None]:
assert f1_score({'precision':10**(-13),'recall':10**(-13)}) == 0.0

assert f1_score({'precision':1,'recall':1})  == 1.

In [None]:
# export
def evaluate(estimator, **args:tf_input_fn):
    '''Evaluate with TF `estimator.evaluate`
    '''
    results = {}
    for name,input_fn in args.items():
        results[name] = estimator.evaluate(input_fn=input_fn)
        results[name]['f1_score'] = f1_score(results[name])
    return results

In [None]:
#export
def predict(estimator, df_examples):
    ''' Predict with TF `estimator.predict` and from a dataframe of payloads.
    '''

    def predict_from_input_fn(estimator, **input_fns):
        res = {}
        for name, input_fn in input_fns.items():
            res[name] = np.array(
                [x["class_ids"][0] for x in estimator.predict(input_fn=input_fn)]
            )
        return res

    return predict_from_input_fn(
        estimator, **predict_input_fs(examples=df_examples)
    )['examples']

Evaluate:

In [None]:
eval_results = evaluate(estimator,**predict_input_fn)

In [None]:
print("Training set accuracy: {accuracy}".format(**eval_results['train']))
print("Test set accuracy: {accuracy}".format(**eval_results['test']))

Predict:

In [None]:
predict(estimator, dfs['test'].sample(5)) 

It is desirable a predictor that inputs from a string. The function `sentiment` covers this role.

In [None]:
# export
def sentiment(estimator, doc):
    ''' Gets the sentiment of the *doc* string
    '''
    ex_df = pd.DataFrame([{'payload':doc,'sentiment':0}])
    pred = ["negative", "positive"][predict(estimator,ex_df)[0]]
    return pred #doc + '\n --> \n' + pred

In [None]:
example_str = ''' thank you for offering me the position of Merchandiser with Thomas Ltd.
i am thankful to accept this job offer and look ahead to starting my career with your company
on June 27, 2000.'''
sentiment(estimator,example_str)

***Confusion matrix***  
The function `plot_confusion_matrix` plots the corresponding confusion matrix over a dataframe and its corresponding Tensorflow function. The confusion matrix helps  to understand the distribution of misclassifications.

In [None]:
# export
def plot_confusion_matrix(df_data, estimator, input_fn, header,fig_file=None):
    ''' Plot the Confusion Matrix: ${{TN,FN},{FP,TP}}$
    '''
    def get_predictions(estimator, input_fn):
        return [x["class_ids"][0] for x in estimator.predict(input_fn=input_fn)]

    LABELS = ["negative", "positive"]

    # Create a confusion matrix on dataframe data.
    cm = tf.math.confusion_matrix(
        df_data["sentiment"], get_predictions(estimator, input_fn)
    )

    # Normalize the confusion matrix so that each row sums to 1.
    cm = tf.cast(cm, dtype=tf.float32)
    cm = cm / tf.math.reduce_sum(cm, axis=1)[:, np.newaxis]

    sns.heatmap(cm, annot=True, xticklabels=LABELS, yticklabels=LABELS)
    plt.title(header)
    plt.xlabel("Predicted")
    plt.ylabel("True")
    if fig_file is not None:
        plt.savefig(fig_file)

The Confusion Matrix for the train set case

In [None]:
plot_confusion_matrix(dfs['train'],estimator,predict_input_fn['train'],header='Train data')

The Confusion Matrix for the test case

In [None]:
plot_confusion_matrix(dfs['test'],estimator,predict_input_fn['test'],header='Test data')

## Save and Load an Estimator

 The  `export_estimator` function copy the directory of the estimador given in `estimator.model_dir` to a *dst_model*.

In [None]:
# export
def export_estimator(estimator, dst_estimator):
    ''' Copy the estimator's directory to a new directory
    '''
    shutil.copytree(estimator.model_dir, dst_estimator)

Load the saved estimator using `load_model` and check that the evaluation results are up to date with latest check point:

In [None]:
with TemporaryDirectory() as tmpdirname:
    tmp_dir = path.join(tmpdirname, 'tmp')
    export_estimator(estimator, dst_estimator=tmp_dir)
    loaded_estimator_results = evaluate(
        load_estimator(embedded_text_feature_column, model_dir=tmp_dir),
        **predict_input_fn
    )

In [None]:
assert loaded_estimator_results == eval_results

Erase all the variables defined so far

In [None]:
del df,dfs, eval_results, embedded_text_feature_column, loaded_estimator_results

In [None]:
str('hola this is an string')

In [None]:
# export
class Pipeline:
    '''Implements the workflow: Load -> Train -> Evaluate.'''
    def __init__(
        self,
        src_file=None,
        estimator_dir=None,
        frac=0.60,
        train_steps=5000,
        random_state=None
    ):
        '''Pass the arguments to class attributes.
           Load and split the data.
        '''
        self.frac = frac
        self.estimator_dir = estimator_dir
        self._is_estimator_dir = path.isdir(str(estimator_dir))
        self.random_state = random_state
        self.train_steps = train_steps
        self.module_spec = "https://tfhub.dev/google/nnlm-en-dim128/1"

        if src_file is not None:
            self.data = pd.read_csv(src_file)
            self.split_dataset()

    def __del__(self):
        ''' Removes the `estimator` and its corresponding directory, 
        unless the estimator_dir is None.
        '''
        if not self._is_estimator_dir:
            rmtree(self.estimator_dir, ignore_errors=True)

    def split_dataset(self):
        ''' Train-test splits. Deletes empty dataframes.
        '''
        self.dfs = balanced_labels_in_split(
            self.data, random_state=self.random_state, frac=self.frac
        )
        if self.dfs['test'].shape[0] == 0:
            del self.dfs['test']
        if self.dfs['train'].shape[0] == 0:
            del self.dfs['train']

    def input_fns(self):
        self.input = {}
        if 'train' in self.dfs.keys():
            self.input['train'] = train_input_fs(train=self.dfs['train'])

        self.input['predict'] = predict_input_fs(**self.dfs)

    def load_estimator(self):
        self.embedded_text_feature_column = embedded_text_feature_column_f(
            module_spec=self.module_spec
        )
        self.estimator = load_estimator(
            self.embedded_text_feature_column, model_dir=self.estimator_dir
        )
        self.estimator_dir = self.estimator.model_dir

    def train(self):
        if self.input['train'] is not None:
            train(self.estimator, self.input['train'], steps=self.train_steps)

    def evaluate(self):
        self.evaluation = evaluate(self.estimator, **self.input['predict'])

    def plot_confusion_matrix(self, label, fig_file=None):
        plot_confusion_matrix(
            self.dfs[label],
            self.estimator,
            self.input['predict'][label],
            header=label + ' data',
            fig_file=fig_file
        )

    def export_estimator(self, dst_dir):
        try:
            _ = self.estimator
        except:
            self.load_estimator()
        export_estimator(self.estimator, dst_dir)

    def predict(self, df_examples):
        '''Predict from dataframe'''
        return predict(self.estimator, df_examples)

    def sentiment(self, doc):
        return sentiment(self.estimator, doc)

    def pipeline(self):
        ''' The pipeline flow is:
            input_fns --> load_estimator --> train --> evaluate
        '''
        self.input_fns()
        self.load_estimator()
        self.train()
        self.evaluate()

In [None]:
from nbdev.showdoc import *
show_doc(Pipeline.__del__)

When an estimator is deleted, the corresponding temporary directory is kept in memory

In [None]:
pl = Pipeline(src_file ='../data/interim/payloads.csv')
pl.load_estimator()
model_dir  = pl.estimator.model_dir

assert path.isdir(model_dir)
del pl.estimator
assert path.isdir(model_dir)
rmtree(model_dir)

The attribute `__del__`  in the `Pipeline` class fixes this

In [None]:
pl = Pipeline()
pl.load_estimator()
model_dir  = pl.estimator.model_dir

assert path.isdir(model_dir)
del pl
assert not path.isdir(model_dir)

The deletion of the `Pipeline` instance does not cause the deletion of the estimator directory when the estimator has been previously loaded (specifying it in *estimator_dir*).

In [None]:
with TemporaryDirectory() as tmpdirname:
    tmp_dir = path.join(tmpdirname, 'tmp')
    pl = Pipeline()
    pl.load_estimator()
    model_dir = pl.estimator.model_dir
    pl.export_estimator(dst_dir=tmp_dir)
    del pl
    
    assert tmp_dir != model_dir
    assert not path.isdir(model_dir) 
    assert path.isdir(tmp_dir)
    
    pl = Pipeline(estimator_dir=tmp_dir)
    del pl 
    
    assert path.isdir(tmp_dir)
    

Does not generate the train TF input function when train dataframe is not provided

In [None]:
sentiments = lambda sentiment, n: {'sentiment': [sentiment for _ in range(n)]}
df_example_f = lambda sentiment, ni, nf: pd.DataFrame(
    sentiments(sentiment, nf - ni), index=range(ni, nf)
).sample(nf - ni)

pl = Pipeline()
pl.load_estimator()
pl.dfs = {'test': df_example_f(1,0,100)} #no tes
pl.input_fns()
assert 'train' not in pl.input.keys()
assert 'test' in pl.input['predict'].keys()

In [None]:
from nbdev.showdoc import *
show_doc(Pipeline.pipeline)

The `pipeline` calls the functions that prepares the input data, trains the model and stores the results of the evaluation

**After running the pipeline, check that all methods are implemented correctly**

In [None]:
pipeline = Pipeline(src_file= '../data/interim/payloads.csv')
pipeline.pipeline()

The resulting evaluation is stored in `pipeline.evaluation`

In [None]:
pipeline.evaluation.keys()

In [None]:
pipeline.evaluation['test']    

The confusion matrix for train set

In [None]:
pipeline.plot_confusion_matrix('train')

The confustion matrix for test set

In [None]:
pipeline.plot_confusion_matrix('test')

Check save in file option in plot confusion matrix

In [None]:
tmp_dir = './this_is_temporary_file_a4gdknlkhlalhfkjdadkl234235253'
rmtree(tmp_dir,ignore_errors=True)
!mkdir -p {tmp_file}

In [None]:
tmp_file = path.join(tmp_dir,'fig.png')
pipeline.plot_confusion_matrix('test', fig_file=tmp_file)
assert path.isfile(tmp_file)

![]("./this_is_temporary_file_a4gdknlkhlalhfkjdadkl234235253/fig.png")

In [None]:
!rm -r {tmp_dir}

In [None]:
with TemporaryDirectory() as tmpdirname:
    tmp_file = path.join(tmpdirname, 'tmp.png')
    

Check that the `export_estimator` is rightly copying the model files

In [None]:
with TemporaryDirectory() as tmpdirname:
    tmp_dir = path.join(tmpdirname,'tmp')
    pipeline.export_estimator(tmp_dir)
    
    assert listdir(tmp_dir) == listdir(pipeline.estimator.model_dir)

Predict examples from dataframe in 0 1 format

In [None]:
pipeline.predict(pipeline.data.sample(2))

Predict from string

In [None]:
doc_str='''thank you very much for offering me the opportunity to work at Norris, Wilson and Kelley as Clinical embryologist.\nafter much deliberation, i will not be accepting the position,
as it isn’t the right fit for my long-term career goals.\ni sincerely appreciate the offer and give you
my best wishes in finding a suitable candidate for the position.\ni wish you and the company well 
in all future endeavours.'''
print(pipeline.sentiment(doc_str))

Create an instance of a `Pipeline` and run it (call `pipeline` method). Export the `estimator`. 
Create a new instance of a `pipeline` with the exported `estimator`. Evaluate in this pipeline without training.
The resulting evaluations of the two pipelines must coincide, provided that the `evaluation` is carried out in the same test set \[CAVEAT: They must coincide up to a factor (see below)\]. 

In [None]:
random_state = randint(0,2**32-1)
with TemporaryDirectory() as tmpdirname:
    tmp_dir = path.join(tmpdirname, 'tmp')
    pl1 = Pipeline(src_file='../data/interim/payloads.csv',random_state=random_state)
    pl1.pipeline()
    pl1.export_estimator(dst_dir=tmp_dir)
    pl2 = Pipeline(src_file='../data/interim/payloads.csv',estimator_dir=tmp_dir,random_state=random_state)
    pl2.input_fns()
    pl2.load_estimator()
    pl2.evaluate()
    
assert  pl1.evaluation ==  pl2.evaluation

Instantiate the Pipeline class twice with the same seed

In [None]:
kwargs = {
    'random_state': randint(0, 2**32 - 1),
    'src_file': '../data/interim/payloads.csv'
}
pl1, pl2 = Pipeline(**kwargs), Pipeline(**kwargs)
pl1.pipeline()
pl2.pipeline()


NameError: name 'Pipeline' is not defined

Since the seed is the same from both pipelines, the train/test split is the same in both cases

In [None]:
assert np.all((pl1.dfs['train'] ==pl2.dfs['train']).values)
assert np.all((pl1.dfs['test'] ==pl2.dfs['test']).values)

The Tensorflow training algorithms are not completely [*deterministic*](https://github.com/NVIDIA/tensorflow-determinism). Therefore, the scores in the two pipelines are equal up to a factor. Calculate that factor.

In [None]:
def is_equal_pipe_evals(m, n):
    ''' $error  = m 10^{-n}$
    '''
    return all(
        abs(
            pl1.evaluation[dataname][scorename] -
            pl2.evaluation[dataname][scorename]
        ) < m * 10**(-n) for dataname in ('train', 'test')
        for scorename in pl1.evaluation['train']
    )

for n,m in product(reversed(range(1, 4)),range(1, 10) ):
    if is_equal_pipe_evals(m, n):
        break
        
print(f'The scores with different instances of the pipeline class are equal up to a factor of {m} x 10^({-n})') 

assert is_equal_pipe_evals(m,n)

In [None]:
# hide
from nbdev.export import notebook2script
notebook2script()