# Merged Feature Engineering

## Task Checklist — ALL FEATURE ENGINEERING COMPLETE

1. **Aggregate depth=1 and depth=2 tables** - DONE
2. **Split data into Applicant vs others (num_group1)** - DONE (person_1)
3. **Active and closed contracts** - DONE
4. **Time-windowed aggregations** - DONE
5. **DPD-conditional aggregations** - DONE
6. **Static table features** - DONE
7. **Rolling time-split validation** - DONE
8. **Remove fluctuating features, rank importance** - DONE
9. **Credit bureau A (DANGEROUS tables)** - DONE
10. **Ratio / interaction features** - DONE
11. **other_1 bank account features** - DONE
12. **credit_bureau_b_2 payment DPD** - DONE
13. **tax_registry_b_1 + tax_registry_c_1** - DONE
14. **person_1 (applicant + others) + person_2** - DONE

---



## 1) Setup

In [None]:
from pathlib import Path
import polars as pl
import numpy as np
import pandas as pd
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.metrics import roc_auc_score

# UPDATE THIS PATH to your local data directory
DATA_DIR = Path("home_credit_data/csv_files/train")

TABLES = {
    "base": DATA_DIR / "train_base.csv",
    "static_0_0": DATA_DIR / "train_static_0_0.csv",
    "static_0_1": DATA_DIR / "train_static_0_1.csv",
    "static_cb_0": DATA_DIR / "train_static_cb_0.csv",
    "other_1": DATA_DIR / "train_other_1.csv",
    "person_1": DATA_DIR / "train_person_1.csv",
    "person_2": DATA_DIR / "train_person_2.csv",
    "applprev_1_0": DATA_DIR / "train_applprev_1_0.csv",
    "applprev_1_1": DATA_DIR / "train_applprev_1_1.csv",
    "applprev_2": DATA_DIR / "train_applprev_2.csv",
    "credit_bureau_b_1": DATA_DIR / "train_credit_bureau_b_1.csv",
    "credit_bureau_b_2": DATA_DIR / "train_credit_bureau_b_2.csv",
    "deposit_1": DATA_DIR / "train_deposit_1.csv",
    "tax_registry_a_1": DATA_DIR / "train_tax_registry_a_1.csv",
    "tax_registry_b_1": DATA_DIR / "train_tax_registry_b_1.csv",
    "tax_registry_c_1": DATA_DIR / "train_tax_registry_c_1.csv",
    # credit_bureau_a shards (DANGEROUS — high depth, largest data source)
    "credit_bureau_a_1_0": DATA_DIR / "train_credit_bureau_a_1_0.csv",
    "credit_bureau_a_1_1": DATA_DIR / "train_credit_bureau_a_1_1.csv",
    "credit_bureau_a_1_2": DATA_DIR / "train_credit_bureau_a_1_2.csv",
    "credit_bureau_a_1_3": DATA_DIR / "train_credit_bureau_a_1_3.csv",
    "credit_bureau_a_2_0": DATA_DIR / "train_credit_bureau_a_2_0.csv",
    "credit_bureau_a_2_1": DATA_DIR / "train_credit_bureau_a_2_1.csv",
    "credit_bureau_a_2_2": DATA_DIR / "train_credit_bureau_a_2_2.csv",
    "credit_bureau_a_2_3": DATA_DIR / "train_credit_bureau_a_2_3.csv",
    "credit_bureau_a_2_4": DATA_DIR / "train_credit_bureau_a_2_4.csv",
    "credit_bureau_a_2_5": DATA_DIR / "train_credit_bureau_a_2_5.csv",
    "credit_bureau_a_2_6": DATA_DIR / "train_credit_bureau_a_2_6.csv",
    "credit_bureau_a_2_7": DATA_DIR / "train_credit_bureau_a_2_7.csv",
    "credit_bureau_a_2_8": DATA_DIR / "train_credit_bureau_a_2_8.csv",
    "credit_bureau_a_2_9": DATA_DIR / "train_credit_bureau_a_2_9.csv",
    "credit_bureau_a_2_10": DATA_DIR / "train_credit_bureau_a_2_10.csv",
}

: 

## 2) Load base table

In [None]:
base = pl.read_csv(TABLES["base"])

base_dates = (
    base.select(["case_id", "date_decision"])
    .with_columns(pl.col("date_decision").str.strptime(pl.Date, strict=False))
)

print(f"Base shape: {base.shape}")
print(f"Target rate: {base['target'].mean():.4f}")
base.head()

## 3) Feature building functions

### Method 1: Leak-safe temporal filtering
- Every record has an **event date** (e.g., contract start, last update)
- We compute `age_years = (decision_date - event_date) / 365.25`
- Only records with `age_years >= 0` are used — this ensures we never use future data that wouldn't be available at decision time
- Without this, the model would cheat by seeing outcomes that haven't happened yet

### Method 2: Active vs closed contract classification
- **Active contracts:** maturity/end date is in the future relative to decision date, OR status column indicates active (e.g., status = "A")
- **Closed contracts:** maturity/end date is in the past, OR status indicates closed (e.g., status in "D", "K", "T")
- Separating these captures different risk signals — many active contracts = higher exposure, many closed = established history

### Method 3: DPD (Days Past Due) conditional aggregation
- DPD measures how late a borrower is on payments — the strongest credit risk signal
- We create binary flags: `dpd >= 30` (early delinquency) and `dpd >= 90` (serious delinquency)
- These are crossed with active/closed status to produce counts like "number of active contracts with 30+ days overdue"
- DPD rate features (e.g., `dpd30_count / active_count`) capture delinquency proportion

### Method 4: Time-windowed counts
- Instead of a single total count, we bucket records by recency: **<=1 year**, **1-3 years**, **>3 years**
- Recent events are more predictive than old ones — a DPD in the last year matters more than one 5 years ago
- Custom windows (e.g., 0-0.5y, 0.5-2y, >2y) are tested per table to find the best split
- Each window is crossed with active/closed for granularity

### Method 5: Static table processing (1:1 join)
- Static tables have exactly one row per applicant — no aggregation needed, just join
- **Numeric columns** (suffixes L, P, A, T): cast to float (some load as strings due to mixed types across shards)
- **Date columns** (suffix D): converted to **days since decision** — gives relative temporal features (e.g., "last delinquency was 120 days ago")
- **Categorical columns** (suffix M): dropped — high cardinality, low signal for tree models without encoding
- **Shard handling**: some static tables are split across files (e.g., static_0_0 + static_0_1) with schema mismatches — all columns are cast to String before concatenation, then properly typed

### Method 6: Rolling time-split validation
- Standard k-fold CV doesn't test temporal stability — the key metric for this competition
- We use **3 time cuts** at WEEK_NUM = 50, 60, 70: train on weeks before the cut, validate on weeks after
- This simulates deploying a model and testing if it holds up on future data
- We report mean, std, min, max AUC across cuts — low std = stable model

