## Set up

In [49]:
!pip install --user tfx tensorflow

You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.[0m[33m
[0m

### Check tensorflow is correctly installed

In [50]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))

TensorFlow version: 2.11.1
TFX version: 1.12.0


### Set up variables

In [51]:
import os

PIPELINE_NAME = "penguin-simple"
SCHEMA_PIPELINE_NAME = "penguin-tfdv-schema"

# Output Dir for pipeline and schema
SCHEMA_PIPELINE_ROOT = os.path.join('pipelines', SCHEMA_PIPELINE_NAME)
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)

# Metadata / SQL dir
SCHEMA_METADATA_PATH = os.path.join('metadata', SCHEMA_PIPELINE_NAME,
                                    'metadata.db')
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')

# Model output dir
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO) # Set default logging level 

### Prepare example CSV data

In [52]:
import urllib.request
import pandas as pd

DATA_DIR = os.path.join('tfx-data')

if not (os.path.exists(DATA_DIR)):
  os.mkdir(DATA_DIR)

# We download dataset from this URL
_data_url = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'
_data_filepath = os.path.join(DATA_DIR, "data.csv")

urllib.request.urlretrieve(_data_url, _data_filepath)

('tfx-data/data.csv', <http.client.HTTPMessage at 0x7f3ff482ff10>)

In [53]:
!head {_data_filepath}

species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g
0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667
0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556
0,0.29818181818181805,0.5833333333333334,0.3898305084745763,0.1527777777777778
0,0.16727272727272732,0.7380952380952381,0.3559322033898305,0.20833333333333334
0,0.26181818181818167,0.892857142857143,0.3050847457627119,0.2638888888888889
0,0.24727272727272717,0.5595238095238096,0.15254237288135594,0.2569444444444444
0,0.25818181818181823,0.773809523809524,0.3898305084745763,0.5486111111111112
0,0.32727272727272727,0.5357142857142859,0.1694915254237288,0.1388888888888889
0,0.23636363636363636,0.9642857142857142,0.3220338983050847,0.3055555555555556


### Create a pipeline

In [54]:
_trainer_module_file = 'penguin_trainer.py'

In [55]:
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
  'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]

_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}

def _input_fn(file_pattern: List[str], 
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:
  return data_accessor.tf_dataset_factory(
    file_pattern,
    tfxio.TensorFlowDatasetOptions(
        batch_size=batch_size, label_key=_LABEL_KEY),
    schema=schema).repeat()

def _build_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
  
  print("Generated Schema: \n", schema)
  
  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _build_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')

Overwriting penguin_trainer.py


#### Create actual TFX pipeline now

In [56]:
from tfx.proto import example_gen_pb2
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  output_config = example_gen_pb2.Output(
      split_config=example_gen_pb2.SplitConfig(splits=[
        example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=6),
        example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=2),
        example_gen_pb2.SplitConfig.Split(name='test', hash_buckets=2)
      ])
  )
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root, output_config=output_config)

  # Lets create some statistics
  statistics_gen = tfx.components.StatisticsGen(examples=example_gen.outputs['examples'])
  
  # Let's create a schema based on the statistics
  schema_gen = tfx.components.SchemaGen(statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
  
  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      statistics_gen,
      schema_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

### Run the pipeline

In [57]:
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=SCHEMA_PIPELINE_NAME,
      pipeline_root=SCHEMA_PIPELINE_ROOT,
      data_root=DATA_DIR,
      module_file=_trainer_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=SCHEMA_METADATA_PATH))

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Generating ephemeral wheel package for '/workspaces/data-centric_ai_bsc_thesis/penguin_trainer.py' (including modules: ['penguin_trainer']).
INFO:absl:User module package has hash fingerprint version 708e8ddbcd9f327c140b552c729957b7701bd70fd339539000385419a51de4be.
INFO:absl:Executing: ['/usr/local/bin/python', '/tmp/tmpzm8qfvdi/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmp8xxuua7m', '--dist-dir', '/tmp/tmpawnfhnao']
INFO:absl:Successfully built user code wheel distribution at 'pipelines/penguin-tfdv-schema/_wheels/tfx_user_code_Trainer-0.0+708e8ddbcd9f327c140b552c729957b7701bd70fd339539000385419a51de4be-py3-none-any.whl'; target user module is 'penguin_trainer'.
INFO:absl:Full user module path is 'penguin_trainer@pipelines/penguin-tfdv-schema/_wheels/tfx_user_code_Trainer-0.0+708e8ddbcd9f327c140b552c729957b7701bd70fd33953900038

