<a href="https://colab.research.google.com/github/soerenml/colab/blob/master/TFX.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

The underlying tutorial is using the Titanic dataset to display a fully fleshed out TFX pipeline.

# Libraries

In [0]:
# TODO(ccy): remove "pyzmq==17.0.0" pin after bug in Colab is fixed.
!pip install -q "tfx>=0.21.1,<0.22" "tensorflow>=2.1,<2.2" "tensorboard>=2.1,<2.3" "pyzmq==17.0.0"

In [0]:
import pandas as pd
import os
import shutil
import tensorflow as tf
import tfx

# Helper libraries (non core)
import pprint
pp = pprint.PrettyPrinter()

# Create Interactive content
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

# ExampleGen
from tfx.components import CsvExampleGen
from tfx.utils.dsl_utils import external_input

# StatisticsGen
from tfx.components import StatisticsGen

# SchemaGen
from tfx.components import SchemaGen

# ExampleValidator
from tfx.components import ExampleValidator

# Transform
from tfx.components import Transform

# Trainer
from tfx.components import Trainer
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.proto import trainer_pb2

# Evaluator
import tensorflow_model_analysis as tfma
from tfx.components import ResolverNode
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.components import Evaluator

# Pusher
from tfx.components import Pusher
from tfx.proto import pusher_pb2

In [0]:
print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))

# Constants

In [0]:
%%writefile _titanic_constants.py

GITHUB_PATH = 'https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv'
DATASET_PATH = './Data/dataset.csv'
DATA_PATH = './Data'
SERVING_MODEL_PATH = './Serving'

_DENSE_FLOAT_FEATURE_KEYS = ['Fare']
_CATEGORICAL_FEATURE_KEYS = ['Sex']
_LABEL_KEY = 'Survived'

# Download Data

In [0]:
# Create Data directory.

# Data
if not os.path.exists('./Data'):
    os.makedirs('./Data')
else:
  shutil.rmtree('./Data')
  os.makedirs('./Data')

# Metadata
if not os.path.exists('./Metadata'):
    os.makedirs('./Metadata')
else:
  shutil.rmtree('./Metadata')
  os.makedirs('./Metadata')

# Metadata
if not os.path.exists('./Serving'):
    os.makedirs('./Serving')
else:
  shutil.rmtree('./Serving')
  os.makedirs('./Serving')

In [0]:
import _titanic_constants as constants

url = constants.GITHUB_PATH
df = pd.read_csv(url, index_col=0)
pd.DataFrame.to_csv(df, constants.DATASET_PATH)

In [0]:
# Have a quick look at the data.
!head {DATASET_PATH}

# TFX

## Create interactive content

For this demo, we use the notebook itself as interactive scheduler ([Source](https://www.tensorflow.org/tfx/api_docs/python/tfx/orchestration/experimental/interactive/interactive_context/InteractiveContext)).

In [0]:
context = InteractiveContext(
    pipeline_name = 'Titanic_Pipeline',
    pipeline_root = './Metadata')

## ExampleGen

ExampleGen is used to ingest BigQuery, CSV and TFRecords data. Automatically converts those dataformats into TFRecords and creates training- and validation splits. 

In [0]:
import _titanic_constants as constants

example_gen = CsvExampleGen(input=external_input(constants.DATA_PATH))
context.run(example_gen)

In [0]:
#@title

# @To-Do: Make this nicer.

# Get the URI of the output artifact representing the training examples, which is a directory
train_uri = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  # pp.pprint(example)

## StatisticsGen

In [0]:
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
context.run(statistics_gen)

In [0]:
# Print statistics.
context.show(statistics_gen.outputs['statistics'])

## SchemaGen

In [0]:
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=False)
context.run(schema_gen)

In [0]:
context.show(schema_gen.outputs['schema'])

## ExampleValidator

In [0]:
example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])
context.run(example_validator)

In [0]:
context.show(example_validator.outputs['anomalies'])

## Transform

In [0]:
%%writefile _titanic_transform.py

import tensorflow as tf
import tensorflow_transform as tft
import _titanic_constants as constants

_DENSE_FLOAT_FEATURE_KEYS = constants._DENSE_FLOAT_FEATURE_KEYS
_CATEGORICAL_FEATURE_KEYS = constants._CATEGORICAL_FEATURE_KEYS
_LABEL_KEY = constants._LABEL_KEY

def _transformed_name(key):
  # Renames transformed variables.
  return key + '_xf'

def preprocessing_fn(inputs):
  # Preprocess data with tft.
  outputs = {}

  for key in _DENSE_FLOAT_FEATURE_KEYS:
    outputs[_transformed_name(key)] = tft.scale_to_z_score(
        _fill_in_missing(inputs[key]))

  for key in _CATEGORICAL_FEATURE_KEYS:
    outputs[_transformed_name(key)] = _fill_in_missing(inputs[key])

  outputs[_transformed_name(_LABEL_KEY)] = _fill_in_missing(inputs[_LABEL_KEY])

  return outputs


