In [0]:
# ============================================================
# SILVER → GOLD ETL  (Spark Connect Safe, Optimized)
# ============================================================

from pyspark.sql.functions import (
    col, lit, current_timestamp, current_date,
    to_date, sum as F_sum, countDistinct, max as F_max, when
)
from pyspark.sql import DataFrame
import uuid, traceback

# ---------------- CONFIG ----------------
CATALOG = "leelastestdata"
SCHEMA  = "default"

# Silver tables
SILVER = {
    "loans":        f"{CATALOG}.{SCHEMA}.loans_silver",
    "payments":     f"{CATALOG}.{SCHEMA}.payments_silver",
    "customers":    f"{CATALOG}.{SCHEMA}.customers_silver",
    "bureau":       f"{CATALOG}.{SCHEMA}.bureau_silver",
    "collections":  f"{CATALOG}.{SCHEMA}.collections_silver",
    "recoveries":   f"{CATALOG}.{SCHEMA}.recoveries_silver"
}

# Gold control tables
GOLD_META_TABLE   = f"{CATALOG}.{SCHEMA}.gold_last_processed"
GOLD_RUNLOG_TABLE = f"{CATALOG}.{SCHEMA}.gold_run_log"

GOLD_PARTITION_COL = "load_date"


# ---------------- HELPERS ----------------

def now_ts():
    """Return Python timestamp (Spark Connect safe)."""
    return spark.sql("select current_timestamp() as ts").first().ts


def ensure_gold_control_tables():
    spark.sql(f"USE CATALOG {CATALOG}")
    spark.sql(f"USE SCHEMA {SCHEMA}")

    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {GOLD_META_TABLE} (
        gold_table string,
        last_processed_timestamp timestamp,
        last_updated timestamp
    ) USING DELTA
    """)

    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {GOLD_RUNLOG_TABLE} (
        run_id string,
        gold_table string,
        start_ts timestamp,
        end_ts timestamp,
        status string,
        rows_written long,
        message string
    ) USING DELTA
    """)


def write_gold_runlog(run_id, gold_table, start_ts, end_ts, status, rows_written, message):
    schema = """
      run_id string,
      gold_table string,
      start_ts timestamp,
      end_ts timestamp,
      status string,
      rows_written long,
      message string
    """
    df = spark.createDataFrame(
        [(run_id, gold_table, start_ts, end_ts, status, rows_written, message)],
        schema=schema
    )
    df.write.format("delta").mode("append").saveAsTable(GOLD_RUNLOG_TABLE)


def update_gold_meta(gold_table, ts):
    schema = """
      gold_table string,
      last_processed_timestamp timestamp,
      last_updated timestamp
    """
    df = spark.createDataFrame(
        [(gold_table, ts, now_ts())],
        schema=schema
    )
    spark.sql(f"DELETE FROM {GOLD_META_TABLE} WHERE gold_table = '{gold_table}'")
    df.write.format("delta").mode("append").saveAsTable(GOLD_META_TABLE)


def get_gold_last_processed(gold_table):
    df = spark.sql(f"SELECT last_processed_timestamp FROM {GOLD_META_TABLE} WHERE gold_table='{gold_table}'")
    return df.first().last_processed_timestamp if df.count() else None


def incremental_source_df(silver_table, last_proc_ts):
    """Return incremental slice from silver using load_timestamp."""
    if not spark.catalog.tableExists(silver_table):
        return None
    df = spark.table(silver_table)
    if "load_date" not in df.columns:
        df = df.withColumn("load_date", to_date(col("load_timestamp")))
    if last_proc_ts is None:
        return df
    return df.filter(col("load_timestamp") > lit(last_proc_ts))


# ---------------- GOLD MODEL TRANSFORMS ----------------

def transform_daily_loan_portfolio(loans_df, payments_df, recoveries_df):
    df = loans_df.select("loan_id","principal","roi_annual","emi_amount","status")

    if payments_df is not None:
        p = payments_df.groupBy("loan_id").agg(F_sum("amount").alias("sum_payments"))
        df = df.join(p, "loan_id", "left")
    else:
        df = df.withColumn("sum_payments", lit(0))

    if recoveries_df is not None:
        r = recoveries_df.groupBy("loan_id").agg(F_sum("amount_recovered").alias("sum_recoveries"))
        df = df.join(r, "loan_id", "left")
    else:
        df = df.withColumn("sum_recoveries", lit(0))

    df = df.fillna(0)

    df = df.withColumn("outstanding", col("principal") - col("sum_payments") - col("sum_recoveries"))

    return df.agg(
        countDistinct("loan_id").alias("total_loans"),
        F_sum(when(col("status")=="ACTIVE",1).otherwise(0)).alias("active_loans"),
        F_sum(when(col("status")=="CLOSED",1).otherwise(0)).alias("closed_loans"),
        F_sum("principal").alias("total_principal"),
        F_sum("outstanding").alias("outstanding_principal"),
        (F_sum("roi_annual")/countDistinct("loan_id")).alias("avg_roi"),
        F_sum("emi_amount").alias("total_emi")
    ).withColumn("load_timestamp", current_timestamp()) \
     .withColumn("load_date", current_date())


