<a href="https://colab.research.google.com/github/robertf99/tfx-e2e/blob/main/tfx_tfdf_e2e_run.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -U tensorflow_decision_forests tfx==1.0.0 wurlitzer

Installing collected packages: requests, prompt-toolkit, packaging, ipython, grpcio-gcp, google-crc32c, pyarrow, proto-plus, hdfs, grpc-google-iam-v1, google-resumable-media, google-cloud-core, future, fasteners, fastavro, dill, avro-python3, google-cloud-vision, google-cloud-videointelligence, google-cloud-spanner, google-cloud-pubsub, google-cloud-profiler, google-cloud-language, google-cloud-dlp, google-cloud-bigtable, google-cloud-bigquery, google-apitools, apache-beam, websocket-client, tensorflow-serving-api, tensorflow-metadata, joblib, attrs, tfx-bsl, terminaltables, ml-metadata, google-cloud-storage, docker, colorama, tensorflow-transform, tensorflow-model-analysis, tensorflow-data-validation, ml-pipelines-sdk, kubernetes, keras-tuner, google-cloud-aiplatform, wurlitzer, tfx, tensorflow-decision-forests
  Attempting uninstall: requests
    Found existing installation: requests 2.23.0
    Uninstalling requests-2.23.0:
      Successfully uninstalled requests-2.23.0
  Attempting 

In [1]:
import tensorflow_decision_forests as tfdf

import os
import numpy as np
import pandas as pd
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import math

try:
  from wurlitzer import sys_pipes
except:
  from colabtools.googlelog import CaptureLog as sys_pipes

from IPython.core.magic import register_line_magic
from IPython.display import Javascript

TensorFlow version: 2.5.0
TFX version: 1.0.0


In [2]:
@register_line_magic
def set_cell_height(size):
  display(
      Javascript("google.colab.output.setIframeHeight(0, true, {maxHeight: " +
                 str(size) + "})"))

## 0. Configs

In [18]:
import os

# We will create two pipelines. One for schema generation and one for training.
SCHEMA_PIPELINE_NAME = "penguin-tfdv-schema"
PIPELINE_NAME = "penguin-tfdv"

# Output directory to store artifacts generated from the pipeline.
SCHEMA_PIPELINE_ROOT = os.path.join('pipelines', SCHEMA_PIPELINE_NAME)
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
SCHEMA_METADATA_PATH = os.path.join('metadata', SCHEMA_PIPELINE_NAME,
                                    'metadata.db')
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')

# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

SAVED_SCHEMA_NAME = "schema.pbtxt"
SAVED_SCHEMA_PATH = os.path.join("schema", SCHEMA_PIPELINE_NAME, SAVED_SCHEMA_NAME)
from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

In [4]:
import os
import pprint
import tempfile
import urllib

import absl
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()

from tfx import v1 as tfx
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip

## 1. Data Ready


In [5]:
import urllib.request
import tempfile

DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.
_data_url = 'https://storage.googleapis.com/download.tensorflow.org/data/palmer_penguins/penguins.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_url, _data_filepath)
!head {_data_filepath}

species,island,bill_length_mm,bill_depth_mm,flipper_length_mm,body_mass_g,sex,year
Adelie,Torgersen,39.1,18.7,181,3750,male,2007
Adelie,Torgersen,39.5,17.4,186,3800,female,2007
Adelie,Torgersen,40.3,18,195,3250,female,2007
Adelie,Torgersen,NA,NA,NA,NA,NA,2007
Adelie,Torgersen,36.7,19.3,193,3450,female,2007
Adelie,Torgersen,39.3,20.6,190,3650,male,2007
Adelie,Torgersen,38.9,17.8,181,3625,female,2007
Adelie,Torgersen,39.2,19.6,195,4675,male,2007
Adelie,Torgersen,34.1,18.1,193,3475,NA,2007


### 1.1 Label predictor column

In [6]:
dataset_df = pd.read_csv(_data_filepath)
print(dataset_df.shape)
# Name of the label column.
label = "species"
classes = dataset_df[label].unique().tolist()
print(f"Label classes: {classes}")
dataset_df[label] = dataset_df[label].map(classes.index)
clean_df = dataset_df.dropna()
print(clean_df.shape)
clean_df.to_csv(_data_filepath, index=False)
!head {_data_filepath}

