In [0]:
df_raw = spark.read.table("retail_agg_ff")
df_raw.show()


In [0]:
# Start from the existing table
# This reads the source data from the retail_agg_ff table, which contains raw retail transactions.
from pyspark.sql.functions import col, last, sum, count, monotonically_increasing_id, lit
from pyspark.sql.types import StringType, LongType

df_raw = spark.read.table("retail_agg_ff")

# Sorter node (descending by STORE_ID and ITEM_DESC)
srt_storeid = df_raw.orderBy(col('STORE_ID').desc(), col('ITEM_DESC').desc())

# Aggregator: Single Agg
aggtrans_single_agg = srt_storeid.groupBy("STORE_ID", "ITEM_DESC").agg(
    last("QTY", ignorenulls=True).alias("QTY"),
    last("PRICE", ignorenulls=True).alias("PRICE"),
    sum(col("QTY")).alias("TOTAL_QTY")
).withColumn("sys_row_id", monotonically_increasing_id())

single_agg_final = aggtrans_single_agg.withColumn("ITEM_DESC", col("ITEM_DESC").cast(StringType())) \
    .withColumn("COUNT_ITEM", lit(None).cast(LongType())) \
    .select(
        col("STORE_ID"),
        col("ITEM_DESC"),
        col("QTY"),
        col("PRICE"),
        col("TOTAL_QTY"),
        col("COUNT_ITEM"),
        col("sys_row_id")
    )

single_agg_final.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("retail_agg_ff_single_agg")

# Aggregator: Nested Agg
aggtrans_nested_agg = df_raw.groupBy("STORE_ID", "ITEM_DESC").agg(
    last("QTY", ignorenulls=True).alias("QTY"),
    last("PRICE", ignorenulls=True).alias("PRICE"),
    count(col("ITEM_DESC")).alias("COUNT_ITEM")
).withColumn("sys_row_id", monotonically_increasing_id())

nested_agg_final = aggtrans_nested_agg.withColumn("ITEM_DESC", col("ITEM_DESC").cast(StringType())) \
    .withColumn("TOTAL_QTY", lit(None).cast(LongType())) \
    .select(
        col("STORE_ID"),
        col("ITEM_DESC"),
        col("QTY"),
        col("PRICE"),
        col("COUNT_ITEM"),
        col("TOTAL_QTY"),
        col("sys_row_id")
    )

nested_agg_final.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("retail_agg_ff_nested_agg")

In [0]:
%python
# Drop the single aggregation table if it exists
spark.sql("DROP TABLE IF EXISTS retail_agg_ff_single_agg")

# Drop the nested aggregation table if it exists
spark.sql("DROP TABLE IF EXISTS retail_agg_ff_nested_agg")

In [0]:
from pyspark.sql.functions import col, sum as _sum, count as _count

