# Bronze Parity Validation  
**Notebook:** nb_01_bronze_parity_validation  

## Purpose
This notebook validates that the Sprint 2 Bronze tables in `lh_olist_bronze`  
are an **exact parity copy** of the Sprint 1 baseline tables in `lh_olist_shared`.

This ensures:
- Bronze ingestion pipelines are configured correctly
- No data loss, duplication, or schema drift occurred during copying
- Bronze can be safely treated as the **source of truth** for Silver development

---

## Sprint 2 Bronze Scope

Only the following 9 Bronze tables are **in scope** for Sprint 2:

- br_customers  
- br_geolocation  
- br_order_items  
- br_orders  
- br_payments  
- br_product_category_translation  
- br_products  
- br_reviews  
- br_sellers  

All other tables in the Bronze lakehouse (for example *_DO_NOT_USE)  
are intentionally excluded from Sprint 2 validation.

---

In [1]:
# Configuration
from pyspark.sql import functions as F

BRONZE_LH = "lh_olist_bronze"
SHARED_LH = "lh_olist_shared"
SCHEMA = "dbo"

TABLES = [
    "br_customers",
    "br_geolocation",
    "br_order_items",
    "br_orders",
    "br_payments",
    "br_product_category_translation",
    "br_products",
    "br_reviews",
    "br_sellers"
]

# Rules used for extra checks (distinct key, null key, min/max date)
# Adjust column names only if your actual columns differ.
TABLE_RULES = {
    "br_customers": {"key_col": "customer_id", "date_col": None},
    "br_geolocation": {"key_col": "geolocation_zip_code_prefix", "date_col": None},
    "br_order_items": {"key_col": "order_id", "date_col": "shipping_limit_date"},
    "br_orders": {"key_col": "order_id", "date_col": "order_purchase_timestamp"},
    "br_payments": {"key_col": "order_id", "date_col": None},
    "br_product_category_translation": {"key_col": "product_category_name", "date_col": None},
    "br_products": {"key_col": "product_id", "date_col": None},
    "br_reviews": {"key_col": "review_id", "date_col": "review_creation_date"},
    "br_sellers": {"key_col": "seller_id", "date_col": None},
}

StatementMeta(, 8078e779-313a-4abf-be39-2190e4e5da15, 3, Finished, Available, Finished)

In [2]:
# Helpers
def tbl(lakehouse: str, schema: str, table: str) -> str:
    return f"{lakehouse}.{schema}.{table}"

def safe_collect_one(df):
    rows = df.collect()
    return rows[0] if rows else None

def get_schema_map(full_table_name: str):
    df = spark.table(full_table_name)
    # map: col -> (type, nullable)
    return {f.name: (f.dataType.simpleString(), f.nullable) for f in df.schema.fields}

StatementMeta(, 8078e779-313a-4abf-be39-2190e4e5da15, 4, Finished, Available, Finished)

In [3]:
# Quick existence check - Bronze and Shared have the 9 tables
def list_tables(lakehouse: str, schema: str):
    return set([r["tableName"] for r in spark.sql(f"SHOW TABLES IN {lakehouse}.{schema}").collect()])

bronze_set = list_tables(BRONZE_LH, SCHEMA)
shared_set = list_tables(SHARED_LH, SCHEMA)

missing_bronze = [t for t in TABLES if t not in bronze_set]
missing_shared = [t for t in TABLES if t not in shared_set]

print("Missing in Bronze:", missing_bronze)
print("Missing in Shared:", missing_shared)

assert len(missing_bronze) == 0, f"Bronze is missing tables: {missing_bronze}"
assert len(missing_shared) == 0, f"Shared is missing tables: {missing_shared}"

print("✅ All 9 Sprint 2 Bronze tables exist in both lakehouses.")

StatementMeta(, 8078e779-313a-4abf-be39-2190e4e5da15, 5, Finished, Available, Finished)

Missing in Bronze: []
Missing in Shared: []
✅ All 9 Sprint 2 Bronze tables exist in both lakehouses.


In [5]:
# Schema parity (column name, type, nullable)
schema_diffs = []

for t in TABLES:
    b_name = tbl(BRONZE_LH, SCHEMA, t)
    s_name = tbl(SHARED_LH, SCHEMA, t)

    b_map = get_schema_map(b_name)
    s_map = get_schema_map(s_name)

    all_cols = sorted(set(b_map.keys()) | set(s_map.keys()))
    for c in all_cols:
        b_val = b_map.get(c)
        s_val = s_map.get(c)

        if b_val != s_val:
            schema_diffs.append((t, c, str(b_val), str(s_val)))

from pyspark.sql import types as T

schema = T.StructType([
    T.StructField("table", T.StringType(), True),
    T.StructField("column", T.StringType(), True),
    T.StructField("bronze(type,nullable)", T.StringType(), True),
    T.StructField("shared(type,nullable)", T.StringType(), True),
])

schema_diff_df = spark.createDataFrame(schema_diffs, schema=schema)

if len(schema_diffs) == 0:
    print("✅ Schema parity passed for all 9 tables.")
else:
    print("⚠️ Schema differences found. Review below.")
    display(schema_diff_df.orderBy("table", "column"))

