## Simple TFX Pipeline Tutorial using Penguin dataset

### A short tutorial to run a simple TFX pipeline

This notebook follows the TFX tutorial here https://www.tensorflow.org/tfx/tucd%20torials/tfx/penguin_simple

Modications to the notebook were made to enable running the notebook locally

In [3]:
# Check the Tensorflow and TFX versions
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))

TensorFlow version: 2.8.1
TFX version: 1.7.1


### Set up variables

Set up variables used to define a pipeline

In [4]:
import os

PIPELINE_NAME='penguin-simple'

# Output directory to store artifacts generated from the pipeline
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLITE DB file to use as an MLMD (ML Metadata) storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME,'metadata.db')
# Output directory where created models from the pipeline will be exported
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO) # set default logging level.

### Prepare example data

We will download the example Palmer Penguins dataset

There are 4 numeric features in this dataset:

    * culmen_length_mm    
    * culmen_depth_mm    
    * flipper_length_mm    
    * body_mass_g
    
All features were already normalized to have range[0,1]. Will will build a classification model which predicts the species of penguins. 

Because TFX ExampleGen reads inputs from a directory, we need to create a directory and copy dataset to it.

In [5]:
import urllib.request
import tempfile

DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.
_data_url = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_url, _data_filepath)

('/tmp/tfx-data07z2rmq8/data.csv', <http.client.HTTPMessage at 0x7fa5b3a4b8e0>)

Take a quick look at the CSV file.

In [6]:
!head {_data_filepath}

species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g
0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667
0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556
0,0.29818181818181805,0.5833333333333334,0.3898305084745763,0.1527777777777778
0,0.16727272727272732,0.7380952380952381,0.3559322033898305,0.20833333333333334
0,0.26181818181818167,0.892857142857143,0.3050847457627119,0.2638888888888889
0,0.24727272727272717,0.5595238095238096,0.15254237288135594,0.2569444444444444
0,0.25818181818181823,0.773809523809524,0.3898305084745763,0.5486111111111112
0,0.32727272727272727,0.5357142857142859,0.1694915254237288,0.1388888888888889
0,0.23636363636363636,0.9642857142857142,0.3220338983050847,0.3055555555555556


You should be able to see five values, **species** is one of 0, 1, or 2, and all other features should have values between 0 and 1.

### Create a pipeline

TFX pipelines are defined using Python APIs. We will define a pipeline which consists of following three components.

    - CsvExampleGen: Reads in data files and convert them to TFX internal format for further processing. There are multiple ExampleGens for various formats. In this tutorial, we will use CsvExampleGen which takes CSV file input.
    
    - Trainer: Trains an ML model. Trainer component requires a model definition code from users. You can use TensorFlow APIs to specify  how to train a model and save it to a _savedmodel format.
    
    - Pusher: Copies the trained model outside of the TFX pipeline. Pusher component can be thought of as a deployment process of the trained ML model.
    
Before defining the pipeline, we need to write a model code for the Trainer component first.    

### Write model training code

We will create a simple DNN model for classification using Tensorflow Keras API. This model training code will be saved to a separate file.

In this tutorial we will use Generic Trainer of TFX which supports Keras-based models. You need to write a Python file containing run_fn function, which is the entrypoint for the Trainer component.

In [7]:
_trainer_module_file = 'penguin_trainer.py'

In [12]:
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _build_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
    
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _build_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')

Overwriting penguin_trainer.py


### Write a pipeline definition

We define a function to create a TFX pipeline. A Pipeline object represents a TFX pipeline which can be run using one of pipeline orchestration systems that TFX supports.

In [13]:
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

### Run the pipeline

TFX supports multiple orchestrators to run pipelines. In this tutorial we will use LocalDagRunner which is included in the TFX Python package and runs pipelines on local environment. We often call TFX pipelines "DAGs" which stands for directed acyclic graph.

LocalDagRunner provides fast iterations for developemnt and debugging. TFX also supports other orchestrators including Kubeflow Pipelines and Apache Airflow which are suitable for production use cases.

See TFX on Cloud AI Platform Pipelines or TFX Airflow Tutorial to learn more about other orchestration systems.

Now we create a LocalDagRunner and pass a Pipeline object created from the function we already defined.

The pipeline runs directly and you can see logs for the progress of the pipeline including ML model training.

In [14]:
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_ROOT,
      module_file=_trainer_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))