def _fill_in_missing(x):
  """Replace missing values in a SparseTensor.
  Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
  Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.
  Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
  """
  default_value = '' if x.dtype == tf.string else 0
  return tf.squeeze(
      tf.sparse.to_dense(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)

In [0]:
transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath('_titanic_transform.py'))
context.run(transform)

In [0]:
#@title

# Get the URI of the output artifact representing the transformed examples, which is a directory
train_uri = os.path.join(transform.outputs['transformed_examples'].get()[0].uri, 'train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  pp.pprint(example)

## Trainer

In [0]:
%%writefile _titanic_trainer.py

import os
import absl
import datetime
import tensorflow as tf
import tensorflow_transform as tft
from tfx.components.trainer.executor import TrainerFnArgs
import _titanic_constants as constants

_DENSE_FLOAT_FEATURE_KEYS = constants._DENSE_FLOAT_FEATURE_KEYS
_CATEGORICAL_FEATURE_KEYS = constants._CATEGORICAL_FEATURE_KEYS
_LABEL_KEY = constants._LABEL_KEY

def _gzip_reader_fn(filenames):
  """Small utility returning a record reader that can read gzip'ed files."""
  return tf.data.TFRecordDataset(
      filenames,
      compression_type='GZIP')

def _get_serve_tf_examples_fn(model, tf_transform_output):
  """Returns a function that parses a serialized tf.Example and applies TFT."""
  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function
  def serve_tf_examples_fn(serialized_tf_examples):
    """Returns the output to be used in the serving signature."""
    feature_spec = tf_transform_output.raw_feature_spec()
    feature_spec.pop(_LABEL_KEY)
    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
    transformed_features = model.tft_layer(parsed_features)
    transformed_features.pop('Survived_xf')
    
    return model(transformed_features)
  
  return serve_tf_examples_fn


def _input_fn(file_pattern, tf_transform_output, batch_size):
  """Input function for training"""
  transformed_feature_spec = (
      tf_transform_output.transformed_feature_spec().copy())

  dataset = tf.data.experimental.make_batched_features_dataset(
      file_pattern=file_pattern,
      batch_size=batch_size,
      features=transformed_feature_spec,
      reader=_gzip_reader_fn,
      label_key='Survived_xf')
  
  return dataset


"""BUILD MODEL"""
def _build_keras_model():
  # Define feature columns.
  real_valued_columns = [tf.feature_column.numeric_column('Fare_xf')]

  indicator_column = [tf.feature_column.indicator_column(
      tf.feature_column.categorical_column_with_vocabulary_list(
          'Sex_xf', ["male", "female"]))]

  # Defining the input layers.
  input_layers = {
      'Fare_xf': tf.keras.layers.Input(name='Fare_xf', shape=(), dtype=tf.float32)
      
  }
  input_layers.update({
      'Sex_xf': tf.keras.layers.Input(name='Sex_xf', shape=(), dtype='string')
  })

  # Build Keras model.
  deep = tf.keras.layers.DenseFeatures(real_valued_columns)(input_layers)
  wide = tf.keras.layers.DenseFeatures(indicator_column)(input_layers)
  merged_layers = tf.keras.layers.concatenate([deep, wide])
  output = tf.keras.layers.Dense(1, activation='sigmoid')(merged_layers)
  model = tf.keras.Model(input_layers, output)

  # Compile model.
  model.compile(
      loss='binary_crossentropy',
      optimizer=tf.keras.optimizers.Adam(lr=0.001),
      metrics=[tf.keras.metrics.BinaryAccuracy()])
  model.summary(print_fn=absl.logging.info)
  return model

# TFX Trainer will call this function.
def run_fn(fn_args: TrainerFnArgs):

  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(fn_args.train_files, tf_transform_output, 40)
  eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, 40)

  model = _build_keras_model()

  # This log path might change in the future.
  log_dir = os.path.join(os.path.dirname(fn_args.serving_model_dir), 'logs')
  
  tensorboard_callback = tf.keras.callbacks.TensorBoard(
      log_dir=log_dir, update_freq='batch')
  
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps,
      callbacks=[tensorboard_callback])

  # To-Do: I still do not understand this part.
  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples')),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

In [0]:
trainer = Trainer(
    module_file=os.path.abspath('_titanic_trainer.py'),
    custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(num_steps=10000),
    eval_args=trainer_pb2.EvalArgs(num_steps=6000))
context.run(trainer)

In [0]:
model_artifact_dir = trainer.outputs['model'].get()[0].uri
log_dir = os.path.join(model_artifact_dir, 'logs')