### Method 7: Incremental block evaluation
- Features are added one block at a time, not all at once
- After adding a block, we re-run the rolling evaluation
- If AUC improves: keep the block. If it drops or stays flat: drop it
- This prevents noisy/redundant features from diluting signal

### Method 8: Credit bureau A processing (DANGEROUS tables)
- **16M rows** (depth-1, contract level) + **188M rows** (depth-2, payment level) across 15 shards
- Data comes from **two credit bureaus** filling alternate paired columns — coalesced to get best available value
- 69 schema mismatches across shards — all columns cast to String before concat, then typed
- **a_1 features (contract level):** active/closed contracts, DPD counts, time windows, PLUS numeric aggregations (max/mean of debt, overdue amounts, outstanding balances)
- **a_2 features (payment level):** per-payment DPD max/mean/counts, overdue amounts — processed shard-by-shard to avoid OOM on 188M rows

### Method 9: Ratio / interaction features
- Raw amounts (income, debt, credit) vary wildly across applicants — ratios normalize them
- **Debt-to-income:** current debt / income — measures leverage, higher = riskier
- **Annuity-to-income:** monthly payment / income — measures affordability
- **Credit utilization:** current debt / credit amount — measures how much of credit line is used
- **DPD installment fraction:** installments with DPD / total installments — measures payment discipline
- **DPD trend:** recent DPD / longer-term DPD — values >1 indicate worsening behavior
- Safe division: returns null when denominator is zero or null, avoiding inf/NaN pollution

In [None]:
def build_contract_features(
    table_path: Path,
    prefix: str,
    base_dates: pl.DataFrame,
    event_date_cols: list[str],
    dpd_col: str | None = None,
    active_flag_expr: pl.Expr | None = None,
    closed_flag_expr: pl.Expr | None = None,
):
    df = pl.read_csv(table_path).join(base_dates, on="case_id", how="left")

    parse_exprs = [pl.col(c).str.strptime(pl.Date, strict=False).alias(c) for c in event_date_cols if c in df.columns]
    cast_exprs = [pl.col(dpd_col).cast(pl.Float64, strict=False).alias(dpd_col)] if (dpd_col is not None and dpd_col in df.columns) else []
    df = df.with_columns(parse_exprs + cast_exprs)

    available_dates = [pl.col(c) for c in event_date_cols if c in df.columns]
    if len(available_dates) > 0:
        df = df.with_columns(pl.coalesce(available_dates).alias("event_date"))
        df = df.with_columns(((pl.col("date_decision") - pl.col("event_date")).dt.total_days() / 365.25).alias("age_years"))
        known_mask = (pl.col("age_years") >= 0)
    else:
        df = df.with_columns([
            pl.lit(None).cast(pl.Date).alias("event_date"),
            pl.lit(None).cast(pl.Float64).alias("age_years"),
        ])
        known_mask = pl.lit(True)

    if active_flag_expr is None:
        active_flag_expr = pl.lit(False)
    if closed_flag_expr is None:
        closed_flag_expr = pl.lit(False)

    df = df.with_columns([
        active_flag_expr.fill_null(False).alias("is_active"),
        closed_flag_expr.fill_null(False).alias("is_closed"),
    ])

    if dpd_col is not None and dpd_col in df.columns:
        df = df.with_columns([
            (pl.col(dpd_col) >= 30).fill_null(False).alias("dpd30"),
            (pl.col(dpd_col) >= 90).fill_null(False).alias("dpd90"),
        ])
    else:
        df = df.with_columns([
            pl.lit(False).alias("dpd30"),
            pl.lit(False).alias("dpd90"),
        ])

    agg = df.group_by("case_id").agg([
        pl.len().alias(f"{prefix}_row_count_all"),
        known_mask.sum().alias(f"{prefix}_known_count"),

        (known_mask & pl.col("is_active")).sum().alias(f"{prefix}_active_count_all"),
        (known_mask & pl.col("is_closed")).sum().alias(f"{prefix}_closed_count_all"),

        (known_mask & pl.col("is_active") & pl.col("dpd30")).sum().alias(f"{prefix}_active_dpd30_count_all"),
        (known_mask & pl.col("is_active") & pl.col("dpd90")).sum().alias(f"{prefix}_active_dpd90_count_all"),
        (known_mask & pl.col("is_closed") & pl.col("dpd30")).sum().alias(f"{prefix}_closed_dpd30_count_all"),
        (known_mask & pl.col("is_closed") & pl.col("dpd90")).sum().alias(f"{prefix}_closed_dpd90_count_all"),

        (known_mask & pl.col("is_active") & (pl.col("age_years") <= 1)).sum().alias(f"{prefix}_active_count_le1y"),
        (known_mask & pl.col("is_closed") & (pl.col("age_years") <= 1)).sum().alias(f"{prefix}_closed_count_le1y"),

        (known_mask & pl.col("is_active") & (pl.col("age_years") > 1) & (pl.col("age_years") <= 3)).sum().alias(f"{prefix}_active_count_1to3y"),
        (known_mask & pl.col("is_closed") & (pl.col("age_years") > 1) & (pl.col("age_years") <= 3)).sum().alias(f"{prefix}_closed_count_1to3y"),

        (known_mask & pl.col("is_active") & (pl.col("age_years") > 3)).sum().alias(f"{prefix}_active_count_gt3y"),
        (known_mask & pl.col("is_closed") & (pl.col("age_years") > 3)).sum().alias(f"{prefix}_closed_count_gt3y"),
    ])

    agg = agg.with_columns([
        pl.when(pl.col(f"{prefix}_active_count_all") > 0)
        .then(pl.col(f"{prefix}_active_dpd30_count_all") / pl.col(f"{prefix}_active_count_all"))
        .otherwise(0.0)
        .alias(f"{prefix}_active_dpd30_rate"),

        pl.when(pl.col(f"{prefix}_closed_count_all") > 0)
        .then(pl.col(f"{prefix}_closed_dpd30_count_all") / pl.col(f"{prefix}_closed_count_all"))
        .otherwise(0.0)
        .alias(f"{prefix}_closed_dpd30_rate"),
    ])

    return agg