def validate_delta_tables(source_df, delta_total_df, delta_count_df):
    print("==== STARTING RECONCILIATION CHECKS ====\n Delta Total_Qty = Single Agg Table\n Delta Count_Item = Nested Agg Table")
    
    # CASE 1: Row Count Check
    source_count = source_df.count()
    total_count = delta_total_df.count()
    count_item_count = delta_count_df.count()
    
    if total_count > 0:
        print(f"PASS: Delta Total_Qty table has {total_count} rows (source had {source_count})")
    else:
        print(f"FAIL: Delta Total_Qty table is empty!")
    
    if count_item_count > 0:
        print(f"PASS: Delta Count_Item table has {count_item_count} rows (source had {source_count})")
    else:
        print(f"FAIL: Delta Count_Item table is empty!")
    
    # CASE 2: Schema Check
    expected_total_cols = ['STORE_ID','ITEM_DESC','QTY','PRICE', 'TOTAL_QTY','COUNT_ITEM','sys_row_id']
    expected_count_cols = ['STORE_ID','ITEM_DESC','QTY','PRICE','COUNT_ITEM', 'TOTAL_QTY', 'sys_row_id']
    
    delta_total_cols = [c.lower() for c in delta_total_df.columns]
    delta_count_cols = [c.lower() for c in delta_count_df.columns]
    expected_total_cols_lower = [c.lower() for c in expected_total_cols]
    expected_count_cols_lower = [c.lower() for c in expected_count_cols]
    

    if delta_total_cols == expected_total_cols_lower:
        print("PASS: Delta Total_Qty schema matches expected columns (case-insensitive)")
    else:
        print(f"FAIL: Delta Total_Qty schema mismatch. Found: {delta_total_cols}")
        
    if delta_count_cols == expected_count_cols_lower:
        print("PASS: Delta Count_Item schema matches expected columns (case-insensitive)")
    else:
        print(f"FAIL: Delta Count_Item schema mismatch. Found: {delta_count_cols}")
    
    # CASE 3: Aggregation correctness (Total_Qty)
    agg_errors = 0
    for row in source_df.select("Store_ID","Item_Desc").distinct().collect():
        s_id = row['Store_ID']
        item = row['Item_Desc']
        source_sum = source_df.filter((col('Store_ID')==s_id) & (col('Item_Desc')==item)).agg(_sum('Qty')).collect()[0][0]
        delta_sum = delta_total_df.filter((col('Store_ID')==s_id) & (col('Item_Desc')==item)).select('Total_Qty').collect()[0][0]
        if source_sum == delta_sum:
            print(f"PASS: Total_Qty correct for Store {s_id}, Item {item} ({delta_sum})")
        else:
            print(f"FAIL: Total_Qty mismatch for Store {s_id}, Item {item}. Source={source_sum}, Delta={delta_sum}")
            agg_errors += 1
    if agg_errors == 0:
        print("PASS: All Total_Qty aggregation checks passed")
    else:
        print(f"FAIL: {agg_errors} Total_Qty aggregation errors found")
    
    # CASE 4: Duplicate Check
    dup_total = delta_total_df.groupBy('Store_ID','Item_Desc').count().filter(col('count')>1).count()
    dup_count = delta_count_df.groupBy('Store_ID','Item_Desc').count().filter(col('count')>1).count()
    
    if dup_total == 0:
        print("PASS: No duplicates in Delta Total_Qty table")
    else:
        print(f"FAIL: {dup_total} duplicate rows in Delta Total_Qty table")
    
    if dup_count == 0:
        print("PASS: No duplicates in Delta Count_Item table")
    else:
        print(f"FAIL: {dup_count} duplicate rows in Delta Count_Item table")
    
    # CASE 5: Null / Invalid Values
    null_total = delta_total_df.filter(col('Store_ID').isNull() | col('Item_Desc').isNull() | col('Qty').isNull() | col('Price').isNull()).count()
    null_count = delta_count_df.filter(col('Store_ID').isNull() | col('Item_Desc').isNull() | col('Qty').isNull() | col('Price').isNull()).count()
    
    if null_total == 0:
        print("PASS: No nulls in key columns of Delta Total_Qty table")
    else:
        print(f"FAIL: {null_total} null rows in Delta Total_Qty table")
    
    if null_count == 0:
        print("PASS: No nulls in key columns of Delta Count_Item table")
    else:
        print(f"FAIL: {null_count} null rows in Delta Count_Item table")
    
    print("==== RECONCILIATION CHECKS COMPLETED ====")
    
# Example usage:
source_df = spark.read.table("retail_agg_ff")
delta_total_df = spark.read.table("retail_agg_ff_single_agg")
delta_count_df =  spark.read.table("retail_agg_ff_nested_agg")

validate_delta_tables(source_df, delta_total_df, delta_count_df)

In [0]:
from pyspark.sql.functions import col, sum as _sum, count as _count, countDistinct
from pyspark.sql.types import IntegerType

def recon_case(case_id, description, condition, fail_msg, severity="FAIL"):
    if condition:
        print(f"❌ {severity} | {case_id} | {description}")
        print(f"    ➜ {fail_msg}\n")
    else:
        print(f"✅ PASS | {case_id} | {description}\n")

