In [30]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))

TensorFlow version: 2.7.1
TFX version: 1.6.1


In [31]:
import os

# Create two pipelines. One for schema generation and one for validation.
SCHEMA_PIPELINE_NAME = "scooter-tfdv-schema"
PIPELINE_NAME = "scooter-tfdv"

# Output directory to store artifacts generated from the pipeline.
SCHEMA_PIPELINE_ROOT = os.path.join('pipelines', SCHEMA_PIPELINE_NAME)
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
SCHEMA_METADATA_PATH = os.path.join('metadata', SCHEMA_PIPELINE_NAME,
                                    'metadata.db')
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

In [32]:
DATA_ROOT = os.path.join(os.getcwd(), "data", "2019") # directory where training data is stored
TEST_DATA_ROOT = os.path.join(os.getcwd(), "data", "2020") # directory where test data is stored
data_filepath = os.path.join(DATA_ROOT, "E-Scooter_Trips_-_2019.csv")

In [33]:
!head {data_filepath}

Trip ID,Start Time,End Time,Trip Distance,Trip Duration,Accuracy,Start Census Tract,End Census Tract,Start Community Area Number,End Community Area Number,Start Community Area Name,End Community Area Name,Start Centroid Latitude,Start Centroid Longitude,Start Centroid Location,End Centroid Latitude,End Centroid Longitude,End Centroid Location
2b66b89f-9cd2-4f4f-a41b-5827bed5a9bb,09/17/2019 02:00:00 PM,09/17/2019 02:00:00 PM,0,30,0,,,,,,,,,,,,
2254578e-5ba6-41a6-9220-6c9f70e301b1,06/28/2019 09:00:00 PM,06/28/2019 10:00:00 PM,3000,945,3000,,,,,,,,,,,,
59284b84-71e5-451c-ac72-1316bdada872,07/13/2019 09:00:00 AM,07/13/2019 09:00:00 AM,2000,740,2000,,,,,,,,,,,,
701f3d5e-d18d-40b3-86aa-bb221fec66cf,08/29/2019 02:00:00 PM,08/29/2019 02:00:00 PM,0,447,0,,,,,,,,,,,,
7c423a03-3351-5425-b2f9-c6b227edbe69,06/21/2019 08:00:00 PM,06/21/2019 08:00:00 PM,2882,18,1,,,,,,,,,,,,
9dc1a7f4-4c7e-41d5-a688-422ddf59c0c9,07/13/2019 07:00:00 PM,07/13/2019 07:00:00 PM,0,446,0,,,,,,,,,,,,
12def531-1266-5326-a191-

In [34]:
from tfx.proto import example_gen_pb2
def _create_schema_pipeline(pipeline_name: str,
                            pipeline_root: str,
                            data_root: str,
                            metadata_path: str) -> tfx.dsl.Pipeline:
    """Creates a pipeline for schema generation."""
    # Brings data into the pipeline.
    output_config = tfx.proto.Output(
        split_config=example_gen_pb2.SplitConfig(splits=[
            tfx.proto.SplitConfig.Split(name='train', hash_buckets=1)
        ])
    )
    example_gen = tfx.components.CsvExampleGen(input_base=data_root, output_config=output_config)

    # Computes statistics over data for visualization and schema generation.
    statistics_gen = tfx.components.StatisticsGen(
      examples=example_gen.outputs['examples'])

    # Generates schema based on the generated statistics.
    schema_gen = tfx.components.SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

    components = [
      example_gen,
      statistics_gen,
      schema_gen,
    ]

    return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

In [35]:
tfx.orchestration.LocalDagRunner().run(
  _create_schema_pipeline(
      pipeline_name=SCHEMA_PIPELINE_NAME,
      pipeline_root=SCHEMA_PIPELINE_ROOT,
      data_root=DATA_ROOT,
      metadata_path=SCHEMA_METADATA_PATH))

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "metada

In [36]:
from ml_metadata.proto import metadata_store_pb2
from tfx.orchestration.portable.mlmd import execution_lib

def get_latest_artifacts(metadata, pipeline_name, component_id):
    """Output artifacts of the latest run of the component."""
    context = metadata.store.get_context_by_type_and_name(
      'node', f'{pipeline_name}.{component_id}')
    executions = metadata.store.get_executions_by_context(context.id)
    latest_execution = max(executions,
                         key=lambda e:e.last_update_time_since_epoch)
    return execution_lib.get_artifacts_dict(metadata, latest_execution.id,
                                          [metadata_store_pb2.Event.OUTPUT])

from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
    """Visualizes artifacts using standard visualization modules."""
    for artifact in artifacts:
        visualization = visualizations.get_registry().get_visualization(
            artifact.type_name)
    if visualization:
        visualization.display(artifact)

from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()

In [37]:
# Non-public APIs, just for showcase.
from tfx.orchestration.metadata import Metadata
from tfx.types import standard_component_specs

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    SCHEMA_METADATA_PATH)

