In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime

# =====================================================
# Parameters
# =====================================================
run_date = datetime.utcnow().strftime("%Y-%m-%d")

# =====================================================
# Read Bronze tables
# =====================================================
bc  = spark.table("bronze_customers")
bi  = spark.table("bronze_inventory")
bo  = spark.table("bronze_orders")
boi = spark.table("bronze_order_items")
bp  = spark.table("bronze_payments")
br  = spark.table("bronze_returns")

# =====================================================
# Helper: Data Quality Logger
# =====================================================
def log_dq_issue(run_date, layer, table_name, rule_name, issue_df,
                 sample_cols=None, sample_limit=5):

    cnt = issue_df.count()
    if cnt == 0:
        return None

    if not sample_cols:
        sample_cols = issue_df.columns[:10]

    samples = (
        issue_df
        .select(*sample_cols)
        .limit(sample_limit)
        .toJSON()
        .collect()
    )

    sample_json = "[" + ",".join(samples) + "]"

    return spark.createDataFrame([(
        run_date, layer, table_name, rule_name,
        int(cnt), sample_json, datetime.utcnow()
    )], [
        "run_date","layer","table_name",
        "rule_name","issue_count",
        "sample_json","logged_utc_ts"
    ])

# =====================================================
# SILVER: INVENTORY (Products master)
# =====================================================
silver_inventory_raw = (
    bi
    .dropDuplicates(["product_id"])
    .filter(F.col("product_id").isNotNull())
    .withColumn("unit_price", F.col("unit_price").cast("double"))
    .withColumn("current_qty", F.col("current_qty").cast("int"))
    .withColumn("reorder_level", F.col("reorder_level").cast("int"))
    .withColumn("silver_ingest_ts", F.current_timestamp())
)

# Bad inventory: invalid price or qty
bad_inventory = silver_inventory_raw.filter(
    F.col("unit_price").isNull() | (F.col("unit_price") <= 0) |
    F.col("current_qty").isNull() | (F.col("current_qty") < 0)
)

silver_inventory_clean = silver_inventory_raw.join(
    bad_inventory.select("product_id").distinct(),
    "product_id",
    "left_anti"
)


# =====================================================
# SILVER: ORDERS (latest per order)
# =====================================================
w_orders = Window.partitionBy("order_id") \
    .orderBy(F.col("bronze_ingest_ts").desc())

silver_orders_raw = (
    bo
    .withColumn("rn", F.row_number().over(w_orders))
    .filter("rn = 1")
    .drop("rn")
    .withColumn("order_ts", F.to_timestamp("order_ts"))
    .withColumn("order_date", F.to_date("order_ts"))
    .withColumn("silver_ingest_ts", F.current_timestamp())
)

bad_orders = silver_orders_raw.filter(
    F.col("order_id").isNull() |
    F.col("customer_id").isNull() |
    F.col("order_ts").isNull()
)

silver_orders_clean = silver_orders_raw.join(
    bad_orders.select("order_id").distinct(),
    "order_id","left_anti"
)

# =====================================================
# SILVER: ORDER ITEMS (grain = order_item_id)
# =====================================================
w_items = Window.partitionBy("order_item_id") \
    .orderBy(F.col("bronze_ingest_ts").desc())

silver_items_raw = (
    boi
    .withColumn("rn", F.row_number().over(w_items))
    .filter("rn = 1")
    .drop("rn")
    .withColumn("quantity", F.col("quantity").cast("int"))
    .withColumn("unit_price", F.col("unit_price").cast("double"))
    .withColumn("line_total", F.col("line_total").cast("double"))
    .withColumn("silver_ingest_ts", F.current_timestamp())
)

bad_items = silver_items_raw.filter(
    F.col("order_item_id").isNull() |
    F.col("order_id").isNull() |
    F.col("product_id").isNull() |
    (F.col("quantity") <= 0) |
    (F.col("unit_price") <= 0)
)

silver_items_clean = silver_items_raw.join(
    bad_items.select("order_item_id").distinct(),
    "order_item_id",
    "left_anti"
)


# =====================================================
# SILVER: PAYMENTS
# =====================================================
silver_payments_raw = (
    bp
    .dropDuplicates(["payment_id"])
    .filter(F.col("payment_id").isNotNull())
    .withColumn("amount", F.col("amount").cast("double"))
    .withColumn("payment_ts", F.to_timestamp("payment_ts"))
    .withColumn("silver_ingest_ts", F.current_timestamp())
)

bad_payments = silver_payments_raw.filter(
    F.col("order_id").isNull() |
    F.col("amount").isNull() | (F.col("amount") <= 0) |
    F.col("payment_ts").isNull()
)

silver_payments_clean = silver_payments_raw.join(
    bad_payments.select("payment_id").distinct(),
    "payment_id","left_anti"
)

