# Workshop on TFX - Components, Pipeline and Serving

TensorFlow Extended (TFX) is an end-to-end platform for deploying production ML pipelines

A TFX pipeline is a sequence of components that implement an ML pipeline which is specifically designed for scalable, high-performance machine learning tasks. Components are built using TFX libraries which can also be used individually.



In this workshop, we will build a simple Image Classification TFX pipeline. After the workshop, one should be able to:
1. Have a basic understanding of the role of TFX pipelines.
2. Have a basic understanding of Transform, Trainer and Push TFX components.
3. Have an idea to convert their ML training code to a TFX pipeline and run it in Local or Apache AirFlow.
4. Have an idea of how to deploy models using Tensorflow Serving.

This workshop is divided into three sections:
1. TFX: Understanding basic TFX component using interactive environment.
2. TFX Pipeline Orchestration using: Local and Apache Airflow (optional).
3. Model deployment: Flask and Tensorflow Serving

Reference: https://www.tensorflow.org/tfx

In [None]:
# Run this cell to setup TFX on Google Colab
try:
  import colab
  !pip install --upgrade pip
  !pip install -U tfx
except:
  pass

Note: In Google Colab, because of package updates, the first time you run this cell you must restart the runtime (Runtime > Restart runtime ...).**

# TFX Components: Interactive Orchestration

### Background
This notebook demonstrates how to use TFX in a Jupyter/Colab environment. Here, we walk through the Chicago Taxi example in an interactive notebook.

Working in an interactive notebook is a useful way to become familiar with the structure of a TFX pipeline. It's also useful when doing development of your own pipelines as a lightweight development environment, but you should be aware that there are differences in the way interactive notebooks are orchestrated, and how they access metadata artifacts.

### Orchestration
In a production deployment of TFX, you will use an orchestrator such as Apache Airflow, Kubeflow Pipelines, or Apache Beam to orchestrate a pre-defined pipeline graph of TFX components. In an interactive notebook, the notebook itself is the orchestrator, running each TFX component as you execute the notebook cells.

### Metadata
In a production deployment of TFX, you will access metadata through the ML Metadata (MLMD) API. MLMD stores metadata properties in a database such as MySQL or SQLite, and stores the metadata payloads in a persistent store such as on your filesystem. In an interactive notebook, both properties and payloads are stored in an ephemeral SQLite database in the /tmp directory on the Jupyter notebook or Colab server.

Adapted from: https://www.tensorflow.org/tfx/tutorials/tfx/components_keras

<img src="./assets/tfx_workflow.png" alt="TFX Component" height="1080" width="1900px"/>


<img src="./assets/tfx_libraries.png" alt="TFX Libraries" height="900" width="1900px"/>


In [None]:
import os
import tensorflow as tf
from typing import List
from tfx import v1 as tfx

from tfx.components import ImportExampleGen
from tfx.components import Pusher
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.proto import example_gen_pb2
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

Check tensorflow and TFX versions

In [None]:
print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))

Setup the pipeline paths for artifacts, metadata, and output model generated to be used later

In [None]:
_pipeline_name = "cifar10-tfx"

_pipeline_root = os.path.join('pipelines', _pipeline_name)
_data_root = "data"
_serving_model_dir = os.path.join('serving_model', _pipeline_name)
_labels_path = os.path.join(_data_root, 'labels.txt')

_trainer_module_file = "cifar10_trainer.py"
_transform_module_file = "cifar10_transform.py"

# Will be used later for TFX pipeline
_metadata_path = os.path.join('metadata', _pipeline_name, 'metadata.db')


Create an interactive context

In [None]:
context = InteractiveContext()

## Data Components

### ImportGen Component

The ImportExampleGen component takes TFRecord files with TF Example data format, and generates train and eval examples for downstream components. This component provides consistent and configurable partition, and it also shuffle the dataset for ML best practice.