def transform_customer_360(customers_df, loans_df, payments_df, bureau_df):
    loans_agg = loans_df.groupBy("customer_id").agg(
        countDistinct("loan_id").alias("num_loans"),
        F_sum("principal").alias("total_principal")
    )

    if payments_df is not None:
        pay_agg = payments_df.join(loans_df.select("loan_id","customer_id"), "loan_id", "left") \
            .groupBy("customer_id") \
            .agg(
                F_max("payment_date").alias("last_payment_date"),
                F_sum("amount").alias("total_paid")
            )
    else:
        pay_agg = None

    if bureau_df is not None:
        bureau_agg = bureau_df.groupBy("customer_id").agg(F_sum("score").alias("bureau_score"))
    else:
        bureau_agg = None

    df = customers_df.select(
        "customer_id","full_name","pan","city","state","risk_segment"
    ).dropDuplicates(["customer_id"])

    df = df.join(loans_agg, "customer_id", "left")

    if pay_agg is not None:
        df = df.join(pay_agg, "customer_id", "left")
    else:
        df = df.withColumn("last_payment_date", lit(None)).withColumn("total_paid", lit(0))

    if bureau_agg is not None:
        df = df.join(bureau_agg, "customer_id", "left")
    else:
        df = df.withColumn("bureau_score", lit(None))

    return df.withColumn("load_timestamp", current_timestamp()) \
             .withColumn("load_date", current_date())


def transform_delinquency_buckets(collections_df):
    df = collections_df.withColumn("dpd_bucket",
        when(col("dpd") <= 30, "0-30")
        .when((col("dpd") > 30) & (col("dpd") <= 60), "31-60")
        .when((col("dpd") > 60) & (col("dpd") <= 90), "61-90")
        .otherwise("90+")
    )
    return df.groupBy("dpd_bucket") \
             .agg(
                 countDistinct("loan_id").alias("loan_count"),
                 F_sum("dpd").alias("dpd_sum")
             ).withColumn("load_timestamp", current_timestamp()) \
              .withColumn("load_date", current_date())


def transform_payments_summary(payments_df):
    return payments_df.groupBy("payment_date","status") \
        .agg(
            F_sum("amount").alias("total_amount"),
            countDistinct("payment_id").alias("num_payments")
        ).withColumn("load_timestamp", current_timestamp()) \
         .withColumn("load_date", current_date())


def transform_recoveries_summary(recoveries_df):
    return recoveries_df.groupBy("event_date","channel") \
        .agg(
            F_sum("amount_recovered").alias("total_recovered"),
            countDistinct("recovery_id").alias("num_recoveries")
        ).withColumn("load_timestamp", current_timestamp()) \
         .withColumn("load_date", current_date())


# ---------------- UPSERT WITH OPTIMIZE + ZORDER ----------------
def upsert_gold(gold_df, gold_table_full, business_keys):
    if gold_df is None:
        return 0
    rows = gold_df.count()
    if rows == 0:
        return 0

    # CREATE TABLE IF MISSING
    if not spark.catalog.tableExists(gold_table_full):
        print(f"Creating Gold table {gold_table_full}")
        gold_df.write.format("delta") \
            .mode("overwrite") \
            .option("mergeSchema","true") \
            .partitionBy(GOLD_PARTITION_COL) \
            .saveAsTable(gold_table_full)

        try:
            print(f"OPTIMIZE + ZORDER {gold_table_full}")
            spark.sql(f"OPTIMIZE {gold_table_full} ZORDER BY ({', '.join(business_keys)})")
        except Exception as e:
            print("Optimize skipped:", e)

        return rows

    # MERGE
    gold_df.createOrReplaceTempView("gold_stg")

    on_cond = " AND ".join([f"t.`{k}` = s.`{k}`" for k in business_keys])
    cols = gold_df.columns

    set_clause = ", ".join([f"t.`{c}` = s.`{c}`" for c in cols if c not in business_keys])
    insert_cols = ", ".join([f"`{c}`" for c in cols])
    insert_vals = ", ".join([f"s.`{c}`" for c in cols])

    merge_sql = f"""
        MERGE INTO {gold_table_full} t
        USING gold_stg s
        ON {on_cond}
        WHEN MATCHED THEN UPDATE SET {set_clause}
        WHEN NOT MATCHED THEN INSERT ({insert_cols}) VALUES ({insert_vals})
    """

    spark.sql(merge_sql)

    # OPTIMIZE + ZORDER
    try:
        print(f"OPTIMIZE + ZORDER on {gold_table_full}")
        spark.sql(f"OPTIMIZE {gold_table_full} ZORDER BY ({', '.join(business_keys)})")
    except Exception as e:
        print("Optimize skipped:", e)

    return rows