StatementMeta(, 8078e779-313a-4abf-be39-2190e4e5da15, 7, Finished, Available, Finished)

✅ Schema parity passed for all 9 tables.


In [6]:
# Row count parity
rows = []

for t in TABLES:
    b = spark.table(tbl(BRONZE_LH, SCHEMA, t)).count()
    s = spark.table(tbl(SHARED_LH, SCHEMA, t)).count()
    status = "PASS" if b == s else "FAIL"
    rows.append((t, b, s, b - s, status))

count_df = spark.createDataFrame(rows, ["table", "bronze_count", "shared_count", "delta", "status"])
display(count_df.orderBy(F.col("status").desc(), F.abs(F.col("delta")).desc()))

fail_n = count_df.filter(F.col("status") == "FAIL").count()
if fail_n == 0:
    print("✅ GO: Row counts match for all 9 tables.")
else:
    raise Exception(f"❌ NO-GO: {fail_n} tables failed row count parity. Fix copy pipeline before proceeding.")

StatementMeta(, 8078e779-313a-4abf-be39-2190e4e5da15, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 18001d8a-f51c-4846-85e8-0703423eeed4)

✅ GO: Row counts match for all 9 tables.


In [9]:
# Key and date sanity checks
sanity_rows = []

for t in TABLES:
    rules = TABLE_RULES.get(t, {})
    key_col = rules.get("key_col")
    date_col = rules.get("date_col")

    bdf = spark.table(tbl(BRONZE_LH, SCHEMA, t))
    sdf = spark.table(tbl(SHARED_LH, SCHEMA, t))

    # key checks
    b_distinct = b_nulls = s_distinct = s_nulls = None
    if key_col and key_col in bdf.columns and key_col in sdf.columns:
        b_distinct = bdf.select(F.col(key_col)).distinct().count()
        b_nulls = bdf.filter(F.col(key_col).isNull()).count()
        s_distinct = sdf.select(F.col(key_col)).distinct().count()
        s_nulls = sdf.filter(F.col(key_col).isNull()).count()

    # date checks
    b_min = b_max = s_min = s_max = None
    if date_col and date_col in bdf.columns and date_col in sdf.columns:
        b_agg = safe_collect_one(bdf.select(F.min(date_col).alias("mn"), F.max(date_col).alias("mx")))
        s_agg = safe_collect_one(sdf.select(F.min(date_col).alias("mn"), F.max(date_col).alias("mx")))
        b_min, b_max = b_agg["mn"], b_agg["mx"]
        s_min, s_max = s_agg["mn"], s_agg["mx"]

    sanity_rows.append((
        t, key_col, b_distinct, s_distinct, b_nulls, s_nulls,
        date_col, str(b_min) if b_min is not None else None, str(s_min) if s_min is not None else None,
        str(b_max) if b_max is not None else None, str(s_max) if s_max is not None else None
    ))

sanity_df = spark.createDataFrame(
    sanity_rows,
    [
        "table", "key_col", "bronze_distinct_key", "shared_distinct_key", "bronze_null_key", "shared_null_key",
        "date_col", "bronze_min_date", "shared_min_date", "bronze_max_date", "shared_max_date"
    ]
)

display(sanity_df.orderBy("table"))

StatementMeta(, 8078e779-313a-4abf-be39-2190e4e5da15, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2b91f88e-6824-4eb5-b57e-f9bf0880bb3c)

In [10]:
print("✅ Bronze parity validation COMPLETE")
print("Status: APPROVED")
print("Scope: 9 Sprint 2 Bronze tables")
print("Next: Proceed to Silver dev using lh_olist_bronze as source of truth")

StatementMeta(, 8078e779-313a-4abf-be39-2190e4e5da15, 12, Finished, Available, Finished)

✅ Bronze parity validation COMPLETE
Status: APPROVED
Scope: 9 Sprint 2 Bronze tables
Next: Proceed to Silver dev using lh_olist_bronze as source of truth


---

## Lakehouse References

| Layer   | Lakehouse Name       | Role |
|--------|----------------------|------|
| Shared | lh_olist_shared      | Sprint 1 baseline (read-only) |
| Bronze | lh_olist_bronze      | Sprint 2 Bronze source of truth |

---

## Result Interpretation Rules

- ✅ PASS  
  Bronze is a faithful copy of Shared and can be used for Silver development.

- ⚠️ WARN  
  Minor differences detected (for example nullable flags or fingerprint variance).  
  Requires review but does not automatically block progression.

- ❌ FAIL  
  Row count mismatch or missing tables.  
  Bronze ingestion pipeline must be fixed before proceeding.

  ---

## Notes for Reviewers

- This notebook intentionally avoids deep row-level comparison  
  to remain fast, repeatable, and demo-friendly.
- All checks use fully qualified table names to avoid lakehouse attachment ambiguity.
- Validation logic is designed to be extensible for future incremental pipelines.

---

## Bronze Validation Conclusion

All Sprint 2 Bronze tables in `lh_olist_bronze` have been validated against the
Sprint 1 baseline in `lh_olist_shared`.

Validation covered:
- Table existence
- Schema parity
- Row count parity
- Key uniqueness and null checks
- Date range consistency

Bronze is approved as the **source of truth** for Silver development.

---
