<h2>Setting Up Pipelines </h2>

In [67]:
import os
import ml_metadata as mlmd
from tfx.proto import example_gen_pb2
import tensorflow_model_analysis as tfma

SCHEMA_PIPELINE_NAME = "tfx_demo"
PIPELINE_NAME = "tfx_demo"

SCHEMA_PIPELINE_ROOT = os.path.join('pipelines', SCHEMA_PIPELINE_NAME)
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
SCHEMA_METADATA_PATH = os.path.join('metadata', SCHEMA_PIPELINE_NAME, 'metadata.db')
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)
DATA_DIR = os.path.join('data', PIPELINE_NAME)
SCHEMA_PATH = 'schema'

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

<h2>Import TFx </h2>

In [68]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))

TensorFlow version: 2.8.2
TFX version: 1.8.0


<h2>Data Setup </h2>

In [8]:
import urllib.request
import tempfile

_data_url = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'
_data_filepath = os.path.join(DATA_DIR, "data.csv")
urllib.request.urlretrieve(_data_url, _data_filepath)

('data/tfx_demo/data.csv', <http.client.HTTPMessage at 0x7f730c983890>)

<h2> Trainer Code </h2>

In [32]:
_trainer_module_file = 'tfx_demo_trainer.py'

In [33]:
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:
    return data_accessor.tf_dataset_factory(file_pattern, tfxio.TensorFlowDatasetOptions(batch_size=batch_size, label_key=_LABEL_KEY),schema=schema).repeat()


def _build_keras_model(schema: schema_pb2.Schema) -> tf.keras.Model:
    feature_keys = [f.name for f in schema.feature if f.name != _LABEL_KEY]
    inputs = [keras.layers.Input(shape=(1,), name=f) for f in feature_keys]
    
    d = keras.layers.concatenate(inputs)
    for _ in range(2):
        d = keras.layers.Dense(8, activation='relu')(d)
    outputs = keras.layers.Dense(3)(d)
    
    model = keras.Model(inputs=inputs, outputs=outputs)
    
    model.compile(optimizer=keras.optimizers.Adam(1e-2), loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=[keras.metrics.SparseCategoricalAccuracy()])
    
    model.summary(print_fn=logging.info)
    return model


def run_fn(fn_args: tfx.components.FnArgs):
    schema = tfx.utils.parse_pbtxt_file(fn_args.schema_path, schema_pb2.Schema())
    
    train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor, schema, batch_size=_TRAIN_BATCH_SIZE)
    eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor, schema, batch_size=_EVAL_BATCH_SIZE)
    
    model = _build_keras_model(schema)
    model.fit(train_dataset, steps_per_epoch=fn_args.train_steps, validation_data=eval_dataset, validation_steps=fn_args.eval_steps)
    
    model.save(fn_args.serving_model_dir, save_format='tf')

Overwriting tfx_demo_trainer.py


<h2> Pipeline Creation </h2>