In [None]:
def build_contract_features_custom_windows(
    table_path: Path,
    prefix: str,
    base_dates: pl.DataFrame,
    event_date_cols: list[str],
    windows=((0,1),(1,3),(3,999)),
    dpd_col: str | None = None,
    active_flag_expr: pl.Expr | None = None,
    closed_flag_expr: pl.Expr | None = None,
):
    df = pl.read_csv(table_path).join(base_dates, on="case_id", how="left")

    parse_exprs = [pl.col(c).str.strptime(pl.Date, strict=False).alias(c) for c in event_date_cols if c in df.columns]
    cast_exprs = [pl.col(dpd_col).cast(pl.Float64, strict=False).alias(dpd_col)] if (dpd_col is not None and dpd_col in df.columns) else []
    df = df.with_columns(parse_exprs + cast_exprs)

    dates = [pl.col(c) for c in event_date_cols if c in df.columns]
    if len(dates) > 0:
        df = df.with_columns([
            pl.coalesce(dates).alias("event_date"),
            ((pl.col("date_decision") - pl.coalesce(dates)).dt.total_days() / 365.25).alias("age_years"),
        ])
        known = (pl.col("age_years") >= 0)
    else:
        df = df.with_columns([pl.lit(None).cast(pl.Float64).alias("age_years")])
        known = pl.lit(True)

    if active_flag_expr is None:
        active_flag_expr = pl.lit(False)
    if closed_flag_expr is None:
        closed_flag_expr = pl.lit(False)

    df = df.with_columns([
        active_flag_expr.fill_null(False).alias("is_active"),
        closed_flag_expr.fill_null(False).alias("is_closed"),
    ])

    aggs = [
        pl.len().alias(f"{prefix}_row_count_all"),
        (known & pl.col("is_active")).sum().alias(f"{prefix}_active_count_all"),
        (known & pl.col("is_closed")).sum().alias(f"{prefix}_closed_count_all"),
    ]

    for lo, hi in windows:
        tag = f"{lo}to{hi}" if hi < 999 else f"gt{lo}"
        cond = (pl.col("age_years") > lo) & (pl.col("age_years") <= hi) if hi < 999 else (pl.col("age_years") > lo)
        aggs += [
            (known & pl.col("is_active") & cond).sum().alias(f"{prefix}_active_count_{tag}"),
            (known & pl.col("is_closed") & cond).sum().alias(f"{prefix}_closed_count_{tag}"),
        ]

    return df.group_by("case_id").agg(aggs)

In [None]:
def build_static_features(
    table_paths: list[Path],
    base_dates: pl.DataFrame,
    prefix: str = "s0",
):
    """
    Load and concat sharded static tables (1:1 with base).
    - Numeric columns (L, P, A, T suffixes): keep as float
    - Date columns (D suffix): convert to days-since-decision (relative time)
    - Categorical columns (M suffix): drop (high cardinality, low signal)

    Handles schema mismatches between shards by casting all non-case_id
    columns to String before concat, then applying proper type conversions.
    """
    dfs = []
    for p in table_paths:
        raw = pl.read_csv(p)
        # Cast all non-case_id columns to String to avoid schema conflicts between shards
        cast_to_str = [pl.col(c).cast(pl.String) for c in raw.columns if c != "case_id"]
        dfs.append(raw.with_columns(cast_to_str))
    df = pl.concat(dfs)

    # Join to get date_decision for relative date features
    df = df.join(base_dates, on="case_id", how="left")

    cols = [c for c in df.columns if c not in ["case_id", "date_decision"]]

    # Classify columns by suffix
    date_cols = [c for c in cols if c.endswith("D")]
    cat_cols = [c for c in cols if c.endswith("M")]
    num_cols = [c for c in cols if c not in date_cols and c not in cat_cols]

    # Cast numeric columns (currently String) to float
    cast_exprs = []
    for c in num_cols:
        cast_exprs.append(pl.col(c).cast(pl.Float64, strict=False).alias(f"{prefix}_{c}"))

    # Convert date columns (currently String) to days since decision
    date_exprs = []
    for c in date_cols:
        date_exprs.append(
            (
                (pl.col("date_decision") - pl.col(c).str.strptime(pl.Date, strict=False))
                .dt.total_days()
                .cast(pl.Float64)
            ).alias(f"{prefix}_{c}_days")
        )

    df = df.with_columns(cast_exprs + date_exprs)

    keep_cols = (
        ["case_id"]
        + [f"{prefix}_{c}" for c in num_cols]
        + [f"{prefix}_{c}_days" for c in date_cols]
    )

    return df.select(keep_cols)

### Method 8: Credit bureau A processing (DANGEROUS tables)
- **16M rows** (depth-1, contract level) + **188M rows** (depth-2, payment level) across 15 shards
- Data comes from **two credit bureaus** filling alternate paired columns — coalesced to get best available value
- 69 schema mismatches across shards — all columns cast to String before concat, then typed
- **a_1 features (contract level):** active/closed contracts, DPD counts, time windows, PLUS numeric aggregations (max/mean of debt, overdue amounts, outstanding balances)
- **a_2 features (payment level):** per-payment DPD max/mean/counts, overdue amounts — processed shard-by-shard to avoid OOM on 188M rows

