## TFDV Analysis on Baseline vs Corrupted Data

In [None]:
!pip install --upgrade pip

!pip install \
  tensorflow==2.15.1 \
  tensorflow-metadata==1.15.0 \
  tensorflow-data-validation==1.15.1

Collecting pip
  Downloading pip-26.0-py3-none-any.whl.metadata (4.7 kB)
Downloading pip-26.0-py3-none-any.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 24.1.2
    Uninstalling pip-24.1.2:
      Successfully uninstalled pip-24.1.2
Successfully installed pip-26.0
Collecting tensorflow==2.15.1
  Downloading tensorflow-2.15.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.2 kB)
Collecting tensorflow-metadata==1.15.0
  Downloading tensorflow_metadata-1.15.0-py3-none-any.whl.metadata (2.4 kB)
Collecting tensorflow-data-validation==1.15.1
  Downloading tensorflow_data_validation-1.15.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (18 kB)
Collecting ml-dtypes~=0.3.1 (from tensorflow==2.15.1)
  Downloading ml_dtypes-0.3.2-cp311-cp311-manylinux_2_17_x86_64.manyli

In [None]:
import os

current_dir = os.getcwd()
if 'notebook' in current_dir:
    BASE_DIR = os.path.dirname(current_dir)
else:
    BASE_DIR = current_dir

os.makedirs('data/', exist_ok=True)
os.makedirs('results/', exist_ok=True)

DATA_DIR = os.path.join(BASE_DIR, "data/")
RESULTS_DIR = os.path.join(BASE_DIR, "results/")
TFDV_RESULTS_DIR = os.path.join(RESULTS_DIR, "tfdv_eval/")
os.makedirs(TFDV_RESULTS_DIR, exist_ok=True)

### Imports and Directories

In [None]:
import json
import numpy as np
import pandas as pd

import tensorflow_data_validation as tfdv
from tensorflow_metadata.proto.v0 import anomalies_pb2

BASELINE_CSV = os.path.join(DATA_DIR, "baseline_data.csv")
CORRUPT_DIR = DATA_DIR

### Helper Functions

