# Data validation using TFX Pipeline and TensorFlow Data Validation

In [1]:
try:
  import colab
  !pip install --upgrade pip
except:
  pass

In [None]:
!pip install -U tfx

In [3]:
!pip uninstall shapely -y

Found existing installation: shapely 2.0.2
Uninstalling shapely-2.0.2:
  Successfully uninstalled shapely-2.0.2


In [4]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))

TensorFlow version: 2.13.1
TFX version: 1.14.0


### Set up variables

In [5]:
import os

# We will create two pipelines. One for schema generation and one for training.
SCHEMA_PIPELINE_NAME = "penguin-tfdv-schema"
PIPELINE_NAME = "penguin-tfdv"

# Output directory to store artifacts generated from the pipeline.
SCHEMA_PIPELINE_ROOT = os.path.join('pipelines', SCHEMA_PIPELINE_NAME)
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
SCHEMA_METADATA_PATH = os.path.join('metadata', SCHEMA_PIPELINE_NAME,
                                    'metadata.db')
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')

# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

### Prepare example data

In [6]:
import urllib.request
import tempfile

DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.
_data_url = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_url, _data_filepath)

('/tmp/tfx-datagm2w9y7r/data.csv', <http.client.HTTPMessage at 0x7c211e25d510>)

In [7]:
!head {_data_filepath}

species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g
0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667
0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556
0,0.29818181818181805,0.5833333333333334,0.3898305084745763,0.1527777777777778
0,0.16727272727272732,0.7380952380952381,0.3559322033898305,0.20833333333333334
0,0.26181818181818167,0.892857142857143,0.3050847457627119,0.2638888888888889
0,0.24727272727272717,0.5595238095238096,0.15254237288135594,0.2569444444444444
0,0.25818181818181823,0.773809523809524,0.3898305084745763,0.5486111111111112
0,0.32727272727272727,0.5357142857142859,0.1694915254237288,0.1388888888888889
0,0.23636363636363636,0.9642857142857142,0.3220338983050847,0.3055555555555556


## Generate a preliminary schema

*   StatisticsGen: calculates statistics for the dataset.
*   SchemaGen: examines the statistics and creates an initial data schema.

### Write a pipeline definition

