# Pipeline

In [14]:
import os
import glob
import pandas as pd
import tensorflow_data_validation as tfdv
from tensorflow_metadata.proto.v0 import anomalies_pb2, schema_pb2, statistics_pb2
from google.protobuf import text_format

In [3]:
def get_latest_subdir(path):
    subdirs = [os.path.join(path, d) for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))]
    if not subdirs:
        raise FileNotFoundError(f"No subdirectories found in {path}")
    latest_subdir = max(subdirs, key=os.path.getmtime)
    return latest_subdir

## ExampleGen Outputs

In [4]:
project_root = os.path.dirname(os.path.dirname(os.getcwd()))
artifacts_root = os.path.join(project_root, "Milestone 3", "tfx_pipeline", "artifacts")

#csv_examplegen_dir = get_latest_subdir(os.path.join(artifacts_root, "CsvExampleGen/examples"))
#statisticsgen_dir = get_latest_subdir(os.path.join(artifacts_root, "StatisticsGen/statistics"))
#schemagen_dir = get_latest_subdir(os.path.join(artifacts_root, "SchemaGen/schema"))
#examplevalidator_dir = get_latest_subdir(os.path.join(artifacts_root, "ExampleValidator/anomalies"))

## Statistics

In [20]:
statisticsgen_dir = get_latest_subdir(os.path.join(artifacts_root, "StatisticsGen/statistics"))
#train_stats = tfdv.load_statistics(os.path.join(statisticsgen_dir, "Split-train", "FeatureStats.pb"))
#eval_stats = tfdv.load_statistics(os.path.join(statisticsgen_dir, "Split-eval", "FeatureStats.pb"))

print("Train folder")
statistics_train_pb = statistics_pb2.DatasetFeatureStatisticsList()
with open(os.path.join(statisticsgen_dir, "Split-train", "FeatureStats.pb"), "rb") as f:
    statistics_train_pb.ParseFromString(f.read())

tfdv.visualize_statistics(statistics_train_pb)

print("Eval folder")
statistics_eval_pb = statistics_pb2.DatasetFeatureStatisticsList()
with open(os.path.join(statisticsgen_dir, "Split-eval", "FeatureStats.pb"), "rb") as f:
    statistics_eval_pb.ParseFromString(f.read())

tfdv.visualize_statistics(statistics_eval_pb)

Train folder


Eval folder


## Schema

In [17]:
schemagen_dir = get_latest_subdir(os.path.join(artifacts_root, "SchemaGen/schema"))
schema = tfdv.load_schema_text(os.path.join(schemagen_dir, "schema.pbtxt"))

print("✅ Loaded Schema")
tfdv.display_schema(schema)

✅ Loaded Schema


Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'event_timestamp',STRING,required,,'event_timestamp'
'f0',FLOAT,required,,-
'f1',FLOAT,required,,-
'f10',FLOAT,required,,-
'f100',FLOAT,required,,-
...,...,...,...,...
'f96',FLOAT,required,,-
'f97',FLOAT,required,,-
'f98',FLOAT,required,,-
'f99',FLOAT,required,,-


Unnamed: 0_level_0,Values
Domain,Unnamed: 1_level_1
'event_timestamp','2025-03-27 07:01:04.332726+00:00'


## Check for Anomalies

In [18]:
examplevalidator_dir = get_latest_subdir(os.path.join(artifacts_root, "ExampleValidator/anomalies"))
anomalies = anomalies_pb2.Anomalies()
with open(os.path.join(examplevalidator_dir, "Split-train", "SchemaDiff.pb"), "rb") as f:
    anomalies.ParseFromString(f.read())

tfdv.display_anomalies(anomalies)

## Transform

In [31]:
transform_stats_dir = get_latest_subdir(os.path.join(artifacts_root, "Transform/post_transform_stats"))
transformed_stats = statistics_pb2.DatasetFeatureStatisticsList()
with open(os.path.join(transform_stats_dir, "FeatureStats.pb"), "rb") as f:
    transformed_stats.ParseFromString(f.read())

tfdv.visualize_statistics(transformed_stats)