%load_ext tensorboard
%tensorboard --logdir {log_dir}

## Evaluator

The following [code](https://www.tensorflow.org/tfx/model_analysis/metrics?hl=de) was used for the binary setting.

In [0]:
eval_config = tfma.EvalConfig(
    model_specs=[
        tfma.ModelSpec(label_key='Survived')
    ],
    metrics_specs = tfma.metrics.specs_from_metrics(
        [
               tfma.metrics.ExampleCount(name='example_count'),
               tfma.metrics.WeightedExampleCount(name='weighted_example_count'),
               tf.keras.metrics.BinaryCrossentropy(name='binary_crossentropy'),
               tf.keras.metrics.BinaryAccuracy(name='accuracy'),
               tf.keras.metrics.AUC(name='auc', num_thresholds=10000),
               tf.keras.metrics.AUC(
                   name='auc_precision_recall', curve='PR', num_thresholds=10000),
               tf.keras.metrics.Precision(name='precision'),
               tf.keras.metrics.Recall(name='recall'),
               tfma.metrics.MeanLabel(name='mean_label'),
               tfma.metrics.MeanPrediction(name='mean_prediction'),
               tfma.metrics.Calibration(name='calibration'),
               tfma.metrics.ConfusionMatrixPlot(name='confusion_matrix_plot'),
               tfma.metrics.CalibrationPlot(name='calibration_plot')
               ]
    ),
    slicing_specs=[
        # An empty slice spec means the overall slice, i.e. the whole dataset.
        tfma.SlicingSpec(),
        # Data can be sliced along a feature column. In this case, data is
        # sliced along feature column trip_start_hour.
        tfma.SlicingSpec(feature_keys=['Sex'])
    ])

In [0]:
'''This part is still experimental.'''
model_resolver = ResolverNode(
      instance_name='latest_blessed_model_resolver',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))
context.run(model_resolver)

evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],
    # Change threshold will be ignored if there is no baseline (first run).
    eval_config=eval_config)
context.run(evaluator)

In [0]:
# Overall model evaluation.
context.show(evaluator.outputs['evaluation'])

In [0]:
# Model evaluation according to certain metrics.

# Get the TFMA output result path and load the result.
PATH_TO_RESULT = evaluator.outputs['evaluation'].get()[0].uri
tfma_result = tfma.load_eval_result(PATH_TO_RESULT)

# Show data sliced along feature column trip_start_hour.
tfma.view.render_slicing_metrics(tfma_result, slicing_column='Sex')

In [0]:
PATH_TO_RESULT = evaluator.outputs['evaluation'].get()[0].uri
print(tfma.load_validation_result(PATH_TO_RESULT))

## Pusher

In [0]:
import _titanic_constants as constants

pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=constants.SERVING_MODEL_PATH)))
context.run(pusher)

# Export the pipeline

In [0]:
import sys 

if 'google.colab' in sys.modules:
  # Colab.
  from google.colab import drive

  drive.mount('/content/drive')

In [0]:
!pwd

In [0]:
_runner_type = 'beam' 
_pipeline_name = 'chicago_taxi_%s' % _runner_type


_notebook_filepath = ('/content/drive/My Drive/Colab Notebooks/TFX.ipynb')


_tfx_root = os.path.join(os.environ['HOME'], 'tfx')
_taxi_root = os.path.join(os.environ['HOME'], 'taxi')
_serving_model_dir = os.path.join(_taxi_root, 'serving_model')
_data_root = os.path.join(_taxi_root, 'data', 'simple')
_pipeline_root = os.path.join(_tfx_root, 'pipelines', _pipeline_name)
_metadata_path = os.path.join(_tfx_root, 'metadata', _pipeline_name,
                              'metadata.db')

# TODO(USER): Specify components to be included in the exported pipeline.
components = [
    example_gen, statistics_gen, schema_gen, example_validator, transform,
    trainer, evaluator, pusher
]



if get_ipython().magics_manager.auto_magic:
  print('Warning: %automagic is ON. Line magics specified without the % prefix '
        'will not be scrubbed during export to pipeline.')

_pipeline_export_filepath = 'export_%s.py' % _pipeline_name
context.export_to_pipeline(notebook_filepath=_notebook_filepath,
                           export_filepath=_pipeline_export_filepath,
                           runner_type=_runner_type)

In [0]:
import tempfile

if 'google.colab' in sys.modules:
  from google.colab import files
  import zipfile

  zip_export_path = os.path.join(
      tempfile.mkdtemp(), 'export.zip')
  with zipfile.ZipFile(zip_export_path, mode='w') as export_zip:
    export_zip.write(_pipeline_export_filepath)
    export_zip.write(_titanic_constants)


  files.download(zip_export_path)