In [None]:
def build_cba1_features(shard_paths: list[Path], base_dates: pl.DataFrame, prefix: str = "cba1"):
    """
    credit_bureau_a depth-1: contract-level records from external credit bureaus.
    Two bureaus fill alternate paired columns — we coalesce to get best available value.
    69 schema mismatches across shards — cast all to String before concat.

    Produces:
      - Count features: total contracts, active/closed, DPD>0/30/90 counts, time-windowed
      - Numeric aggregations: max/mean of DPD, debt outstanding, overdue amounts
      - Rate features: DPD30 rate for active and closed contracts
    """
    # Load and concat shards (cast to String to handle schema mismatches)
    dfs = []
    for p in shard_paths:
        raw = pl.read_csv(p)
        cast_to_str = [pl.col(c).cast(pl.String) for c in raw.columns if c != "case_id"]
        dfs.append(raw.with_columns(cast_to_str))
    df = pl.concat(dfs)
    print(f"  {prefix} raw: {df.shape}")

    # Join base dates
    df = df.join(base_dates, on="case_id", how="left")

    # Coalesce paired columns (two credit bureaus fill alternates)
    df = df.with_columns([
        # DPD
        pl.coalesce([
            pl.col("dpdmax_139P").cast(pl.Float64, strict=False),
            pl.col("dpdmax_757P").cast(pl.Float64, strict=False),
        ]).alias("dpd_max"),
        # Credit end date (for active/closed)
        pl.coalesce([
            pl.col("dateofcredend_289D").str.strptime(pl.Date, strict=False),
            pl.col("dateofcredend_353D").str.strptime(pl.Date, strict=False),
        ]).alias("cred_end_date"),
        # Last update (for age/recency)
        pl.coalesce([
            pl.col("lastupdate_1112D").str.strptime(pl.Date, strict=False),
            pl.col("lastupdate_388D").str.strptime(pl.Date, strict=False),
        ]).alias("last_update"),
        # Amounts
        pl.col("debtoutstand_525A").cast(pl.Float64, strict=False).alias("debt_outstanding"),
        pl.col("debtoverdue_47A").cast(pl.Float64, strict=False).alias("debt_overdue"),
        pl.coalesce([
            pl.col("overdueamountmax_155A").cast(pl.Float64, strict=False),
            pl.col("overdueamountmax_35A").cast(pl.Float64, strict=False),
        ]).alias("overdue_max"),
        pl.coalesce([
            pl.col("totaldebtoverduevalue_178A").cast(pl.Float64, strict=False),
            pl.col("totaldebtoverduevalue_718A").cast(pl.Float64, strict=False),
        ]).alias("total_debt_overdue"),
        pl.coalesce([
            pl.col("outstandingamount_354A").cast(pl.Float64, strict=False),
            pl.col("outstandingamount_362A").cast(pl.Float64, strict=False),
        ]).alias("outstanding_amount"),
        pl.coalesce([
            pl.col("numberofoverdueinstlmax_1039L").cast(pl.Float64, strict=False),
            pl.col("numberofoverdueinstlmax_1151L").cast(pl.Float64, strict=False),
        ]).alias("num_overdue_instl_max"),
        pl.coalesce([
            pl.col("numberofoverdueinstls_725L").cast(pl.Float64, strict=False),
            pl.col("numberofoverdueinstls_834L").cast(pl.Float64, strict=False),
        ]).alias("num_overdue_instls"),
    ])

    # Compute age in years from last_update
    df = df.with_columns(
        ((pl.col("date_decision") - pl.col("last_update")).dt.total_days() / 365.25).alias("age_years")
    )
    known = pl.col("age_years") >= 0

    # Active/closed based on credit end date
    df = df.with_columns([
        (pl.col("cred_end_date") >= pl.col("date_decision")).fill_null(False).alias("is_active"),
        (pl.col("cred_end_date") < pl.col("date_decision")).fill_null(False).alias("is_closed"),
    ])

    # DPD flags
    df = df.with_columns([
        (pl.col("dpd_max") > 0).fill_null(False).alias("dpd_any"),
        (pl.col("dpd_max") >= 30).fill_null(False).alias("dpd30"),
        (pl.col("dpd_max") >= 90).fill_null(False).alias("dpd90"),
    ])

    # Aggregate
    agg = df.group_by("case_id").agg([
        # Count features
        pl.len().alias(f"{prefix}_contract_count"),
        known.sum().alias(f"{prefix}_known_count"),
        (known & pl.col("is_active")).sum().alias(f"{prefix}_active_count"),
        (known & pl.col("is_closed")).sum().alias(f"{prefix}_closed_count"),

        # DPD count features
        (known & pl.col("dpd_any")).sum().alias(f"{prefix}_dpd_any_count"),
        (known & pl.col("dpd30")).sum().alias(f"{prefix}_dpd30_count"),
        (known & pl.col("dpd90")).sum().alias(f"{prefix}_dpd90_count"),
        (known & pl.col("is_active") & pl.col("dpd30")).sum().alias(f"{prefix}_active_dpd30_count"),
        (known & pl.col("is_active") & pl.col("dpd90")).sum().alias(f"{prefix}_active_dpd90_count"),
        (known & pl.col("is_closed") & pl.col("dpd30")).sum().alias(f"{prefix}_closed_dpd30_count"),
        (known & pl.col("is_closed") & pl.col("dpd90")).sum().alias(f"{prefix}_closed_dpd90_count"),

        # Time-windowed counts
        (known & pl.col("is_active") & (pl.col("age_years") <= 1)).sum().alias(f"{prefix}_active_le1y"),
        (known & pl.col("is_closed") & (pl.col("age_years") <= 1)).sum().alias(f"{prefix}_closed_le1y"),
        (known & pl.col("is_active") & (pl.col("age_years") > 1) & (pl.col("age_years") <= 3)).sum().alias(f"{prefix}_active_1to3y"),
        (known & pl.col("is_closed") & (pl.col("age_years") > 1) & (pl.col("age_years") <= 3)).sum().alias(f"{prefix}_closed_1to3y"),
        (known & pl.col("is_active") & (pl.col("age_years") > 3)).sum().alias(f"{prefix}_active_gt3y"),
        (known & pl.col("is_closed") & (pl.col("age_years") > 3)).sum().alias(f"{prefix}_closed_gt3y"),

        # Numeric aggregations (max/mean of key amounts)
        pl.col("dpd_max").max().alias(f"{prefix}_dpd_max"),
        pl.col("dpd_max").mean().alias(f"{prefix}_dpd_mean"),
        pl.col("debt_outstanding").max().alias(f"{prefix}_debt_outstanding_max"),
        pl.col("debt_outstanding").mean().alias(f"{prefix}_debt_outstanding_mean"),
        pl.col("debt_overdue").max().alias(f"{prefix}_debt_overdue_max"),
        pl.col("debt_overdue").mean().alias(f"{prefix}_debt_overdue_mean"),
        pl.col("overdue_max").max().alias(f"{prefix}_overdue_max_max"),
        pl.col("overdue_max").mean().alias(f"{prefix}_overdue_max_mean"),
        pl.col("total_debt_overdue").max().alias(f"{prefix}_total_debt_overdue_max"),
        pl.col("total_debt_overdue").sum().alias(f"{prefix}_total_debt_overdue_sum"),
        pl.col("outstanding_amount").max().alias(f"{prefix}_outstanding_max"),
        pl.col("outstanding_amount").sum().alias(f"{prefix}_outstanding_sum"),
        pl.col("num_overdue_instl_max").max().alias(f"{prefix}_num_overdue_instl_max"),
        pl.col("num_overdue_instls").max().alias(f"{prefix}_num_overdue_instls_max"),
        pl.col("num_overdue_instls").sum().alias(f"{prefix}_num_overdue_instls_sum"),
    ])

    # Rate features
    agg = agg.with_columns([
        pl.when(pl.col(f"{prefix}_active_count") > 0)
            .then(pl.col(f"{prefix}_active_dpd30_count") / pl.col(f"{prefix}_active_count"))
            .otherwise(0.0).alias(f"{prefix}_active_dpd30_rate"),
        pl.when(pl.col(f"{prefix}_closed_count") > 0)
            .then(pl.col(f"{prefix}_closed_dpd30_count") / pl.col(f"{prefix}_closed_count"))
            .otherwise(0.0).alias(f"{prefix}_closed_dpd30_rate"),
        pl.when(pl.col(f"{prefix}_known_count") > 0)
            .then(pl.col(f"{prefix}_dpd_any_count") / pl.col(f"{prefix}_known_count"))
            .otherwise(0.0).alias(f"{prefix}_dpd_any_rate"),
    ])

    return agg

