In [0]:
spark.sql("USE CATALOG yogurt")
spark.sql("CREATE SCHEMA IF NOT EXISTS quality")
spark.sql("USE SCHEMA quality")

display(spark.sql("SELECT current_catalog() AS catalog, current_database() AS schema"))


In [0]:
%sql
CREATE TABLE IF NOT EXISTS dq_pk_config (
  table_name STRING,
  pk_cols    STRING      
) USING DELTA;

CREATE TABLE IF NOT EXISTS dq_fk_config (
  child_table  STRING,
  child_cols   STRING,   
  parent_table STRING,
  parent_cols  STRING    
) USING DELTA;

CREATE TABLE IF NOT EXISTS dq_runs (
  run_id     BIGINT,
  run_ts     TIMESTAMP,
  triggered_by STRING
) USING DELTA;

CREATE TABLE IF NOT EXISTS dq_checks (
  run_id      BIGINT,
  layer       STRING,        
  table_name  STRING,
  check_name  STRING,
  status      STRING,        
  measured_value STRING,
  expected_expr  STRING,
  details     STRING,
  checked_ts  TIMESTAMP
) USING DELTA;

CREATE TABLE IF NOT EXISTS dq_table_metrics (
  run_id     BIGINT,
  layer      STRING,         
  table_name STRING,
  row_count  BIGINT,
  max_date   DATE,           
  collected_ts TIMESTAMP
) USING DELTA;


In [0]:

spark.sql("TRUNCATE TABLE quality.dq_pk_config")
spark.sql("""INSERT INTO quality.dq_pk_config VALUES
  ('region'  ,'r_regionkey'),
  ('nation'  ,'n_nationkey'),
  ('part'    ,'p_partkey'),
  ('supplier','s_suppkey'),
  ('partsupp','ps_partkey,ps_suppkey'),
  ('customer','c_custkey'),
  ('orders'  ,'o_orderkey'),
  ('lineitem','l_orderkey,l_linenumber')
""")

spark.sql("TRUNCATE TABLE quality.dq_fk_config")
spark.sql("""INSERT INTO quality.dq_fk_config VALUES
  ('nation'  ,'n_regionkey'      ,'region'  ,'r_regionkey'),
  ('supplier','s_nationkey'      ,'nation'  ,'n_nationkey'),
  ('customer','c_nationkey'      ,'nation'  ,'n_nationkey'),
  ('orders'  ,'o_custkey'        ,'customer','c_custkey'),
  ('lineitem','l_orderkey'       ,'orders'  ,'o_orderkey'),
  ('lineitem','l_partkey'        ,'part'    ,'p_partkey'),
  ('lineitem','l_suppkey'        ,'supplier','s_suppkey'),
  ('partsupp','ps_partkey'       ,'part'    ,'p_partkey'),
  ('partsupp','ps_suppkey'       ,'supplier','s_suppkey')
""")
print("Config loaded.")


In [0]:

import time
run_id = int(time.time())
spark.sql(f"INSERT INTO quality.dq_runs VALUES ({run_id}, current_timestamp(), 'Person C')")

bronze = "yogurt.bronze"
silver = "yogurt.silver"

tables = ["region","nation","part","supplier","partsupp","customer","orders","lineitem"]

for t in tables:
    bc = spark.sql(f"SELECT COUNT(*) c FROM {bronze}.{t}").collect()[0][0]
    sc = spark.sql(f"SELECT COUNT(*) c FROM {silver}.{t}").collect()[0][0]
    status = "PASS" if bc == sc else "FAIL"
    spark.sql(f"""
      INSERT INTO quality.dq_checks
      VALUES ({run_id}, 'silver', '{t}', 'row_count_matches_bronze', '{status}',
              '{sc}', 'silver_count == bronze_count', 'bronze={bc}, silver={sc}', current_timestamp())
    """)

pk_cfg = {r['table_name']: r['pk_cols'] for r in spark.table("quality.dq_pk_config").collect()}

for t, pk in pk_cfg.items():
    cols = [c.strip() for c in pk.split(",")]
    null_pred = " OR ".join([f"{c} IS NULL" for c in cols])
    nulls = spark.sql(f"SELECT COUNT(*) c FROM {silver}.{t} WHERE {null_pred}").collect()[0][0]
    status = "PASS" if nulls == 0 else "FAIL"
    spark.sql(f"""
      INSERT INTO quality.dq_checks
      VALUES ({run_id}, 'silver', '{t}', 'pk_not_null', '{status}',
              '{nulls}', '{pk} IS NOT NULL', 'null_pk={nulls}', current_timestamp())
    """)
    group_cols = ", ".join(cols)
    dup = spark.sql(f"""
      SELECT COUNT(*) c FROM (
        SELECT {group_cols}, COUNT(*) cnt
        FROM {silver}.{t}
        GROUP BY {group_cols}
        HAVING COUNT(*) > 1
      )
    """).collect()[0][0]
    status = "PASS" if dup == 0 else "FAIL"
    spark.sql(f"""
      INSERT INTO quality.dq_checks
      VALUES ({run_id}, 'silver', '{t}', 'pk_unique', '{status}',
              '{dup}', 'COUNT(GROUP BY {pk}) <= 1', 'duplicate_pk_groups={dup}', current_timestamp())
    """)