More details: https://www.tensorflow.org/tfx/api_docs/python/tfx/v1/components/ImportExampleGen

In [None]:
input_config = example_gen_pb2.Input(splits=[
    example_gen_pb2.Input.Split(name='train', pattern='train/*'),
    example_gen_pb2.Input.Split(name='eval', pattern='test/*')
])
example_gen = ImportExampleGen(input_base=_data_root,
                               input_config=input_config)

context.run(example_gen)

In [None]:
artifact = example_gen.outputs['examples'].get()[0]
print(artifact.split_names, artifact.uri)

### StatisticsGen Component

The StatisticsGen TFX pipeline component generates features statistics over both training and serving data, which can be used by other pipeline components. StatisticsGen uses Beam to scale to large datasets.

Consumes: datasets created by an ExampleGen pipeline component.

Emits: Dataset statistics.

More details: https://www.tensorflow.org/tfx/guide/statsgen

In [None]:
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

context.run(statistics_gen)

In [None]:
context.show(statistics_gen.outputs['statistics'])

### SchemaGen Component

In [None]:
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'],
                           infer_feature_shape=True)

context.run(schema_gen)

In [None]:
context.show(schema_gen.outputs['schema'])

## Transform Component

* Consumes: tf.Examples from an ExampleGen component, and a data schema from a SchemaGen component.
* Emits: A SavedModel to a Trainer component, pre-transform and post-transform statistics.

More details: https://www.tensorflow.org/tfx/guide/transform


In [None]:
transform = Transform(examples=example_gen.outputs['examples'],
                      schema=schema_gen.outputs['schema'],
                      module_file=_transform_module_file)

context.run(transform)

Let's examine the output artifacts of Transform. This component produces two types of outputs:

* transform_graph is the graph that can perform the preprocessing operations (this graph will be included in the serving and evaluation models).
* transformed_examples represents the preprocessed training and evaluation data.

In [None]:
train_uri = transform.outputs['transform_graph'].get()[0].uri
os.listdir(train_uri)

In [None]:
# Get the URI of the output artifact representing the transformed examples, which is a directory
    _create_pipeline(pipeline_name=_pipeline_name,
                     pipeline_root=_pipeline_root,
                     data_root=_data_root,
                     transform_module_file=_transform_module_file,
                     trainer_module_file=_trainer_module_file,
                     serving_model_dir=_serving_model_dir,
                     metadata_path=_metadata_path,
                     labels_path=_labels_path))
# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(1):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  print(example)

## Trainer Component

The Trainer TFX pipeline component trains a TensorFlow model.

Trainer takes:
* tf.Examples used for training and eval.
* A user provided module file that defines the trainer logic.
* Protobuf definition of train args and eval args.
* (Optional) A data schema created by a SchemaGen pipeline component and optionally altered by the developer.
* (Optional) transform graph produced by an upstream Transform component.
* (Optional) pre-trained models used for scenarios such as warmstart.
* (Optional) hyperparameters, which will be passed to user module function. Details of the integration with Tuner can be found here.

Trainer emits: At least one model for inference/serving (typically in SavedModelFormat) and optionally another model for eval (typically an EvalSavedModel).

More details: https://www.tensorflow.org/tfx/guide/trainer

In [None]:
trainer = Trainer(module_file=_trainer_module_file,
                  examples=transform.outputs['transformed_examples'],
                  transform_graph=transform.outputs['transform_graph'],
                  schema=schema_gen.outputs['schema'],
                  train_args=trainer_pb2.TrainArgs(num_steps=160),
                  eval_args=trainer_pb2.EvalArgs(num_steps=4),
                  custom_config={'labels_path': _labels_path})
context.run(trainer)

In [None]:
model_artifact_dir = trainer.outputs['model'].get()[0].uri
print(os.listdir(model_artifact_dir))
model_dir = os.path.join(model_artifact_dir, 'Format-Serving')
print(os.listdir(model_dir))