In [None]:
def build_cba2_features(shard_paths: list[Path], prefix: str = "cba2"):
    """
    credit_bureau_a depth-2: payment-level records. 188M rows across 11 shards.
    Processed shard-by-shard to avoid OOM: aggregate each shard to case_id level,
    then re-aggregate across shards.

    Key columns: pmts_dpd_1073P (payment DPD), pmts_overdue_1140A (overdue amount).
    """
    shard_aggs = []
    for i, p in enumerate(shard_paths):
        df = pl.scan_csv(p).select([
            "case_id",
            pl.col("pmts_dpd_1073P").cast(pl.Float64, strict=False).alias("pmt_dpd"),
            pl.col("pmts_overdue_1140A").cast(pl.Float64, strict=False).alias("pmt_overdue"),
        ])

        shard_agg = df.group_by("case_id").agg([
            pl.len().alias("_n"),
            pl.col("pmt_dpd").max().alias("_dpd_max"),
            pl.col("pmt_dpd").sum().alias("_dpd_sum"),
            pl.col("pmt_dpd").drop_nulls().len().alias("_dpd_nonnull"),
            (pl.col("pmt_dpd") > 0).sum().alias("_dpd_gt0"),
            (pl.col("pmt_dpd") >= 30).sum().alias("_dpd_ge30"),
            (pl.col("pmt_dpd") >= 90).sum().alias("_dpd_ge90"),
            pl.col("pmt_overdue").max().alias("_overdue_max"),
            pl.col("pmt_overdue").sum().alias("_overdue_sum"),
            pl.col("pmt_overdue").drop_nulls().len().alias("_overdue_nonnull"),
        ]).collect()

        shard_aggs.append(shard_agg)
        print(f"  shard {i}: {shard_agg.height} case_ids aggregated")

    # Merge all shard aggregations
    combined = pl.concat(shard_aggs)
    print(f"  Combined: {combined.shape}")

    # Re-aggregate across shards (same case_id may appear in multiple shards)
    final = combined.group_by("case_id").agg([
        pl.col("_n").sum().alias(f"{prefix}_payment_count"),
        pl.col("_dpd_max").max().alias(f"{prefix}_dpd_max"),
        pl.col("_dpd_sum").sum().alias(f"{prefix}_dpd_sum"),
        pl.col("_dpd_nonnull").sum().alias(f"{prefix}_dpd_nonnull"),
        pl.col("_dpd_gt0").sum().alias(f"{prefix}_dpd_gt0_count"),
        pl.col("_dpd_ge30").sum().alias(f"{prefix}_dpd_ge30_count"),
        pl.col("_dpd_ge90").sum().alias(f"{prefix}_dpd_ge90_count"),
        pl.col("_overdue_max").max().alias(f"{prefix}_overdue_max"),
        pl.col("_overdue_sum").sum().alias(f"{prefix}_overdue_sum"),
        pl.col("_overdue_nonnull").sum().alias(f"{prefix}_overdue_nonnull"),
    ])

    # Derived features
    final = final.with_columns([
        # Mean DPD across all payments
        pl.when(pl.col(f"{prefix}_dpd_nonnull") > 0)
            .then(pl.col(f"{prefix}_dpd_sum") / pl.col(f"{prefix}_dpd_nonnull"))
            .otherwise(None).alias(f"{prefix}_dpd_mean"),
        # Mean overdue amount
        pl.when(pl.col(f"{prefix}_overdue_nonnull") > 0)
            .then(pl.col(f"{prefix}_overdue_sum") / pl.col(f"{prefix}_overdue_nonnull"))
            .otherwise(None).alias(f"{prefix}_overdue_mean"),
        # DPD rate (fraction of payments with any DPD)
        pl.when(pl.col(f"{prefix}_dpd_nonnull") > 0)
            .then(pl.col(f"{prefix}_dpd_gt0_count") / pl.col(f"{prefix}_dpd_nonnull"))
            .otherwise(0.0).alias(f"{prefix}_dpd_rate"),
        # DPD30 rate
        pl.when(pl.col(f"{prefix}_dpd_nonnull") > 0)
            .then(pl.col(f"{prefix}_dpd_ge30_count") / pl.col(f"{prefix}_dpd_nonnull"))
            .otherwise(0.0).alias(f"{prefix}_dpd30_rate"),
    ])

    # Drop intermediate columns
    final = final.drop([f"{prefix}_dpd_sum", f"{prefix}_dpd_nonnull", f"{prefix}_overdue_nonnull"])

    return final

## 4) Build feature blocks

### Block 1: cb1 (credit_bureau_b_1) — 16 features
External credit bureau records. Active/closed by maturity date, DPD from `dpd_550P`.

### Block 2: ap1w (applprev_1_1) — 9 features
Previous applications with custom time windows (0-0.5y, 0.5-2y, 2+y).

### Block 3: dep (deposit_1) — 16 features
Deposit accounts. Active/closed by contract end date.

### Block 4: taxa (tax_registry_a_1) — 16 features
Tax records. All flagged active.

### Block 5: ap2 (applprev_2) — 16 features
Application counts only.

### Block 6: s0 (static_0_0 + static_0_1) — ~159 features
Applicant-level static data. 1:1 join. Numeric + date features.

### Block 7: scb (static_cb_0) — ~47 features
Credit bureau static data. 1:1 join.

### Block 8: cba1 (credit_bureau_a depth-1) — ~36 features
16M rows, 4 shards, paired columns coalesced. DPD counts, amounts, rates.

### Block 9: cba2 (credit_bureau_a depth-2) — ~11 features
188M rows, 11 shards. Per-payment DPD and overdue amounts.

### Block 10: ratio (derived from s0) — 12 features
Income ratios, credit utilization, payment discipline, DPD trends.

### Block 11: oth (other_1) — 7 features
Bank account activity. 51k rows, 1:1.

### Block 12: cbb2 (credit_bureau_b_2) — 11 features
1.3M payment-level records. DPD counts and overdue amounts.

### Block 13: taxb (tax_registry_b_1) — 6 features
1.1M rows. Deduction amounts (sum/max/mean), recency, presence flag.

### Block 14: taxc (tax_registry_c_1) — 6 features
3.3M rows. Payment amounts (sum/max/mean), recency, presence flag.

### Block 15: p1 (person_1) — 12 features
3M rows. **Applicant (num_group1=0):** income, age, children, employment, gender, housing.
**Others:** count of associated persons.

### Block 16: p2 (person_2) — 4 features
1.6M rows, depth-2. Record count, distinct persons, unique roles, employment count.

---

### Feature Summary

| Block | Source | Features | Key Signal |
|-------|--------|----------|------------|
| base | train_base | 1 | Temporal position (WEEK_NUM) |
| cb1 | credit_bureau_b_1 | 16 | DPD delinquency |
| ap1w | applprev_1_1 | 9 | Application velocity |
| dep | deposit_1 | 16 | Financial stability |
| taxa | tax_registry_a_1 | 16 | Income presence |
| ap2 | applprev_2 | 16 | Application count |
| s0 | static_0_0 + static_0_1 | ~159 | Income, debt, DPD history |
| scb | static_cb_0 | ~47 | Credit queries, risk scores |
| cba1 | credit_bureau_a_1_* | ~36 | External DPD, debt, overdue |
| cba2 | credit_bureau_a_2_* | ~11 | Per-payment DPD |
| ratio | derived from s0 | 12 | Financial health ratios |
| oth | other_1 | 7 | Bank account activity |
| cbb2 | credit_bureau_b_2 | 11 | Per-payment DPD (bureau B) |
| taxb | tax_registry_b_1 | 6 | Tax deductions |
| taxc | tax_registry_c_1 | 6 | Tax payments |
| p1 | person_1 | 12 | Demographics, applicant vs others |
| p2 | person_2 | 4 | Person record counts |
| **Total** | | **~385** | |

