In [1]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))

TensorFlow version: 2.7.0
TFX version: 1.6.0


In [27]:
PROJECT_ROOT = os.path.join(os.environ['HOME'], 'private_workspace/demo-tfx')

DATA_ROOT = os.path.join(PROJECT_ROOT, 'data/penguin-simple')
OUTPUT_ROOT = os.path.join(PROJECT_ROOT, 'output')

SCHEMA_PIPELINE_NAME = "penguin-tfdv-schema"
PIPELINE_NAME = "penguin-tfdv"

SCHEMA_PIPELINE_ROOT = os.path.join(OUTPUT_ROOT, SCHEMA_PIPELINE_NAME, 'pipelines')
SCHEMA_METADATA_PATH = os.path.join(OUTPUT_ROOT, SCHEMA_PIPELINE_NAME, 'metadata', 'metadata.db')
PIPELINE_ROOT = os.path.join(OUTPUT_ROOT, PIPELINE_NAME, 'pipelines')
METADATA_PATH = os.path.join(OUTPUT_ROOT, PIPELINE_NAME, 'metadata', 'metadata.db')
SERVING_MODEL_DIR = os.path.join(OUTPUT_ROOT, PIPELINE_NAME, 'serving_model')

In [39]:
from tfx.proto import example_gen_pb2

def _create_schema_pipeline(
        pipeline_name: str,
        pipeline_root: str,
        data_root: str,
        metadata_path: str
        ) -> tfx.dsl.Pipeline:
    """Creates a pipeline for schema generation."""
    example_gen = tfx.components.CsvExampleGen(input_base=data_root, output_config=None)
    #example_gen = tfx.components.CsvExampleGen(input_base=data_root)

    # NEW: Computes statistics over data for visualization and schema generation.
    statistics_gen = tfx.components.StatisticsGen(examples=example_gen.outputs['examples'])

    # NEW: Generates schema based on the generated statistics.
    schema_gen = tfx.components.SchemaGen(statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

    components = [
      example_gen,
      statistics_gen,
      schema_gen,
    ]

    return tfx.dsl.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(metadata_path),
        components=components
    )


pipeline = _create_schema_pipeline(
    pipeline_name=SCHEMA_PIPELINE_NAME,
    pipeline_root=SCHEMA_PIPELINE_ROOT,
    data_root=DATA_ROOT,
    metadata_path=SCHEMA_METADATA_PATH
)
tfx.orchestration.LocalDagRunner().run(pipeline)

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "/Users

INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 16 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "/Users/lxh/private_workspace/demo-tfx/output/penguin-tfdv-schema/pipelines/CsvExampleGen/examples/16"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25647,xor_checksum:1644998806,sum_checksum:1644998806"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "penguin-tfdv-schema:2022-02-16T18:51:00.842004:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}

INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to /Users/lxh/private_workspace/demo-tfx/output/penguin-tfdv-schema/pipelines/StatisticsGen/statistics/17/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to /Users/lxh/private_workspace/demo-tfx/output/penguin-tfdv-schema/pipelines/StatisticsGen/statistics/17/Split-eval.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 17 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "/Users/lxh/private_workspace/demo-tfx/output/penguin-tfdv-schema/pipelines/StatisticsGen/statistics/17"
custom_properties {
  key: "name"
  value {
    string_value: "penguin-tfdv-schema:2022-02-16T18:51:00.842004:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.


In [41]:
from tfx import v1 as tfx
from tfx.types import standard_component_specs
from ml_metadata.proto import metadata_store_pb2
from tfx.orchestration.metadata import Metadata
from tfx.orchestration.portable.mlmd import execution_lib
from tfx.orchestration.experimental.interactive import visualizations
from tfx.orchestration.experimental.interactive import standard_visualizations

In [45]:
def get_latest_artifacts(metadata, pipeline_name, component_id):
    """Output artifacts of the latest run of the component."""
    context = metadata.store.get_context_by_type_and_name('node', f'{pipeline_name}.{component_id}')
    executions = metadata.store.get_executions_by_context(context.id)
    latest_execution = max(executions, key=lambda e:e.last_update_time_since_epoch)
    return execution_lib.get_artifacts_dict(metadata, latest_execution.id, [metadata_store_pb2.Event.OUTPUT])


def visualize_artifacts(artifacts):
    """Visualizes artifacts using standard visualization modules."""
    for artifact in artifacts:
        visualization = visualizations.get_registry().get_visualization(artifact.type_name)
        if visualization:
            visualization.display(artifact)


standard_visualizations.register_standard_visualizations()

In [46]:
metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(SCHEMA_METADATA_PATH)

with Metadata(metadata_connection_config) as metadata_handler:
    # Find output artifacts from MLMD.
    stat_gen_output = get_latest_artifacts(metadata_handler, SCHEMA_PIPELINE_NAME, 'StatisticsGen')
    stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]

    schema_gen_output = get_latest_artifacts(metadata_handler, SCHEMA_PIPELINE_NAME, 'SchemaGen')
    schema_artifacts = schema_gen_output[standard_component_specs.SCHEMA_KEY]

INFO:absl:MetadataStore with DB connection initialized


In [47]:
visualize_artifacts(stats_artifacts)

In [33]:
visualize_artifacts(schema_artifacts)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'body_mass_g',FLOAT,required,,-
'culmen_depth_mm',FLOAT,required,,-
'culmen_length_mm',FLOAT,required,,-
'flipper_length_mm',FLOAT,required,,-
'species',INT,required,,-


In [34]:
schema_artifacts[0].uri
# ${SCHEMA_PIPELINE_ROOT}/SchemaGen/schema/${max}/schema.pbtxt 

'/Users/lxh/private_workspace/demo-tfx/output/penguin-tfdv-schema/pipelines/SchemaGen/schema/15'

In [49]:
metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)

with Metadata(metadata_connection_config) as metadata_handler:
  ev_output = get_latest_artifacts(metadata_handler, PIPELINE_NAME, 'ExampleValidator')
  anomalies_artifacts = ev_output[standard_component_specs.ANOMALIES_KEY]


INFO:absl:MetadataStore with DB connection initialized


In [50]:
visualize_artifacts(anomalies_artifacts)