# Hands-on Exercise Part 1: TFX Pipeline components

With this notebook we will define, run and inspect the individual components of a TFX pipeline. All the code should run without any modifications

### Install and import libraries

Before getting started, we need to pip install tfx. The will take a minute. Make sure to *RESTART RUNTIME* when the installation is complete.

In [48]:
! pip install -U tfx

In [31]:
import os
import tfx
from tfx import v1 as tfx_v1
from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Tuner
from tfx.components import Transform
from tfx.dsl.components.common import resolver
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from  tfx.proto import example_gen_pb2
import tensorflow as tf
import tensorflow_model_analysis as tfma
from google.protobuf import text_format
import pandas as pd
from time import time
import numpy as np

import logging
logging.disable(logging.WARNING)

## Set up directories and load the data

For this exercise we'll be using the [Laptop Prices Prediction dataset](https://www.kaggle.com/danielbethell/laptop-prices-prediction). Goal will be to create a workflow for a model that predicts laptop price based on some descriptive attributes. The dataset is fairly small with 1303 records and 12 features which allows for quick load/transform/training times.

In [3]:
# Set up the directories for your pipeline
DATA_ROOT = 'data'                                       # location of the raw CSV data set
SERVING_MODEL_DIR = 'serving_model'                      # where we will save the final model for deployment with TF serving
PIPELINE_NAME = 'laptop_pipeline'
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME) # to store all the artifacts that our pipeline components will create

! mkdir {DATA_ROOT}
! mkdir {SERVING_MODEL_DIR}

In [49]:
# download the raw dataset
! gdown https://drive.google.com/uc?id=1NzyT0YLV9xk71jDR_lde6rKifLB7UT9p

Let's save the data n the correct root folder and take a quick look at a few records:

In [50]:
file_dir = 'laptop_price.csv'
ds = pd.read_csv(file_dir, encoding="iso-8859-1")
ds.to_csv(os.path.join(DATA_ROOT, 'train.csv'), index=False, encoding="utf-8")

ds.head()

## InteractiveContex - Our pipeline orchestrator for development
The key difference between running TFX in production vs. in this experimental notebook setting is the orchestrator. While in production your pipeline will be orchestrated by Apache Beam , Apache Airflow or Kubeflow, here we create an **InteractiveContext** that manages component execution in the notebook and allows us to inspect the created artifacts.

It also creates an in-memory ML Metadata store using SQLite.

In [7]:
context = InteractiveContext(pipeline_root=PIPELINE_ROOT)

## ExampleGen
The ExampleGen component is responsible for ingesting data in our pipeline in our pipeline as [tf.Examples](https://www.tensorflow.org/tutorials/load_data/tfrecord). We create a separate train and validation split.

In [57]:
# Specify a config for the output with a 3:1 split for the train and eval set:
output = example_gen_pb2.Output(
    split_config=example_gen_pb2.SplitConfig(splits=[
        example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=3),
        example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1),
    ]))

# Define the component:
example_gen = CsvExampleGen(input_base=DATA_ROOT, output_config=output)

# Run the component:
start = time()
context.run(example_gen)
print(f"time to run component: {np.round(time()-start)}s")

Lets take a look at the created artifacts. Note that the resulting files are gzipped TFRecord files 

In [58]:
artifact_uri = example_gen.outputs['examples'].get()[0].uri

print('artifact location:', artifact_uri)
print('created artifacts:', os.listdir(artifact_uri))
print('created training data files:', os.listdir(os.path.join(artifact_uri, 'Split-train')))
print('created validation data files:', os.listdir(os.path.join(artifact_uri, 'Split-eval')))

## StatisticsGen
With the StatisticsGen component we compute descriptive statistics for our dataset. These stats can be visualized for review and are used by the SchemaGen component to infer the schema.


In [54]:
# Create StatisticsGen and run component
statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples'])

start = time()
context.run(statistics_gen)
print(f"time to run component: {np.round(time()-start)}s")

Lets take a look at the calculated statistics for our train and eval dataset:

In [60]:
context.show(statistics_gen.outputs['statistics'])

# SchemaGen
The SchemaGen component infer the schema - data type and range of values for numerical features, range of values for categorical features - based on the statistics from the StatisticsGen.

In [61]:
# Create StatisticsGen and run the component
schema_gen = SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

