# Interactive ML with TensorFlow Extended (TFX) Pipelines
1. Extracting the new training data from the source using [ExampleGen](https://www.tensorflow.org/tfx/guide/examplegen) component.
2. Validating new training data
    * Generating statistics from the the incoming data using [StatisticsGen](https://www.tensorflow.org/tfx/guide/statsgen) component.
    * Importing a fixed raw schema using [ImporterNode](https://github.com/tensorflow/tfx/blob/master/tfx/components/common_nodes/importer_node.py) component.
    * Validating data based on the schema using [ExampleValidator](https://www.tensorflow.org/tfx/guide/exampleval) component.
5. Transforming the data for ML using the [Transform](https://www.tensorflow.org/tfx/guide/transform) component.
6. Training the model using the [Trainer](https://www.tensorflow.org/tfx/guide/trainer) component.
7. Evaluate the model using the [Evaluator](https://www.tensorflow.org/tfx/guide/evaluator) component.
8. Validate the model using a [Custom TFX](https://www.tensorflow.org/tfx/guide/custom_component) component.
9. Push the the blessed model to serving locationusing [Pusher](https://www.tensorflow.org/tfx/guide/pusher) component.
10. Query the [ML Metadata](https://www.tensorflow.org/tfx/guide/mlmd) DB

### Setup

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os
import tfx
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_model_analysis as tfma
import warnings
import logging

warnings.simplefilter(action='ignore', category=FutureWarning)
logger = tf.get_logger()

print("Tensorflow Version:", tf.__version__)
print("TFX Version:", tfx.__version__)

In [None]:
WORKSPACE = 'workspace'
DATA_DIR = WORKSPACE + '/data'
RAW_SCHEMA_DIR =  'ml_pipeline/raw_schema'
OUTPUT_DIR = WORKSPACE + '/artifacts'
MODEL_REGISTRY = WORKSPACE + '/model_registry'

REMOVE_ARTIFACTS = True
if REMOVE_ARTIFACTS:
    if tf.io.gfile.exists(OUTPUT_DIR):
        print("Removing previous artifacts...")
        tf.io.gfile.rmtree(OUTPUT_DIR)
    if tf.io.gfile.exists(MODEL_REGISTRY):
        print("Removing previous model regitry...")
        tf.io.gfile.rmtree(MODEL_REGISTRY)
        
tf.io.gfile.mkdir(MODEL_REGISTRY)
print("Model registry directory created.")

## Create Interactive Context
This will use an ephemeral SQLite MLMD connection contained in the pipeline_root directory with file name "metadata.sqlite" will be used.

In [None]:
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

PIPELINE_NAME = 'tfx-census-classification'

context = InteractiveContext(
    pipeline_name=PIPELINE_NAME,
    pipeline_root=OUTPUT_DIR,
    metadata_connection_config=None
)

In [None]:
from pprint import pprint
pprint("Standard Artifact types:")
pprint([a for a in dir(tfx.types.standard_artifacts) if a[0].isupper()])

## 1. Data Ingestion (ExampleGen)
1. Reads the CSV data files (expecting to include headers)
2. Split the data to train and eval sets
3. Write the data to TFRecords


* **Inputs**: ExternalPath
* **Ouptpus**: Examples (TFRecords)
* **Properties**: split ratio

In [None]:
from tfx.utils.dsl_utils import external_input
from tfx.proto import example_gen_pb2

output_config = example_gen_pb2.Output(
    split_config=example_gen_pb2.SplitConfig(splits=[
        example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=3),
        example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
    ]))


example_gen = tfx.components.CsvExampleGen(
    instance_name='Data_Extraction_Spliting',
    input=external_input(DATA_DIR),
    output_config=output_config
)

context.run(example_gen)

In [None]:
import tensorflow_data_validation as tfdv


### Read sample of the extracted data...

In [None]:
train_uri = example_gen.outputs.examples.get()[0].uri + "/train/*"

tfrecord_filenames = tf.data.Dataset.list_files(train_uri)

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

# Display some records
for tfrecord in dataset.shuffle(1000).take(1):
    serialized_example = tfrecord.numpy()
    print(tf.train.Example.FromString(serialized_example).features)
    print("")

## 2. Data Validation
1. Generate the **statistics** for the data to validate.
2. Import the **raw_schema** created in the Data Analysis phase.
3. Validat the **statistics** against the schema and generate **anomalies** (if any).

### 2.1. Generating statistics for the data to validate (StatisticsGen)
* **Inputs**: Examples
* **Outputs**: ExampleStatistics
* **Properries**: None

In [None]:
statistics_gen = tfx.components.StatisticsGen(
    instance_name='Statistics_Generation',
    examples=example_gen.outputs.examples)
context.run(statistics_gen)

In [None]:
#context.show(statistics_gen.outputs['statistics'])

### 2.2. Import the fixed raw schema (ImporterNode)
The **ImporterNode** allows you to import an external artifact to a component.
You need to specifiy:
1. Artifact Type
2. Artifcat Location

In [None]:
schema_importer = tfx.components.common_nodes.importer_node.ImporterNode(
    instance_name='Schema_Importer',
    source_uri=RAW_SCHEMA_DIR,
    artifact_type=tfx.types.standard_artifacts.Schema,
    reimport=False
)

context.run(schema_importer)

In [None]:
context.show(schema_importer.outputs.result)

### 2.3. Validate the input data statistics (ExampleValidator)
* **Inputs**: ExampleStatistics, Schema
* **Outputs**: ExampleAnomalies (if any)
* **Properties**: None

In [None]:
example_validator = tfx.components.ExampleValidator(
    statistics=statistics_gen.outputs.statistics,
    schema=schema_importer.outputs.result,
    instance_name="Data_Validation"
)

context.run(example_validator)

In [None]:
context.show(example_validator.outputs.anomalies)

## 3. Data Preprocessing

### 3.1. Implement the preprocessing logic

We need to implement the preprocessing logic in a python module: **transform.py**.

* This module is expected to have **preprocessing_fn** method, which accepts a dictionary of the raw features, and returns a dictionary of the transformed features.
* We use the **raw schema** to identify feature types and the required transformation.
* The function is implemented using [TensorFlow Transform](https://www.tensorflow.org/tfx/guide/tft).

### 3.2. Tranform train and eval data (Transform)

The component uses the transform output generated from transforming the train data to transform eval data.
That is, while the train data is **analyzed** and **transformed**, the eval data is **only transformed** uaing the output of the analyze phase (TransformGraph) on the train data.

* **Inputs**: train and eval data (Examples), raw schema (Schema), transformation module (file)
* **outputs**: transformed train and eval data (Examples), transform output (TransformGraph)

In [None]:
logger.setLevel(logging.ERROR)

_transform_module_file = 'ml_pipeline/modules/transform.py'

transform = tfx.components.Transform(
    examples=example_gen.outputs.examples,
    schema=schema_importer.outputs.result,
    module_file=_transform_module_file,
    instance_name="Data_Transformation"
)

context.run(transform)

In [None]:
uri = transform.outputs.transform_graph.get()[0].uri
os.listdir(uri)

## 4. Train the Model (Trainer)

### 4.1 Implement a train Python module.

Create a Python module containing the following methods.

1. create_dataset: loads data for training and evaluation
2. create_feature_columns: defines the model interface
3. create_keras_model: defines model architecture and optimization algorithm
4. create_serving_signature: defines the exported model interface

The entry point of the module is the **run_fn**, which trains, evaluates, and exports the model.
The function takes **params** as argument, which includes the required parameters for creating and traniing the model.

In [None]:
from ml_pipeline.modules import train
transform_output = tft.TFTransformOutput(transform.outputs.transform_graph.get()[0].uri)

#### - Create Dataset

In [None]:
transformed_train_uri = transform.outputs.transformed_examples.get()[0].uri + "/train/*"

sample_dataset = train.create_dataset(
    transformed_train_uri, transform_output, batch_size=5, shuffle=True)

for input_features, target in sample_dataset.take(2):
    print("Features:")
    for key, values in input_features.items():
        print("- {}: {}".format(key, values.numpy().tolist()))
    print("Targets:", list(target.numpy().tolist()))
    print("")

#### - Feature Columns

In [None]:
feature_columns = train.create_feature_columns(transform_output)
feature_columns

#### - Keras Model

In [None]:
model = train.create_keras_model(None, feature_columns)
model(input_features)
model.summary()

#### - model signatures

In [None]:
model_signatures = train.create_model_signatures(model, transform_output)
serving_fn = model_signatures['serving_features']

print("Serving fn inputs:")
print("------------------")
pprint(serving_fn.structured_input_signature[0])

print("Serving fn outputs:")
print("-------------------")
pprint(serving_fn.structured_outputs)
print("")

eval_fn = model_signatures['serving_default']
print("Eval fn inputs:")
print("---------------")
pprint(eval_fn.structured_input_signature[0])

print("Eval fn outputs:")
print("----------------")
pprint(eval_fn.structured_outputs)

### 4.2 Train the model using the Trainer component
* **Inputs**: train module file with the **trainer_fn**, raw schema (Schema), and transform output (TransformGraph)
* **Outputs**: saved_model (Model)
* **Properties**: train and eval args


In [None]:
from tfx.components.base import executor_spec
from tfx.components.trainer import executor as trainer_executor

logger.setLevel(logging.INFO)

_train_module_file = 'ml_pipeline/modules/train.py'

trainer = tfx.components.Trainer(
    custom_executor_spec=executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
    module_file=_train_module_file,
    transformed_examples=transform.outputs.transformed_examples,
    schema=schema_importer.outputs.result,
    transform_graph=transform.outputs.transform_graph,
    train_args=tfx.proto.trainer_pb2.TrainArgs(num_steps=1000),
    eval_args=tfx.proto.trainer_pb2.EvalArgs(num_steps=None),
    instance_name='Census_Classifier_Trainer'
)

context.run(trainer)

In [None]:
train_uri = trainer.outputs.model.get()[0].uri
saved_model_dir = os.path.join(train_uri, 'serving_model_dir')
print(saved_model_dir)

In [None]:
!ls {saved_model_dir}

## 5. Evaluate the trained model (Evaluator)
* **Inputs**: eval data (Examples), trained model (Model)
* **Outputs** eval metric (ModelEvaluation)
* **Properties**: Slicing Specs

In [None]:
from ml_pipeline.modules import helper
eval_config = helper.get_eval_config()

In [None]:
model_analyzer = tfx.components.Evaluator(
    instance_name="Occupation_based_Evaluator",
    examples=example_gen.outputs.examples,
    model=trainer.outputs.model,
    eval_config=eval_config
)

context.run(model_analyzer, enable_cache=False)

In [None]:
import tensorflow_model_analysis as tfma

results_uri = model_analyzer.outputs.output.get()[0].uri
results = tfma.load_eval_result(results_uri)

In [None]:
print('Total evaluation examples:', int(results.slicing_metrics[0][1]['']['']['example_count']['doubleValue']))
print('Overal evaluation accuracy:', round(results.slicing_metrics[0][1]['']['']['binary_accuracy']['doubleValue']*100, 2), '%')
print("")
for slicing_metric in results.slicing_metrics[1:]:
    label = "{}:{}".format(slicing_metric[0][0][0], slicing_metric[0][0][1])
    example_count = int(slicing_metric[1]['']['']['example_count']['doubleValue'])
    accuray = round(slicing_metric[1]['']['']['binary_accuracy']['doubleValue']*100, 2)
    print('{} - example count: {}, accuracy: {}'.format(label, example_count, accuray), '%')

## 6. Validate the Trained Model

We will create a Custom TFX Component that validates the trained model based on its produced evaluation metric.

The custom validator will **bless** the model if:
1. Overal accuracy is greater than 85%.
2. Accuracy per **Occupation** slice is at most 10% less than the overall accuracy.

* **Inputs**: Evaluation Metric (ModelEvaluation), trained model (Model)
* **Outputs**: blessing (ModelBlessing)
* **Properties**: accuracy_threshold, slice_accuracy_tolerance

In [None]:
from ml_pipeline.modules import custom_components

In [None]:
accuracy_model_validator = custom_components.AccuracyModelValidator(
    eval_results=model_analyzer.outputs.output,
    model=trainer.outputs.model,
    accuracy_threshold=0.5,
    slice_accuracy_tolerance=0.15,
    instance_name="Accuracy_Model_Validator"
)

context.run(accuracy_model_validator, enable_cache=False)

In [None]:
blessing_uri = accuracy_model_validator.outputs.blessing.get()[0].uri
!ls -l {blessing_uri}

## 7. Pushing the Blessed Model (Pusher)
This steps pushes the validated and blessed model to its final destination. This could be:
1. Model Registry
2. Git Repository
3. API Serving Platform
4. Filesystem location
5. Other

### 7.1. Push the blessed model to model registry (filesystem location)

In [None]:
exported_model_location = os.path.join(MODEL_REGISTRY, 'census')

push_destination=tfx.proto.pusher_pb2.PushDestination(
    filesystem=tfx.proto.pusher_pb2.PushDestination.Filesystem(
        base_directory=exported_model_location)
)

pusher = tfx.components.Pusher(
    model=trainer.outputs.model,
    model_blessing=accuracy_model_validator.outputs.blessing,
    push_destination=push_destination
)

context.run(pusher)

### 7.2. Test the pushed model

In [None]:
latest_serving_model_path = os.path.join(exported_model_location, max(os.listdir(exported_model_location)))
print(latest_serving_model_path)

In [None]:
!saved_model_cli show --dir {latest_serving_model_path} --tag_set serve --signature_def serving_features

In [None]:
logger.setLevel(logging.ERROR)

predictor = tf.saved_model.load(
    latest_serving_model_path).signatures["serving_features"]

def create_tf_features(instance):
    new_instance = {}
    for key, value in instance.items():
        new_instance[key] = tf.constant(
            value, 
            dtype=serving_fn.structured_input_signature[0][0][key].dtype
        )
    return new_instance

def local_predict(instance):
    features = create_tf_features(instance)
    outputs = predictor(**features)
    return outputs 

In [None]:
instance = {
    'age': [34],
    'workclass': ['Private'],
    'education': ['Doctorate'],
    'education_num': [10],
    'marital_status': ['Married-civ-spouse'],
    'occupation': ['Prof-specialty'],
    'relationship': ['Husband'],
    'race': ['White'],
    'gender': ['Male'],
    'capital_gain': [0], 
    'capital_loss': [0], 
    'hours_per_week': [40],
    'native_country':['Egyptian']
}

In [None]:
outputs = local_predict(instance)

predictions = list(
    zip(outputs[LABEL_KEY].numpy().tolist(), 
        outputs[SCORE_KEY].numpy().tolist()))

for prediction in predictions:
    print("Predicted label: {} - Prediction confidence: {}".format(
        prediction[0], round(prediction[1], 3)))

## 8. Querying Metadata database

In [None]:
import sqlite3
connection = sqlite3.connect(os.path.join(OUTPUT_DIR, 'metadata.sqlite'))
cursor = connection.cursor()

### List tables

In [None]:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
pprint(cursor.fetchall())

### Query Artifact table

In [None]:
cursor.execute("SELECT * FROM Artifact;")
for entry in cursor.fetchall():
    print(entry)