In this notebook-based tutorial, we will create and run a TFX pipeline to train a model to predict septic patients based on biological markers. The pipeline will consist of three essential TFX components: ExampleGen, Trainer and Pusher. The pipeline includes the most minimal ML workflow like importing data, training a model and exporting the trained TFRS ranking model.

# Project Goal
    
Build a basic Tensorflow Pipeline that automatically executes tasks from ingestion to serving. Scenario, uses sythentically generated patient data (Heart Rate, Temperature, Respiratory Rate, White Blood Cell Count) that has been labelled with a 1 (Septic) or 0 (Not-Septic).

## Overview of steps
1. Install required software
1. Configure pipeline variables
1. Prepare the raw data 
1. Write the training pipeline (data ingestion, model training, model pushing)
1. Run the training pipeline
1. Push the model

# Import required software

In [1]:
# commented out due to ODH notebook image error on direct pip commands. Use %horus error.
# run below commands in terminal
#!pip install --upgrade pip
#!pip install -r requirements.txt -q

In [2]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))

TensorFlow version: 2.9.1
TFX version: 1.9.0


# Configure Pipeline Variables

There are some variables used to define a pipeline. You can customize these variables as you want. By default all output from the pipeline will be generated under the current directory. Instead of using the SchemaGen component to generate a schema, for this tutorial we will create a hardcoded schema.

In [3]:
import os

PIPELINE_NAME = "sepsis_pipeline"

# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('../pipeline', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('../pipeline/metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('../models', PIPELINE_NAME)
# Path to the training data
DATA_TRAIN = '../data/training_data'
# File name
FILENAME='septic_data_labelled.csv'
RAW_DATA = os.path.join(DATA_TRAIN,FILENAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level

In [4]:
print(PIPELINE_NAME)
print(PIPELINE_ROOT)
print(METADATA_PATH)
print(SERVING_MODEL_DIR)
print(DATA_TRAIN)
print(RAW_DATA)

sepsis_pipeline
../pipeline/sepsis_pipeline
../pipeline/metadata/sepsis_pipeline/metadata.db
../models/sepsis_pipeline
../data/training_data
../data/training_data/septic_data_labelled.csv


# Prepare and examine the training data

There are four numeric features in this dataset:

Heart Rate
Temperature
Respiratory Rate
White Blood Cell Count

There is a label of 0 or 1 indicating Not-Septic or Septic

We will build a ranking model which predicts sepsis. 

## View the data

You should be able to see five values. For example, the first example means patient is not septic.

In [5]:
!head {RAW_DATA}

HR,Temp,Resp,WBC,isSeptic
40.0,110.0,12.0,4.54,0
41.5,79.0,26.0,4.23,0
41.9,61.0,14.0,18.13,0
40.1,89.0,20.0,3.40,0
35.6,136.0,13.0,9.21,0
35.8,83.0,26.0,8.95,0
35.6,83.0,18.0,21.70,0
35.4,85.0,14.0,3.92,0
38.3,238.0,25.0,6.96,0


# Create a pipeline

TFX pipelines are defined using Python APIs. We will define a pipeline which consists of following three components.

1. CsvExampleGen: Reads in data files and convert them to TFX internal format for further processing. There are multiple ExampleGens for various formats. In this tutorial, we will use CsvExampleGen which takes CSV file input.
1. Trainer: Trains an ML model. Trainer component requires a model definition code from users. You can use TensorFlow APIs to specify how to train a model and save it in a saved_model format.
1. Pusher: Copies the trained model outside of the TFX pipeline. Pusher component can be thought of an deployment process of the trained ML model.

Before actually define the pipeline, we need to write a model code for the Trainer component first.

## Write model training code

We will build a simple ranking model to predict movie ratings. This model training code will be saved to a separate file.

In this tutorial we will use Generic Trainer of TFX which support Keras-based models. You need to write a Python file containing run_fn function, which is the entrypoint for the Trainer component.

In [6]:
_trainer_module_file = '../src/training_code.py'

In [7]:
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'HR', 'Temp', 'Resp', 'WBC'
]

_LABEL_KEY = 'isSeptic'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _build_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying patient data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _build_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')

Writing ../src/training_code.py


## Write a pipeline definition

We define a function to create a TFX pipeline. A Pipeline object represents a TFX pipeline which can be run using one of pipeline orchestration systems that TFX supports.

In [8]:
_pipeline_file = '../src/pipeline.py'

In [9]:
#commented out writefile due to function not loading inside notebook
#%%writefile {_pipeline_file}

def _create_pipeline(pipeline_name: str,
                     pipeline_root: str,
                     data_root: str,
                     module_file: str,
                     serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a three component patient pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

# Run the pipeline

In [10]:
# if .ipynb_checkpoints exists, then pipeline will error due to split header mismatch
!rm -rf ../data/{training_data,serving_data}/.ipynb_checkpoints

In [14]:
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_TRAIN,
      module_file=_trainer_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))

INFO:absl:Generating ephemeral wheel package for '/opt/app-root/src/mlops-basic/src/training_code.py' (including modules: ['training_code']).
INFO:absl:User module package has hash fingerprint version 1fc48ff96f5bb66bd831416e8c248ab2506c605c9f8f3a5ab67db644fa6a8faa.
INFO:absl:Executing: ['/opt/app-root/bin/python3.8', '/tmp/tmp7kmj1vyi/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmp8e0qmtwl', '--dist-dir', '/tmp/tmpcit0tlsl']
INFO:absl:Successfully built user code wheel distribution at '../pipeline/sepsis_pipeline/_wheels/tfx_user_code_Trainer-0.0+1fc48ff96f5bb66bd831416e8c248ab2506c605c9f8f3a5ab67db644fa6a8faa-py3-none-any.whl'; target user module is 'training_code'.
INFO:absl:Full user module path is 'training_code@../pipeline/sepsis_pipeline/_wheels/tfx_user_code_Trainer-0.0+1fc48ff96f5bb66bd831416e8c248ab2506c605c9f8f3a5ab67db644fa6a8faa-py3-none-any.whl'
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec





INFO:tensorflow:Assets written to: ../pipeline/sepsis_pipeline/Trainer/model/8/Format-Serving/assets


INFO:tensorflow:Assets written to: ../pipeline/sepsis_pipeline/Trainer/model/8/Format-Serving/assets
INFO:absl:Training complete. Model written to ../pipeline/sepsis_pipeline/Trainer/model/8/Format-Serving. ModelRun written to ../pipeline/sepsis_pipeline/Trainer/model_run/8
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 8 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model': [Artifact(artifact: uri: "../pipeline/sepsis_pipeline/Trainer/model/8"
custom_properties {
  key: "name"
  value {
    string_value: "sepsis_pipeline:2022-07-20T14:19:58.631508:Trainer:model:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.9.0"
  }
}
name: "sepsis_pipeline:2022-07-20T14:19:58.631508:Trainer:model:0"
, artifact_type: name: "Model"
base_type: MODEL
)], 'model_run': [Artifact(artifact: uri: "../pipeline/sepsis_pipeline/Trainer/model_run/8"
custom_properties {
  key: "nam

In [12]:
# List files in created model directory.
!ls -R {SERVING_MODEL_DIR}

../models/sepsis_pipeline:
1658326344

../models/sepsis_pipeline/1658326344:
assets	keras_metadata.pb  saved_model.pb  variables

../models/sepsis_pipeline/1658326344/assets:

../models/sepsis_pipeline/1658326344/variables:
variables.data-00000-of-00001  variables.index
