In [0]:
# 35_risk_eval_build — Build per-loan risk outputs from features + rules

# ─────────────────────────────────────────────────────────────────────────────
# 1) Parameters
# ─────────────────────────────────────────────────────────────────────────────
dbutils.widgets.text("CATALOG", "reporting_factory_risk_profile")
dbutils.widgets.text("RUN_TAG", "rules@current")   # any free-form string for traceability
catalog = dbutils.widgets.get("CATALOG")
run_tag = dbutils.widgets.get("RUN_TAG")

spark.sql(f"USE CATALOG {catalog}")
spark.sql("CREATE SCHEMA IF NOT EXISTS control")
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")
spark.sql("USE SCHEMA gold")

# ─────────────────────────────────────────────────────────────────────────────
# 2) Guardrails: required inputs exist & non-empty
# ─────────────────────────────────────────────────────────────────────────────
def fail(msg):
    print(f"❌ {msg}")
    raise Exception(msg)

# gold.features must exist and have rows
if not spark._jsparkSession.catalog().tableExists(f"{catalog}.gold.features"):
    fail(f"Missing table: {catalog}.gold.features")

n_feat = spark.sql("SELECT COUNT(*) c FROM gold.features").first().c
if n_feat == 0:
    fail("gold.features is empty.")

# rules table may exist; if not, we’ll default
rules_exists = spark._jsparkSession.catalog().tableExists(f"{catalog}.control.risk_rules")
if not rules_exists:
    print("⚠️ control.risk_rules not found — will default all loans to Medium / 0 points.")
else:
    # Create a convenience view of today's enabled rules
    spark.sql("""
    CREATE OR REPLACE TEMP VIEW tmp_enabled_rules AS
    SELECT rule_id, name, segment, condition_sql, impact_column, impact_value, priority
    FROM control.risk_rules
    WHERE enabled = TRUE
      AND current_date BETWEEN effective_from AND COALESCE(effective_to, DATE '2999-12-31')
    ORDER BY priority
    """)
    n_rules = spark.sql("SELECT COUNT(*) c FROM tmp_enabled_rules").first().c
    print(f"Enabled rules (effective today): {n_rules}")

# ─────────────────────────────────────────────────────────────────────────────
# 3) Build a unified matches SQL (union of all rules against gold.features)
#    Each rule’s condition_sql is written against alias 'f' (as designed earlier).
# ─────────────────────────────────────────────────────────────────────────────
from datetime import datetime
run_id = f"RE_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"  # Risk Eval run id
spark.conf.set("spark.sql.session.timeZone", "UTC")

if rules_exists:
    # Collect rule rows and generate UNION ALL SQL
    rules_pdf = spark.sql("SELECT * FROM tmp_enabled_rules ORDER BY priority").toPandas()
    union_parts = []
    for _, r in rules_pdf.iterrows():
        rid = r["rule_id"]
        impact_col = r["impact_column"]
        impact_val = r["impact_value"]
        prio = int(r["priority"]) if r["priority"] is not None else 0
        cond = r["condition_sql"] or "1=0"  # safety

        union_parts.append(f"""
            SELECT f.loan_id,
                   '{rid}'  AS rule_id,
                   '{impact_col}' AS impact_column,
                   '{impact_val}' AS impact_value,
                   {prio}   AS priority
            FROM {catalog}.gold.features f
            WHERE ({cond})
        """.strip())

    matches_sql = " UNION ALL ".join(union_parts) if union_parts else "SELECT NULL AS loan_id, NULL AS rule_id, NULL AS impact_column, NULL AS impact_value, 0 AS priority WHERE 1=0"
else:
    # No rules — empty matches
    matches_sql = "SELECT NULL AS loan_id, NULL AS rule_id, NULL AS impact_column, NULL AS impact_value, 0 AS priority WHERE 1=0"

# ─────────────────────────────────────────────────────────────────────────────
# 4) Resolve risk_band and risk_points per loan and publish gold.risk_eval
# ─────────────────────────────────────────────────────────────────────────────
resolved_sql = f"""
CREATE OR REPLACE TABLE gold.risk_eval
USING DELTA
AS
WITH matches AS (
  {matches_sql}
),
band AS (
  SELECT loan_id, impact_value AS risk_band
  FROM (
    SELECT loan_id, impact_value, priority,
           ROW_NUMBER() OVER (PARTITION BY loan_id ORDER BY priority DESC) rn
    FROM matches
    WHERE lower(impact_column)='risk_band'
  ) x
  WHERE rn=1
),
points AS (
  SELECT loan_id,
         COALESCE(SUM(CAST(regexp_extract(impact_value,'[-+]?\\d+',0) AS INT)),0) AS risk_points
  FROM matches
  WHERE lower(impact_column)='risk_points'
  GROUP BY loan_id
)
SELECT
  f.loan_id,
  f.borrower_id,
  f.dti,
  f.fico_score,
  f.utilization,
  f.grade,
  f.loan_amount,
  f.interest_rate,
  f.term_months,
  f.issue_date,
  COALESCE(b.risk_band, 'Medium') AS risk_band,
  COALESCE(p.risk_points, 0)      AS risk_points,
  '{run_tag}'                      AS rules_version,
  current_timestamp()              AS evaluated_at
FROM gold.features f
LEFT JOIN band   b ON f.loan_id = b.loan_id
LEFT JOIN points p ON f.loan_id = p.loan_id
"""
spark.sql(resolved_sql)

# ─────────────────────────────────────────────────────────────────────────────
# 5) Lightweight audit log
# ─────────────────────────────────────────────────────────────────────────────
spark.sql("""
CREATE TABLE IF NOT EXISTS control.rule_runs (
  risk_eval_run_id STRING,
  rules_version STRING,
  rule_count INT,
  features_count BIGINT,
  produced_at TIMESTAMP
) USING DELTA
""")

rule_count = spark.sql("SELECT COUNT(*) c FROM tmp_enabled_rules").first().c if rules_exists else 0

spark.sql(f"""
INSERT INTO control.rule_runs
VALUES ('{run_id}', '{run_tag}', {rule_count}, {n_feat}, current_timestamp())
""")

# ─────────────────────────────────────────────────────────────────────────────
# 6) Quick stats & finish
# ─────────────────────────────────────────────────────────────────────────────
out = spark.sql("""
SELECT
  COUNT(*) AS loans_total,
  SUM(CASE WHEN risk_band='High' OR risk_points>=20 THEN 1 ELSE 0 END) AS high_risk_loans,
  ROUND(AVG(dti),1)  AS avg_dti,
  ROUND(AVG(fico_score),0) AS avg_fico
FROM gold.risk_eval
""").first()

print(f"✅ gold.risk_eval built (run_id={run_id}, rules_version={run_tag})")
print(f"   loans_total={out.loans_total}, high_risk_loans={out.high_risk_loans}, avg_dti={out.avg_dti}, avg_fico={out.avg_fico}")
dbutils.jobs.taskValues.set(key="risk_eval_run_id", value=run_id)