In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

spark.sql("USE CATALOG dq_demo")
spark.sql("USE SCHEMA core")

In [0]:
def profile_table(table_name: str, date_col: str = "created_date"):
    df = spark.table(f"dq_demo.core.{table_name}")
    
    # assume there is a batch_date column; reuse created_date for now
    df = df.withColumn("batch_date", F.col(date_col).cast("date"))
    
    numeric_cols = [
        f.name for f in df.schema.fields
        if isinstance(f.dataType, (T.IntegerType, T.LongType, T.DoubleType, T.FloatType, T.DecimalType))
    ]
    other_cols = [
        f.name for f in df.schema.fields
        if f.name not in numeric_cols and f.name not in ["batch_date"]
    ]

    profiles = []

    for col in numeric_cols + other_cols:
        col_df = (
            df.groupBy("batch_date")
              .agg(
                  F.count(F.lit(1)).alias("row_count"),
                  F.count(F.when(F.col(col).isNull(), 1)).alias("null_count"),
                  F.countDistinct(F.col(col)).alias("distinct_count"),
                  F.min(F.col(col)).cast("string").alias("min_value"),
                  F.max(F.col(col)).cast("string").alias("max_value"),
              )
              .withColumn("null_fraction", F.col("null_count") / F.col("row_count"))
              .withColumn("table_name", F.lit(table_name))
              .withColumn("column_name", F.lit(col))
              .withColumn("profile_ts", F.current_timestamp())
              .select(
                  "table_name", "column_name", "batch_date",
                  "row_count", "null_count", "null_fraction",
                  "distinct_count", "min_value", "max_value",
                  "profile_ts",
              )
        )
        profiles.append(col_df)

    if not profiles:
        return None

    combined = profiles[0]
    for p in profiles[1:]:
        combined = combined.unionByName(p)

    return combined


In [0]:
spark.sql("TRUNCATE TABLE dq_demo.core.dq_profiles")

profiles_tx = profile_table("transactions", date_col="created_date")
profiles_tx.write.mode("append").format("delta").saveAsTable("dq_demo.core.dq_profiles")


In [0]:
spark.table("dq_demo.core.dq_profiles").where("table_name = 'transactions'").limit(10).show(truncate=False)


In [0]:
# detect_anomalies
from pyspark.sql.window import Window

profiles = spark.table("dq_demo.core.dq_profiles").where(
    (F.col("table_name") == "transactions") &
    (F.col("column_name") == "customer_id")
)

profiles = profiles.orderBy("batch_date")