In [None]:
# cb1: credit_bureau_b_1
cb1_agg = build_contract_features(
    table_path=TABLES["credit_bureau_b_1"],
    prefix="cb1",
    base_dates=base_dates,
    event_date_cols=["lastupdate_260D", "contractdate_551D"],
    dpd_col="dpd_550P",
    active_flag_expr=(pl.col("contractmaturitydate_151D").str.strptime(pl.Date, strict=False) >= pl.col("date_decision")),
    closed_flag_expr=(pl.col("contractmaturitydate_151D").str.strptime(pl.Date, strict=False) < pl.col("date_decision")),
)
print(f"cb1: {cb1_agg.shape}")

In [None]:
# ap1w: applprev_1_1 with best window scheme (<=0.5y, 0.5-2y, >2y)
ap1w_agg = build_contract_features_custom_windows(
    table_path=TABLES["applprev_1_1"],
    prefix="ap1w",
    base_dates=base_dates,
    event_date_cols=["dateactivated_425D", "approvaldate_319D", "creationdate_885D"],
    windows=((0, 0.5), (0.5, 2), (2, 999)),
    dpd_col=None,
    active_flag_expr=pl.col("status_219L").is_in(["A"]),
    closed_flag_expr=pl.col("status_219L").is_in(["D", "K", "T"]),
)
print(f"ap1w: {ap1w_agg.shape}")

In [None]:
# dep: deposit_1
dep_agg = build_contract_features(
    table_path=TABLES["deposit_1"],
    prefix="dep",
    base_dates=base_dates,
    event_date_cols=["openingdate_313D", "contractenddate_991D"],
    dpd_col=None,
    active_flag_expr=(pl.col("contractenddate_991D") >= pl.col("date_decision")),
    closed_flag_expr=(pl.col("contractenddate_991D") < pl.col("date_decision")),
)
print(f"dep: {dep_agg.shape}")

In [None]:
# taxa: tax_registry_a_1
taxa_agg = build_contract_features(
    table_path=TABLES["tax_registry_a_1"],
    prefix="taxa",
    base_dates=base_dates,
    event_date_cols=["recorddate_4527225D"],
    dpd_col=None,
    active_flag_expr=pl.lit(True),
    closed_flag_expr=pl.lit(False),
)
print(f"taxa: {taxa_agg.shape}")

In [None]:
# ap2: applprev_2 (no event dates or DPD, just row counts)
ap2 = pl.read_csv(TABLES["applprev_2"]).join(base_dates, on="case_id", how="left")

ap2_agg = (
    ap2.group_by("case_id").agg([
        pl.len().alias("ap2_row_count_all"),
        pl.len().alias("ap2_known_count"),
        pl.len().alias("ap2_active_count_all"),
        pl.lit(0).sum().cast(pl.Int64).alias("ap2_closed_count_all"),
        pl.lit(0).sum().cast(pl.Int64).alias("ap2_active_dpd30_count_all"),
        pl.lit(0).sum().cast(pl.Int64).alias("ap2_active_dpd90_count_all"),
        pl.lit(0).sum().cast(pl.Int64).alias("ap2_closed_dpd30_count_all"),
        pl.lit(0).sum().cast(pl.Int64).alias("ap2_closed_dpd90_count_all"),
        pl.lit(0).sum().cast(pl.Int64).alias("ap2_active_count_le1y"),
        pl.lit(0).sum().cast(pl.Int64).alias("ap2_closed_count_le1y"),
        pl.lit(0).sum().cast(pl.Int64).alias("ap2_active_count_1to3y"),
        pl.lit(0).sum().cast(pl.Int64).alias("ap2_closed_count_1to3y"),
        pl.lit(0).sum().cast(pl.Int64).alias("ap2_active_count_gt3y"),
        pl.lit(0).sum().cast(pl.Int64).alias("ap2_closed_count_gt3y"),
    ])
    .with_columns([
        pl.lit(0.0).alias("ap2_active_dpd30_rate"),
        pl.lit(0.0).alias("ap2_closed_dpd30_rate"),
    ])
)
print(f"ap2: {ap2_agg.shape}")

In [None]:
# s0: static_0 (concat shards static_0_0 + static_0_1) — applicant-level features
# 1:1 join, no aggregation. 167 cols: 91 numeric(L), 31 amounts(A), 22 DPD(P), 15 dates(D), 8 categorical(M dropped)
s0_features = build_static_features(
    table_paths=[TABLES["static_0_0"], TABLES["static_0_1"]],
    base_dates=base_dates,
    prefix="s0",
)
print(f"s0: {s0_features.shape}")

In [None]:
# scb: static_cb_0 — credit bureau static features
# 1:1 join, 52 cols: 32 numeric(L), 4 amounts(A), 2 scores(T), 9 dates(D), 5 categorical(M dropped)
scb_features = build_static_features(
    table_paths=[TABLES["static_cb_0"]],
    base_dates=base_dates,
    prefix="scb",
)
print(f"scb: {scb_features.shape}")

In [None]:
# cba1: credit_bureau_a depth-1 (contract level) — 16M rows, 4 shards, 79 cols
# Coalesces paired columns from two credit bureaus, handles 69 schema mismatches
# Features: contract counts, active/closed, DPD counts/rates, time windows, numeric aggregations
print("Building cba1...")
cba1_paths = [TABLES[f"credit_bureau_a_1_{i}"] for i in range(4)]
cba1_features = build_cba1_features(cba1_paths, base_dates, prefix="cba1")
print(f"cba1: {cba1_features.shape}")

In [None]:
# cba2: credit_bureau_a depth-2 (payment level) — 188M rows, 11 shards
# Processed shard-by-shard to avoid OOM. Aggregates per-payment DPD and overdue amounts.
# Features: payment count, DPD max/mean/rates, overdue max/mean/sum
print("Building cba2...")
cba2_paths = [TABLES[f"credit_bureau_a_2_{i}"] for i in range(11)]
cba2_features = build_cba2_features(cba2_paths, prefix="cba2")
print(f"cba2: {cba2_features.shape}")

In [None]:
# oth: other_1 — 1:1 table, 51k rows, 5 financial amount columns + 2 derived
# Debit/deposit incoming/outgoing/balance — only ~3.3% of applicants have data
oth_raw = pl.read_csv(TABLES["other_1"]).select([
    "case_id",
    pl.col("amtdebitincoming_4809443A").alias("oth_debit_in"),
    pl.col("amtdebitoutgoing_4809440A").alias("oth_debit_out"),
    pl.col("amtdepositbalance_4809441A").alias("oth_deposit_bal"),
    pl.col("amtdepositincoming_4809444A").alias("oth_deposit_in"),
    pl.col("amtdepositoutgoing_4809442A").alias("oth_deposit_out"),
]).with_columns([
    # Net debit flow (incoming - outgoing): positive = more money coming in
    (pl.col("oth_debit_in") - pl.col("oth_debit_out")).alias("oth_debit_net"),
    # Net deposit flow
    (pl.col("oth_deposit_in") - pl.col("oth_deposit_out")).alias("oth_deposit_net"),
])
print(f"oth: {oth_raw.shape}")