In [69]:
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     schema_path: str, module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
    """Creates a pipeline using predefined schema with TFX."""
    
    input = example_gen_pb2.Input(splits=[example_gen_pb2.Input.Split(pattern='input-{SPAN}/*')])
    example_gen = tfx.components.CsvExampleGen(input_base=DATA_DIR, input_config=input)
 
    statistics_gen = tfx.components.StatisticsGen(examples=example_gen.outputs['examples'])
    infer_schema = tfx.components.SchemaGen(statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
    schema_importer = tfx.dsl.Importer(source_uri=schema_path,artifact_type=tfx.types.standard_artifacts.Schema).with_id('schema_importer')
    example_validator = tfx.components.ExampleValidator(statistics=statistics_gen.outputs['statistics'], schema=schema_importer.outputs['result'])
    
    trainer = tfx.components.Trainer(module_file=module_file,examples=example_gen.outputs['examples'],schema=schema_importer.outputs['result'], train_args=tfx.proto.TrainArgs(num_steps=100),eval_args=tfx.proto.EvalArgs(num_steps=5))
    
    eval_config = tfma.EvalConfig(
        model_specs=[tfma.ModelSpec(label_key='species', signature_name='serving_default')],
        metrics_specs=[tfma.MetricsSpec(metrics=[tfma.MetricConfig(class_name='SparseCategoricalAccuracy', threshold=tfma.MetricThreshold(value_threshold=tfma.GenericValueThreshold(lower_bound={'value': 0.6})))])],
        slicing_specs=[tfma.SlicingSpec()])
    
    evaluator = tfx.components.Evaluator(examples=example_gen.outputs['examples'],model=trainer.outputs['model'],schema=infer_schema.outputs['schema'],eval_config=eval_config)
    pusher = tfx.components.Pusher(model=trainer.outputs['model'],model_blessing=evaluator.outputs['blessing'],push_destination=tfx.proto.PushDestination(filesystem=tfx.proto.PushDestination.Filesystem(base_directory=serving_model_dir)))
    
    components = [example_gen, statistics_gen, infer_schema, schema_importer, example_validator,trainer,evaluator,pusher]
    
    return tfx.dsl.Pipeline(pipeline_name=pipeline_name, pipeline_root=pipeline_root, metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(metadata_path),components=components)

<h2>Local Run </h2>

In [70]:
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_DIR,
      schema_path=SCHEMA_PATH,
      module_file=_trainer_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Generating ephemeral wheel package for '/home/ec2-user/raahul_works/tfx_pipelines/tfx_demo_trainer.py' (including modules: ['penguin_trainer_sklearn', 'tfx_demo_trainer']).
INFO:absl:User module package has hash fingerprint version 0b8b809b95aa6362f4fdaec4f989327d854ca186e0f8d8240d606e15bb7bb41b.
INFO:absl:Executing: ['/usr/bin/python3', '/tmp/tmppa234m6g/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmp9amafi_h', '--dist-dir', '/tmp/tmppsxarhlr']


running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying penguin_trainer_sklearn.py -> build/lib
copying tfx_demo_trainer.py -> build/lib
installing to /tmp/tmp9amafi_h
running install
running install_lib
copying build/lib/penguin_trainer_sklearn.py -> /tmp/tmp9amafi_h
copying build/lib/tfx_demo_trainer.py -> /tmp/tmp9amafi_h
running install_egg_info
running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /tmp/tmp9amafi_h/tfx_user_code_Trainer-0.0+0b8b809b95aa6362f4fdaec4f989327d854ca186e0f8d8240d606e15bb7bb41b-py3

INFO:absl:Successfully built user code wheel distribution at 'pipelines/tfx_demo/_wheels/tfx_user_code_Trainer-0.0+0b8b809b95aa6362f4fdaec4f989327d854ca186e0f8d8240d606e15bb7bb41b-py3-none-any.whl'; target user module is 'tfx_demo_trainer'.
INFO:absl:Full user module path is 'tfx_demo_trainer@pipelines/tfx_demo/_wheels/tfx_user_code_Trainer-0.0+0b8b809b95aa6362f4fdaec4f989327d854ca186e0f8d8240d606e15bb7bb41b-py3-none-any.whl'
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "Evaluator"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.evaluator.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "ExampleValidator"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_validato

Processing ./pipelines/tfx_demo/_wheels/tfx_user_code_Trainer-0.0+0b8b809b95aa6362f4fdaec4f989327d854ca186e0f8d8240d606e15bb7bb41b-py3-none-any.whl
Installing collected packages: tfx-user-code-Trainer
Successfully installed tfx-user-code-Trainer-0.0+0b8b809b95aa6362f4fdaec4f989327d854ca186e0f8d8240d606e15bb7bb41b


INFO:absl:Successfully installed 'pipelines/tfx_demo/_wheels/tfx_user_code_Trainer-0.0+0b8b809b95aa6362f4fdaec4f989327d854ca186e0f8d8240d606e15bb7bb41b-py3-none-any.whl'.
INFO:absl:Training model.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INF

INFO:tensorflow:Assets written to: pipelines/tfx_demo/Trainer/model/31/Format-Serving/assets


INFO:tensorflow:Assets written to: pipelines/tfx_demo/Trainer/model/31/Format-Serving/assets
INFO:absl:Training complete. Model written to pipelines/tfx_demo/Trainer/model/31/Format-Serving. ModelRun written to pipelines/tfx_demo/Trainer/model_run/31
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 31 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/tfx_demo/Trainer/model_run/31"
custom_properties {
  key: "name"
  value {
    string_value: "tfx_demo:2022-06-28T13:04:36.241723:Trainer:model_run:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.8.0"
  }
}
name: "tfx_demo:2022-06-28T13:04:36.241723:Trainer:model_run:0"
, artifact_type: name: "ModelRun"
)], 'model': [Artifact(artifact: uri: "pipelines/tfx_demo/Trainer/model/31"
custom_properties {
  key: "name"
  value {
    string_value: "tfx_demo:2022-06-28T13:04:3

<h2> MLMD Call </h2>

In [71]:
metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
print('MLMD version: {}'.format(mlmd.__version__))
store = mlmd.MetadataStore(metadata_connection_config)
base_dir = metadata_connection_config.sqlite.filename_uri.split('metadata.sqlite')[0]

INFO:absl:MetadataStore with DB connection initialized


MLMD version: 1.8.0


<h2>Data Visulisation Util </h2>

In [72]:
import urllib
import pandas as pd

def display_types(types):
    table = {'id': [], 'name': []}
    for a_type in types:
        table['id'].append(a_type.id)
        table['name'].append(a_type.name)
    return pd.DataFrame(data=table)


def display_artifacts(store, artifacts):
  # Helper function to render dataframes for the input artifacts
  table = {'artifact id': [], 'type': [], 'uri': []}
  for a in artifacts:
    table['artifact id'].append(a.id)
    artifact_type = store.get_artifact_types_by_id([a.type_id])[0]
    table['type'].append(artifact_type.name)
    table['uri'].append(a.uri.replace(base_dir, './'))
  return pd.DataFrame(data=table)

def display_properties(store, node):
  # Helper function to render dataframes for artifact and execution properties
  table = {'property': [], 'value': []}
  for k, v in node.properties.items():
    table['property'].append(k)
    table['value'].append(
        v.string_value if v.HasField('string_value') else v.int_value)
  for k, v in node.custom_properties.items():
    table['property'].append(k)
    table['value'].append(
        v.string_value if v.HasField('string_value') else v.int_value)
  return pd.DataFrame(data=table)

In [73]:
display_types(store.get_artifact_types())

Unnamed: 0,id,name
0,15,Examples
1,17,Schema
2,19,ExampleStatistics
3,21,ModelRun
4,22,Model
5,24,ExampleAnomalies
6,26,PushedModel
7,29,ModelBlessing
8,30,ModelEvaluation


<h2> Pushed Model Details </h2>

In [74]:
pushed_models = store.get_artifacts_by_type("PushedModel")
display_artifacts(store, pushed_models)

Unnamed: 0,artifact id,type,uri
0,7,PushedModel,pipelines/tfx_demo/Pusher/pushed_model/6
1,13,PushedModel,pipelines/tfx_demo/Pusher/pushed_model/13
2,19,PushedModel,pipelines/tfx_demo/Pusher/pushed_model/19
3,28,PushedModel,pipelines/tfx_demo/Pusher/pushed_model/27
4,37,PushedModel,pipelines/tfx_demo/Pusher/pushed_model/35


In [75]:
pushed_model = pushed_models[-1]
display_properties(store, pushed_model)

Unnamed: 0,property,value
0,pushed_destination,serving_model/tfx_demo/1656421512
1,name,tfx_demo:2022-06-28T13:04:36.241723:Pusher:pus...
2,pushed,1
3,pushed_version,1656421512
4,tfx_version,1.8.0


In [76]:
parent_artifacts = get_one_hop_parent_artifacts(store, [pushed_model])

In [77]:
def get_one_hop_parent_artifacts(store, artifacts):
  # Get a list of artifacts within a 1-hop of the artifacts of interest
  artifact_ids = [artifact.id for artifact in artifacts]
  executions_ids = set(
      event.execution_id
      for event in store.get_events_by_artifact_ids(artifact_ids)
      if event.type == mlmd.proto.Event.OUTPUT)
  artifacts_ids = set(
      event.artifact_id
      for event in store.get_events_by_execution_ids(executions_ids)
      if event.type == mlmd.proto.Event.INPUT)
  return [artifact for artifact in store.get_artifacts_by_id(artifacts_ids)]

In [78]:
parent_artifacts = get_one_hop_parent_artifacts(store, [pushed_model])
display_artifacts(store, parent_artifacts)

Unnamed: 0,artifact id,type,uri
0,32,Model,pipelines/tfx_demo/Trainer/model/31
1,35,ModelBlessing,pipelines/tfx_demo/Evaluator/blessing/34


In [79]:
exported_model = parent_artifacts[0]
display_properties(store, exported_model)

Unnamed: 0,property,value
0,tfx_version,1.8.0
1,name,tfx_demo:2022-06-28T13:04:36.241723:Trainer:mo...


In [80]:
model_parents = get_one_hop_parent_artifacts(store, [exported_model])
display_artifacts(store, model_parents)

Unnamed: 0,artifact id,type,uri
0,2,Schema,schema
1,29,Examples,pipelines/tfx_demo/CsvExampleGen/examples/28
