TensorFlow Extended (TFX) is used to create machine learning pipelines. It is open-source and is used by many large companies to put their machine learning model into production.

![im](https://i.imgur.com/Npjr3NK.png)


**Pipeline :**
A TFX pipeline is used to implement an ML pipeline which remains intact for the entire lifetime of the product. They are built using tfx components. 

![](https://i.imgur.com/z3mMdts.png)

**Metadata store**-

It is a single place to manage all the ML metadata about experiments, artifacts, models, and pipelines.
A store consists of - 
Definitions of artifacts and their properties
Execution records of components
Lineage tracking across all executions

![](https://i.imgur.com/DwDEvHq.png)

**TFX Orchestrator**-
An Orchestrator is a system where you can execute pipeline runs. Apache airflow, kubeflow are used as orchestrators. Dagrunner is used to refer to an implementation that supports orchestrator.

**Components**-
A component is used to implement a part of the ML pipeline. ML pipeline consists of many such components. Components are composed of:

component specification- Defines the component's input, output and required parameters.
executor- Implements the code in that component
component interface- Contains component specification and executor
 
Explaining each component here-

**ExampleGen**- This component ingests data into the pipelines. It takes external files (in formats such as CSV, TFRecord, BigQuery etc),partitions and shuffles it, to generate the Examples file. 

![](https://i.imgur.com/hrb82Ev.png)

**StatisticsGen**-
It generates features statistics over Example data and releases data statistics for other pipeline components.

![](https://i.imgur.com/Ym9pOZd.png)

**SchemaGen**
Schema provides a description of input data. SchemaGen automatically generates a schema from the training data and provides detail about the features, their allowed values and data types for feature values.

![](https://i.imgur.com/MySRzxs.png)

**ExampleValidator**
It finds anomalies in training and serving data by comparing data statistics computed by the StatisticsGen pipeline component against a schema. For example: 

![](https://i.imgur.com/FMi2ZWn.png)

**Transform**
It performs feature engineering on Example data and releases both SavedModel as well as statistics on both pre-transform and post-transform data.

![](https://i.imgur.com/R1OYqkV.png)

**Trainer**
Trainer is used to train models using tensorflow.
It takes Examples data, logic for training the data and protobuf definition for trainargs and eval argos. It releases a model for inference and maybe another one for evaluation

![](https://i.imgur.com/u6NqxPJ.png)

**Evaluator**
It performs analysis on the training results for the models to access overall model quality and track performance over time. It validates if the model can be pushed for production.
If the new model's metrics meet the baseline model requirements, the model is said to be "blessed", and is pushed to production.

![](https://i.imgur.com/9dXiq4y.png)

**Pusher**
IT is used to push a validated model to a deployment. 
 
The Pusher component pushes a validated model to a deployment target during model training . Pusher decides to push or not based on the blessings from the other components.
Evaluator blesses the model if it is "good enough" to be pushed to production.
InfraValidator blesses the model if the model is mechanically servable in a production environment.
A Pusher component consumes a trained model in SavedModel format, and outputs  the same SavedModel, along with versioning metadata.
 
 ![](https://i.imgur.com/rqyuCgC.png)·        
 

The ExampleGen TFX Pipeline component ingests data into TFX pipelines by consuming external data sources such as CSV, TFRecord, Avro, Parquet and BigQuery to generate Examples( tf.Example records, tf.SequenceExample records, or proto format) which will be read by other TFX components. 
ExampleGen and Other Components
ExampleGen provides data to components that make use of the TensorFlow Data Validation library, such as SchemaGen, StatisticsGen, and Example Validator. It also provides data to Transform, which makes use of the TensorFlow Transform library, and ultimately to deployment targets during inference.
 

In [None]:
try:
  import colab
  !pip install --upgrade pip
except:
  pass

In [None]:
!pip install -U tfx

Restart runtime before running the next cell

In [None]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))

TensorFlow version: 2.8.0
TFX version: 1.7.1


set up variables

In [None]:
import os

PIPELINE_NAME = "spaceship-tfma"

# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

setup data

In [None]:
import urllib.request
import tempfile

DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.
_data_url = 'https://raw.githubusercontent.com/harsshh14/kaggle-spaceship-dataset/main/modified_data.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_url, _data_filepath)

('/tmp/tfx-datazbvfbia0/data.csv', <http.client.HTTPMessage at 0x7f581bf4af90>)

In [None]:
!head {_data_filepath}

,PassengerId,HomePlanet,CryoSleep,Cabin,Destination,Age,VIP,RoomService,FoodCourt,ShoppingMall,Spa,VRDeck,Name,Transported
0,0001_01,Europa,False,B/0/P,TRAPPIST-1e,39.0,False,0.0,0.0,0.0,0.0,0.0,Maham Ofracculy,0
1,0002_01,Earth,False,F/0/S,TRAPPIST-1e,24.0,False,109.0,9.0,25.0,549.0,44.0,Juanna Vines,1
2,0003_01,Europa,False,A/0/S,TRAPPIST-1e,58.0,True,43.0,3576.0,0.0,6715.0,49.0,Altark Susent,0
3,0003_02,Europa,False,A/0/S,TRAPPIST-1e,33.0,False,0.0,1283.0,371.0,3329.0,193.0,Solam Susent,0
4,0004_01,Earth,False,F/1/S,TRAPPIST-1e,16.0,False,303.0,70.0,151.0,565.0,2.0,Willy Santantines,1
5,0005_01,Earth,False,F/0/P,PSO J318.5-22,44.0,False,0.0,483.0,0.0,291.0,0.0,Sandie Hinetthews,1
6,0006_01,Earth,False,F/2/S,TRAPPIST-1e,26.0,False,42.0,1539.0,3.0,0.0,0.0,Billex Jacostaffey,1
8,0007_01,Earth,False,F/3/S,TRAPPIST-1e,35.0,False,0.0,785.0,17.0,216.0,0.0,Andona Beston,1
9,0008_01,Europa,True,B/1/P,55 Cancri e,14.0,False,0.0,0.0,0.0,0.0,0.0,Erraiam Flatic,1


#Create pipeline

In [None]:
_trainer_module_file = 'spaceship_trainer.py'

In [None]:
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx.components.trainer.executor import TrainerFnArgs
from tfx.components.trainer.fn_args_utils import DataAccessor
from tfx_bsl.tfxio import dataset_options
from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'Age','RoomService','FoodCourt','ShoppingMall','Spa','VRDeck'
]
_LABEL_KEY = 'Transported'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      dataset_options.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _build_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying spaceship data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3, activation='sigmoid')(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: TrainerFnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _build_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')

Writing spaceship_trainer.py


pipeline definiton

In [None]:
import tensorflow_model_analysis as tfma

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a three component spaceship pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # NEW: Get the latest blessed model for Evaluator.
  model_resolver = tfx.dsl.Resolver(
      strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
      model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
      model_blessing=tfx.dsl.Channel(
          type=tfx.types.standard_artifacts.ModelBlessing)).with_id(
              'latest_blessed_model_resolver')

  # NEW: Uses TFMA to compute evaluation statistics over features of a model and
  #   perform quality validation of a candidate model (compared to a baseline).

  eval_config = tfma.EvalConfig(
      model_specs=[tfma.ModelSpec(label_key='Transported')],
      slicing_specs=[
          # An empty slice spec means the overall slice, i.e. the whole dataset.
          tfma.SlicingSpec(),
          # Calculate metrics for each spaceship species.
          tfma.SlicingSpec(feature_keys=['Transported']),
          ],
      metrics_specs=[
          tfma.MetricsSpec(per_slice_thresholds={
              'sparse_categorical_accuracy':
                  tfma.PerSliceMetricThresholds(thresholds=[
                      tfma.PerSliceMetricThreshold(
                          slicing_specs=[tfma.SlicingSpec()],
                          threshold=tfma.MetricThreshold(
                              value_threshold=tfma.GenericValueThreshold(
                                   lower_bound={'value': 0.6}),
                              # Change threshold will be ignored if there is no
                              # baseline model resolved from MLMD (first run).
                              change_threshold=tfma.GenericChangeThreshold(
                                  direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                                  absolute={'value': -1e-10}))
                       )]),
          })],
      )
  evaluator = tfx.components.Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      baseline_model=model_resolver.outputs['model'],
      eval_config=eval_config)

  # Checks whether the model passed the validation steps and pushes the model
  # to a file destination if check passed.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      model_blessing=evaluator.outputs['blessing'], # Pass an evaluation result.
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      trainer,

      # Following two components were added to the pipeline.
      model_resolver,
      evaluator,

      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

run pipeline

In [None]:
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_ROOT,
      module_file=_trainer_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))

INFO:absl:Generating ephemeral wheel package for '/content/spaceship_trainer.py' (including modules: ['spaceship_trainer']).
INFO:absl:User module package has hash fingerprint version ed3dd78995d7bb293f8d038ea449f9e334026ef63a52179c54ecafc208436e87.
INFO:absl:Executing: ['/usr/bin/python3', '/tmp/tmp0pv0_him/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmp16pigtdd', '--dist-dir', '/tmp/tmpy3j8t25f']
INFO:absl:Successfully built user code wheel distribution at 'pipelines/spaceship-tfma/_wheels/tfx_user_code_Trainer-0.0+ed3dd78995d7bb293f8d038ea449f9e334026ef63a52179c54ecafc208436e87-py3-none-any.whl'; target user module is 'spaceship_trainer'.
INFO:absl:Full user module path is 'spaceship_trainer@pipelines/spaceship-tfma/_wheels/tfx_user_code_Trainer-0.0+ed3dd78995d7bb293f8d038ea449f9e334026ef63a52179c54ecafc208436e87-py3-none-any.whl'
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_s

INFO:absl:Processing input csv data /tmp/tfx-datazbvfbia0/* to TFExample.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 1 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/spaceship-tfma/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:631489,xor_checksum:1653297433,sum_checksum:1653297433"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "spaceship-tfma:2022-05-23T09:17:14.709245:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
c





INFO:tensorflow:Assets written to: pipelines/spaceship-tfma/Trainer/model/3/Format-Serving/assets


INFO:tensorflow:Assets written to: pipelines/spaceship-tfma/Trainer/model/3/Format-Serving/assets
INFO:absl:Training complete. Model written to pipelines/spaceship-tfma/Trainer/model/3/Format-Serving. ModelRun written to pipelines/spaceship-tfma/Trainer/model_run/3
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 3 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model': [Artifact(artifact: uri: "pipelines/spaceship-tfma/Trainer/model/3"
custom_properties {
  key: "name"
  value {
    string_value: "spaceship-tfma:2022-05-23T09:17:14.709245:Trainer:model:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.7.1"
  }
}
, artifact_type: name: "Model"
base_type: MODEL
)], 'model_run': [Artifact(artifact: uri: "pipelines/spaceship-tfma/Trainer/model_run/3"
custom_properties {
  key: "name"
  value {
    string_value: "spaceship-tfma:2022-05-23T09:17:14.709245:Trainer:m

Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`
INFO:absl:Blessing result True written to pipelines/spaceship-tfma/Evaluator/blessing/4.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 4 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'evaluation': [Artifact(artifact: uri: "pipelines/spaceship-tfma/Evaluator/evaluation/4"
custom_properties {
  key: "name"
  value {
    string_value: "spaceship-tfma:2022-05-23T09:17:14.709245:Evaluator:evaluation:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.7.1"
  }
}
, artifact_type: name: "ModelEvaluation"
)], 'blessing': [Artifact(artifact: uri: "pipelines/spaceship-tfma/Evaluator/blessing/4"
custom_properties {
  key: "name"
  value {
    string_value: "spaceship-tfma:2022-05-23T09:17:14.709245:Evaluator:blessing:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string

examine outputs of pipeline

In [None]:
from ml_metadata.proto import metadata_store_pb2
# Non-public APIs, just for showcase.
from tfx.orchestration.portable.mlmd import execution_lib

# TODO(b/171447278): Move these functions into the TFX library.

def get_latest_artifacts(metadata, pipeline_name, component_id):
  """Output artifacts of the latest run of the component."""
  context = metadata.store.get_context_by_type_and_name(
      'node', f'{pipeline_name}.{component_id}')
  executions = metadata.store.get_executions_by_context(context.id)
  latest_execution = max(executions,
                         key=lambda e:e.last_update_time_since_epoch)
  return execution_lib.get_artifacts_dict(metadata, latest_execution.id,
                                          [metadata_store_pb2.Event.OUTPUT])

In [None]:
# Non-public APIs, just for showcase.
from tfx.orchestration.metadata import Metadata
from tfx.types import standard_component_specs

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    METADATA_PATH)

with Metadata(metadata_connection_config) as metadata_handler:
  # Find output artifacts from MLMD.
  evaluator_output = get_latest_artifacts(metadata_handler, PIPELINE_NAME,
                                          'Evaluator')
  eval_artifact = evaluator_output[standard_component_specs.EVALUATION_KEY][0]

INFO:absl:MetadataStore with DB connection initialized


In [None]:
import tensorflow_model_analysis as tfma

eval_result = tfma.load_eval_result(eval_artifact.uri)
tfma.view.render_slicing_metrics(eval_result, slicing_column='Transported')