(344, 8)
Label classes: ['Adelie', 'Gentoo', 'Chinstrap']
(333, 8)
species,island,bill_length_mm,bill_depth_mm,flipper_length_mm,body_mass_g,sex,year
0,Torgersen,39.1,18.7,181.0,3750.0,male,2007
0,Torgersen,39.5,17.4,186.0,3800.0,female,2007
0,Torgersen,40.3,18.0,195.0,3250.0,female,2007
0,Torgersen,36.7,19.3,193.0,3450.0,female,2007
0,Torgersen,39.3,20.6,190.0,3650.0,male,2007
0,Torgersen,38.9,17.8,181.0,3625.0,female,2007
0,Torgersen,39.2,19.6,195.0,4675.0,male,2007
0,Torgersen,41.1,17.6,182.0,3200.0,female,2007
0,Torgersen,38.6,21.2,191.0,3800.0,male,2007


## 2. Define Pipeline

### schema pipeline

In [8]:
from tfx import v1 as tfx

def create_schema_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)
  
  # Computes statistics over data for visualization and schema generation.
  statistics_gen = tfx.components.StatisticsGen(
      examples=example_gen.outputs['examples'])

  # Generates schema based on the generated statistics.
  schema_gen = tfx.components.SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

  components = [
      example_gen,
      statistics_gen,
      schema_gen
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

In [10]:
tfx.orchestration.LocalDagRunner().run(
  create_schema_pipeline(
      pipeline_name=SCHEMA_PIPELINE_NAME,
      pipeline_root=SCHEMA_PIPELINE_ROOT,
      data_root=DATA_ROOT,
      metadata_path=SCHEMA_METADATA_PATH))

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Running pipeline:
 pipeline_info {
  id: "penguin-tfdv-schema"
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
      }
      id: "CsvExampleGen"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "penguin-tfdv-schema"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-08-03T11:31:15.238388"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "penguin-tfdv-schema.CsvExampleGen"
          }
        }
      }
    }
    outputs {
   

INFO:absl:Processing input csv data /tmp/tfx-datar21onp4k/* to TFExample.
INFO:absl:Examples generated.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 1 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin-tfdv-schema/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:14417,xor_checksum:1627990185,sum_checksum:1627990185"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "penguin-tfdv-schema:2021-08-03T11:31:15.238388:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.0.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
propert

In [21]:
from tfx.orchestration.metadata import Metadata
from ml_metadata.proto import metadata_store_pb2
# Non-public APIs, just for showcase.
from tfx.orchestration.portable.mlmd import execution_lib
from tfx.types import standard_component_specs
import shutil


# TODO(b/171447278): Move these functions into the TFX library.

def get_latest_artifacts(metadata, pipeline_name, component_id):
  """Output artifacts of the latest run of the component."""
  context = metadata.store.get_context_by_type_and_name(
      'node', f'{pipeline_name}.{component_id}')
  executions = metadata.store.get_executions_by_context(context.id)
  latest_execution = max(executions,
                         key=lambda e:e.last_update_time_since_epoch)
  return execution_lib.get_artifacts_dict(metadata, latest_execution.id, 
                                          metadata_store_pb2.Event.OUTPUT)

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    SCHEMA_METADATA_PATH)

with Metadata(metadata_connection_config) as metadata_handler:
  # Find output artifacts from MLMD.
  stat_gen_output = get_latest_artifacts(metadata_handler,SCHEMA_PIPELINE_NAME,
                                         'StatisticsGen')
  stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]

  schema_gen_output = get_latest_artifacts(metadata_handler,
                                           SCHEMA_PIPELINE_NAME, 'SchemaGen')
  schema_artifacts = schema_gen_output[standard_component_specs.SCHEMA_KEY]


os.makedirs(SAVED_SCHEMA_PATH, exist_ok=True)
_generated_path = os.path.join(schema_artifacts[0].uri, SAVED_SCHEMA_NAME)

# Copy the 'schema.pbtxt' file from the artifact uri to a predefined path.
shutil.copy(_generated_path, SAVED_SCHEMA_PATH)

INFO:absl:MetadataStore with DB connection initialized


'schema/penguin-tfdv-schema/schema.pbtxt/schema.pbtxt'

### e2e piepline

In [24]:
TRAINER_MODULE_PATH = 'penguin_trainer.py'

In [38]:
%%writefile {TRAINER_MODULE_PATH}


import tensorflow_decision_forests as tfdf
from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras

from tfx import v1 as tfx

# from tfx_bsl.public import tfxio
from tfx_bsl.tfxio import dataset_options
from tensorflow_metadata.proto.v0 import schema_pb2

from wurlitzer import sys_pipes

_LABEL_KEY = "species"

_TRAIN_BATCH_SIZE = 238
_EVAL_BATCH_SIZE = 95


def _input_fn(
    file_pattern: List[str],
    data_accessor: tfx.components.DataAccessor,
    schema: schema_pb2.Schema,
    batch_size: int = 200,
) -> tf.data.Dataset:
    """Generates features and label for training.

    Args:
      file_pattern: List of paths or patterns of input tfrecord files.
      data_accessor: DataAccessor for converting input to RecordBatch.
      schema: schema of the input data.
      batch_size: representing the number of consecutive elements of returned
        dataset to combine in a single batch

    Returns:
      A dataset that contains (features, indices) tuple where features is a
        dictionary of Tensors, and indices is a single Tensor of label indices.
    """

    dataset = data_accessor.tf_dataset_factory(
        file_pattern,
        dataset_options.TensorFlowDatasetOptions(
            batch_size=batch_size, label_key=_LABEL_KEY, num_epochs=1
        ),
        schema,
    )

    # def prepare_label(feature_dict):
    #     # label_dict = tf.sparse.to_dense(
    #     #     feature_dict.pop(_LABEL_KEY),
    #     #     default_value=None,
    #     #     validate_indices=True,
    #     #     name=None,
    #     # )
    #     label_dict = feature_dict.pop(_LABEL_KEY)

    #     return feature_dict, label_dict

    # dataset = dataset.map(prepare_label)
    return dataset


def _build_tfdf_model():
    model = tfdf.keras.RandomForestModel()
    model.compile(metrics=["accuracy"])
    return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
    """Train the model based on given args.

    Args:
      fn_args: Holds args used to train the model as name/value pairs.
    """

    schema = tfx.utils.parse_pbtxt_file(fn_args.schema_path, schema_pb2.Schema())
    train_dataset = _input_fn(
        fn_args.train_files, fn_args.data_accessor, schema, batch_size=_TRAIN_BATCH_SIZE
    )
    #   for features, label in train_dataset.take(1):  # only take first element of dataset
    #     print('*************')
    #     print(features)
    #     print(label)
    eval_dataset = _input_fn(
        fn_args.eval_files, fn_args.data_accessor, schema, batch_size=_EVAL_BATCH_SIZE
    )

    model = _build_tfdf_model()
    with sys_pipes():
        model.fit(
            train_dataset,
            # steps_per_epoch=fn_args.train_steps,
            validation_data=eval_dataset,
        )
        # validation_steps=fn_args.eval_steps)
        print(model.summary())

    model.make_inspector().export_to_tensorboard(fn_args.model_run_dir)
    # The result of the training should be saved in `fn_args.serving_model_dir`
    # directory.
    model.save(fn_args.serving_model_dir, save_format="tf")


Overwriting penguin_trainer.py


In [39]:
from tfx import v1 as tfx
import tensorflow_model_analysis as tfma


def create_schema_pipeline(
    pipeline_name: str,
    pipeline_root: str,
    data_root: str,
    metadata_path: str,
    schema_path: str,
    trainer_module_file: str,
    serving_model_dir: str,
) -> tfx.dsl.Pipeline:
    """Creates a three component penguin pipeline with TFX."""
    # Split data
    example_gen = tfx.components.CsvExampleGen(input_base=data_root)

    # Generate current data statistics
    statistics_gen = tfx.components.StatisticsGen(
        examples=example_gen.outputs["examples"]
    )

    # Import saved schema
    schema_importer = tfx.dsl.Importer(
        source_uri=schema_path, artifact_type=tfx.types.standard_artifacts.Schema
    ).with_id("schema_importer")

    # Validate Schema
    example_validator = tfx.components.ExampleValidator(
        statistics=statistics_gen.outputs["statistics"],
        schema=schema_importer.outputs["result"],
    )

    # Trainer
    trainer = tfx.components.Trainer(
        module_file=trainer_module_file,
        examples=example_gen.outputs["examples"],
        schema=schema_importer.outputs["result"],  # Pass the imported schema.
        train_args=tfx.proto.TrainArgs(),
        eval_args=tfx.proto.EvalArgs(),
    )

    # Evaluation
    eval_config = tfma.EvalConfig(
        model_specs=[
            # This assumes a serving model with signature 'serving_default'. If
            # using estimator based EvalSavedModel, add signature_name: 'eval' and
            # remove the label_key.
            tfma.ModelSpec(label_key="species")
        ],
        metrics_specs=[
            tfma.MetricsSpec(
                metrics=[
                    tfma.MetricConfig(class_name="ExampleCount"),
                    tfma.MetricConfig(
                        class_name="SparseCategoricalAccuracy",
                        threshold=tfma.MetricThreshold(
                            value_threshold=tfma.GenericValueThreshold(
                                lower_bound={"value": 0.9}
                            ),
                            # Change threshold will be ignored if there is no
                            # baseline model resolved from MLMD (first run).
                            change_threshold=tfma.GenericChangeThreshold(
                                direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                                absolute={"value": -1e-10},
                            ),
                        ),
                    ),
                ]
            )
        ],
        slicing_specs=[
            # An empty slice spec means the overall slice, i.e. the whole dataset.
            tfma.SlicingSpec(),
            # Data can be sliced along a feature column. In this case, data is
            # sliced along feature column trip_start_hour.
            tfma.SlicingSpec(feature_keys=["sex"]),
        ],
    )

    model_resolver = tfx.dsl.Resolver(
        strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
        model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
        model_blessing=tfx.dsl.Channel(type=tfx.types.standard_artifacts.ModelBlessing),
    ).with_id("latest_blessed_model_resolver")

    evaluator = tfx.components.Evaluator(
        examples=example_gen.outputs["examples"],
        model=trainer.outputs["model"],
        baseline_model=model_resolver.outputs["model"],
        eval_config=eval_config,
    )

    # Pusher
    pusher = tfx.components.Pusher(
        model=trainer.outputs["model"],
        model_blessing=evaluator.outputs["blessing"],
        push_destination=tfx.proto.PushDestination(
            filesystem=tfx.proto.PushDestination.Filesystem(
                base_directory=serving_model_dir
            )
        ),
    )

    components = [
        example_gen,
        statistics_gen,
        schema_importer,
        example_validator,
        trainer,
        evaluator,
        pusher,
    ]

    return tfx.dsl.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(
            metadata_path
        ),
        components=components,
    )


In [44]:
tfx.orchestration.LocalDagRunner().run(
    create_schema_pipeline(
        pipeline_name=SCHEMA_PIPELINE_NAME,
        pipeline_root=SCHEMA_PIPELINE_ROOT,
        data_root=DATA_ROOT,
        metadata_path=SCHEMA_METADATA_PATH,
        schema_path=SAVED_SCHEMA_PATH,
        trainer_module_file=TRAINER_MODULE_PATH,
        serving_model_dir=SERVING_MODEL_DIR,
    ),
)

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Generating ephemeral wheel package for '/content/penguin_trainer.py' (including modules: ['penguin_trainer']).
INFO:absl:User module package has hash fingerprint version 3aa89e3eb08eebb43a41f8e85df3868ac3b273cc746a5cdcbc2eacd516acf9ce.
INFO:absl:Executing: ['/usr/bin/python3', '/tmp/tmp5ud5vvlk/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmpypmkxsb4', '--dist-dir', '/tmp/tmpuimw8i0n']
INFO:absl:Successfully built user code wheel distribution at 'pipelines/penguin-tfdv-schema/_wheels/tfx_user_code_Trainer-0.0+3aa89e3eb08eebb43a41f8e85df3868ac3b273cc746a5cdcbc2eacd516acf9ce-py3-none-any.whl'; target user module is 'penguin_trainer'.
INFO:absl:Full user module path is 'penguin_trainer@pipelines/penguin-tfdv-schema/_wheels/tfx_user_code_Trainer-0.0+3aa89e3eb08eebb43a41f8e85df3868ac3b273cc746a5cdcbc2eacd516acf9ce-py3-none-any.whl'
INFO

      1/Unknown - 0s 339ms/step

[INFO kernel.cc:746] Start Yggdrasil model training
[INFO kernel.cc:747] Collect training examples
[INFO kernel.cc:392] Number of batches: 1
[INFO kernel.cc:393] Number of examples: 238
[INFO kernel.cc:769] Dataset:
Number of records: 238
Number of columns: 8

Number of columns by type:
	NUMERICAL: 5 (62.5%)
	CATEGORICAL: 3 (37.5%)

Columns:

NUMERICAL: 5 (62.5%)
	0: "bill_depth_mm" NUMERICAL mean:17.0895 min:13.2 max:21.5 sd:1.90863
	1: "bill_length_mm" NUMERICAL mean:44.3345 min:32.1 max:59.6 sd:5.4891
	2: "body_mass_g" NUMERICAL mean:4230.78 min:2700 max:6300 sd:816.615
	3: "flipper_length_mm" NUMERICAL mean:201.466 min:172 max:231 sd:14.0735
	6: "year" NUMERICAL mean:2008.07 min:2007 max:2009 sd:0.814227

CATEGORICAL: 3 (37.5%)
	4: "island" CATEGORICAL has-dict vocab-size:4 zero-ood-items most-frequent:"Biscoe" 122 (51.2605%)
	5: "sex" CATEGORICAL has-dict vocab-size:3 zero-ood-items most-frequent:"male" 120 (50.4202%)
	7: "__LABEL" CATEGORICAL integerized vocab-size:4 no-ood-item


Model: "random_forest_model_5"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
Total params: 1
Trainable params: 0
Non-trainable params: 1
_________________________________________________________________
Type: "RANDOM_FOREST"
Task: CLASSIFICATION
Label: "__LABEL"

Input Features (7):
	bill_depth_mm
	bill_length_mm
	body_mass_g
	flipper_length_mm
	island
	sex
	year

No weights

Variable Importance: MEAN_MIN_DEPTH:
    1.           "__LABEL"  3.276547 ################
    2.              "year"  3.261754 ###############
    3.               "sex"  3.235646 ###############
    4.       "body_mass_g"  2.701349 ###########
    5.            "island"  2.102374 ######
    6.     "bill_depth_mm"  2.053149 #####
    7.    "bill_length_mm"  1.476428 #
    8. "flipper_length_mm"  1.353696 

Variable Importance: NUM_AS_ROOT:
    1. "flipper_length_mm" 154.000000 ################
    2.     "bill_depth_mm" 68.00000

INFO:absl:Training complete. Model written to pipelines/penguin-tfdv-schema/Trainer/model/42/Format-Serving. ModelRun written to pipelines/penguin-tfdv-schema/Trainer/model_run/42
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 42 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/penguin-tfdv-schema/Trainer/model_run/42"
custom_properties {
  key: "name"
  value {
    string_value: "penguin-tfdv-schema:2021-08-03T12:41:46.314475:Trainer:model_run:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.0.0"
  }
}
, artifact_type: name: "ModelRun"
)], 'model': [Artifact(artifact: uri: "pipelines/penguin-tfdv-schema/Trainer/model/42"
custom_properties {
  key: "name"
  value {
    string_value: "penguin-tfdv-schema:2021-08-03T12:41:46.314475:Trainer:model:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    stri

ValueError: ignored

In [None]:
context = InteractiveContext()

In [None]:
example_gen = tfx.components.CsvExampleGen(input_base=DATA_ROOT)
context.run(example_gen)

In [None]:
artifact = example_gen.outputs['examples'].get()[0]
# Get the URI of the output artifact representing the training examples, which is a directory
train_uri = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'Split-train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(2):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  pp.pprint(example)


In [None]:
tfrecord_filenames

## 2. Schema Generation

In [None]:
statistics_gen = tfx.components.StatisticsGen(
    examples=example_gen.outputs['examples'])
context.run(statistics_gen)

schema_gen = tfx.components.SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=False)
context.run(schema_gen)

In [None]:
def _create_schema_pipeline(pipeline_name: str, 
                            pipeline_root: str, 
                            data_root: str,
                            metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)
  # NEW: Computes statistics over data for visualization and schema generation.
  statistics_gen = tfx.components.StatisticsGen(
      examples=example_gen.outputs['examples'])

  # NEW: Generates schema based on the generated statistics.
  schema_gen = tfx.components.SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
  

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      statistics_gen,
      schema_gen,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

In [None]:
tfx.orchestration.LocalDagRunner().run(
  _create_schema_pipeline(
      pipeline_name=SCHEMA_PIPELINE_NAME,
      pipeline_root=SCHEMA_PIPELINE_ROOT,
      data_root=DATA_ROOT,
      metadata_path=SCHEMA_METADATA_PATH))

### 2.1 Check output

In [None]:
from ml_metadata.proto import metadata_store_pb2
# Non-public APIs, just for showcase.
from tfx.orchestration.portable.mlmd import execution_lib

# TODO(b/171447278): Move these functions into the TFX library.

def get_latest_artifacts(metadata, pipeline_name, component_id):
  """Output artifacts of the latest run of the component."""
  context = metadata.store.get_context_by_type_and_name(
      'node', f'{pipeline_name}.{component_id}')
  executions = metadata.store.get_executions_by_context(context.id)
  latest_execution = max(executions,
                         key=lambda e:e.last_update_time_since_epoch)
  return execution_lib.get_artifacts_dict(metadata, latest_execution.id, 
                                          metadata_store_pb2.Event.OUTPUT)

# Non-public APIs, just for showcase.
from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
        artifact.type_name)
    if visualization:
      visualization.display(artifact)

from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()

In [None]:
# Non-public APIs, just for showcase.
from tfx.orchestration.metadata import Metadata
from tfx.types import standard_component_specs

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    SCHEMA_METADATA_PATH)

with Metadata(metadata_connection_config) as metadata_handler:
  # Find output artifacts from MLMD.
  stat_gen_output = get_latest_artifacts(metadata_handler, SCHEMA_PIPELINE_NAME,
                                         'StatisticsGen')
  stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]

  schema_gen_output = get_latest_artifacts(metadata_handler,
                                           SCHEMA_PIPELINE_NAME, 'SchemaGen')
  schema_artifacts = schema_gen_output[standard_component_specs.SCHEMA_KEY]

In [None]:
visualize_artifacts(stats_artifacts)

In [None]:
visualize_artifacts(schema_artifacts)

In [None]:
import shutil

_schema_filename = 'schema.pbtxt'
SCHEMA_PATH = 'schema'

os.makedirs(SCHEMA_PATH, exist_ok=True)
_generated_path = os.path.join(schema_artifacts[0].uri, _schema_filename)

# Copy the 'schema.pbtxt' file from the artifact uri to a predefined path.
shutil.copy(_generated_path, SCHEMA_PATH)

## 3. Validate schema

In [None]:
example_validator = tfx.components.ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])
context.run(example_validator)

In [None]:
context.show(example_validator.outputs['anomalies'])

## 4. Trainer

In [None]:
_trainer_module_file = 'penguin_trainer.py'

In [None]:
%%writefile {_trainer_module_file}

import tensorflow_decision_forests as tfdf
from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
# from tfx_bsl.public import tfxio
from tfx_bsl.tfxio import dataset_options
from tensorflow_metadata.proto.v0 import schema_pb2

from wurlitzer import sys_pipes

_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 238
_EVAL_BATCH_SIZE = 95


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
    
  dataset = data_accessor.tf_dataset_factory(
      file_pattern,
      dataset_options.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=None, num_epochs=1),
      schema)
  
  def prepare_label(feature_dict):
    print(type(feature_dict))
    print(feature_dict)
    label_dict = tf.sparse.to_dense(
        feature_dict.pop(_LABEL_KEY), default_value=None, validate_indices=True, name=None
    )
    return feature_dict, label_dict

  dataset = dataset.map(prepare_label)
  return dataset

def _build_tfdf_model():
    model = tfdf.keras.RandomForestModel()
    model.compile(metrics=["accuracy"])
    return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  schema = tfx.utils.parse_pbtxt_file(fn_args.schema_path, schema_pb2.Schema())
  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
#   for features, label in train_dataset.take(1):  # only take first element of dataset
#     print('*************')
#     print(features)
#     print(label)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _build_tfdf_model()
  with sys_pipes():
    model.fit(
        train_dataset,
        # steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset)
        # validation_steps=fn_args.eval_steps)
    print(model.summary())

  model.make_inspector().export_to_tensorboard(fn_args.model_run_dir)
  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')


In [None]:
trainer = tfx.components.Trainer(
    module_file=os.path.abspath(_trainer_module_file),
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    train_args=tfx.proto.TrainArgs(),
    eval_args=tfx.proto.EvalArgs())
context.run(trainer)

In [None]:
model_artifact_dir = trainer.outputs['model'].get()[0].uri
pp.pprint(os.listdir(model_artifact_dir))
model_dir = os.path.join(model_artifact_dir, 'Format-Serving')
pp.pprint(os.listdir(model_dir))

In [None]:
model_run_artifact_dir = trainer.outputs['model_run'].get()[0].uri
print(model_run_artifact_dir)
os.listdir(model_run_artifact_dir)

In [None]:
%load_ext tensorboard
%tensorboard --logdir {model_run_artifact_dir}

## 5. Evaluation

In [None]:
eval_config = tfma.EvalConfig(
    model_specs=[
        # This assumes a serving model with signature 'serving_default'. If
        # using estimator based EvalSavedModel, add signature_name: 'eval' and 
        # remove the label_key.
        tfma.ModelSpec(label_key='species')
    ],
    metrics_specs=[
        tfma.MetricsSpec(
            metrics=[
                tfma.MetricConfig(class_name='ExampleCount'),
                tfma.MetricConfig(class_name='SparseCategoricalAccuracy',
                  threshold=tfma.MetricThreshold(
                    value_threshold=tfma.GenericValueThreshold(
                        lower_bound={'value': 0.5}),
                    # Change threshold will be ignored if there is no
                    # baseline model resolved from MLMD (first run).
                    change_threshold=tfma.GenericChangeThreshold(
                        direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                        absolute={'value': -1e-10})))
            ]
        )
    ],
    slicing_specs=[
        # An empty slice spec means the overall slice, i.e. the whole dataset.
        tfma.SlicingSpec(),
        # Data can be sliced along a feature column. In this case, data is
        # sliced along feature column trip_start_hour.
        tfma.SlicingSpec(feature_keys=['sex'])
    ])

model_resolver = tfx.dsl.Resolver(
      strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
      model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
      model_blessing=tfx.dsl.Channel(
          type=tfx.types.standard_artifacts.ModelBlessing)).with_id(
              'latest_blessed_model_resolver')
context.run(model_resolver)

In [None]:
evaluator = tfx.components.Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],
    eval_config=eval_config)
context.run(evaluator)

In [None]:
context.show(evaluator.outputs['evaluation'])

In [None]:
# Get the TFMA output result path and load the result.
PATH_TO_RESULT = evaluator.outputs['evaluation'].get()[0].uri
tfma_result = tfma.load_eval_result(PATH_TO_RESULT)

# Show data sliced along feature column trip_start_hour.
tfma.view.render_slicing_metrics(
    tfma_result, slicing_column='sex')

In [None]:
blessing_uri = evaluator.outputs.blessing.get()[0].uri
!ls -l {blessing_uri}

In [None]:
PATH_TO_RESULT = evaluator.outputs['evaluation'].get()[0].uri
print(tfma.load_validation_result(PATH_TO_RESULT))

## 6. Serving

In [None]:
pusher = tfx.components.Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=tfx.proto.PushDestination(
        filesystem=tfx.proto.PushDestination.Filesystem(
            base_directory=SERVING_MODEL_DIR)))
context.run(pusher)

In [None]:
push_uri = pusher.outputs.pushed_model.get()[0].uri
model = tf.saved_model.load(push_uri)

for item in model.signatures.items():
  print(item)