start = time()
context.run(schema_gen)
print(f"time to run component: {np.round(time()-start)}s")

Lets take a look at the outputs produced by SchemaGen. We will see that the data type STRING with all available values in the "Domain" is only recognized for some of the categorical features (e.g. "Company", "Memory") while others are recognized as "BYTES". The distinction is made based on the number of different values a categorical feature assumes. However, since both STRING and BYTES are stores as Byteslist in the protobuf fileformat, this distinction is not relevant for subsequent data transformation.

In [62]:
context.show(schema_gen.outputs['schema'])

## ExampleValidator

The ExampleValidator component uses dataset statistics and validates them against the data schema. It detects anomalies such as missing values and identifies data-skew and data drift by comparing training and validation data.

In [63]:
# Create ExampleValidator and run the component:
example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])

start = time()
context.run(example_validator)
print(f"time to run component: {np.round(time()-start)}s")

Lets take a look at the output to verify that there are no anomalies.

In [64]:
context.show(example_validator.outputs['anomalies'])

# Transform
Next we create a Transform component to perform feature engineering. This will result in a TensorFlow graph which will be used to transform tf.Examples for training and serving. To enable this transformation for later inference requests without a Transform component, this graph will be included in the TensorFlow model (SavedModel) that is the result of model training. 

Unlike the previous components, the transform component requires a custom Python module with a `preprocessing_fn` that defines the transformation using the tensorflow_transform library.

In [65]:
# lets inspect the number of unique values for each feature. This helps us defining the number of categories
# for the one-hot encoding for the categorical features.
for col in ds.columns:
  print(f'{col}: {ds[col].unique().shape[0]}')

Before writing the transform_module file, let's save some constants that we require for transformation and later training in a separate `laptop_constants.py` file. Here we specify the numerical and categorical feature keys along with the number of categories to use for the latter. We also sppecify the feature key, which TFX will need to aware of for training.

In [18]:
_laptop_constants_module_file = 'laptop_constants.py'

In [66]:
%%writefile {_laptop_constants_module_file}

MAX_VOCAB_LEN = 25
NUMERIC_FEATURE_KEYS = ['Inches', 'ScreenResolution', 'Weight']
VOCAB_FEATURE_DICT = {
    'Company': 19, 
    'Product': MAX_VOCAB_LEN, 
    'TypeName': 6, 
    'Cpu': MAX_VOCAB_LEN, 
    'Ram': 9,
    'Memory': MAX_VOCAB_LEN, 
    'Gpu': MAX_VOCAB_LEN, 
    'OpSys': 9,
}
NUM_OOV_BUCKETS = 3
LABEL_KEY = 'Price_euros'

Now we write the `laptop_transform.py` module with the preprocessing function for feature engineering. Take a look at how we transform the strings for ScreenResolution and Weight into a numerical value and how we create one-hot vectors from the categorical features.

In [21]:
_transform_module_file = 'laptop_transform.py'

In [67]:
%%writefile {_transform_module_file}

import tensorflow as tf
import tensorflow_transform as tft

import laptop_constants

# Unpack the contents of the constants module
_NUMERIC_FEATURE_KEYS = laptop_constants.NUMERIC_FEATURE_KEYS
_VOCAB_FEATURE_DICT = laptop_constants.VOCAB_FEATURE_DICT
_NUM_OOV_BUCKETS = laptop_constants.NUM_OOV_BUCKETS
_LABEL_KEY = laptop_constants.LABEL_KEY


def _screen_res_width(screen_res):
  screen_res = tf.strings.split(screen_res, "x")
  return tf.reshape(tf.strings.regex_replace(screen_res[0], '[^0-9.]', ''), (1,))