running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying penguin_trainer.py -> build/lib
installing to /tmp/tmp8xxuua7m
running install
running install_lib
copying build/lib/penguin_trainer.py -> /tmp/tmp8xxuua7m
running install_egg_info
running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /tmp/tmp8xxuua7m/tfx_user_code_Trainer-0.0+708e8ddbcd9f327c140b552c729957b7701bd70fd339539000385419a51de4be-py3.8.egg-info
running install_scripts
creating /tmp/tmp8xxuua7m/tfx_user_code_Trainer-0.0+708e8ddbcd9f327c140b552c729

INFO:absl:Processing input csv data tfx-data/* to TFExample.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 6 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin-tfdv-schema/CsvExampleGen/examples/6"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1680617863,sum_checksum:1680617863"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key:

Processing ./pipelines/penguin-tfdv-schema/_wheels/tfx_user_code_Trainer-0.0+708e8ddbcd9f327c140b552c729957b7701bd70fd339539000385419a51de4be-py3-none-any.whl


You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.
INFO:absl:Successfully installed 'pipelines/penguin-tfdv-schema/_wheels/tfx_user_code_Trainer-0.0+708e8ddbcd9f327c140b552c729957b7701bd70fd339539000385419a51de4be-py3-none-any.whl'.
INFO:absl:Training model.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting 

Installing collected packages: tfx-user-code-Trainer
Successfully installed tfx-user-code-Trainer-0.0+708e8ddbcd9f327c140b552c729957b7701bd70fd339539000385419a51de4be
Generated Schema: 
 feature {
  name: "body_mass_g"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "culmen_depth_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "culmen_length_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "flipper_length_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "species"
  type: INT
  presence {
    min_fraction: 1.0
  }
  shape {
    dim {
      size: 1
    }
  }
}



INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Model: "model_3"
INFO:absl:__________________________________________________________________________________________________
INFO:absl: Layer (type)                   Output Shape         Param #     Connected to                     
INFO:absl: culmen_length_mm (InputLayer)  [(None, 1)]          0           []                               
INFO:absl:                                                                                                  
INFO:absl: culmen_depth_mm (InputLayer)   [(None, 1)]          0           []                   





INFO:tensorflow:Assets written to: pipelines/penguin-tfdv-schema/Trainer/model/8/Format-Serving/assets


INFO:tensorflow:Assets written to: pipelines/penguin-tfdv-schema/Trainer/model/8/Format-Serving/assets
INFO:absl:Training complete. Model written to pipelines/penguin-tfdv-schema/Trainer/model/8/Format-Serving. ModelRun written to pipelines/penguin-tfdv-schema/Trainer/model_run/8
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 8 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/penguin-tfdv-schema/Trainer/model_run/8"
, artifact_type: name: "ModelRun"
)], 'model': [Artifact(artifact: uri: "pipelines/penguin-tfdv-schema/Trainer/model/8"
, artifact_type: name: "Model"
base_type: MODEL
)]}) for execution 8
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component Trainer is finished.
INFO:absl:Component Pusher is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.pusher.component.Pusher"
    bas

### Review output of the pipeline

In [58]:
from ml_metadata.proto import metadata_store_pb2
# Non-public APIs, just for showcase.
from tfx.orchestration.portable.mlmd import execution_lib

# TODO(b/171447278): Move these functions into the TFX library.

def get_latest_artifacts(metadata, pipeline_name, component_id):
  """Output artifacts of the latest run of the component."""
  context = metadata.store.get_context_by_type_and_name(
      'node', f'{pipeline_name}.{component_id}')
  executions = metadata.store.get_executions_by_context(context.id)
  latest_execution = max(executions,
                         key=lambda e:e.last_update_time_since_epoch)
  return execution_lib.get_output_artifacts(metadata, latest_execution.id)

# Non-public APIs, just for showcase.
from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
        artifact.type_name)
    if visualization:
      visualization.display(artifact)

from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()


In [59]:
# Non-public APIs, just for showcase.
from tfx.orchestration.metadata import Metadata
from tfx.types import standard_component_specs

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    SCHEMA_METADATA_PATH)
with Metadata(metadata_connection_config) as metadata_handler:
  # Find output artifacts from MLMD.
  stat_gen_output = get_latest_artifacts(metadata_handler, SCHEMA_PIPELINE_NAME,
                                         'StatisticsGen')
  stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]

  schema_gen_output = get_latest_artifacts(metadata_handler,
                                           SCHEMA_PIPELINE_NAME, 'SchemaGen')
  schema_artifacts = schema_gen_output[standard_component_specs.SCHEMA_KEY]

INFO:absl:MetadataStore with DB connection initialized


#### Print Stats

In [60]:
# docs-infra: no-execute
visualize_artifacts(stats_artifacts)

#### Print Schema

In [61]:
visualize_artifacts(schema_artifacts)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'body_mass_g',FLOAT,required,,-
'culmen_depth_mm',FLOAT,required,,-
'culmen_length_mm',FLOAT,required,,-
'flipper_length_mm',FLOAT,required,,-
'species',INT,required,,-