with Metadata(metadata_connection_config) as metadata_handler:
    stat_gen_output = get_latest_artifacts(metadata_handler, SCHEMA_PIPELINE_NAME,
                                         'StatisticsGen')
    stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]

    schema_gen_output = get_latest_artifacts(metadata_handler,
                                           SCHEMA_PIPELINE_NAME, 'SchemaGen')
    schema_artifacts = schema_gen_output[standard_component_specs.SCHEMA_KEY]

INFO:absl:MetadataStore with DB connection initialized


In [38]:
visualize_artifacts(stats_artifacts)

In [39]:
visualize_artifacts(schema_artifacts)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'End Time',BYTES,required,,-
'Start Time',BYTES,required,,-
'Trip ID',BYTES,required,,-
'End Centroid Location',BYTES,required,,-
'End Community Area Name',STRING,required,,'End Community Area Name'
'Start Centroid Location',BYTES,required,,-
'Start Community Area Name',STRING,required,,'Start Community Area Name'
'Accuracy',INT,required,,-
'End Census Tract',INT,required,,-
'End Centroid Latitude',FLOAT,required,,-


Unnamed: 0_level_0,Values
Domain,Unnamed: 1_level_1
'End Community Area Name',"'ALBANY PARK', 'ARCHER HEIGHTS', 'ARMOUR SQUARE', 'AUSTIN', 'AVONDALE', 'BELMONT CRAGIN', 'BRIDGEPORT', 'BRIGHTON PARK', 'CHICAGO LAWN', 'DUNNING', 'EAST GARFIELD PARK', 'GARFIELD RIDGE', 'HERMOSA', 'HUMBOLDT PARK', 'IRVING PARK', 'JEFFERSON PARK', 'KENWOOD', 'LAKE VIEW', 'LINCOLN PARK', 'LINCOLN SQUARE', 'LOGAN SQUARE', 'LOOP', 'LOWER WEST SIDE', 'MCKINLEY PARK', 'MONTCLARE', 'NEAR NORTH SIDE', 'NEAR SOUTH SIDE', 'NEAR WEST SIDE', 'NEW CITY', 'NORTH CENTER', 'NORTH LAWNDALE', 'NORWOOD PARK', 'PORTAGE PARK', 'SOUTH LAWNDALE', 'UPTOWN', 'WEST ENGLEWOOD', 'WEST GARFIELD PARK', 'WEST RIDGE', 'WEST TOWN', 'WOODLAWN'"
'Start Community Area Name',"'ARMOUR SQUARE', 'AUSTIN', 'AVONDALE', 'BELMONT CRAGIN', 'BRIDGEPORT', 'BRIGHTON PARK', 'DUNNING', 'EAST GARFIELD PARK', 'GAGE PARK', 'HERMOSA', 'HUMBOLDT PARK', 'IRVING PARK', 'LINCOLN PARK', 'LOGAN SQUARE', 'LOOP', 'LOWER WEST SIDE', 'MONTCLARE', 'NEAR NORTH SIDE', 'NEAR SOUTH SIDE', 'NEAR WEST SIDE', 'NEW CITY', 'NORTH CENTER', 'NORTH LAWNDALE', 'OHARE', 'PORTAGE PARK', 'SOUTH LAWNDALE', 'UPTOWN', 'WEST GARFIELD PARK', 'WEST TOWN', 'WOODLAWN'"


In [53]:
import shutil

_schema_filename = 'schema.pbtxt'
SCHEMA_PATH = 'schema'

os.makedirs(SCHEMA_PATH, exist_ok=True)
_generated_path = os.path.join(schema_artifacts[0].uri, _schema_filename)

# Copy the schema to a predefined path.
shutil.copy(_generated_path, SCHEMA_PATH)

UPDATED_SCHEMA_PATH = 'updated_schema'