In [None]:
# cbb2: credit_bureau_b_2 — depth-2, 1.3M payment-level records
# Columns: pmts_dpdvalue_108P (payment DPD), pmts_pmtsoverdue_635A (overdue amount)
# Same pattern as cba2 but single file, no shard handling needed
cbb2_raw = pl.read_csv(TABLES["credit_bureau_b_2"]).select([
    "case_id",
    pl.col("pmts_dpdvalue_108P").cast(pl.Float64, strict=False).alias("pmt_dpd"),
    pl.col("pmts_pmtsoverdue_635A").cast(pl.Float64, strict=False).alias("pmt_overdue"),
])

cbb2_features = cbb2_raw.group_by("case_id").agg([
    pl.len().alias("cbb2_payment_count"),
    pl.col("pmt_dpd").max().alias("cbb2_dpd_max"),
    pl.col("pmt_dpd").mean().alias("cbb2_dpd_mean"),
    (pl.col("pmt_dpd") > 0).sum().alias("cbb2_dpd_gt0_count"),
    (pl.col("pmt_dpd") >= 30).sum().alias("cbb2_dpd_ge30_count"),
    (pl.col("pmt_dpd") >= 90).sum().alias("cbb2_dpd_ge90_count"),
    pl.col("pmt_overdue").max().alias("cbb2_overdue_max"),
    pl.col("pmt_overdue").mean().alias("cbb2_overdue_mean"),
    pl.col("pmt_overdue").sum().alias("cbb2_overdue_sum"),
]).with_columns([
    # DPD rate
    pl.when(pl.col("cbb2_payment_count") > 0)
        .then(pl.col("cbb2_dpd_gt0_count") / pl.col("cbb2_payment_count"))
        .otherwise(0.0).alias("cbb2_dpd_rate"),
    # DPD30 rate
    pl.when(pl.col("cbb2_payment_count") > 0)
        .then(pl.col("cbb2_dpd_ge30_count") / pl.col("cbb2_payment_count"))
        .otherwise(0.0).alias("cbb2_dpd30_rate"),
])

print(f"cbb2: {cbb2_features.shape}")

In [None]:
# taxb: tax_registry_b_1 — 1.1M rows, deduction amounts + dates
taxb_raw = pl.read_csv(TABLES["tax_registry_b_1"]).join(base_dates, on="case_id", how="left")
taxb_raw = taxb_raw.with_columns([
    pl.col("amount_4917619A").cast(pl.Float64, strict=False).alias("amt"),
    ((pl.col("date_decision") - pl.col("deductiondate_4917603D").str.strptime(pl.Date, strict=False))
     .dt.total_days().cast(pl.Float64)).alias("days_since"),
])
taxb_features = taxb_raw.group_by("case_id").agg([
    pl.len().alias("taxb_count"),
    pl.col("amt").sum().alias("taxb_amt_sum"),
    pl.col("amt").max().alias("taxb_amt_max"),
    pl.col("amt").mean().alias("taxb_amt_mean"),
    pl.col("days_since").min().alias("taxb_most_recent_days"),  # smallest = most recent
]).with_columns([
    # Flag: has any tax_b records
    (pl.col("taxb_count") > 0).cast(pl.Int8).alias("taxb_has_records"),
])
print(f"taxb: {taxb_features.shape}")

In [None]:
# taxc: tax_registry_c_1 — 3.3M rows, payment amounts + processing dates
taxc_raw = pl.read_csv(TABLES["tax_registry_c_1"]).join(base_dates, on="case_id", how="left")
taxc_raw = taxc_raw.with_columns([
    pl.col("pmtamount_36A").cast(pl.Float64, strict=False).alias("pmt_amt"),
    ((pl.col("date_decision") - pl.col("processingdate_168D").str.strptime(pl.Date, strict=False))
     .dt.total_days().cast(pl.Float64)).alias("days_since"),
])
taxc_features = taxc_raw.group_by("case_id").agg([
    pl.len().alias("taxc_count"),
    pl.col("pmt_amt").sum().alias("taxc_pmt_sum"),
    pl.col("pmt_amt").max().alias("taxc_pmt_max"),
    pl.col("pmt_amt").mean().alias("taxc_pmt_mean"),
    pl.col("days_since").min().alias("taxc_most_recent_days"),
]).with_columns([
    (pl.col("taxc_count") > 0).cast(pl.Int8).alias("taxc_has_records"),
])
print(f"taxc: {taxc_features.shape}")

In [None]:
# p1: person_1 — 3M rows, 37 cols, depth-1
# Split: applicant (num_group1=0) gets direct features, others get counted
p1_raw = pl.read_csv(TABLES["person_1"]).join(base_dates, on="case_id", how="left")

# Applicant features (num_group1 == 0)
p1_applicant = (
    p1_raw.filter(pl.col("num_group1") == 0)
    .with_columns([
        pl.col("mainoccupationinc_384A").cast(pl.Float64, strict=False).alias("p1_applicant_income"),
        pl.col("childnum_185L").cast(pl.Float64, strict=False).alias("p1_childnum"),
        pl.col("empl_employedtotal_800L").cast(pl.Float64, strict=False).alias("p1_employed_total"),
        pl.col("gender_992L").cast(pl.Float64, strict=False).alias("p1_gender"),
        pl.col("familystate_447L").cast(pl.Float64, strict=False).alias("p1_family_state"),
        pl.col("housetype_905L").cast(pl.Float64, strict=False).alias("p1_house_type"),
        pl.col("incometype_1044T").cast(pl.Float64, strict=False).alias("p1_income_type"),
        pl.col("isreference_387L").cast(pl.Float64, strict=False).alias("p1_is_reference"),
        # Age from birth date
        ((pl.col("date_decision") - pl.col("birth_259D").str.strptime(pl.Date, strict=False))
         .dt.total_days() / 365.25).alias("p1_age_years"),
        # Employment start (days since)
        ((pl.col("date_decision") - pl.col("empl_employedfrom_271D").str.strptime(pl.Date, strict=False))
         .dt.total_days().cast(pl.Float64)).alias("p1_employed_since_days"),
    ])
    .select(["case_id", "p1_applicant_income", "p1_childnum", "p1_employed_total",
             "p1_gender", "p1_family_state", "p1_house_type", "p1_income_type",
             "p1_is_reference", "p1_age_years", "p1_employed_since_days"])
)

# Other persons count (num_group1 > 0)
p1_others = (
    p1_raw.filter(pl.col("num_group1") > 0)
    .group_by("case_id").agg([
        pl.len().alias("p1_other_person_count"),
    ])
)

# Merge applicant + others
p1_features = p1_applicant.join(p1_others, on="case_id", how="left").with_columns(
    pl.col("p1_other_person_count").fill_null(0),
)
print(f"p1: {p1_features.shape}")