# ---------------- MAIN ETL ----------------
def gold_etl_run():
    ensure_gold_control_tables()
    run_id = str(uuid.uuid4())
    start_ts = now_ts()

    print(f"\n===== Silver → Gold ETL START (run_id={run_id}) =====\n")

    # 1. Daily Loan Portfolio
    try:
        gold = f"{CATALOG}.{SCHEMA}.daily_loan_portfolio_gold"
        last = get_gold_last_processed(gold)

        loans      = incremental_source_df(SILVER["loans"], last)
        payments   = incremental_source_df(SILVER["payments"], last)
        recoveries = incremental_source_df(SILVER["recoveries"], last)

        if loans is not None and loans.count() > 0:
            df = transform_daily_loan_portfolio(loans, payments, recoveries)
            rows = upsert_gold(df, gold, ["load_date"])
            max_ts = loans.agg(F_max("load_timestamp")).first()[0]
            update_gold_meta(gold, max_ts)
            write_gold_runlog(run_id, gold, start_ts, now_ts(), "SUCCESS", rows, "Portfolio refreshed")
        else:
            write_gold_runlog(run_id, gold, start_ts, now_ts(), "NO_DATA", 0, "No new loans")
    except Exception as e:
        write_gold_runlog(run_id, "daily_loan_portfolio_gold", start_ts, now_ts(), "FAILED", 0, str(e))

    # 2. Customer 360
    try:
        gold = f"{CATALOG}.{SCHEMA}.customer_360_gold"
        last = get_gold_last_processed(gold)

        loans      = incremental_source_df(SILVER["loans"], last)
        customers  = incremental_source_df(SILVER["customers"], last)
        payments   = incremental_source_df(SILVER["payments"], last)
        bureau     = incremental_source_df(SILVER["bureau"], last)

        if loans is not None and loans.count() > 0:
            df = transform_customer_360(customers, loans, payments, bureau)
            rows = upsert_gold(df, gold, ["customer_id"])
            max_ts = loans.agg(F_max("load_timestamp")).first()[0]
            update_gold_meta(gold, max_ts)
            write_gold_runlog(run_id, gold, start_ts, now_ts(), "SUCCESS", rows, "Customer 360 refreshed")
        else:
            write_gold_runlog(run_id, gold, start_ts, now_ts(), "NO_DATA", 0, "No new loans")
    except Exception as e:
        write_gold_runlog(run_id, "customer_360_gold", start_ts, now_ts(), "FAILED", 0, str(e))

    # 3. Delinquency Buckets
    try:
        gold = f"{CATALOG}.{SCHEMA}.delinquency_buckets_gold"
        last = get_gold_last_processed(gold)

        collections = incremental_source_df(SILVER["collections"], last)

        if collections is not None and collections.count() > 0:
            df = transform_delinquency_buckets(collections)
            rows = upsert_gold(df, gold, ["dpd_bucket"])
            max_ts = collections.agg(F_max("load_timestamp")).first()[0]
            update_gold_meta(gold, max_ts)
            write_gold_runlog(run_id, gold, start_ts, now_ts(), "SUCCESS", rows, "Delinquency refreshed")
        else:
            write_gold_runlog(run_id, gold, start_ts, now_ts(), "NO_DATA", 0, "No collections")
    except Exception as e:
        write_gold_runlog(run_id, "delinquency_buckets_gold", start_ts, now_ts(), "FAILED", 0, str(e))

    # 4. Payments Summary
    try:
        gold = f"{CATALOG}.{SCHEMA}.payments_summary_gold"
        last = get_gold_last_processed(gold)

        payments = incremental_source_df(SILVER["payments"], last)

        if payments is not None and payments.count() > 0:
            df = transform_payments_summary(payments)
            rows = upsert_gold(df, gold, ["payment_date","status"])
            max_ts = payments.agg(F_max("load_timestamp")).first()[0]
            update_gold_meta(gold, max_ts)
            write_gold_runlog(run_id, gold, start_ts, now_ts(), "SUCCESS", rows, "Payments summary refreshed")
        else:
            write_gold_runlog(run_id, gold, start_ts, now_ts(), "NO_DATA", 0, "No payments")
    except Exception as e:
        write_gold_runlog(run_id, "payments_summary_gold", start_ts, now_ts(), "FAILED", 0, str(e))

    # 5. Recoveries Summary
    try:
        gold = f"{CATALOG}.{SCHEMA}.recoveries_summary_gold"
        last = get_gold_last_processed(gold)

        recoveries = incremental_source_df(SILVER["recoveries"], last)

        if recoveries is not None and recoveries.count() > 0:
            df = transform_recoveries_summary(recoveries)
            rows = upsert_gold(df, gold, ["event_date","channel"])
            max_ts = recoveries.agg(F_max("load_timestamp")).first()[0]
            update_gold_meta(gold, max_ts)
            write_gold_runlog(run_id, gold, start_ts, now_ts(), "SUCCESS", rows, "Recoveries summary refreshed")
        else:
            write_gold_runlog(run_id, gold, start_ts, now_ts(), "NO_DATA", 0, "No recoveries")
    except Exception as e:
        write_gold_runlog(run_id, "recoveries_summary_gold", start_ts, now_ts(), "FAILED", 0, str(e))

    print(f"\n===== Silver → Gold ETL END (run_id={run_id}) =====")


# Execute the ETL
gold_etl_run()