In [None]:
model_run_artifact_dir = trainer.outputs['model_run'].get()[0].uri

%load_ext tensorboard
%tensorboard --logdir {model_run_artifact_dir}

## Pusher Component

The Pusher component is used to push a validated model to a deployment target during model training or re-training. Before the deployment, Pusher relies on one or more blessings from other validation components to decide whether to push the model or not.

A Pusher component consumes a trained model in SavedModel format, and produces the same SavedModel, along with versioning metadata.

More details: https://www.tensorflow.org/tfx/guide/pusher

In [None]:
pusher = Pusher(model=trainer.outputs['model'],
                push_destination=pusher_pb2.PushDestination(
                    filesystem=pusher_pb2.PushDestination.Filesystem(
                        base_directory=_serving_model_dir)))
context.run(pusher)

# TFX Pipeline Orchestration

## TFX Pipeline Stitching

In [None]:
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     transform_module_file: str, trainer_module_file: str,
                     serving_model_dir: str, metadata_path: str,
                     labels_path: str) -> pipeline.Pipeline:
    """Implements the CIFAR10 image classification pipeline using TFX."""
    # This is needed for datasets with pre-defined splits
    # Change the pattern argument to train_whole/* and test_whole/* to train
    # on the whole CIFAR-10 dataset
    input_config = example_gen_pb2.Input(splits=[
        example_gen_pb2.Input.Split(name='train', pattern='train/*'),
        example_gen_pb2.Input.Split(name='eval', pattern='test/*')
    ])

    # Brings data into the pipeline.
    example_gen = ImportExampleGen(input_base=data_root,
                                   input_config=input_config)

    # Computes statistics over data for visualization and example validation.
    statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

    # Generates schema based on statistics files.
    schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'],
                           infer_feature_shape=True)

    # Performs transformations and feature engineering in training and serving.
    transform = Transform(examples=example_gen.outputs['examples'],
                          schema=schema_gen.outputs['schema'],
                          module_file=transform_module_file)

    # Uses user-provided Python function that trains a model.
    # When traning on the whole dataset, use 18744 for train steps, 156 for eval
    # steps. 18744 train steps correspond to 24 epochs on the whole train set, and
    # 156 eval steps correspond to 1 epoch on the whole test set. The
    # configuration below is for training on the dataset we provided in the data
    # folder, which has 128 train and 128 test samples. The 160 train steps
    # correspond to 40 epochs on this tiny train set, and 4 eval steps correspond
    # to 1 epoch on this tiny test set.
    trainer = Trainer(module_file=trainer_module_file,
                      examples=transform.outputs['transformed_examples'],
                      transform_graph=transform.outputs['transform_graph'],
                      schema=schema_gen.outputs['schema'],
                      train_args=trainer_pb2.TrainArgs(num_steps=160),
                      eval_args=trainer_pb2.EvalArgs(num_steps=4),
                      custom_config={'labels_path': labels_path})

    # Checks whether the model passed the validation steps and pushes the model
    # to a file destination if check passed.
    pusher = Pusher(model=trainer.outputs['model'],
                    push_destination=pusher_pb2.PushDestination(
                        filesystem=pusher_pb2.PushDestination.Filesystem(
                            base_directory=serving_model_dir)))

    components = [
        example_gen, statistics_gen, schema_gen, transform, trainer, pusher
    ]

    return pipeline.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            metadata_path))


## TFX Orchestration 

### Local Dag Runner

In [None]:
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=_pipeline_name,
      pipeline_root=_pipeline_root,
      data_root=_data_root,
      transform_module_file=_transform_module_file,
      trainer_module_file=_trainer_module_file,
      serving_model_dir=_serving_model_dir,
      metadata_path=_metadata_path,
      labels_path=_labels_path))

### [Optional] Airflow Dag Runner

## Model Deployment

### Flask

### TF-Serving

## References

The various code snippets were borrowed from multiple