# TFX Workflow
---

## TL;DR

TFX is an amazing tool for production, as many Google products can testify. But it can also be used before production, during the development phase, in order to reduce the time needed between dev and prod.
Let's see in this tutorial how one can leverage this tool in order to improve the development experience.

## First Things First 

### - Watch this video

In [1]:
%%HTML
<iframe width="824" height="463" src="https://www.youtube.com/embed/drYM04t57tU" frameborder="0" allow="accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>

### - Read the [TFX User Guide](https://www.tensorflow.org/tfx/guide) until the [Deployment Targets](https://www.tensorflow.org/tfx/guide#deployment_targets) section included

## Some imports

In [2]:
import os
import sys

sys.path.append("../../utils")
from pipeline_utils import PipelineStep, create_pipeline_file, write_pipeline_to_dags
import tfx_utils

import tensorflow_data_validation as tfdv
import tensorflow_model_analysis as tfma

from tfx.utils import io_utils
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema

%matplotlib notebook

## Setup

In [3]:
project = "titanic"
pipeline_name = project + "_pipeline"
metadata_uri = os.path.join(os.environ['METADATA_DB_DIR'], project, 'metadata.db')
store = tfx_utils.TFXReadonlyMetadataStore.from_sqlite_db(metadata_uri)

## Workflow

A data science workflow is an iterative process, we first start with the data, inspect them, clean them, create some features, inspect them again, build a model, analyze it, restart at any previous stage if needed, then eventually deploy the model. TFX brings us key components in order to build this steps. Let's see how we can iterate over the workflow in order to create our model.

### Data Validation