# =====================================================
# SILVER: RETURNS (grain = return_id)
# =====================================================
w_returns = Window.partitionBy("return_id") \
    .orderBy(F.col("bronze_ingest_ts").desc())

silver_returns_raw = (
    br
    .withColumn("rn", F.row_number().over(w_returns))
    .filter("rn = 1")
    .drop("rn")
    .withColumn("refund_amount", F.col("refund_amount").cast("double"))
    .withColumn("return_ts", F.to_timestamp("return_ts"))
    .withColumn("silver_ingest_ts", F.current_timestamp())
)

bad_returns = silver_returns_raw.filter(
    F.col("return_id").isNull() |
    F.col("order_id").isNull() |
    F.col("refund_amount").isNull() |
    (F.col("refund_amount") <= 0)
)

silver_returns_clean = silver_returns_raw.join(
    bad_returns.select("return_id").distinct(),
    "return_id",
    "left_anti"
)


# =====================================================
# SILVER: CUSTOMERS (CURRENT)
# =====================================================
silver_customers_current = (
    bc
    .dropDuplicates(["customer_id"])
    .filter(F.col("customer_id").isNotNull())
    .withColumn(
        "country_clean",
        F.when(F.trim(F.col("country")) == "", "Unknown")
         .otherwise(F.col("country"))
    )
    .withColumn(
        "segment_clean",
        F.when(F.trim(F.col("segment")) == "", "Unknown")
         .otherwise(F.col("segment"))
    )
    .withColumn("silver_ingest_ts", F.current_timestamp())
)

# =====================================================
# SILVER: CUSTOMERS SCD2 (INITIAL)
# =====================================================
silver_customers_scd2 = (
    silver_customers_current
    .select("customer_id","country_clean","segment_clean")
    .withColumnRenamed("country_clean","country")
    .withColumnRenamed("segment_clean","segment")
    .withColumn(
        "attr_hash",
        F.sha2(F.concat_ws("||","country","segment"),256)
    )
    .withColumn("effective_start_ts", F.current_timestamp())
    .withColumn("effective_end_ts", F.lit(None).cast("timestamp"))
    .withColumn("is_current", F.lit(True))
    .withColumn("silver_ingest_ts", F.current_timestamp())
)

# =====================================================
# BUILD DQ ISSUES LOG (SAFE WHEN NO ISSUES)
# =====================================================
dq_entries = []

for e in [
    log_dq_issue(run_date,"silver","inventory","invalid_inventory",bad_inventory),
    log_dq_issue(run_date,"silver","orders","invalid_orders",bad_orders),
    log_dq_issue(run_date,"silver","order_items","invalid_order_items",bad_items),
    log_dq_issue(run_date,"silver","payments","invalid_payments",bad_payments),
    log_dq_issue(run_date,"silver","returns","invalid_refunds",bad_returns)
]:
    if e:
        dq_entries.append(e)

if dq_entries:
    silver_dq_issues = dq_entries[0]
    for d in dq_entries[1:]:
        silver_dq_issues = silver_dq_issues.unionByName(d)
else:
    silver_dq_issues = spark.createDataFrame(
        [],
        schema="""
            run_date string,
            layer string,
            table_name string,
            rule_name string,
            issue_count int,
            sample_json string,
            logged_utc_ts timestamp
        """
    )

# =====================================================
# WRITE SILVER TABLES (OVERWRITE)
# =====================================================
silver_inventory_clean.write.mode("overwrite").format("delta") \
    .saveAsTable("silver_inventory_clean")

silver_orders_clean.write.mode("overwrite").format("delta") \
    .saveAsTable("silver_orders_clean")

silver_items_clean.write.mode("overwrite").format("delta") \
    .saveAsTable("silver_order_items_clean")

silver_payments_clean.write.mode("overwrite").format("delta") \
    .saveAsTable("silver_payments_clean")

silver_returns_clean.write.mode("overwrite").format("delta") \
    .saveAsTable("silver_returns_clean")

silver_customers_current.write.mode("overwrite").format("delta") \
    .saveAsTable("silver_customers_current")

silver_customers_scd2.write.mode("overwrite").format("delta") \
    .saveAsTable("silver_customers_scd2")

# Quarantine
bad_inventory.write.mode("overwrite").format("delta").saveAsTable("silver_inventory_quarantine")
bad_orders.write.mode("overwrite").format("delta").saveAsTable("silver_orders_quarantine")
bad_items.write.mode("overwrite").format("delta").saveAsTable("silver_order_items_quarantine")
bad_payments.write.mode("overwrite").format("delta").saveAsTable("silver_payments_quarantine")
bad_returns.write.mode("overwrite").format("delta").saveAsTable("silver_returns_quarantine")

silver_dq_issues.write.mode("overwrite").format("delta") \
    .option("overwriteSchema","true") \
    .saveAsTable("silver_dq_issues")


print("âœ… Silver layer complete for ALL bronze tables")
