In [0]:
from pyspark.sql.functions import col, lit, count, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from datetime import datetime

# Load Silver table
df = spark.table("churn_catalog.processed.customer_profiles")

issues = []

#CHECK NULL VALUES
null_counts = df.select(
    [count(when(col(c).isNull(), c)).alias(c) for c in df.columns]
).collect()[0].asDict()

for column, null_value in null_counts.items():
    if null_value > 0:
        issues.append((column, "NULL_VALUES", null_value))

# CHECK DUPLICATE CUSTOMER IDs
duplicates = df.groupBy("customer_id").count().filter(col("count") > 1).count()
if duplicates > 0:
    issues.append(("customer_id", "DUPLICATES_FOUND", duplicates))

# CHECK INVALID AGE
invalid_age = df.filter((col("age") <= 0) | (col("age") >= 80)).count()
if invalid_age > 0:
    issues.append(("age", "INVALID_RANGE", invalid_age))

# CHECK NEGATIVE BALANCE & SALARY
negative_balance = df.filter(col("balance") < 0).count()
if negative_balance > 0:
    issues.append(("balance", "NEGATIVE_VALUES", negative_balance))

negative_salary = df.filter(col("estimated_salary") < 0).count()
if negative_salary > 0:
    issues.append(("estimated_salary", "NEGATIVE_VALUES", negative_salary))

# DEFINE EMPTY SCHEMA (IMPORTANT)
log_schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("column_name", StringType(), True),
    StructField("issue_type", StringType(), True),
    StructField("count", IntegerType(), True),
])

#  CREATE LOG DATAFRAME
if issues:
    log_df = spark.createDataFrame(
        [(datetime.now(), col_name, issue_type, count) 
         for col_name, issue_type, count in issues],
        log_schema
    )
else:
    log_df = spark.createDataFrame([], log_schema)  # EMPTY DATAFRAME

# Show output
display(log_df)

# WRITE LOG TABLE
log_df.write.format("delta").mode("append").saveAsTable(
    "churn_catalog.logs.data_quality"
)


# DATA QUALITY SUMMARY

In [0]:
print("DATA QUALITY SUMMARY")
print(f"Total rows checked: {df.count()}")
print(f"Null issues found: {sum([1 for i in issues if i[1]=='NULL_VALUES'])}")
print(f"Duplicate customer_id count: {duplicates}")
print(f"Invalid age rows: {invalid_age}")
print(f"Negative balance rows: {negative_balance}")
print(f"Negative salary rows: {negative_salary}")
