In [0]:
%run ./00_Setup_Environment

In [0]:
import pyspark.sql.functions as F

In [0]:
%sql
CREATE TABLE IF NOT EXISTS metadata.data_quality_metrics (
    table_name STRING,
    column_name STRING,
    total_rows BIGINT,
    null_count BIGINT,
    null_percentage DOUBLE,
    check_date TIMESTAMP
)

In [0]:
def run_data_quality(table, table_name):
    metrics = []
    total_rows = table.count()
    if total_rows == 0:
        return metrics
    for c in table.columns:
        null_count = table.filter(F.col(c).isNull()).count()
        if null_count > 0:
            metrics.append(
                (
                    table_name,
                    c,
                    total_rows,
                    null_count,
                    round(null_count / total_rows * 100, 2),
                )
            )
    return metrics

In [0]:
spark.sql("USE adventureworks.silver")
schema = 'silver'
silver_table = [table.name for table in spark.catalog.listTables(schema)]

In [0]:
all_metrics = []

for t in silver_table:
    df = spark.sql(f"SELECT * FROM {schema}.{t}")
    metrics = run_data_quality(df, t)
    all_metrics.extend(metrics)

In [0]:
dq_df = spark.createDataFrame(
    all_metrics,
    ["table_name", "column_name", "total_rows", "null_count", "null_percentage"]
).withColumn("check_date", current_timestamp())

In [0]:
dq_df.write.mode("append").saveAsTable("metadata.data_quality_metrics")

In [0]:
threshold_violation = dq_df.filter(col("null_percentage") > 5)

if threshold_violation.count() > 0:
    display(threshold_violation)
    raise Exception("Data Quality FAILED: Null percentage above threshold")

In [0]:
%sql
CREATE TABLE IF NOT EXISTS metadata.pipeline_status (
    pipeline_name STRING,
    status STRING,
    execution_date TIMESTAMP,
    message STRING
)

In [0]:
status = "FAILED" if threshold_violation.count() > 0 else "SUCCESS"

spark.createDataFrame(
    [("silver_quality_check", status, None)],
    ["pipeline_name", "status", "message"]
).withColumn("execution_date", current_timestamp()) \
 .write.mode("append").saveAsTable("metadata.pipeline_status")