# Define the transformations
def preprocessing_fn(inputs):
    """tf.transform's callback function for preprocessing inputs.
    Args:
        inputs: map from feature keys to raw not-yet-transformed features.
    Returns:
        Map from string feature key to transformed feature operations.
    """

    # Initialize outputs dictionary
    outputs = {}
    
    weight = tf.strings.to_number(
        tf.map_fn(
            lambda x: tf.strings.regex_replace(x, "[aA-zZ]", ""),
            inputs['Weight']),
            out_type=tf.dtypes.float32)

    screen_resolution = tf.strings.to_number(
          tf.map_fn(
          _screen_res_width,
          tf.squeeze(inputs['ScreenResolution'], axis=1)),
          out_type=tf.dtypes.float32)
    
    inches = inputs['Inches']

    numeric_features_preprocessed = {
        'Inches': inches,
        'ScreenResolution': screen_resolution,
        'Weight': weight
    }
    
    for key, value in numeric_features_preprocessed.items():
        scaled = tft.scale_to_0_1(value)
        outputs[key] = tf.reshape(scaled, [-1])

    # Convert strings to indices and convert to one-hot vectors
    for key, vocab_size in _VOCAB_FEATURE_DICT.items():
        indices = tft.compute_and_apply_vocabulary(inputs[key], num_oov_buckets=_NUM_OOV_BUCKETS)
        one_hot = tf.one_hot(indices, vocab_size + _NUM_OOV_BUCKETS)
        outputs[key] = tf.reshape(one_hot, [-1, vocab_size + _NUM_OOV_BUCKETS])

    # Cast label to float
    outputs[_LABEL_KEY] = tf.cast(inputs[_LABEL_KEY], tf.float32)

    return outputs

Now lets create and run the transform component. Notice that in addition to the artifacts from the ExampleGen and SchemaGen component, we pass the created `transform_module.py` as parameter.

Running this component might take ~ 50 seconds.

In [68]:
transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=_transform_module_file)

start = time()
context.run(transform)
print(f"time to run component: {np.round(time()-start)}s")

Lets inspect how we transformed our input data by taking a look at one example from our dataset. Note that we are making use of the `tf.data.TFRecordDataset` API which allows us to load, unpack and parse the serialized data.

In [69]:
import pprint as pp
# Get the URI of the output artifact representing the transformed examples
train_uri = os.path.join(transform.outputs['transformed_examples'].get()[0].uri, 'Split-train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

# Decode the first record and print output
for tfrecord in dataset.take(1):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  pp.pprint(example)

## Trainer
Now we are coming to the training part. Note that due to time constraints we skipped the Tuner in this tutorial which would help us to find the best hyperparameters for our model.

Similar to the Transform component, we define a `trainer.py` file that we will supply as parameter to the Trainer component. Note the purpose of the functions that we define in the trainer module:

*   `_gzip_reader_fn`: to read the compressed data
*   `_input_fn`: to create batches of data for training
*   `_get_serve_tf_examples_fn`: to parse a serialized tf.Example and apply the transformation from the transform graph obtained from the Transform component
*   `model_builder`: create and compile the TensorFlow Keras model
*   `run_fn`: define and run the model



In [27]:
_trainer_module_file = 'trainer.py'

In [70]:
%%writefile {_trainer_module_file}

from typing import NamedTuple, Dict, Text, Any, List
from tfx.components.trainer.fn_args_utils import FnArgs, DataAccessor
import tensorflow as tf
import tensorflow_transform as tft

import laptop_constants

_NUMERIC_FEATURE_KEYS = laptop_constants.NUMERIC_FEATURE_KEYS
_VOCAB_FEATURE_DICT = laptop_constants.VOCAB_FEATURE_DICT
_NUM_OOV_BUCKETS = laptop_constants.NUM_OOV_BUCKETS
_LABEL_KEY = laptop_constants.LABEL_KEY

N_EPOCHS = 15

def _gzip_reader_fn(filenames):
  '''Load compressed dataset
  Args: filenames - filenames of TFRecords to load
  Returns: TFRecordDataset loaded from the filenames
  '''
  return tf.data.TFRecordDataset(filenames, compression_type='GZIP')
  

def _input_fn(file_pattern,
              tf_transform_output,
              num_epochs=3,
              batch_size=32) -> tf.data.Dataset:
  '''Create batches of features and labels from TF Records

  Args:
    file_pattern - List of files or patterns of file paths containing Example records.
    tf_transform_output - transform output graph
    num_epochs - Integer specifying the number of times to read through the dataset. 
            If None, cycles through the dataset forever.
    batch_size - An int representing the number of records to combine in a single batch.

  Returns:
    A dataset of dict elements, (or a tuple of dict elements and label). 
    Each dict maps feature keys to Tensor or SparseTensor objects.
  '''
  transformed_feature_spec = (
      tf_transform_output.transformed_feature_spec().copy())
  
  dataset = tf.data.experimental.make_batched_features_dataset(
      file_pattern=file_pattern,
      batch_size=batch_size,
      features=transformed_feature_spec,
      reader=_gzip_reader_fn,
      num_epochs=num_epochs,
      label_key=_LABEL_KEY)
  
  return dataset


def _get_serve_tf_examples_fn(model, tf_transform_output):
  """Returns a function that parses a serialized tf.Example and applies TFT."""

  # Get transformation graph
  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function
  def serve_tf_examples_fn(serialized_tf_examples):
    """Returns the output to be used in the serving signature."""
    # Get pre-transform feature spec
    feature_spec = tf_transform_output.raw_feature_spec()

    # Pop label since serving inputs do not include the label
    feature_spec.pop(_LABEL_KEY)

    # Parse raw examples into a dictionary of tensors matching the feature spec
    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)

    # Transform the raw examples using the transform graph
    transformed_features = model.tft_layer(parsed_features)

    # Get predictions using the transformed features
    return model(transformed_features)

  return serve_tf_examples_fn