fk_rows = spark.table("quality.dq_fk_config").collect()
for r in fk_rows:
    child, child_cols, parent, parent_cols = r['child_table'], r['child_cols'], r['parent_table'], r['parent_cols']
    c_cols = [c.strip() for c in child_cols.split(",")]
    p_cols = [c.strip() for c in parent_cols.split(",")]
    on_cond = " AND ".join([f"c.{cc} = p.{pp}" for cc, pp in zip(c_cols, p_cols)])
    orphans = spark.sql(f"""
      SELECT COUNT(*) c FROM {silver}.{child} c
      LEFT ANTI JOIN {silver}.{parent} p ON {on_cond}
    """).collect()[0][0]
    status = "PASS" if orphans == 0 else "FAIL"
    spark.sql(f"""
      INSERT INTO quality.dq_checks
      VALUES ({run_id}, 'silver', '{child}', 'fk_{child}_{parent}', '{status}',
              '{orphans}', 'no orphans vs {parent}', 'orphans={orphans}', current_timestamp())
    """)

date_cols = {'orders':'o_orderdate', 'lineitem':'l_shipdate'}
for t in tables:
    cnt = spark.sql(f"SELECT COUNT(*) c FROM {silver}.{t}").collect()[0][0]
    if t in date_cols:
        mx = spark.sql(f"SELECT MAX({date_cols[t]}) d FROM {silver}.{t}").collect()[0][0]
        mx_val = f"DATE('{mx}')" if mx is not None else "NULL"
        spark.sql(f"INSERT INTO quality.dq_table_metrics VALUES ({run_id}, 'silver', '{t}', {cnt}, {mx_val}, current_timestamp())")
    else:
        spark.sql(f"INSERT INTO quality.dq_table_metrics VALUES ({run_id}, 'silver', '{t}', {cnt}, NULL, current_timestamp())")

print(f"Completed DQ run {run_id}.")


In [0]:
%sql
CREATE OR REPLACE VIEW vw_dq_latest_run AS
SELECT run_id
FROM quality.dq_runs
ORDER BY run_ts DESC
LIMIT 1;

CREATE OR REPLACE VIEW vw_dq_checks_latest AS
SELECT c.*
FROM quality.dq_checks c
JOIN vw_dq_latest_run r ON c.run_id = r.run_id;

CREATE OR REPLACE VIEW vw_dq_failures_latest AS
SELECT *
FROM vw_dq_checks_latest
WHERE status = 'FAIL';

CREATE OR REPLACE VIEW vw_dq_summary_latest AS
SELECT table_name,
       SUM(CASE WHEN status='PASS' THEN 1 ELSE 0 END) AS pass_cnt,
       SUM(CASE WHEN status='FAIL' THEN 1 ELSE 0 END) AS fail_cnt
FROM vw_dq_checks_latest
GROUP BY table_name
ORDER BY fail_cnt DESC, table_name;

CREATE OR REPLACE VIEW vw_dq_kpi_trend AS
SELECT run_id,
       SUM(CASE WHEN status='PASS' THEN 1 ELSE 0 END) AS passes,
       SUM(CASE WHEN status='FAIL' THEN 1 ELSE 0 END) AS fails,
       ROUND(100.0 * SUM(CASE WHEN status='PASS' THEN 1 ELSE 0 END) / COUNT(*), 2) AS pass_rate_pct,
       MAX(r.run_ts) AS run_ts
FROM quality.dq_checks c
JOIN quality.dq_runs r USING(run_id)
GROUP BY run_id
ORDER BY run_ts DESC;

CREATE OR REPLACE VIEW vw_rowcount_trend AS
SELECT run_id, table_name, row_count, max_date, collected_ts
FROM quality.dq_table_metrics
ORDER BY collected_ts DESC, table_name;


In [0]:
%sql
SELECT * FROM vw_dq_summary_latest;

SELECT * FROM vw_dq_failures_latest;

SELECT * FROM vw_dq_kpi_trend ORDER BY run_ts DESC LIMIT 10;

SELECT * FROM vw_rowcount_trend ORDER BY collected_ts DESC LIMIT 80;