In [None]:
# text to simple numeric proxies so TFDV can validate text corruption structurally
def add_text_proxies(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    if "text" not in df.columns:
        df["text"] = ""

    s = df["text"].astype(str).fillna("")

    df["text_len"] = s.str.len().astype(np.int64)
    df["token_count"] = s.str.split().str.len().fillna(0).astype(np.int64)
    df["has_empty_text"] = (s.str.len() == 0).astype(np.int64)

    def non_ascii_ratio(text: str) -> float:
        if not text:
            return 0.0
        non_ascii = sum(1 for ch in text if ord(ch) > 127)
        return non_ascii / max(1, len(text))

    df["non_ascii_ratio"] = s.map(non_ascii_ratio).astype(float)

    if "label" in df.columns:
        df["label"] = pd.to_numeric(df["label"], errors="coerce")

    return df

def load_csv(path: str) -> pd.DataFrame:
    df = pd.read_csv(path)
    keep_cols = [c for c in ["text", "label"] if c in df.columns]
    return add_text_proxies(df[keep_cols].copy())

def count_anomalies(anomalies: anomalies_pb2.Anomalies) -> dict:
    counts = {
        "total": 0,
        "type": 0,
        "completeness": 0,
        "drift": 0,
    }

    for _, feat_anom in anomalies.anomaly_info.items():
        for reason in feat_anom.reason:
            counts["total"] += 1

            msg = ((reason.short_description or "") + " " + (reason.description or "")).lower()

            if "type" in msg:
                counts["type"] += 1
            if any(k in msg for k in ["missing", "presence", "completeness"]):
                counts["completeness"] += 1
            if any(k in msg for k in ["drift", "skew", "distribution"]):
                counts["drift"] += 1

    return counts


# severity score for ranking experiments based on heuristics
def anomaly_severity_score(counts: dict) -> float:
    w_type = 0.30
    w_comp = 0.30
    w_drift = 0.40

    raw = (
        w_type * counts["type"] +
        w_comp * counts["completeness"] +
        w_drift * counts["drift"]
    )

    return float(1.0 - np.exp(-raw / 3.0))

In [None]:
print(f"Loading baseline data...")
df_baseline = load_csv(BASELINE_CSV)

base_stats = tfdv.generate_statistics_from_dataframe(df_baseline)
schema = tfdv.infer_schema(base_stats)

for feature in ["text_len", "token_count", "non_ascii_ratio", "has_empty_text"]:
    if feature in schema.feature:
        tfdv.set_domain(schema, feature, schema.feature[feature].type)

schema_path = os.path.join(TFDV_RESULTS_DIR, "baseline_schema.pbtxt")
tfdv.write_schema_text(schema, schema_path)

# Validate corrupted datasets against baseline
results = []

corrupt_files = sorted(
    f for f in os.listdir(CORRUPT_DIR)
    if f.endswith(".csv") and f != "baseline_data.csv"
)

print(f"Found {len(corrupt_files)} corrupted datasets")

for fname in corrupt_files:
    exp_name = fname.replace(".csv", "")
    path = os.path.join(CORRUPT_DIR, fname)

    df_corrupted = load_csv(path)
    cur_stats = tfdv.generate_statistics_from_dataframe(df_corrupted)

    anomalies = tfdv.validate_statistics(
        statistics=cur_stats,
        schema=schema,
        previous_statistics=base_stats
    )

    counts = count_anomalies(anomalies)
    score = anomaly_severity_score(counts)

    # additional data info (outside TFDV)
    base_empty = (df_baseline["has_empty_text"] == 1).mean()
    cur_empty = (df_corrupted["has_empty_text"] == 1).mean()

    results.append({
        "experiment": exp_name,
        "n_rows": int(len(df_corrupted)),
        "n_anomalies_total": counts["total"],
        "type_violations": counts["type"],
        "completeness_related": counts["completeness"],
        "drift_related": counts["drift"],
        "tfdv_severity_score": score,
        "delta_empty_text_rate": float(cur_empty - base_empty),
        "mean_text_len": float(df_corrupted["text_len"].mean()),
        "mean_token_count": float(df_corrupted["token_count"].mean()),
        "mean_non_ascii_ratio": float(df_corrupted["non_ascii_ratio"].mean()),
    })

    anom_path = os.path.join(TFDV_RESULTS_DIR, f"anomalies_{exp_name}.pbtxt")
    tfdv.write_anomalies_text(anomalies, anom_path)

df_results = pd.DataFrame(results).sort_values(
    "tfdv_severity_score", ascending=False
)

out_csv = os.path.join(TFDV_RESULTS_DIR, "tfdv_analysis.csv")
df_results.to_csv(out_csv, index=False)

summary = {
    "n_experiments": int(len(df_results)),
    "mean_severity_score": float(df_results["tfdv_severity_score"].mean()) if len(df_results) else None,
    "median_severity_score": float(df_results["tfdv_severity_score"].median()) if len(df_results) else None,
    "top_3_by_score": df_results[
        ["experiment", "tfdv_severity_score", "n_anomalies_total"]
    ].head(3).to_dict(orient="records"),
    "schema_path": schema_path,
    "results_csv": out_csv,
}

with open(os.path.join(TFDV_RESULTS_DIR, "tfdv_analysis_summary.json"), "w") as f:
    json.dump(summary, f, indent=2)

print("Results saved.")
display(df_results)

del df_baseline
del df_corrupted

Loading baseline data...
Found 9 corrupted datasets
Results saved.


Unnamed: 0,experiment,n_rows,n_anomalies_total,type_violations,completeness_related,drift_related,tfdv_severity_score,delta_empty_text_rate,mean_text_len,mean_token_count,mean_non_ascii_ratio
0,01_missing_text_data,100000,1,1,0,0,0.095163,0.0,132.54091,25.35594,0.002267
1,02_broken_chars_data,100000,1,1,0,0,0.095163,0.0,188.56445,35.8854,0.06269
2,03_swapped_text_data,100000,1,1,0,0,0.095163,0.0,188.56446,35.8854,0.003194
4,05_swapped_labels_data,100000,1,1,0,0,0.095163,0.0,188.56446,35.8854,0.003194
5,06_combined_broken_chars_missing_text_data,100000,1,1,0,0,0.095163,0.0,173.90968,33.13095,0.025244
6,07_combined_swap_text_labels_data,100000,1,1,0,0,0.095163,0.0,188.56446,35.8854,0.003194
8,09_all_corruptions_data,100000,1,1,0,0,0.095163,0.0,179.27971,34.14421,0.020792
3,04_missing_labels_data,100000,2,0,0,0,0.0,0.0,188.56446,35.8854,0.003194
7,08_heavy_missing_data,100000,2,0,0,0,0.0,0.0,142.09817,27.14118,0.002348