In [54]:
print(f'Schema at {UPDATED_SCHEMA_PATH}-----')
!cat {UPDATED_SCHEMA_PATH}/*

Schema at updated_schema-----
feature {
  name: "End Time"
  type: BYTES
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "Start Time"
  type: BYTES
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "Trip ID"
  type: BYTES
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "End Centroid Location"
  type: BYTES
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}
feature {
  name: "End Community Area Name"
  type: BYTES
  domain: "End Community Area Name"
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}
feature {
  name: "Start Centroid Location"
  type: BYTES
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}
feature {
  name: "Start Community Area Name"
  type: BYTES
  domain: "Start Community Area Name"
  presence {
    min_fraction: 1.0
    min_count: 1
  

In [73]:
TEST_DATA_ROOT = os.path.join(os.getcwd(), "data", "2020")

In [74]:
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     schema_path: str, metadata_path: str) -> tfx.dsl.Pipeline:
    """Creates a pipeline using predefined schema with TFX."""
    # Brings data into the pipeline.
    output_config = tfx.proto.Output(
        split_config=example_gen_pb2.SplitConfig(splits=[
            tfx.proto.SplitConfig.Split(name='test', hash_buckets=1)
        ])
    )
    example_gen = tfx.components.CsvExampleGen(input_base=data_root, output_config=output_config)

    # Computes statistics over data for visualization and example validation.
    statistics_gen = tfx.components.StatisticsGen(
      examples=example_gen.outputs['examples'])

    # Import the schema.
    schema_importer = tfx.dsl.Importer(
      source_uri=schema_path,
      artifact_type=tfx.types.standard_artifacts.Schema).with_id(
          'schema_importer')

    # Performs anomaly detection based on statistics and schema.
    example_validator = tfx.components.ExampleValidator(
      statistics=statistics_gen.outputs['statistics'],
      schema=schema_importer.outputs['result'])

    components = [
      example_gen,
      statistics_gen,
      schema_importer,
      example_validator,
    ]

    return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

In [75]:
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=TEST_DATA_ROOT,
      schema_path=UPDATED_SCHEMA_PATH,
      metadata_path=METADATA_PATH))

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "ExampleValidator"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_validator.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filenam

In [76]:
# connect to anomalies database
metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    METADATA_PATH)

with Metadata(metadata_connection_config) as metadata_handler:
    ev_output = get_latest_artifacts(metadata_handler, PIPELINE_NAME,
                                   'ExampleValidator')
    anomalies_artifacts = ev_output[standard_component_specs.ANOMALIES_KEY]

INFO:absl:MetadataStore with DB connection initialized


In [77]:
visualize_artifacts(anomalies_artifacts)

Unnamed: 0_level_0,Anomaly short description,Anomaly long description
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1
'Accuracy',Column dropped,Column is completely missing
'Trip Distance',Out-of-range values,Unexpectedly large value: 49997.
'End Community Area Name',Unexpected string values,"Examples contain values missing from the schema: ASHBURN (<1%), AUBURN GRESHAM (<1%), AVALON PARK (<1%), BEVERLY (<1%), BURNSIDE (<1%), CALUMET HEIGHTS (<1%), CHATHAM (<1%), CLEARING (<1%), DOUGLAS (<1%), EAST SIDE (<1%), EDGEWATER (~2%), EDISON PARK (<1%), ENGLEWOOD (<1%), FOREST GLEN (<1%), FULLER PARK (<1%), GAGE PARK (<1%), GRAND BOULEVARD (<1%), GREATER GRAND CROSSING (<1%), HEGEWISCH (<1%), HYDE PARK (~2%), MORGAN PARK (<1%), MOUNT GREENWOOD (<1%), NORTH PARK (<1%), OAKLAND (<1%), OHARE (<1%), PULLMAN (<1%), RIVERDALE (<1%), ROGERS PARK (~1%), ROSELAND (<1%), SOUTH CHICAGO (<1%), SOUTH DEERING (<1%), SOUTH SHORE (<1%), WASHINGTON HEIGHTS (<1%), WASHINGTON PARK (<1%), WEST ELSDON (<1%), WEST LAWN (<1%), WEST PULLMAN (<1%)."
'Start Census Tract',Column dropped,Column is completely missing
'End Census Tract',Column dropped,Column is completely missing
'Start Community Area Name',Unexpected string values,"Examples contain values missing from the schema: ALBANY PARK (<1%), ARCHER HEIGHTS (<1%), ASHBURN (<1%), AUBURN GRESHAM (<1%), AVALON PARK (<1%), BEVERLY (<1%), BURNSIDE (<1%), CALUMET HEIGHTS (<1%), CHATHAM (~1%), CHICAGO LAWN (<1%), CLEARING (<1%), DOUGLAS (<1%), EAST SIDE (<1%), EDGEWATER (~2%), EDISON PARK (<1%), ENGLEWOOD (<1%), FOREST GLEN (<1%), FULLER PARK (<1%), GARFIELD RIDGE (<1%), GRAND BOULEVARD (<1%), GREATER GRAND CROSSING (<1%), HEGEWISCH (<1%), HYDE PARK (~2%), JEFFERSON PARK (<1%), KENWOOD (<1%), LAKE VIEW (~15%), LINCOLN SQUARE (~1%), MCKINLEY PARK (<1%), MORGAN PARK (<1%), MOUNT GREENWOOD (<1%), NORTH PARK (<1%), NORWOOD PARK (<1%), OAKLAND (<1%), PULLMAN (<1%), RIVERDALE (<1%), ROGERS PARK (~1%), ROSELAND (<1%), SOUTH CHICAGO (<1%), SOUTH DEERING (<1%), SOUTH SHORE (<1%), WASHINGTON HEIGHTS (<1%), WASHINGTON PARK (<1%), WEST ELSDON (<1%), WEST ENGLEWOOD (<1%), WEST LAWN (<1%), WEST PULLMAN (<1%), WEST RIDGE (<1%)."
'Vendor',New column,New column (column in data but not in schema)