def validate_pipeline(source_df, delta_single_df, delta_nested_df):

    print("="*90)
    print("STARTING FULL AUTOMATED RECONCILIATION")
    print("Single Agg = TOTAL_QTY table | Nested Agg = COUNT_ITEM table")
    print("="*90)

    # -------------------------------
    # CASE 1: Empty Load Check
    # -------------------------------
    recon_case(
        "TC_01",
        "Delta tables should not be empty",
        delta_single_df.count() == 0 or delta_nested_df.count() == 0,
        "One or more Delta tables are empty"
    )

    # -------------------------------
    # CASE 2: Schema Validation
    # -------------------------------
    expected_single_cols = ['store_id','item_desc','qty','price','total_qty','count_item','sys_row_id']
    expected_nested_cols = ['store_id','item_desc','qty','price','count_item','total_qty','sys_row_id']

    recon_case(
        "TC_02",
        "Single Agg schema validation",
        sorted([c.lower() for c in delta_single_df.columns]) != sorted(expected_single_cols),
        f"Expected {expected_single_cols}, Found {delta_single_df.columns}"
    )

    recon_case(
        "TC_03",
        "Nested Agg schema validation",
        sorted([c.lower() for c in delta_nested_df.columns]) != sorted(expected_nested_cols),
        f"Expected {expected_nested_cols}, Found {delta_nested_df.columns}"
    )

    # -------------------------------
    # CASE 3: Duplicate Detection
    # -------------------------------
    dup_single = delta_single_df.groupBy("Store_ID","Item_Desc").count().filter(col("count") > 1).count()
    dup_nested = delta_nested_df.groupBy("Store_ID","Item_Desc").count().filter(col("count") > 1).count()

    recon_case(
        "TC_04",
        "Duplicate rows in Single Agg",
        dup_single > 0,
        f"{dup_single} duplicate Store_ID + Item_Desc rows found"
    )

    recon_case(
        "TC_05",
        "Duplicate rows in Nested Agg",
        dup_nested > 0,
        f"{dup_nested} duplicate Store_ID + Item_Desc rows found"
    )

    # -------------------------------
    # CASE 4: Null Critical Columns
    # -------------------------------
    null_single = delta_single_df.filter(
        col("Store_ID").isNull() |
        col("Item_Desc").isNull() |
        col("Qty").isNull() |
        col("Price").isNull()
    ).count()

    recon_case(
        "TC_06",
        "Nulls in critical columns (Single Agg)",
        null_single > 0,
        f"{null_single} rows have NULL critical values"
    )

    # -------------------------------
    # CASE 5: Aggregation Integrity (TOTAL_QTY)
    # -------------------------------
    agg_mismatch = 0
    for r in source_df.select("Store_ID","Item_Desc").distinct().collect():
        s = r["Store_ID"]
        i = r["Item_Desc"]

        src_sum = source_df.filter(
            (col("Store_ID")==s) & (col("Item_Desc")==i)
        ).agg(_sum("Qty")).collect()[0][0]

        tgt_sum = delta_single_df.filter(
            (col("Store_ID")==s) & (col("Item_Desc")==i)
        ).select("Total_Qty").collect()[0][0]

        if src_sum != tgt_sum:
            agg_mismatch += 1

    recon_case(
        "TC_07",
        "TOTAL_QTY aggregation correctness",
        agg_mismatch > 0,
        f"{agg_mismatch} Store/Item combinations have incorrect TOTAL_QTY"
    )

    # -------------------------------
    # CASE 6: COUNT_ITEM correctness (Nested Agg)
    # -------------------------------
    count_mismatch = 0
    for r in source_df.select("Store_ID","Item_Desc").distinct().collect():
        s = r["Store_ID"]
        i = r["Item_Desc"]

        src_cnt = source_df.filter(
            (col("Store_ID")==s) & (col("Item_Desc")==i)
        ).count()

        tgt_cnt = delta_nested_df.filter(
            (col("Store_ID")==s) & (col("Item_Desc")==i)
        ).select("Count_Item").collect()[0][0]

        if src_cnt != tgt_cnt:
            count_mismatch += 1

    recon_case(
        "TC_08",
        "COUNT_ITEM aggregation correctness",
        count_mismatch > 0,
        f"{count_mismatch} Store/Item combinations have incorrect COUNT_ITEM"
    )

    # -------------------------------
    # CASE 7: Completeness (Missing Stores)
    # -------------------------------
    missing_stores = source_df.select("Store_ID").distinct() \
        .subtract(delta_single_df.select("Store_ID").distinct()).count()

    recon_case(
        "TC_09",
        "All Store_IDs must exist in Delta",
        missing_stores > 0,
        f"{missing_stores} Store_IDs missing in Delta tables"
    )

    # -------------------------------
    # CASE 8: Qty Sanity Rule
    # -------------------------------
    invalid_qty = delta_single_df.filter(
        (col("Qty") <= 0) | (col("Qty") > 1000) | (col("Qty").cast(IntegerType()).isNull())
    ).count()

    recon_case(
        "TC_10",
        "Qty sanity rule (1–1000, integer)",
        invalid_qty > 0,
        f"{invalid_qty} rows have invalid Qty values",
        severity="WARN"
    )

    # -------------------------------
    # CASE 9: Price Sanity Rule
    # -------------------------------
    invalid_price = delta_single_df.filter(
        (col("Price") <= 0) | (col("Price") > 100)
    ).count()

    recon_case(
        "TC_11",
        "Price sanity rule (0 < Price ≤ 100)",
        invalid_price > 0,
        f"{invalid_price} rows have unrealistic Price values",
        severity="WARN"
    )

    print("="*90)
    print("RECONCILIATION COMPLETED")
    print("="*90)


source_df = spark.table("retail_agg_ff")
delta_single_df = spark.table("retail_agg_ff_single_agg")
delta_nested_df = spark.table("retail_agg_ff_nested_agg")

validate_pipeline(source_df, delta_single_df, delta_nested_df)