INFO:absl:Generating ephemeral wheel package for '/home/onwunalu/codelib/python/tfx-tutorials/starter-pipeline/penguin_trainer.py' (including modules: ['penguin_trainer']).
INFO:absl:User module package has hash fingerprint version 7c94cab67ddd76748eaf05a7c82530bac6a95fdda2027e20ae09a5b57b7d7cd9.
INFO:absl:Executing: ['/home/onwunalu/.pyenv/versions/tfx/bin/python', '/tmp/tmp4jtunxm8/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmpveo7br41', '--dist-dir', '/tmp/tmp5nc60xj2']
INFO:absl:Successfully built user code wheel distribution at 'pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+7c94cab67ddd76748eaf05a7c82530bac6a95fdda2027e20ae09a5b57b7d7cd9-py3-none-any.whl'; target user module is 'penguin_trainer'.
INFO:absl:Full user module path is 'penguin_trainer@pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+7c94cab67ddd76748eaf05a7c82530bac6a95fdda2027e20ae09a5b57b7d7cd9-py3-none-any.whl'
INFO:absl:Using deployment config:
 executor_specs {
  key: "Cs

running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying penguin_trainer.py -> build/lib
installing to /tmp/tmpveo7br41
running install
running install_lib
copying build/lib/penguin_trainer.py -> /tmp/tmpveo7br41
running install_egg_info
running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /tmp/tmpveo7br41/tfx_user_code_Trainer-0.0+7c94cab67ddd76748eaf05a7c82530bac6a95fdda2027e20ae09a5b57b7d7cd9-py3.8.egg-info
running install_scripts
creating /tmp/tmpveo7br41/tfx_user_code_Trainer-0.0+7c94cab67ddd76748eaf05a7c82

INFO:absl:Processing input csv data /tmp/tfx-data07z2rmq8/* to TFExample.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 14 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin-simple/CsvExampleGen/examples/14"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1653633279,sum_checksum:1653633279"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "penguin-simple:2022-05-27T01:36:55.948907:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}


INFO:absl:Train on the 'train' split when train_args.splits is not set.
INFO:absl:Evaluate on the 'eval' split when eval_args.splits is not set.
INFO:absl:udf_utils.get_fn {'module_path': 'penguin_trainer@pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+7c94cab67ddd76748eaf05a7c82530bac6a95fdda2027e20ae09a5b57b7d7cd9-py3-none-any.whl', 'custom_config': 'null', 'train_args': '{\n  "num_steps": 100\n}', 'eval_args': '{\n  "num_steps": 5\n}'} 'run_fn'
INFO:absl:Installing 'pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+7c94cab67ddd76748eaf05a7c82530bac6a95fdda2027e20ae09a5b57b7d7cd9-py3-none-any.whl' to a temporary directory.
INFO:absl:Executing: ['/home/onwunalu/.pyenv/versions/tfx/bin/python', '-m', 'pip', 'install', '--target', '/tmp/tmpw72qib_d', 'pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+7c94cab67ddd76748eaf05a7c82530bac6a95fdda2027e20ae09a5b57b7d7cd9-py3-none-any.whl']
E0527 01:36:57.390652433   31736 fork_posix.cc:76]           Other threads a

Processing ./pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+7c94cab67ddd76748eaf05a7c82530bac6a95fdda2027e20ae09a5b57b7d7cd9-py3-none-any.whl


INFO:absl:Successfully installed 'pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+7c94cab67ddd76748eaf05a7c82530bac6a95fdda2027e20ae09a5b57b7d7cd9-py3-none-any.whl'.
INFO:absl:Training model.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.


Installing collected packages: tfx-user-code-Trainer
Successfully installed tfx-user-code-Trainer-0.0+7c94cab67ddd76748eaf05a7c82530bac6a95fdda2027e20ae09a5b57b7d7cd9


2022-05-27 01:36:58.590998: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-05-27 01:36:58.595941: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudnn.so.8'; dlerror: libcudnn.so.8: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /home/onwunalu/codelib/res-sim-utils/libecl/build/lib64:/home/onwunalu/codelib/res-sim-utils/libecl/build/lib64::/home/onwunalu/GPRS/GPRS/GPRS08_WORKING/GPRS08/LIBS:/home/onwunalu/GPRS/GPRS/GPRS08_WORKING/GPRS08/LIBS/SAMG:/usr/local/cuda/bin:/usr/local/cuda/include:/usr/local/lib64:/usr/lib:/usr/lib64:/opt/intel/mkl/lib/intel64:/opt/intel/mkl/lib/mic:/opt/intel/mkl/lib/intel64:/home/onwunalu/GPRS/GPRS/GPRS08_WORKING/GPRS08/LIBS/SAMG:/home/onwunalu/GPRS/GPRS08_WORKING/GPRS08/LIBS:/home/onwunalu/GPRS/GPRS08_WORKING/GPRS08/LIBS/SAMG/ifor

[<KerasTensor: shape=(None, 1) dtype=float32 (created by layer 'culmen_length_mm')>, <KerasTensor: shape=(None, 1) dtype=float32 (created by layer 'culmen_depth_mm')>, <KerasTensor: shape=(None, 1) dtype=float32 (created by layer 'flipper_length_mm')>, <KerasTensor: shape=(None, 1) dtype=float32 (created by layer 'body_mass_g')>]


2022-05-27 01:36:59.489053: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.


INFO:tensorflow:Assets written to: pipelines/penguin-simple/Trainer/model/15/Format-Serving/assets


INFO:tensorflow:Assets written to: pipelines/penguin-simple/Trainer/model/15/Format-Serving/assets
INFO:absl:Training complete. Model written to pipelines/penguin-simple/Trainer/model/15/Format-Serving. ModelRun written to pipelines/penguin-simple/Trainer/model_run/15
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 15 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model': [Artifact(artifact: uri: "pipelines/penguin-simple/Trainer/model/15"
custom_properties {
  key: "name"
  value {
    string_value: "penguin-simple:2022-05-27T01:36:55.948907:Trainer:model:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.7.1"
  }
}
, artifact_type: name: "Model"
base_type: MODEL
)], 'model_run': [Artifact(artifact: uri: "pipelines/penguin-simple/Trainer/model_run/15"
custom_properties {
  key: "name"
  value {
    string_value: "penguin-simple:2022-05-27T01:36:55.948907:Tra

You should see "INFO:absl:Component Pusher is finished." at the end of the logs if the pipeline finished successfully. Because Pusher component is the last component of the pipeline.

The pusher component pushes the trained model to the SERVING_MODEL_DIR which is the serving_model/penguin-simple directory if you did not change the variables in the previous steps. You can see the result from the file browser in the left-side panel in Colab, or using the following command:

In [None]:
# List files in created model directory.
!find {SERVING_MODEL_DIR}