# Compute rolling median null_fraction as a crude baseline
w = Window.orderBy("batch_date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

profiles_with_baseline = (
    profiles
    .withColumn("median_null_fraction", F.percentile_approx("null_fraction", 0.5).over(w))
    .withColumn("delta_null_fraction", F.col("null_fraction") - F.col("median_null_fraction"))
)

profiles_with_baseline.orderBy("batch_date").show(30, truncate=False)


In [0]:
threshold_abs = 0.1   # absolute 10% nulls
threshold_delta = 0.15  # 15% jump vs median

candidates = profiles_with_baseline.where(
    (F.col("null_fraction") > threshold_abs) &
    (F.col("delta_null_fraction") > threshold_delta)
)

candidates.show(truncate=False)


In [0]:
gt = spark.table("dq_demo.core.dq_ground_truth").where(
    (F.col("table_name") == "transactions") &
    (F.col("column_name") == "customer_id") &
    (F.col("anomaly_type") == "NULL_SPIKE")
).limit(1)

gt_row = gt.collect()[0]
gt_id = gt_row["id"]
gt_date = gt_row["batch_date"]

null_spike_incidents = (
    candidates
    .where(F.col("batch_date") == F.lit(gt_date))
    .withColumn("id", F.expr("uuid()"))
    .withColumn("ground_truth_id", F.lit(gt_id))
    .withColumn("incident_type", F.lit("NULL_SPIKE"))
    .withColumn("severity", F.lit("HIGH"))
    .withColumn(
        "details",
        F.concat(
            F.lit("Null spike detected for transactions.customer_id: "),
            F.lit("null_fraction="), F.col("null_fraction").cast("string"),
            F.lit(", median_null_fraction="), F.col("median_null_fraction").cast("string")
        )
    )
    .withColumn("detection_ts", F.current_timestamp())
    .select(
        "id", "table_name", "column_name", "batch_date",
        "ground_truth_id", "incident_type", "severity",
        "details", "detection_ts"
    )
)

null_spike_incidents.write.mode("append").format("delta").saveAsTable("dq_demo.core.dq_incidents")


In [0]:
spark.table("dq_demo.core.dq_incidents").show(truncate=False)


In [0]:
# AMOUNT_DRIFT detection based on null_fraction for amount is useless,
# so we use row_count and a crude "z-score" on log(amount) via min/max proxy,
# but simplest is to look at extremal values compared to history.

amount_profiles = spark.table("dq_demo.core.dq_profiles").where(
    (F.col("table_name") == "transactions") &
    (F.col("column_name") == "amount")
).orderBy("batch_date")

w_amt = Window.orderBy("batch_date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

amount_with_baseline = (
    amount_profiles
    .withColumn("median_min", F.percentile_approx("min_value", 0.5).over(w_amt))
    .withColumn("median_max", F.percentile_approx("max_value", 0.5).over(w_amt))
    .withColumn("delta_max", F.col("max_value").cast("double") - F.col("median_max").cast("double"))
)

amount_with_baseline.orderBy("batch_date").show(40, truncate=False)
drift_threshold = 100.0  # adjust if needed

amount_candidates = amount_with_baseline.where(
    F.col("delta_max") > drift_threshold
)

amount_candidates.show(truncate=False)

gt_amt = spark.table("dq_demo.core.dq_ground_truth").where(
    (F.col("table_name") == "transactions") &
    (F.col("column_name") == "amount") &
    (F.col("anomaly_type") == "AMOUNT_DRIFT")
).limit(1)

gt_amt_row = gt_amt.collect()[0]
gt_amt_id = gt_amt_row["id"]
gt_amt_date = gt_amt_row["batch_date"]

amount_incidents = (
    amount_candidates
    .where(F.col("batch_date") == F.lit(gt_amt_date))
    .withColumn("id", F.expr("uuid()"))
    .withColumn("ground_truth_id", F.lit(gt_amt_id))
    .withColumn("incident_type", F.lit("AMOUNT_DRIFT"))
    .withColumn("severity", F.lit("MEDIUM"))
    .withColumn(
        "details",
        F.concat(
            F.lit("Amount drift detected for transactions.amount: "),
            F.lit("max_value="), F.col("max_value"),
            F.lit(", median_max="), F.col("median_max")
        )
    )
    .withColumn("detection_ts", F.current_timestamp())
    .select(
        "id", "table_name", "column_name", "batch_date",
        "ground_truth_id", "incident_type", "severity",
        "details", "detection_ts"
    )
)

amount_incidents.write.mode("append").format("delta").saveAsTable("dq_demo.core.dq_incidents")


In [0]:
volume_profiles = spark.table("dq_demo.core.dq_profiles").where(
    (F.col("table_name") == "transactions") &
    (F.col("column_name") == "transaction_id")
).orderBy("batch_date")

w_vol = Window.orderBy("batch_date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

volume_with_baseline = (
    volume_profiles
    .withColumn("median_row_count", F.percentile_approx("row_count", 0.5).over(w_vol))
    .withColumn("drop_ratio", F.col("row_count") / F.col("median_row_count"))
)

volume_with_baseline.orderBy("batch_date").show(40, truncate=False)

drop_threshold = 0.5  # less than half the usual volume

volume_candidates = volume_with_baseline.where(
    F.col("drop_ratio") < drop_threshold
)

volume_candidates.show(truncate=False)


gt_vol = spark.table("dq_demo.core.dq_ground_truth").where(
    (F.col("table_name") == "transactions") &
    (F.col("column_name") == "*ROW_COUNT*") &
    (F.col("anomaly_type") == "VOLUME_DROP")
).limit(1)

gt_vol_row = gt_vol.collect()[0]
gt_vol_id = gt_vol_row["id"]
gt_vol_date = gt_vol_row["batch_date"]

volume_incidents = (
    volume_candidates
    .where(F.col("batch_date") == F.lit(gt_vol_date))
    .withColumn("id", F.expr("uuid()"))
    .withColumn("ground_truth_id", F.lit(gt_vol_id))
    .withColumn("incident_type", F.lit("VOLUME_DROP"))
    .withColumn("severity", F.lit("MEDIUM"))
    .withColumn(
        "details",
        F.concat(
            F.lit("Volume drop detected for transactions row_count: "),
            F.lit("row_count="), F.col("row_count").cast("string"),
            F.lit(", median_row_count="), F.col("median_row_count").cast("string"),
            F.lit(", drop_ratio="), F.col("drop_ratio").cast("string")
        )
    )
    .withColumn("detection_ts", F.current_timestamp())
    .select(
        "id", "table_name", "column_name", "batch_date",
        "ground_truth_id", "incident_type", "severity",
        "details", "detection_ts"
    )
)

volume_incidents.write.mode("append").format("delta").saveAsTable("dq_demo.core.dq_incidents")


In [0]:

spark.table("dq_demo.core.dq_incidents").orderBy("batch_date", "incident_type").show(truncate=False)

In [0]:
%sql
USE CATALOG dq_demo;
USE SCHEMA core;

WITH gt AS (
  SELECT id FROM dq_ground_truth
),
detected AS (
  SELECT DISTINCT ground_truth_id AS id
  FROM dq_incidents
  WHERE ground_truth_id IS NOT NULL
),
agg AS (
  SELECT
    (SELECT COUNT(*) FROM gt) AS total_gt,
    (SELECT COUNT(*) FROM detected) AS detected_gt
)
SELECT
  total_gt,
  detected_gt,
  detected_gt * 1.0 / total_gt AS detection_coverage,
  1.0 - detected_gt * 1.0 / total_gt AS silent_failure_rate
FROM agg;


In [0]:
%sql
SELECT
  i.ground_truth_id,
  gt.table_name,
  gt.column_name,
  gt.batch_date,
  i.incident_type,
  i.detection_ts,
  datediff(SECOND, gt.batch_date, i.detection_ts) AS detection_lag_seconds
FROM dq_incidents i
JOIN dq_ground_truth gt
  ON i.ground_truth_id = gt.id;