def model_builder():
  '''
  Returns: compiled tf.keras model
  '''
  # Hyperparameters. Note: these would typically be obtained from the Tuner component
  hp_units = 20
  hp_learning_rate = 1e-3

  # Define input layers for numeric keys
  input_tensors_numeric = [
      tf.keras.layers.Input(name=colname, shape=(1,), dtype=tf.float32)
      for colname in _NUMERIC_FEATURE_KEYS
  ]

  # Define input layers for vocab keys
  input_tensors_categorical = [
      tf.keras.layers.Input(name=colname, shape=(vocab_size + _NUM_OOV_BUCKETS,), dtype=tf.float32)
      for colname, vocab_size in _VOCAB_FEATURE_DICT.items()
  ]

  input_tensors = input_tensors_numeric + input_tensors_categorical

  # Define the tf.keras model using the Functional API. Start by concatenating the input tensors
  x = tf.keras.layers.concatenate(input_tensors)
  x = tf.keras.layers.Dense(128, activation='relu')(x)
  x = tf.keras.layers.Dense(64, activation='relu')(x)
  x = tf.keras.layers.Dense(1, activation='relu')(x)
  output = tf.keras.layers.Lambda(lambda x: x * 2000.0)(x)
  model = tf.keras.Model(input_tensors, output)

  model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=hp_learning_rate),
                loss=tf.keras.losses.MeanSquaredError(),
                metrics='mean_absolute_error',
                )

  model.summary()
  return model


def run_fn(fn_args: FnArgs) -> None:
  """Defines and trains the model.
  Args:
    fn_args: Holds args as name/value pairs. Refer here for the complete attributes: 
    https://www.tensorflow.org/tfx/api_docs/python/tfx/components/trainer/fn_args_utils/FnArgs#attributes
  """

  tensorboard_callback = tf.keras.callbacks.TensorBoard(
      log_dir=fn_args.model_run_dir, update_freq='batch') #batch  epoch
  
  tf_transform_output = tft.TFTransformOutput(fn_args.transform_graph_path)
  
  train_set = _input_fn(fn_args.train_files[0], tf_transform_output, 1)
  val_set = _input_fn(fn_args.eval_files[0], tf_transform_output, 1)

  model = model_builder()

  model.fit(
      x=train_set,
      validation_data=val_set,
      callbacks=[tensorboard_callback],
      epochs=N_EPOCHS
      )
  
  # Define default serving signature for inference requests
  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples')),
  }
  
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

Finally, lets create and run the Trainer component

In [71]:
trainer = Trainer(
    module_file=_trainer_module_file,
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(splits=['train']),
    eval_args=trainer_pb2.EvalArgs(splits=['eval']))

start = time()
context.run(trainer, enable_cache=False)
print(f"\ntime to run component: {np.round(time()-start)}s")

Take a look at the progress of training and how the performance improved over 15 epochs. Since this is a regression problem (remember, we predict the prices of laptops), we measure the performance in mean_absolute_error (mean_squared_error for the loss function). Note how the mean absolute error decreases from > 300 to < 200 during training.

Lets take a look at the created artifacts.

In [72]:
# Get artifact uri of trainer model output
model_artifact_dir = trainer.outputs['model'].get()[0].uri
print(f'contents of model artifact directory:{os.listdir(model_artifact_dir)}')

model_dir = os.path.join(model_artifact_dir, 'Format-Serving')
print(f'contents of model directory: {os.listdir(model_dir)}')