The first stage of the workflow is to load the data and inspect them.
In order to do so, we will need to, first, load the data ([CsvExampleGen](https://www.tensorflow.org/tfx/guide/examplegen)), then create a statistical representation of it ([StatisticsGen](https://www.tensorflow.org/tfx/guide/statsgen)) in order to create some schema ([SchemaGen](https://www.tensorflow.org/tfx/guide/schemagen)) which will be used in order to validate ([ExampleValidator](https://www.tensorflow.org/tfx/guide/exampleval)) the data to monitor any changes.

#### Pipeline

We use the *create_pipeline_file* tool function in order to create a tfx pipeline for the **Data Validation** phase

In [4]:
pipeline_file = create_pipeline_file(PipelineStep.DATA_VALIDATION, project)
write_pipeline_to_dags(pipeline_file, pipeline_name)
print(pipeline_file)


import os
import logging
import datetime

from tfx.orchestration.airflow.airflow_runner import AirflowDAGRunner
from tfx.orchestration.pipeline import PipelineDecorator
from tfx.utils.dsl_utils import csv_input
from tfx.proto import trainer_pb2, evaluator_pb2, pusher_pb2

from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.schema_gen.component import SchemaGen
from tfx.components.example_validator.component import ExampleValidator
from tfx.components.transform.component import Transform
from tfx.components.trainer.component import Trainer
from tfx.components.evaluator.component import Evaluator
from tfx.components.model_validator.component import ModelValidator
from tfx.components.pusher.component import Pusher


data_dir = os.path.join(os.environ['DATA_DIR'], 'titanic')
log_dir = os.path.join(os.environ['TFX_DIR'], 'logs')
serving_model_dir = os.path.join(os.environ['SERVI

> We can now go in [Airflow](http://localhost:8080/admin/airflow/graph?dag_id=titanic) and click **Refresh** in order to update the dag, then **Triger DAG** in order to run it

#### Visualization

As seen in the [TFX User Guide](https://www.tensorflow.org/tfx/guide), the outputs of components are stored in the meta-datastore by the publishers. Thus we can query the store in order to grap some infos on this outputs (artifacts).

In [5]:
artifacts_infos = store.get_artifacts_of_type_df(tfx_utils.TFXArtifactTypes.EXAMPLES).reset_index()
artifacts_infos

Unnamed: 0,ID


In [13]:
artifact_id = 2
store.display_stats_for_examples(artifact_id)

In [15]:
schemas = store.get_artifacts_of_type_df(tfx_utils.TFXArtifactTypes.SCHEMA)
schema_uri = schemas.URI.iloc[0] + 'schema.pbtxt'
schema_proto = io_utils.parse_pbtxt_file(file_name=schema_uri, message=schema_pb2.Schema())
tfdv.display_schema(schema_proto)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'Fare',FLOAT,required,single,-
'Name',BYTES,required,single,-
'Embarked',STRING,optional,single,'Embarked'
'Age',FLOAT,optional,single,-
'Parch',INT,required,single,-
'Pclass',INT,required,single,-
'Sex',STRING,required,single,'Sex'
'Survived',INT,required,single,-
'SibSp',INT,required,single,-
'PassengerId',INT,required,single,-


Unnamed: 0_level_0,Values
Domain,Unnamed: 1_level_1
'Embarked',"'C', 'Q', 'S'"
'Sex',"'female', 'male'"


In [16]:
feature_spec, domains = schema_utils.schema_as_feature_spec(schema_proto)
feature_spec

{u'Age': VarLenFeature(dtype=tf.float32),
 u'Cabin': VarLenFeature(dtype=tf.string),
 u'Embarked': VarLenFeature(dtype=tf.string),
 u'Fare': VarLenFeature(dtype=tf.float32),
 u'Name': VarLenFeature(dtype=tf.string),
 u'Parch': VarLenFeature(dtype=tf.int64),
 u'PassengerId': VarLenFeature(dtype=tf.int64),
 u'Pclass': VarLenFeature(dtype=tf.int64),
 u'Sex': VarLenFeature(dtype=tf.string),
 u'SibSp': VarLenFeature(dtype=tf.int64),
 u'Survived': VarLenFeature(dtype=tf.int64),
 u'Ticket': VarLenFeature(dtype=tf.string)}

### Preprocessing & Feature Engineering

We have seen in the previous phase the presence of missing values. Let's fill them and process the features for the model. 

#### Pipeline

In [18]:
pipeline_file = create_pipeline_file(
    PipelineStep.PREPROCESSING,
    project
)
write_pipeline_to_dags(pipeline_file, pipeline_name)
print(pipeline_file)


import os
import logging
import datetime

from tfx.orchestration.airflow.airflow_runner import AirflowDAGRunner
from tfx.orchestration.pipeline import PipelineDecorator
from tfx.utils.dsl_utils import csv_input
from tfx.proto import trainer_pb2, evaluator_pb2

from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.schema_gen.component import SchemaGen
from tfx.components.example_validator.component import ExampleValidator
from tfx.components.transform.component import Transform
from tfx.components.trainer.component import Trainer

from tfx.components.evaluator.component import Evaluator


data_dir = os.path.join(os.environ['DATA_DIR'], 'titanic')
log_dir = os.path.join(os.environ['TFX_DIR'], 'logs')
project_preprocessing_file = os.path.join(os.environ['DAGS_DIR'], 'titanic_preprocessing.py')
project_training_file = os.path.join(os.environ['DAGS_DIR'], 'titanic_modeling.py')




#### Preprocessing Module

Let's implement the *preprocessing_fn* which will be used by the [Transform component](https://www.tensorflow.org/tfx/guide/transform) in order to process the data

In [6]:
%%writefile /root/airflow/dags/titanic_preprocessing.py

import tensorflow as tf
import tensorflow_transform as tft


def _to_dense(x, default_value=None):
    default_value = ('' if x.dtype == tf.string else 0) if default_value is None else default_value    
    return tf.squeeze(tf.sparse_to_dense(x.indices, [x.dense_shape[0], 1], x.values, default_value), axis=1)
    

def preprocessing_fn(inputs):
    """tf.transform's callback function for preprocessing inputs.

    Args:
    inputs: map from feature keys to raw not-yet-transformed features.

    Returns:
    Map from string feature key to transformed feature operations.
    """
    
    outputs = {}
        
    # Missing values ==========================================
    age = inputs["Age"]                                       
    age_mean = tft.mean(age)
    age_without_na = _to_dense(age, age_mean)
    
    cabin_without_na = _to_dense(inputs["Cabin"], "X")
    embarked_without_na = _to_dense(inputs["Embarked"], "S")
    # =========================================================
    
    
    # Feature Engineering ============================================================        
    outputs["Age"] = tft.bucketize(age_without_na, num_buckets=5)
    
    outputs["Fare"] = tft.scale_to_z_score(_to_dense(inputs["Fare"]))    
    outputs["Parch"] = tft.scale_to_z_score(_to_dense(inputs['Parch']))

    outputs["Sex"] = tft.compute_and_apply_vocabulary(_to_dense(inputs["Sex"]))
    outputs["Cabin"] = tft.compute_and_apply_vocabulary(cabin_without_na, top_k=100) 
    outputs["Embarked"] = tft.compute_and_apply_vocabulary(embarked_without_na) 
    # ================================================================================
    
    
    # Label =============================================
    outputs["Survived"] = _to_dense(inputs["Survived"])
    # ===================================================
    
    
    return outputs

Writing /root/airflow/dags/titanic_preprocessing.py


> Then go in [Airflow](http://localhost:8080/admin/airflow/graph?dag_id=titanic) and click **Refresh** in order to update the dag, then **Triger DAG** in order to run it

#### Features Check

In [19]:
artifacts_infos = store.get_artifacts_of_type_df(tfx_utils.TFXArtifactTypes.EXAMPLES).reset_index()
artifacts_infos

Unnamed: 0,ID,STATE,TYPE_NAME,SPAN,URI,SPLIT
0,2,published,ExamplesPath,1,/root/airflow/tfx/pipelines/titanic/CsvExample...,train
1,3,published,ExamplesPath,1,/root/airflow/tfx/pipelines/titanic/CsvExample...,eval
2,8,published,ExamplesPath,1,/root/airflow/tfx/pipelines/titanic/Transform/...,train
3,9,published,ExamplesPath,1,/root/airflow/tfx/pipelines/titanic/Transform/...,eval


In [21]:
path = artifacts_infos[artifacts_infos.ID == 9].URI.iloc[0]
preprocessed_stats = tfdv.generate_statistics_from_tfrecord(path)

tfdv.visualize_statistics(preprocessed_stats)

### Modeling

#### Pipeline

In [7]:
columns_for_slicing = ['Sex', 'Fare', 'Age', 'Embarked']

In [8]:
pipeline_file = create_pipeline_file(
    PipelineStep.MODELING,
    project,
    columns_for_slicing=columns_for_slicing,
    train_steps=1000,
    eval_steps=500,
)
write_pipeline_to_dags(pipeline_file, pipeline_name)
print(pipeline_file)


import os
import logging
import datetime

from tfx.orchestration.airflow.airflow_runner import AirflowDAGRunner
from tfx.orchestration.pipeline import PipelineDecorator
from tfx.utils.dsl_utils import csv_input
from tfx.proto import trainer_pb2, evaluator_pb2, pusher_pb2

from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.schema_gen.component import SchemaGen
from tfx.components.example_validator.component import ExampleValidator
from tfx.components.transform.component import Transform
from tfx.components.trainer.component import Trainer
from tfx.components.evaluator.component import Evaluator
from tfx.components.model_validator.component import ModelValidator
from tfx.components.pusher.component import Pusher


data_dir = os.path.join(os.environ['DATA_DIR'], 'titanic')
log_dir = os.path.join(os.environ['TFX_DIR'], 'logs')
serving_model_dir = os.path.join(os.environ['SERVI

#### Modeling Module

Let's implement the *trainer_fn* which will be used by the [Trainer component](https://www.tensorflow.org/tfx/guide/trainer) in order to build the model

In [9]:
%%writefile /root/airflow/dags/titanic_modeling.py

import os
import tensorflow as tf
import tensorflow_model_analysis as tfma

from tfx.proto import trainer_pb2
from tfx.components.trainer.component import Trainer
from tensorflow_transform.tf_metadata import schema_utils
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from tensorflow_transform.tf_metadata import metadata_io
from tensorflow_transform.saved import saved_transform_io


def _build_estimator(transform_output, config):
    """Build an estimator for predicting the survivors.

    Args:
        transform_output: directory in which the tf-transform model was written
          during the preprocessing step.
        config: tf.contrib.learn.RunConfig defining the runtime environment for
          the estimator (including model_dir).
      
    Returns:
        A dict of the following:
          - estimator: The estimator that will be used for training and eval.
          - train_spec: Spec for training.
          - eval_spec: Spec for eval.
          - eval_input_receiver_fn: Input function for eval.
    """
    
    feature_columns = [
        tf.feature_column.numeric_column("Fare", shape=()),
        tf.feature_column.numeric_column("Parch", shape=()),
        tf.feature_column.categorical_column_with_identity("Age", num_buckets=10),
        tf.feature_column.categorical_column_with_identity("Sex", num_buckets=2),
        tf.feature_column.categorical_column_with_identity("Cabin", num_buckets=100),
        tf.feature_column.categorical_column_with_identity("Embarked", num_buckets=3),
    ]
    
    return tf.estimator.LinearClassifier(feature_columns=feature_columns, config=config)


def _get_raw_feature_spec(schema):
    return schema_utils.schema_as_feature_spec(schema).feature_spec


def _example_serving_receiver_fn(transform_output, schema):
    """Build the serving in inputs.

    Args:
        transform_output: directory in which the tf-transform model was written
          during the preprocessing step.
        schema: the schema of the input data.

    Returns:
        Tensorflow graph which parses examples, applying tf-transform to them.
    """
  
    raw_feature_spec = _get_raw_feature_spec(schema)
    raw_feature_spec.pop("Survived")

    raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(raw_feature_spec)
    
    serving_input_receiver = raw_input_fn()

    _, transformed_features = (
        saved_transform_io.partially_apply_saved_transform(
            os.path.join(transform_output, transform_fn_io.TRANSFORM_FN_DIR),
            serving_input_receiver.features
        )
    )

    return tf.estimator.export.ServingInputReceiver(
        transformed_features,
        serving_input_receiver.receiver_tensors
    )


def _eval_input_receiver_fn(transform_output, schema):
    """Build everything needed for the tf-model-analysis to run the model.

    Args:
        transform_output: directory in which the tf-transform model was written
          during the preprocessing step.
        schema: the schema of the input data.

    Returns:
        EvalInputReceiver function, which contains:
          - Tensorflow graph which parses raw untransformed features, applies the
            tf-transform preprocessing operators.
          - Set of raw, untransformed features.
          - Label against which predictions will be compared.
    """
  
    raw_feature_spec = _get_raw_feature_spec(schema)

    serialized_tf_example = tf.placeholder(dtype=tf.string, shape=[None], name='input_example_tensor')

    # Add a parse_example operator to the tensorflow graph, which will parse raw, untransformed, tf examples.
    features = tf.parse_example(serialized_tf_example, raw_feature_spec)

    # Now that we have our raw examples, process them through the tf-transform
    # function computed during the preprocessing step.
    _, transformed_features = (
        saved_transform_io.partially_apply_saved_transform(
          os.path.join(transform_output, transform_fn_io.TRANSFORM_FN_DIR),
          features
        )
    )

    # The key name MUST be 'examples'.
    receiver_tensors = {'examples': serialized_tf_example}

    # NOTE: Model is driven by transformed features (since training works on the
    # materialized output of TFT, but slicing will happen on raw features.
    features.update(transformed_features)

    return tfma.export.EvalInputReceiver(
        features=features,
        receiver_tensors=receiver_tensors,
        labels=transformed_features["Survived"]
    )


def _gzip_reader_fn():
    """Small utility returning a record reader that can read gzip'ed files."""
    return tf.TFRecordReader(
        options=tf.python_io.TFRecordOptions(
            compression_type=tf.python_io.TFRecordCompressionType.GZIP
        )
    )


def _input_fn(filenames, transform_output, batch_size=200):
    """Generates features and labels for training or evaluation.

      Args:
        filenames: [str] list of CSV files to read data from.
        transform_output: directory in which the tf-transform model was written
          during the preprocessing step.
        batch_size: int First dimension size of the Tensors returned by input_fn

      Returns:
        A (features, indices) tuple where features is a dictionary of
          Tensors, and indices is a single Tensor of label indices.
    """

    metadata_dir = os.path.join(transform_output, transform_fn_io.TRANSFORMED_METADATA_DIR)
    transformed_metadata = metadata_io.read_metadata(metadata_dir)
    transformed_feature_spec = transformed_metadata.schema.as_feature_spec()

    transformed_features = tf.contrib.learn.io.read_batch_features(
        filenames, batch_size, transformed_feature_spec, reader=_gzip_reader_fn
    )

    # We pop the label because we do not want to use it as a feature while we're training.
    return transformed_features, transformed_features.pop("Survived")


# TFX will call this function
def trainer_fn(hparams, schema):
    """Build the estimator using the high level API.

    Args:
        hparams: Holds hyperparameters used to train the model as name/value pairs
        schema: Holds the schema of the training examples.

    Returns:
        A dict of the following:
          - estimator: The estimator that will be used for training and eval.
          - train_spec: Spec for training.
          - eval_spec: Spec for eval.
          - eval_input_receiver_fn: Input function for eval.
    """
    
    train_batch_size = 40
    eval_batch_size = 40

    train_input_fn = lambda: _input_fn(
        hparams.train_files,
        hparams.transform_output,
        batch_size=train_batch_size
    )

    eval_input_fn = lambda: _input_fn(
        hparams.eval_files,
        hparams.transform_output,
        batch_size=eval_batch_size
    )

    train_spec = tf.estimator.TrainSpec(
        train_input_fn,
        max_steps=hparams.train_steps
    )


    serving_receiver_fn = lambda: _example_serving_receiver_fn(hparams.transform_output, schema)

    exporter = tf.estimator.FinalExporter('titanic', serving_receiver_fn)
    
    eval_spec = tf.estimator.EvalSpec(
      eval_input_fn,
      steps=hparams.eval_steps,
      exporters=[exporter],
      name='titanic-eval'
    )

    run_config = tf.estimator.RunConfig(
        model_dir=hparams.serving_model_dir,
        save_summary_steps=10,
        save_checkpoints_steps=10,
        
    )

    estimator = _build_estimator(
        transform_output=hparams.transform_output,
        config=run_config,
    )

    # Create an input receiver for TFMA processing
    receiver_fn = lambda: _eval_input_receiver_fn(hparams.transform_output, schema)

    return {
        'estimator': estimator,
        'train_spec': train_spec,
        'eval_spec': eval_spec,
        'eval_input_receiver_fn': receiver_fn
    }

Writing /root/airflow/dags/titanic_modeling.py


> Then go in [Airflow](http://localhost:8080/admin/airflow/graph?dag_id=titanic) and click **Refresh** in order to update the dag, then **Triger DAG** in order to run it

#### Tensorboard

In [19]:
tensorboard_logdir = os.path.join(os.environ['TFX_DIR'], 'pipelines', 'titanic', 'Trainer/output')
os.environ['TENSORBOARD_LOGDIR'] = tensorboard_logdir

In [20]:
%%bash --bg -s
nohup tensorboard --logdir="${TENSORBOARD_LOGDIR}" 2>&1

Starting job # 0 in a separate thread.


> Open [Tensorboard](http://localhost:6006)

### Model Analysis

After the training has completed we can investigate the metrics along  slices of the data in order to check for performance drops on specific subsets

In [15]:
artifacts_infos = store.get_artifacts_of_type_df(tfx_utils.TFXArtifactTypes.MODEL).reset_index()
artifacts_infos

Unnamed: 0,ID,STATE,TYPE_NAME,SPAN,URI,SPLIT
0,11,published,ModelExportPath,1,/root/airflow/tfx/pipelines/titanic/Trainer/ou...,


In [16]:
columns_for_slicing

['Sex', 'Fare', 'Age', 'Embarked']

In [18]:
store.display_tfma_analysis(11, columns_for_slicing[2])

U2xpY2luZ01ldHJpY3NWaWV3ZXIoY29uZmlnPXsnd2VpZ2h0ZWRFeGFtcGxlc0NvbHVtbic6ICdwb3N0X2V4cG9ydF9tZXRyaWNzL2V4YW1wbGVfY291bnQnfSwgZGF0YT1beydtZXRyaWNzJzrigKY=


### Deploy

#### Pipeline

In [12]:
pipeline_file = create_pipeline_file(
    PipelineStep.DEPLOY,
    project,
    columns_for_slicing=columns_for_slicing,
    train_steps=1000,
    eval_steps=500,
)
write_pipeline_to_dags(pipeline_file, pipeline_name)
print(pipeline_file)


import os
import logging
import datetime

from tfx.orchestration.airflow.airflow_runner import AirflowDAGRunner
from tfx.orchestration.pipeline import PipelineDecorator
from tfx.utils.dsl_utils import csv_input
from tfx.proto import trainer_pb2, evaluator_pb2, pusher_pb2

from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.schema_gen.component import SchemaGen
from tfx.components.example_validator.component import ExampleValidator
from tfx.components.transform.component import Transform
from tfx.components.trainer.component import Trainer
from tfx.components.evaluator.component import Evaluator
from tfx.components.model_validator.component import ModelValidator
from tfx.components.pusher.component import Pusher


data_dir = os.path.join(os.environ['DATA_DIR'], 'titanic')
log_dir = os.path.join(os.environ['TFX_DIR'], 'logs')
serving_model_dir = os.path.join(os.environ['SERVI

> Then go in [Airflow](http://localhost:8080/admin/airflow/graph?dag_id=titanic) and click **Refresh** in order to update the dag, then **Triger DAG** in order to run it

#### Output SavedModel for Serving

In [13]:
saved_model_dir = os.path.join(os.environ["SERVING_DIR"], "serving_model", project)
saved_model_dir = os.path.join(saved_model_dir, os.listdir(saved_model_dir)[0])
os.listdir(saved_model_dir)

['variables', 'assets', 'saved_model.pb']

In [14]:
!saved_model_cli show --dir $saved_model_dir --all


MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['classification']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['inputs'] tensor_info:
        dtype: DT_STRING
        shape: (-1)
        name: input_example_tensor:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['classes'] tensor_info:
        dtype: DT_STRING
        shape: (-1, 2)
        name: linear/head/Tile:0
    outputs['scores'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 2)
        name: linear/head/predictions/probabilities:0
  Method name is: tensorflow/serving/classify

signature_def['predict']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['examples'] tensor_info:
        dtype: DT_STRING
        shape: (-1)
        name: input_example_tensor:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['class_ids'] tensor_info:
        dtype: D