In [None]:
# p2: person_2 — 1.6M rows, depth-2, mostly categorical
# Low signal — just count records and unique roles per case_id
p2_raw = pl.read_csv(TABLES["person_2"])
p2_features = p2_raw.group_by("case_id").agg([
    pl.len().alias("p2_record_count"),
    pl.col("num_group1").n_unique().alias("p2_num_persons"),  # distinct persons
    pl.col("relatedpersons_role_762T").cast(pl.String).n_unique().alias("p2_unique_roles"),
    pl.col("empls_employedfrom_796D").drop_nulls().len().alias("p2_has_employment_count"),
])
print(f"p2: {p2_features.shape}")

## 5) Merge all blocks into base

In [None]:
model_df = (
    base
    .join(cb1_agg, on="case_id", how="left")
    .join(ap1w_agg, on="case_id", how="left")
    .join(dep_agg, on="case_id", how="left")
    .join(taxa_agg, on="case_id", how="left")
    .join(ap2_agg, on="case_id", how="left")
    .join(s0_features, on="case_id", how="left")
    .join(scb_features, on="case_id", how="left")
    .join(cba1_features, on="case_id", how="left")
    .join(cba2_features, on="case_id", how="left")
    .join(oth_raw, on="case_id", how="left")
    .join(cbb2_features, on="case_id", how="left")
    .join(taxb_features, on="case_id", how="left")
    .join(taxc_features, on="case_id", how="left")
    .join(p1_features, on="case_id", how="left")
    .join(p2_features, on="case_id", how="left")
    .with_columns([
        pl.col("^cb1_.*$").fill_null(0),
        pl.col("^ap1w_.*$").fill_null(0),
        pl.col("^dep_.*$").fill_null(0),
        pl.col("^taxa_.*$").fill_null(0),
        pl.col("^ap2_.*$").fill_null(0),
        pl.col("^cba1_.*$").fill_null(0),
        pl.col("^cba2_.*$").fill_null(0),
        pl.col("^cbb2_.*$").fill_null(0),
        pl.col("^p2_.*$").fill_null(0),
    ])
)

feature_cols = [c for c in model_df.columns if c not in ["case_id", "date_decision", "MONTH", "target"]]
print(f"Final shape: {model_df.shape}")
print(f"Feature count: {len(feature_cols)}")
print(f"Features: {feature_cols[:20]}...")

In [None]:
# Block 10: Ratio / interaction features (computed from existing s0 columns)
# No data loading — these are derived from static table features already joined

def _safe_ratio(num_col, den_col, alias):
    """Division that returns null when denominator is zero/null."""
    return (
        pl.when(pl.col(den_col).is_not_null() & (pl.col(den_col) > 0))
        .then(pl.col(num_col) / pl.col(den_col))
        .otherwise(None)
        .alias(alias)
    )

model_df = model_df.with_columns([
    # --- Income ratios (leverage & affordability) ---
    _safe_ratio("s0_currdebt_22A", "s0_maininc_215A", "ratio_debt_to_income"),
    _safe_ratio("s0_annuity_780A", "s0_maininc_215A", "ratio_annuity_to_income"),
    _safe_ratio("s0_credamount_770A", "s0_maininc_215A", "ratio_credit_to_income"),
    _safe_ratio("s0_totaldebt_9A", "s0_maininc_215A", "ratio_total_debt_to_income"),

    # --- Credit utilization ---
    _safe_ratio("s0_currdebt_22A", "s0_credamount_770A", "ratio_credit_utilization"),
    _safe_ratio("s0_sumoutstandtotal_3546847A", "s0_credamount_770A", "ratio_outstanding_to_credit"),
    _safe_ratio("s0_downpmt_116A", "s0_credamount_770A", "ratio_downpayment"),

    # --- Payment discipline ---
    # Fraction of installments with DPD >= 10 days
    pl.when(
        (pl.col("s0_numinstlswithdpd10_728L") + pl.col("s0_numinstlswithoutdpd_562L")) > 0
    ).then(
        pl.col("s0_numinstlswithdpd10_728L") /
        (pl.col("s0_numinstlswithdpd10_728L") + pl.col("s0_numinstlswithoutdpd_562L"))
    ).otherwise(None).alias("ratio_dpd_installment_frac"),

    # DPD trend: recent (3m) vs long-term (24m) — >1 means worsening
    pl.when(pl.col("s0_maxdpdlast24m_143P") > 0)
    .then(pl.col("s0_maxdpdlast3m_392P") / pl.col("s0_maxdpdlast24m_143P"))
    .otherwise(0.0).alias("ratio_dpd_trend_3m_vs_24m"),

    # --- Payment consistency ---
    _safe_ratio("s0_avgpmtlast12m_4525200A", "s0_annuity_780A", "ratio_pmt_to_annuity"),
    _safe_ratio("s0_maxinstallast24m_3658928A", "s0_avginstallast24m_3658937A", "ratio_install_volatility"),
    _safe_ratio("s0_avgoutstandbalancel6m_4187114A", "s0_maxoutstandbalancel12m_4187113A", "ratio_outstand_6m_vs_12m"),
])

ratio_cols = [c for c in model_df.columns if c.startswith("ratio_")]
print(f"Ratio features added: {len(ratio_cols)}")
print(f"  {ratio_cols}")

# Redefine feature_cols to include ratios
feature_cols = [c for c in model_df.columns if c not in ["case_id", "date_decision", "MONTH", "target"]]
print(f"\nTotal feature count: {len(feature_cols)}")

## 6) Evaluation helpers

In [None]:
def run_auc_for_cut(df, feature_cols, cut_week):
    tr = df.filter(pl.col("WEEK_NUM") <= cut_week)
    va = df.filter(pl.col("WEEK_NUM") > cut_week)

    Xtr = tr.select(feature_cols).to_pandas()
    ytr = tr["target"].to_pandas()
    Xva = va.select(feature_cols).to_pandas()
    yva = va["target"].to_pandas()

    clf = HistGradientBoostingClassifier(max_depth=6, learning_rate=0.05, max_iter=200, random_state=42)
    clf.fit(Xtr, ytr)
    p = clf.predict_proba(Xva)[:, 1]
    return roc_auc_score(yva, p)


def eval_stability(df, feature_cols, cuts=(50, 60, 70), label="model"):
    rows = []
    for c in cuts:
        auc = run_auc_for_cut(df, feature_cols, c)
        rows.append({"model": label, "cut_week": c, "auc": auc})
    out = pd.DataFrame(rows)
    summary = {
        "model": label,
        "mean_auc": out["auc"].mean(),
        "std_auc": out["auc"].std(ddof=0),
        "min_auc": out["auc"].min(),
        "max_auc": out["auc"].max(),
    }
    return out, pd.DataFrame([summary])

## 7) Validate

In [None]:
detail, summary = eval_stability(model_df, feature_cols, cuts=(50, 60, 70), label="all_features_with_static")
print(detail)
print()
print(summary)