In [8]:
def _create_schema_pipeline(pipeline_name: str,
                            pipeline_root: str,
                            data_root: str,
                            metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a pipeline for schema generation."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # NEW: Computes statistics over data for visualization and schema generation.
  statistics_gen = tfx.components.StatisticsGen(examples=example_gen.outputs['examples'])

  # NEW: Generates schema based on the generated statistics.
  schema_gen = tfx.components.SchemaGen(statistics=statistics_gen.outputs['statistics'],
                                        infer_feature_shape=True)

  components = [example_gen, statistics_gen, schema_gen]
  return tfx.dsl.Pipeline(pipeline_name=pipeline_name,
                          pipeline_root=pipeline_root,
                          metadata_connection_config=tfx.orchestration.metadata
                                                            .sqlite_metadata_connection_config(metadata_path),
                          components=components)

In [9]:
tfx.orchestration.LocalDagRunner().run(_create_schema_pipeline(
                                                               pipeline_name=SCHEMA_PIPELINE_NAME,
                                                               pipeline_root=SCHEMA_PIPELINE_ROOT,
                                                               data_root=DATA_ROOT,
                                                               metadata_path=SCHEMA_METADATA_PATH)
                                       )

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "metada

INFO:absl:Processing input csv data /tmp/tfx-datagm2w9y7r/* to TFExample.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 1 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin-tfdv-schema/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1704095070,sum_checksum:1704095070"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
proper

### Review outputs of the pipeline

As explained in the previous tutorial, a TFX pipeline produces two kinds of
outputs, artifacts and a
[metadata DB(MLMD)](https://www.tensorflow.org/tfx/guide/mlmd) which contains
metadata of artifacts and pipeline executions. We defined the location of
these outputs in the above cells. By default, artifacts are stored under
the `pipelines` directory and metadata is stored as a sqlite database
under the `metadata` directory.

You can use MLMD APIs to locate these outputs programatically. First, we will
define some utility functions to search for the output artifacts that were just
produced.


In [10]:
from ml_metadata.proto import metadata_store_pb2
from tfx.orchestration.portable.mlmd import execution_lib

def get_latest_artifacts(metadata, pipeline_name, component_id):
  """Output artifacts of the latest run of the component."""
  context = metadata.store.get_context_by_type_and_name('node', f'{pipeline_name}.{component_id}')
  executions = metadata.store.get_executions_by_context(context.id)
  latest_execution = max(executions,
                         key=lambda e:e.last_update_time_since_epoch)
  return execution_lib.get_output_artifacts(metadata, latest_execution.id)


from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(artifact.type_name)
    if visualization:
      visualization.display(artifact)

from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()

Now we can examine the outputs from the pipeline execution.

In [11]:
from tfx.orchestration.metadata import Metadata
from tfx.types import standard_component_specs

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(SCHEMA_METADATA_PATH)

with Metadata(metadata_connection_config) as metadata_handler:
  # Find output artifacts from MLMD.
  stat_gen_output = get_latest_artifacts(metadata_handler, SCHEMA_PIPELINE_NAME, 'StatisticsGen')
  stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]
  schema_gen_output = get_latest_artifacts(metadata_handler, SCHEMA_PIPELINE_NAME, 'SchemaGen')
  schema_artifacts = schema_gen_output[standard_component_specs.SCHEMA_KEY]

INFO:absl:MetadataStore with DB connection initialized


#### Examine the output from StatisticsGen


In [12]:
visualize_artifacts(stats_artifacts)

<!-- <img class="tfo-display-only-on-site"
src="images/penguin_tfdv/penguin_tfdv_statistics.png"/> -->

You can see various stats for the input data. These statistics are supplied to
`SchemaGen` to construct an initial schema of data automatically.


#### Examine the output from SchemaGen



In [13]:
visualize_artifacts(schema_artifacts)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'body_mass_g',FLOAT,required,,-
'culmen_depth_mm',FLOAT,required,,-
'culmen_length_mm',FLOAT,required,,-
'flipper_length_mm',FLOAT,required,,-
'species',INT,required,,-


### Export the schema for future use




In [14]:
import shutil

_schema_filename = 'schema.pbtxt'
SCHEMA_PATH = 'schema'

os.makedirs(SCHEMA_PATH, exist_ok=True)
_generated_path = os.path.join(schema_artifacts[0].uri, _schema_filename)

# Copy the 'schema.pbtxt' file from the artifact uri to a predefined path.
shutil.copy(_generated_path, SCHEMA_PATH)

'schema/schema.pbtxt'

In [15]:
print(f'Schema at {SCHEMA_PATH}-----')
!cat {SCHEMA_PATH}/*

Schema at schema-----
feature {
  name: "body_mass_g"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "culmen_depth_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "culmen_length_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "flipper_length_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "species"
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}


## Validate input examples and train an ML model


In [16]:
_trainer_module_file = 'penguin_trainer.py'

In [17]:
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

# We don't need to specify _FEATURE_KEYS and _FEATURE_SPEC any more.
# Those information can be read from the given schema file.

_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _build_keras_model(schema: schema_pb2.Schema) -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.

  # ++ Changed code: Uses all features in the schema except the label.
  feature_keys = [f.name for f in schema.feature if f.name != _LABEL_KEY]
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in feature_keys]
  # ++ End of the changed code.

  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # ++ Changed code: Reads in schema file passed to the Trainer component.
  schema = tfx.utils.parse_pbtxt_file(fn_args.schema_path, schema_pb2.Schema())
  # ++ End of the changed code.

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _build_keras_model(schema)
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')

Writing penguin_trainer.py


### Write a pipeline definition

We will add two new components, `Importer` and `ExampleValidator`. Importer
brings an external file into the TFX pipeline. In this case, it is a file
containing schema definition. ExampleValidator will examine
the input data and validate whether all input data conforms the data schema
we provided.


In [18]:
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     schema_path: str, module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a pipeline using predefined schema with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)
  # Computes statistics over data for visualization and example validation.
  statistics_gen = tfx.components.StatisticsGen(examples=example_gen.outputs['examples'])
  # NEW: Import the schema.
  schema_importer = tfx.dsl.Importer(source_uri=schema_path,
                                     artifact_type=tfx.types.standard_artifacts.Schema).with_id('schema_importer')
  # NEW: Performs anomaly detection based on statistics and data schema.
  example_validator = tfx.components.ExampleValidator(statistics=statistics_gen.outputs['statistics'],
                                                      schema=schema_importer.outputs['result'])
  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(module_file=module_file,
                                   examples=example_gen.outputs['examples'],
                                   schema=schema_importer.outputs['result'],  # Pass the imported schema.
                                   train_args=tfx.proto.TrainArgs(num_steps=100),
                                   eval_args=tfx.proto.EvalArgs(num_steps=5)
                                   )
  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(model=trainer.outputs['model'],
                                 push_destination=tfx.proto.PushDestination(filesystem=tfx.proto.PushDestination.Filesystem(
                                                                                  base_directory=serving_model_dir)
                                                                            )
                                 )

  components = [example_gen, statistics_gen, schema_importer, example_validator, trainer, pusher]
  return tfx.dsl.Pipeline(pipeline_name=pipeline_name,
                          pipeline_root=pipeline_root,
                          metadata_connection_config=tfx.orchestration.metadata
                                                              .sqlite_metadata_connection_config(metadata_path),
                          components=components)

### Run the pipeline


In [19]:
tfx.orchestration.LocalDagRunner().run(_create_pipeline(pipeline_name=PIPELINE_NAME,
                                                        pipeline_root=PIPELINE_ROOT,
                                                        data_root=DATA_ROOT,
                                                        schema_path=SCHEMA_PATH,
                                                        module_file=_trainer_module_file,
                                                        serving_model_dir=SERVING_MODEL_DIR,
                                                        metadata_path=METADATA_PATH)
                                        )

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Generating ephemeral wheel package for '/content/penguin_trainer.py' (including modules: ['penguin_trainer']).
INFO:absl:User module package has hash fingerprint version 000876a22093ec764e3751d5a3ed939f1b107d1d6ade133f954ea2a767b8dfb2.
INFO:absl:Executing: ['/usr/bin/python3', '/tmp/tmpzy31rg8k/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmp69dtwdcm', '--dist-dir', '/tmp/tmpif22wutq']
INFO:absl:Successfully built user code wheel distribution at 'pipelines/penguin-tfdv/_wheels/tfx_user_code_Trainer-0.0+000876a22093ec764e3751d5a3ed939f1b107d1d6ade133f954ea2a767b8dfb2-py3-none-any.whl'; target user module is 'penguin_trainer'.
INFO:absl:Full user module path is 'penguin_trainer@pipelines/penguin-tfdv/_wheels/tfx_user_code_Trainer-0.0+000876a22093ec764e3751d5a3ed939f1b107d1d6ade133f954ea2a767b8dfb2-py3-none-any.whl'
INFO:absl:Using de



INFO:absl:Function `_wrapped_model` contains input name(s) resource with unsupported characters which will be renamed to model_dense_2_biasadd_readvariableop_resource in the SavedModel.
INFO:absl:Found untraced functions such as _update_step_xla while saving (showing 1 of 1). These functions will not be directly callable after loading.
INFO:absl:Writing fingerprint to pipelines/penguin-tfdv/Trainer/model/4/Format-Serving/fingerprint.pb
INFO:absl:Training complete. Model written to pipelines/penguin-tfdv/Trainer/model/4/Format-Serving. ModelRun written to pipelines/penguin-tfdv/Trainer/model_run/4
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 4 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model': [Artifact(artifact: uri: "pipelines/penguin-tfdv/Trainer/model/4"
, artifact_type: name: "Model"
base_type: MODEL
)], 'model_run': [Artifact(artifact: uri: "pipelines/penguin-tfdv/Trainer/mode

You should see "INFO:absl:Component Pusher is finished." if the pipeline
finished successfully.

### Examine outputs of the pipeline

You should see "No anomalies found" for each split of examples. Because we
used the same data which was used for the schema generation in this pipeline,
no anomaly is expected here. If you run this pipeline repeatedly with new
incoming data, ExampleValidator should be able to find any discrepancies
between the new data and the existing schema.

If any anomalies were found, you may review your data to check to see if any
examples do not follow your assumptions. Outputs from other components like
StatisticsGen might be useful. However, any anomalies which are found will
NOT block further pipeline executions.

In [20]:
metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)

with Metadata(metadata_connection_config) as metadata_handler:
  ev_output = get_latest_artifacts(metadata_handler, PIPELINE_NAME, 'ExampleValidator')
  anomalies_artifacts = ev_output[standard_component_specs.ANOMALIES_KEY]

INFO:absl:MetadataStore with DB connection initialized


In [22]:
visualize_artifacts(anomalies_artifacts)