(Optional:) To inspect performance more thoroughly, load and take a look at the Tensorboard

In [73]:
model_run_artifact_dir = trainer.outputs['model_run'].get()[0].uri

%reload_ext tensorboard
%tensorboard --logdir {model_run_artifact_dir}

## Evaluator

With the Evaluator component we verify if the model performance is satisfactory for pushing to deployment. The Evaluator runs the trained model with validation data (raw eval split from ExampleGen) and “blesses” the trained model if it passes the criteria specified in the eval config

Lets first specify the eval config. Notice that we specify an upper bound of 300 as threshold for the MeanAbsoluteError. You can play around with this value. If you set it too low (e.g. 100) the model will likely not pass the validation and will consequently not be blessed.

In [39]:
eval_config = text_format.Parse("""
  ## Model information
  model_specs {
    # This assumes a serving model with signature 'serving_default'.
    signature_name: "serving_default",
    label_key: "Price_euros"
  }

  ## Post training metric information
  metrics_specs {
    metrics { class_name: "ExampleCount" }
    metrics {
      class_name: "MeanAbsoluteError"
      threshold {
        # Ensure that metric is always < XXX
        value_threshold {
          upper_bound { value: 300 }
        }
        # Ensure that metric does not drop by more than a small epsilon
        # e.g. (candidate - baseline) > -1e-10 or candidate > baseline - 1e-10
        change_threshold {
          direction: LOWER_IS_BETTER
          absolute { value: 1e4 }
        }
      }
    }
    metrics { class_name: "MeanSquaredError" }
  }

  ## Slicing information
  slicing_specs {}  # overall slice

""", tfma.EvalConfig())

Here we define the model resolver who is responsible for selecting the right model. We use the `LatestBlessedModelStrategy` which, as the name suggests, picks the latest trained model among all models that passed the validation of the Evaluator.

Since we haven't run the Evaluator component yet, the output artifact will show empty results.

In [74]:
# Setup the Resolver node to find the latest blessed model
model_resolver = tfx_v1.dsl.Resolver(
      strategy_class=tfx_v1.dsl.experimental.LatestBlessedModelStrategy,
      model=tfx_v1.dsl.Channel(type=tfx_v1.types.standard_artifacts.Model),
      model_blessing=tfx_v1.dsl.Channel(
          type=tfx_v1.types.standard_artifacts.ModelBlessing)).with_id(
              'latest_blessed_model_resolver')

# Run the Resolver node
context.run(model_resolver)

In [75]:
# Load Resolver outputs
model_resolver.outputs['model']

Now we run the Evaluator. Note that we pass the model resolver output model as "baseline" model to the component. What this does is it compares the model produced by the training run to the latest blessed model (none in our case because we run the component for the first time) and picks the better performing one. This ensures that a newly trained model only gets pushed if its performance exceeds a baseline model.

In [76]:
# Setup and run the Evaluator component
evaluator = tfx.components.Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],
    eval_config=eval_config)

context.run(evaluator, enable_cache=False)

Lets verify that the Evaluator was able to produce a blessed model.

In [77]:
# Get `Evaluator` blessing output URI
blessing_uri = evaluator.outputs['blessing'].get()[0].uri
os.listdir(blessing_uri)

We can also visualize the Evaluation results.

In [78]:
# Visualize the evaluation results
context.show(evaluator.outputs['evaluation'])

## Pusher

Finally, we push our model to the serving model directory. Note that the pusher does not take care of serving the model but just pushes it to the appropriate destination.

To serve the model for inference requests we use TensorFlow Serving. This will be covered in the second exercise.

In [46]:
# Setup serving path
import tempfile

_serving_model_dir = os.path.join(tempfile.mkdtemp(),
                                  'serving_model')

In [79]:
pusher = Pusher(
      model=trainer.outputs['model'],
      model_blessing=evaluator.outputs['blessing'],
      push_destination=pusher_pb2.PushDestination(
          filesystem=pusher_pb2.PushDestination.Filesystem(
              base_directory=_serving_model_dir)))

context.run(pusher)

Congrats! You explored all (well excpept for the Tuner) TFX components and ran them successfully. You were able to run and inspect them individually using the InteractiveContext.

In the next exercise we will run this complete identical pipeline from beginning to end and inspect the ML Metadata store as well as TF Serving