In [None]:
from dotenv import load_dotenv
import os
from pathlib import Path

CURRENT_DIRECTORY_NOTEBOOK = None


def intitate_notebook():
    load_dotenv()
    global CURRENT_DIRECTORY_NOTEBOOK
    if CURRENT_DIRECTORY_NOTEBOOK is None:
        os.chdir(os.getenv("PROJECT_BASE_PATH"))
        CURRENT_DIRECTORY_NOTEBOOK = Path(os.getcwd())
        print("Current directory for notebook: ", CURRENT_DIRECTORY_NOTEBOOK)
    else:
        print(
            "Current directory for notebook is already set: ",
            CURRENT_DIRECTORY_NOTEBOOK,
        )


intitate_notebook()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql.window import Window

In [None]:
spark = (
    SparkSession.builder.appName("PostgresETL")
    .master("local[1]")  # Limit to 1 core
    .config("spark.jars", "setup_files/postgresql-42.7.5.jar")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .config("spark.sql.shuffle.partitions", "100")
    .config("spark.sql.debug.maxToStringFields", 50)
    .getOrCreate()
)


username = "data_source_user"
password = "data_source_user_password"
host = "172.17.0.1"
port = "5435"
database = "data_source_db"


jdbc_url = f"jdbc:postgresql://{host}:{port}/{database}"
properties = {"user": username, "password": password, "driver": "org.postgresql.Driver"}

In [None]:
prev_df = spark.read.option("failFast", "true").jdbc(
    url=jdbc_url, table="previous_application", properties=properties
)
pos_df = spark.read.option("failFast", "true").jdbc(
    url=jdbc_url, table="pos_cash_balance", properties=properties
)
cc_df = spark.read.option("failFast", "true").jdbc(
    url=jdbc_url, table="credit_card_balance", properties=properties
)
inst_df = spark.read.option("failFast", "true").jdbc(
    url=jdbc_url, table="installments_payments", properties=properties
)

In [None]:
pos_df.printSchema()
pos_df.show(5, truncate=False)

In [None]:
# -----------------------------------------------------------------------------
# 1. previous_application ↦ per‐applicant features
# -----------------------------------------------------------------------------
def get_prev_app_features(prev_df: DataFrame) -> DataFrame:
    """
    Input:
        prev_df: DataFrame loaded from previous_application.csv, with columns at least:
            - sk_id_curr (int)
            - sk_id_prev (int)
            - name_contract_status (string)
            - amt_application (double/decimal)
            - amt_credit (double/decimal)
            - amt_annuity (double/decimal)
            - name_contract_type (string)
            - channel_type (string)
            - days_decision (int)
    Output:
        DataFrame with one row per sk_id_curr, containing aggregated features about previous loans.
    """
    # 1) Basic aggregates: count, sum, avg, max, min, stddev on amt_application
    agg_basic = prev_df.groupBy("sk_id_curr").agg(
        # ---- Counts & Totals ----
        F.count("*").alias("prev_app_count"),  # total # previous apps
        F.sum(
            F.expr("case when name_contract_status = 'Approved' then 1 else 0 end")
        ).alias("prev_app_approved_count"),
        F.sum(
            F.expr("case when name_contract_status = 'Refused' then 1 else 0 end")
        ).alias("prev_app_refused_count"),
        F.sum(
            F.expr(
                "case when name_contract_status in ('Canceled','Canceled by client') then 1 else 0 end"
            )
        ).alias("prev_app_canceled_count"),
        # ---- Amounts on Application ----
        F.sum("amt_application").alias("prev_app_sum_amt_application"),
        F.avg("amt_application").alias("prev_app_avg_amt_application"),
        F.max("amt_application").alias("prev_app_max_amt_application"),
        F.stddev("amt_application").alias("prev_app_stddev_amt_application"),
        # ---- Amount Ratios ----
        # avg(amt_credit / amt_application), guarding divide-by-zero
        F.avg(
            F.expr(
                "case when amt_application > 0 then amt_credit / amt_application else 0 end"
            )
        ).alias("prev_app_avg_credit_to_application_ratio"),
        # avg(amt_credit / amt_annuity), guarding divide-by-zero
        F.avg(
            F.expr("case when amt_annuity > 0 then amt_credit / amt_annuity else 0 end")
        ).alias("prev_app_avg_loan_to_annuity_ratio"),
        # ---- Contract Type Counts ----
        F.sum(
            F.expr("case when name_contract_type = 'Cash loans' then 1 else 0 end")
        ).alias("prev_app_cash_loans_count"),
        F.sum(
            F.expr("case when name_contract_type = 'Revolving loans' then 1 else 0 end")
        ).alias("prev_app_revolving_loans_count"),
        # ---- Days Decision Statistics ----
        F.avg("days_decision").alias("prev_app_avg_days_decision"),
        F.max("days_decision").alias("prev_app_max_days_decision"),
        F.min("days_decision").alias("prev_app_min_days_decision"),
        F.stddev("days_decision").alias("prev_app_stddev_days_decision"),
        # ---- Channel Type Diversity ----
        F.countDistinct("channel_type").alias("prev_app_distinct_channel_count"),
    )

    # 2) Compute ratios that depend on the basic counts
    #    prev_app_ratio_canceled = canceled_count / total_count
    #    prev_app_ratio_cash_to_total = cash_loans_count / total_count, likewise for revolving
    agg_ratios = agg_basic.select(
        "sk_id_curr",
        "prev_app_count",
        "prev_app_approved_count",
        "prev_app_refused_count",
        "prev_app_canceled_count",
        "prev_app_sum_amt_application",
        "prev_app_avg_amt_application",
        "prev_app_max_amt_application",
        "prev_app_stddev_amt_application",
        "prev_app_avg_credit_to_application_ratio",
        "prev_app_avg_loan_to_annuity_ratio",
        "prev_app_cash_loans_count",
        "prev_app_revolving_loans_count",
        "prev_app_avg_days_decision",
        "prev_app_max_days_decision",
        "prev_app_min_days_decision",
        "prev_app_stddev_days_decision",
        "prev_app_distinct_channel_count",
        # Derive ratios:
        (F.expr("prev_app_canceled_count / prev_app_count")).alias(
            "prev_app_canceled_ratio"
        ),
        (F.expr("prev_app_cash_loans_count / prev_app_count")).alias(
            "prev_app_ratio_cash_to_total"
        ),
        (F.expr("prev_app_revolving_loans_count / prev_app_count")).alias(
            "prev_app_ratio_revolving_to_total"
        ),
    )

    return agg_ratios


# -----------------------------------------------------------------------------
# 2. POS_CASH_balance ↦ per‐applicant features
# -----------------------------------------------------------------------------
def get_pos_features(prev_df: DataFrame, pos_df: DataFrame) -> DataFrame:
    """
    Input:
        prev_df: DataFrame from previous_application.csv (with sk_id_prev → sk_id_curr mapping).
        pos_df: DataFrame loaded from POS_CASH_balance.csv, with columns including:
            - sk_id_prev (int)
            - months_balance (int)
            - name_contract_status (int)
            - cnt_instalment (int)
            - cnt_instalment_future (int)
            - sk_dpd (int)
            - sk_dpd_def (int)
      (If pos_df also has sk_id_curr, that’s okay; we will join on sk_id_prev to get sk_id_curr.)
    Output:
        DataFrame with one row per sk_id_curr, containing aggregated POS metrics.
    """
    # 1) Join previous_application to POS on sk_id_prev (LEFT join so we keep all prev loans)
    #    Then we can groupBy(sk_id_curr) to aggregate across all POS rows belonging to all prev loans of a given sk_id_curr.
    pos_joined = prev_df.select("sk_id_prev", "sk_id_curr").join(
        pos_df, on=["sk_id_curr", "sk_id_prev"], how="left"
    )

    # 2) Basic counts & sums & averages on POS columns
    agg_pos_basic = pos_joined.groupBy("sk_id_curr").agg(
        # ---- Record Counts ----
        F.count("sk_id_prev").alias("pos_record_count"),
        F.countDistinct("sk_id_prev").alias("pos_distinct_prev_count"),
        # ---- Late Status Counts ----
        F.sum(F.expr("case when name_contract_status > 0 then 1 else 0 end")).alias(
            "pos_late_status_count"
        ),
        F.sum(F.expr("case when name_contract_status >= 2 then 1 else 0 end")).alias(
            "pos_severe_late_count"
        ),
        # ---- Months Balance Stats ----
        F.avg("months_balance").alias("pos_avg_months_balance"),
        F.max("months_balance").alias("pos_max_months_balance"),
        F.min("months_balance").alias("pos_min_months_balance"),
        F.stddev("months_balance").alias("pos_stddev_months_balance"),
        # ---- Status Stats ----
        F.avg("name_contract_status").alias("pos_avg_status"),
        F.stddev("name_contract_status").alias("pos_stddev_status"),
        F.countDistinct("name_contract_status").alias("pos_distinct_status_count"),
        # ---- Instalment Counts ----
        F.avg("cnt_instalment").alias("pos_avg_cnt_instalment"),
        F.max("cnt_instalment").alias("pos_max_cnt_instalment"),
        F.sum("cnt_instalment").alias("pos_sum_cnt_instalment"),
        F.avg("cnt_instalment_future").alias("pos_avg_cnt_instalment_future"),
        F.max("cnt_instalment_future").alias("pos_max_cnt_instalment_future"),
        F.sum(F.expr("case when cnt_instalment = 0 then 1 else 0 end")).alias(
            "pos_count_no_instalment"
        ),
        # ---- DPD & DPD_DEF Stats ----
        F.avg("sk_dpd").alias("pos_avg_sk_dpd"),
        F.max("sk_dpd").alias("pos_max_sk_dpd"),
        F.sum(F.expr("case when sk_dpd > 0 then 1 else 0 end")).alias(
            "pos_sk_dpd_nonzero_count"
        ),
        F.avg("sk_dpd_def").alias("pos_avg_sk_dpd_def"),
        F.max("sk_dpd_def").alias("pos_max_sk_dpd_def"),
        F.sum(F.expr("case when sk_dpd_def > 0 then 1 else 0 end")).alias(
            "pos_sk_dpd_def_nonzero_count"
        ),
    )

    # 3) Derive ratios from the above sums/counts
    agg_pos = agg_pos_basic.select(
        "sk_id_curr",
        "pos_record_count",
        "pos_distinct_prev_count",
        "pos_late_status_count",
        "pos_severe_late_count",
        "pos_avg_months_balance",
        "pos_max_months_balance",
        "pos_min_months_balance",
        "pos_stddev_months_balance",
        "pos_avg_status",
        "pos_stddev_status",
        "pos_distinct_status_count",
        "pos_avg_cnt_instalment",
        "pos_max_cnt_instalment",
        "pos_sum_cnt_instalment",
        "pos_avg_cnt_instalment_future",
        "pos_max_cnt_instalment_future",
        "pos_count_no_instalment",
        "pos_avg_sk_dpd",
        "pos_max_sk_dpd",
        "pos_sk_dpd_nonzero_count",
        "pos_avg_sk_dpd_def",
        "pos_max_sk_dpd_def",
        "pos_sk_dpd_def_nonzero_count",
        # Ratio of any late status rows to total POS records
        (F.expr("pos_late_status_count / pos_record_count")).alias(
            "pos_ratio_nonzero_status"
        ),
        (F.expr("pos_severe_late_count / pos_record_count")).alias(
            "pos_ratio_severe_late"
        ),
    )

    # 4) Per‐loan late‐ratio & final‐status metrics (intermediate steps):
    #    a) Compute per‐previous‐loan aggregates first
    pos_per_prev = (
        pos_df.groupBy("sk_id_prev")
        .agg(
            F.count("*").alias("pos_total_cnt_per_prev"),
            F.sum(F.expr("case when name_contract_status > 0 then 1 else 0 end")).alias(
                "pos_late_cnt_per_prev"
            ),
        )
        .withColumn(
            "pos_late_ratio_per_prev",
            F.expr(
                "case when pos_total_cnt_per_prev > 0 then pos_late_cnt_per_prev / pos_total_cnt_per_prev else 0 end"
            ),
        )
    )

    #    b) Join back to previous_application to get sk_id_curr
    pos_prev_joined = prev_df.select("sk_id_prev", "sk_id_curr").join(
        pos_per_prev.select("sk_id_prev", "pos_late_ratio_per_prev"),
        on="sk_id_prev",
        how="left",
    )

    #    c) Aggregate per sk_id_curr
    agg_pos_per_prev = pos_prev_joined.groupBy("sk_id_curr").agg(
        F.avg("pos_late_ratio_per_prev").alias("pos_avg_late_ratio_per_prev"),
        F.max("pos_late_ratio_per_prev").alias("pos_max_late_ratio_per_prev"),
        F.sum(
            F.expr("case when pos_late_ratio_per_prev = 1.0 then 1 else 0 end")
        ).alias("pos_full_late_loans_count"),
    )

    agg_pos_per_prev = agg_pos_per_prev.select(
        "sk_id_curr",
        "pos_avg_late_ratio_per_prev",
        "pos_max_late_ratio_per_prev",
        "pos_full_late_loans_count",
    )

    # 5) Latest POS name_contract_status per previous loan (use window to get latest months_balance row)
    w_prev = Window.partitionBy("sk_id_prev").orderBy(F.desc("months_balance"))
    pos_latest = (
        pos_df.withColumn("rn", F.row_number().over(w_prev))
        .filter("rn = 1")
        .select(
            "sk_id_curr",
            "sk_id_prev",
            F.col("name_contract_status").alias("pos_latest_status_per_prev"),
        )
    )

    pos_latest_joined = prev_df.select("sk_id_prev", "sk_id_curr").join(
        pos_latest, on=["sk_id_curr", "sk_id_prev"], how="left"
    )

    agg_pos_latest_status = pos_latest_joined.groupBy("sk_id_curr").agg(
        F.sum(
            F.expr("case when pos_latest_status_per_prev > 0 then 1 else 0 end")
        ).alias("pos_loans_latest_late_count"),
        F.sum(
            F.expr("case when pos_latest_status_per_prev = 0 then 1 else 0 end")
        ).alias("pos_loans_latest_not_late_count"),
    )

    agg_pos_latest_status = agg_pos_latest_status.select(
        "sk_id_curr", "pos_loans_latest_late_count", "pos_loans_latest_not_late_count"
    )

    # 6) Join everything together on sk_id_curr
    pos_features = (
        agg_pos.alias("a")
        .join(agg_pos_per_prev.alias("b"), on="sk_id_curr", how="left")
        .join(agg_pos_latest_status.alias("c"), on="sk_id_curr", how="left")
    )

    pos_features = pos_features.select(
        "sk_id_curr",
        # from agg_pos
        "pos_record_count",
        "pos_distinct_prev_count",
        "pos_late_status_count",
        "pos_severe_late_count",
        "pos_avg_months_balance",
        "pos_max_months_balance",
        "pos_min_months_balance",
        "pos_stddev_months_balance",
        "pos_avg_status",
        "pos_stddev_status",
        "pos_distinct_status_count",
        "pos_avg_cnt_instalment",
        "pos_max_cnt_instalment",
        "pos_sum_cnt_instalment",
        "pos_avg_cnt_instalment_future",
        "pos_max_cnt_instalment_future",
        "pos_count_no_instalment",
        "pos_avg_sk_dpd",
        "pos_max_sk_dpd",
        "pos_sk_dpd_nonzero_count",
        "pos_avg_sk_dpd_def",
        "pos_max_sk_dpd_def",
        "pos_sk_dpd_def_nonzero_count",
        "pos_ratio_nonzero_status",
        "pos_ratio_severe_late",
        # from agg_pos_per_prev
        "pos_avg_late_ratio_per_prev",
        "pos_max_late_ratio_per_prev",
        "pos_full_late_loans_count",
        # from agg_pos_latest_status
        "pos_loans_latest_late_count",
        "pos_loans_latest_not_late_count",
    )

    return pos_features


# -----------------------------------------------------------------------------
# 3. credit_card_balance ↦ per‐applicant features
# -----------------------------------------------------------------------------
def get_cc_features(prev_df: DataFrame, cc_df: DataFrame) -> DataFrame:
    """
    Input:
        prev_df: DataFrame from previous_application.csv (sk_id_prev → sk_id_curr mapping)
        cc_df: DataFrame loaded from credit_card_balance.csv, with columns including:
            - sk_id_prev (int)
            - months_balance (int)
            - amt_balance (double)
            - amt_credit_limit_actual (double)
            - amt_drawings_current (double)
            - amt_drawings_atm_current (double)
            - amt_payment_current (double)
            - amt_inst_min_regularity (double)
            - cnt_drawings_current (int)
            - sk_dpd (int)
            - sk_dpd_def (int)
    Output:
        DataFrame with one row per sk_id_curr, containing aggregated credit‐card metrics.
    """
    # 1) Join previous_application to credit_card_balance on sk_id_prev
    cc_joined = prev_df.select("sk_id_prev", "sk_id_curr").join(
        cc_df, on=["sk_id_curr", "sk_id_prev"], how="left"
    )

    # 2) Compute utilization ratio column once (guard divide-by-zero)
    cc_with_util = cc_joined.withColumn(
        "cc_utilization_ratio",
        F.expr("""
            case 
                when amt_credit_limit_actual > 0 
                then amt_balance / amt_credit_limit_actual 
                else 0 
            end
        """),
    )

    # 3) Basic aggregates
    agg_cc_basic = cc_with_util.groupBy("sk_id_curr").agg(
        # ---- Record Counts ----
        F.count("sk_id_prev").alias("cc_record_count"),
        F.countDistinct("sk_id_prev").alias("cc_distinct_prev_count"),
        # ---- Balance Stats ----
        F.avg("amt_balance").alias("cc_avg_amt_balance"),
        F.max("amt_balance").alias("cc_max_amt_balance"),
        F.min("amt_balance").alias("cc_min_amt_balance"),
        F.stddev("amt_balance").alias("cc_stddev_amt_balance"),
        # ---- Credit Limit Stats ----
        F.avg("amt_credit_limit_actual").alias("cc_avg_amt_credit_limit"),
        F.max("amt_credit_limit_actual").alias("cc_max_amt_credit_limit"),
        F.min("amt_credit_limit_actual").alias("cc_min_amt_credit_limit"),
        F.stddev("amt_credit_limit_actual").alias("cc_stddev_amt_credit_limit"),
        # ---- Drawings Current ----
        F.avg("amt_drawings_current").alias("cc_avg_amt_drawings_current"),
        F.max("amt_drawings_current").alias("cc_max_amt_drawings_current"),
        # ---- Drawings ATM Current ----
        F.avg("amt_drawings_atm_current").alias("cc_avg_amt_drawings_atm_current"),
        F.max("amt_drawings_atm_current").alias("cc_max_amt_drawings_atm_current"),
        # ---- Count Drawings Current ----
        F.avg("cnt_drawings_current").alias("cc_avg_cnt_drawings_current"),
        F.max("cnt_drawings_current").alias("cc_max_cnt_drawings_current"),
        # ---- Utilization Ratio Stats ----
        F.avg("cc_utilization_ratio").alias("cc_avg_utilization_ratio"),
        F.max("cc_utilization_ratio").alias("cc_max_utilization_ratio"),
        F.stddev("cc_utilization_ratio").alias("cc_stddev_utilization_ratio"),
        # ---- Underpayment Count ----
        F.sum(
            F.expr(
                "case when amt_payment_current < amt_inst_min_regularity then 1 else 0 end"
            )
        ).alias("cc_underpayment_months_count"),
        # ---- DPD & DPD_DEF Stats ----
        F.avg("sk_dpd").alias("cc_avg_sk_dpd"),
        F.max("sk_dpd").alias("cc_max_sk_dpd"),
        F.sum(F.expr("case when sk_dpd > 0 then 1 else 0 end")).alias(
            "cc_sk_dpd_nonzero_count"
        ),
        F.avg("sk_dpd_def").alias("cc_avg_sk_dpd_def"),
        F.max("sk_dpd_def").alias("cc_max_sk_dpd_def"),
        F.sum(F.expr("case when sk_dpd_def > 0 then 1 else 0 end")).alias(
            "cc_sk_dpd_def_nonzero_count"
        ),
        # ---- Months Balance Stats ----
        F.avg("months_balance").alias("cc_avg_months_balance"),
        F.max("months_balance").alias("cc_max_months_balance"),
        F.min("months_balance").alias("cc_min_months_balance"),
        F.stddev("months_balance").alias("cc_stddev_months_balance"),
        # ---- Nonzero Balance Count ----
        F.sum(F.expr("case when amt_balance > 0 then 1 else 0 end")).alias(
            "cc_nonzero_balance_count"
        ),
    )

    # 4) Per‐loan max/avg utilization: first aggregate per sk_id_prev
    cc_per_prev = cc_with_util.groupBy("sk_id_prev").agg(
        F.max("cc_utilization_ratio").alias("prev_cc_max_util"),
        F.avg("cc_utilization_ratio").alias("prev_cc_avg_util"),
        F.sum(
            F.expr(
                "case when amt_balance > 0 and amt_payment_current = 0 then 1 else 0 end"
            )
        ).alias("prev_cc_zero_payment_months"),
    )

    # Join to prev_df to get sk_id_curr
    cc_prev_joined = prev_df.select("sk_id_prev", "sk_id_curr").join(
        cc_per_prev, on="sk_id_prev", how="left"
    )

    agg_cc_per_prev = cc_prev_joined.groupBy("sk_id_curr").agg(
        F.avg("prev_cc_max_util").alias("cc_avg_of_prev_max_util"),
        F.max("prev_cc_max_util").alias("cc_max_of_prev_max_util"),
        F.avg("prev_cc_avg_util").alias("cc_avg_of_prev_avg_util"),
        F.sum("prev_cc_zero_payment_months").alias("cc_sum_prev_zero_payment_months"),
        F.max("prev_cc_zero_payment_months").alias("cc_max_prev_zero_payment_months"),
    )

    # 5) Latest utilization per previous loan (use window):
    w_prev_cc = Window.partitionBy("sk_id_prev").orderBy(F.desc("months_balance"))
    cc_latest = (
        cc_with_util.withColumn("rn", F.row_number().over(w_prev_cc))
        .filter("rn = 1")
        .select(
            "sk_id_prev", F.col("cc_utilization_ratio").alias("cc_latest_util_per_prev")
        )
    )

    cc_latest_joined = prev_df.select("sk_id_prev", "sk_id_curr").join(
        cc_latest, on="sk_id_prev", how="left"
    )

    agg_cc_latest = cc_latest_joined.groupBy("sk_id_curr").agg(
        F.avg("cc_latest_util_per_prev").alias("cc_avg_latest_util_per_prev"),
        F.max("cc_latest_util_per_prev").alias("cc_max_latest_util_per_prev"),
        # count how many prev loans have latest util > 0.8 (example threshold)
        F.sum(
            F.expr("case when cc_latest_util_per_prev > 0.8 then 1 else 0 end")
        ).alias("cc_loans_latest_high_util_count"),
    )

    # 6) Join all credit_card aggregates
    cc_features = (
        agg_cc_basic.alias("a")
        .join(agg_cc_per_prev.alias("b"), on="sk_id_curr", how="left")
        .join(agg_cc_latest.alias("c"), on="sk_id_curr", how="left")
        .select(
            "sk_id_curr",
            # from agg_cc_basic
            "cc_record_count",
            "cc_distinct_prev_count",
            "cc_avg_amt_balance",
            "cc_max_amt_balance",
            "cc_min_amt_balance",
            "cc_stddev_amt_balance",
            "cc_avg_amt_credit_limit",
            "cc_max_amt_credit_limit",
            "cc_min_amt_credit_limit",
            "cc_stddev_amt_credit_limit",
            "cc_avg_amt_drawings_current",
            "cc_max_amt_drawings_current",
            "cc_avg_amt_drawings_atm_current",
            "cc_max_amt_drawings_atm_current",
            "cc_avg_cnt_drawings_current",
            "cc_max_cnt_drawings_current",
            "cc_avg_utilization_ratio",
            "cc_max_utilization_ratio",
            "cc_stddev_utilization_ratio",
            "cc_underpayment_months_count",
            "cc_avg_sk_dpd",
            "cc_max_sk_dpd",
            "cc_sk_dpd_nonzero_count",
            "cc_avg_sk_dpd_def",
            "cc_max_sk_dpd_def",
            "cc_sk_dpd_def_nonzero_count",
            "cc_avg_months_balance",
            "cc_max_months_balance",
            "cc_min_months_balance",
            "cc_stddev_months_balance",
            "cc_nonzero_balance_count",
            # from agg_cc_per_prev
            "cc_avg_of_prev_max_util",
            "cc_max_of_prev_max_util",
            "cc_avg_of_prev_avg_util",
            "cc_sum_prev_zero_payment_months",
            "cc_max_prev_zero_payment_months",
            # from agg_cc_latest
            "cc_avg_latest_util_per_prev",
            "cc_max_latest_util_per_prev",
            "cc_loans_latest_high_util_count",
        )
    )

    return cc_features


# -----------------------------------------------------------------------------
# 4. installments_payments ↦ per‐applicant features
# -----------------------------------------------------------------------------
def get_inst_features(prev_df: DataFrame, inst_df: DataFrame) -> DataFrame:
    """
    Input:
        prev_df: DataFrame from previous_application.csv (sk_id_prev → sk_id_curr)
        inst_df: DataFrame loaded from installments_payments.csv, with columns including:
            - sk_id_prev (int)
            - num_instalment_version (int)
            - num_instalment_number (int)
            - days_instalment (int)
            - days_entry_payment (int)
            - amt_instalment (double)
            - amt_payment (double)
    Output:
        DataFrame with one row per sk_id_curr, containing aggregated installment‐behavior metrics.
    """
    # 1) Join previous_application to installments_payments on sk_id_prev
    inst_joined = prev_df.select("sk_id_prev", "sk_id_curr").join(
        inst_df, on=["sk_id_curr", "sk_id_prev"], how="left"
    )

    # 2) Compute amount difference (amt_payment - amt_instalment) once
    inst_with_diff = (
        inst_joined.withColumn(
            "inst_amount_diff", F.col("amt_payment") - F.col("amt_instalment")
        )
        .withColumn(
            "inst_overdue_days",
            F.expr(
                "case when days_entry_payment > days_instalment then days_entry_payment - days_instalment else 0 end"
            ),
        )
        .withColumn(
            "inst_early_days",
            F.expr(
                "case when days_entry_payment < days_instalment then days_instalment - days_entry_payment else 0 end"
            ),
        )
        .withColumn(
            "inst_is_late",
            F.expr("case when days_entry_payment > days_instalment then 1 else 0 end"),
        )
        .withColumn(
            "inst_is_early",
            F.expr("case when days_entry_payment < days_instalment then 1 else 0 end"),
        )
        .withColumn(
            "inst_is_on_time",
            F.expr("case when days_entry_payment = days_instalment then 1 else 0 end"),
        )
        .withColumn(
            "inst_large_underpayment",
            F.expr("case when amt_payment < 0.5 * amt_instalment then 1 else 0 end"),
        )
    )

    # 3) Basic aggregates on inst_with_diff
    agg_inst_basic = inst_with_diff.groupBy("sk_id_curr").agg(
        # ---- Record Counts ----
        F.count("sk_id_prev").alias("inst_record_count"),
        F.countDistinct("sk_id_prev").alias("inst_distinct_prev_count"),
        # ---- Num Instalment Stats ----
        F.avg("num_instalment_number").alias("inst_avg_num_instalment_number"),
        F.max("num_instalment_number").alias("inst_max_num_instalment_number"),
        F.min("num_instalment_number").alias("inst_min_num_instalment_number"),
        F.stddev("num_instalment_number").alias("inst_stddev_num_instalment_number"),
        F.avg("num_instalment_version").alias("inst_avg_num_instalment_version"),
        F.max("num_instalment_version").alias("inst_max_num_instalment_version"),
        F.min("num_instalment_version").alias("inst_min_num_instalment_version"),
        F.stddev("num_instalment_version").alias("inst_stddev_num_instalment_version"),
        # ---- Days Stats ----
        F.avg("days_instalment").alias("inst_avg_due_day"),
        F.max("days_instalment").alias("inst_max_due_day"),
        F.min("days_instalment").alias("inst_min_due_day"),
        F.stddev("days_instalment").alias("inst_stddev_due_day"),
        F.avg("days_entry_payment").alias("inst_avg_pay_day"),
        F.max("days_entry_payment").alias("inst_max_pay_day"),
        F.min("days_entry_payment").alias("inst_min_pay_day"),
        F.stddev("days_entry_payment").alias("inst_stddev_pay_day"),
        # ---- Payment & Due Amounts ----
        F.sum("amt_payment").alias("inst_sum_amt_payment"),
        F.sum("amt_instalment").alias("inst_sum_amt_instalment"),
        F.avg("inst_amount_diff").alias("inst_avg_amount_diff"),
        F.max("inst_amount_diff").alias("inst_max_amount_diff"),
        F.min("inst_amount_diff").alias("inst_min_amount_diff"),
        F.stddev("inst_amount_diff").alias("inst_stddev_amount_diff"),
        F.sum(
            F.when(F.col("inst_amount_diff") < 0, F.col("inst_amount_diff")).otherwise(
                0
            )
        ).alias("inst_sum_amount_diff_when_negative"),
        F.sum(
            F.when(F.col("inst_amount_diff") > 0, F.col("inst_amount_diff")).otherwise(
                0
            )
        ).alias("inst_sum_amount_diff_when_positive"),
        # ---- Late/Early/On‐Time Counts ----
        F.sum("inst_is_late").alias("inst_count_late_payment"),
        F.sum("inst_is_early").alias("inst_count_early_payment"),
        F.sum("inst_is_on_time").alias("inst_count_on_time_payment"),
        F.sum("inst_large_underpayment").alias("inst_count_large_underpayment"),
        # ---- Overdue/Early Days Stats ----
        F.sum("inst_overdue_days").alias("inst_sum_overdue_days"),
        F.avg("inst_overdue_days").alias("inst_avg_overdue_days"),
        F.max("inst_overdue_days").alias("inst_max_overdue_days"),
        F.sum("inst_early_days").alias("inst_sum_early_days"),
        F.avg("inst_early_days").alias("inst_avg_early_days"),
        F.max("inst_early_days").alias("inst_max_early_days"),
    )

    # 4) Compute ratios from above counts
    agg_inst = agg_inst_basic.select(
        "sk_id_curr",
        "inst_record_count",
        "inst_distinct_prev_count",
        "inst_avg_num_instalment_number",
        "inst_max_num_instalment_number",
        "inst_min_num_instalment_number",
        "inst_stddev_num_instalment_number",
        "inst_avg_num_instalment_version",
        "inst_max_num_instalment_version",
        "inst_min_num_instalment_version",
        "inst_stddev_num_instalment_version",
        "inst_avg_due_day",
        "inst_max_due_day",
        "inst_min_due_day",
        "inst_stddev_due_day",
        "inst_avg_pay_day",
        "inst_max_pay_day",
        "inst_min_pay_day",
        "inst_stddev_pay_day",
        "inst_sum_amt_payment",
        "inst_sum_amt_instalment",
        "inst_avg_amount_diff",
        "inst_max_amount_diff",
        "inst_min_amount_diff",
        "inst_stddev_amount_diff",
        "inst_sum_amount_diff_when_negative",
        "inst_sum_amount_diff_when_positive",
        "inst_count_late_payment",
        "inst_count_early_payment",
        "inst_count_on_time_payment",
        "inst_count_large_underpayment",
        "inst_sum_overdue_days",
        "inst_avg_overdue_days",
        "inst_max_overdue_days",
        "inst_sum_early_days",
        "inst_avg_early_days",
        "inst_max_early_days",
        # Derived ratios:
        (
            F.expr(
                "case when inst_record_count > 0 then inst_count_late_payment / inst_record_count else 0 end"
            )
        ).alias("inst_ratio_late_payment"),
        (
            F.expr(
                "case when inst_record_count > 0 then inst_count_early_payment / inst_record_count else 0 end"
            )
        ).alias("inst_ratio_early_payment"),
        (
            F.expr(
                "case when inst_record_count > 0 then inst_count_on_time_payment / inst_record_count else 0 end"
            )
        ).alias("inst_ratio_on_time_payment"),
        (
            F.expr(
                "case when inst_sum_amt_instalment > 0 then inst_sum_amt_payment / inst_sum_amt_instalment else 0 end"
            )
        ).alias("inst_ratio_paid_to_due"),
    )

    # 5) Per‐previous‐loan late/overdue aggregation
    inst_per_prev = inst_with_diff.groupBy("sk_id_prev").agg(
        F.sum("inst_is_late").alias("prev_inst_late_count"),
        F.sum("inst_overdue_days").alias("prev_inst_sum_overdue_days"),
        F.sum("inst_is_early").alias("prev_inst_early_count"),
        F.sum("inst_early_days").alias("prev_inst_sum_early_days"),
        F.sum(
            F.expr("case when inst_amount_diff < 0 then inst_amount_diff else 0 end")
        ).alias("prev_inst_sum_amount_diff_negative"),
        F.sum(
            F.expr("case when inst_amount_diff > 0 then inst_amount_diff else 0 end")
        ).alias("prev_inst_sum_amount_diff_positive"),
        F.avg("inst_amount_diff").alias("prev_inst_avg_amount_diff"),
    )

    inst_prev_joined = prev_df.select("sk_id_prev", "sk_id_curr").join(
        inst_per_prev, on="sk_id_prev", how="left"
    )

    agg_inst_per_prev = inst_prev_joined.groupBy("sk_id_curr").agg(
        F.avg("prev_inst_late_count").alias("inst_avg_late_count_per_prev"),
        F.max("prev_inst_late_count").alias("inst_max_late_count_per_prev"),
        F.sum("prev_inst_late_count").alias("inst_sum_late_count_all_prev"),
        F.avg("prev_inst_sum_overdue_days").alias("inst_avg_sum_overdue_per_prev"),
        F.max("prev_inst_sum_overdue_days").alias("inst_max_sum_overdue_per_prev"),
        F.sum("prev_inst_sum_overdue_days").alias("inst_sum_overdue_all_prev"),
        F.avg("prev_inst_sum_amount_diff_negative").alias(
            "inst_avg_sum_amount_diff_negative_per_prev"
        ),
        F.max("prev_inst_sum_amount_diff_negative").alias(
            "inst_max_sum_amount_diff_negative_per_prev"
        ),
        F.avg("prev_inst_sum_amount_diff_positive").alias(
            "inst_avg_sum_amount_diff_positive_per_prev"
        ),
        F.max("prev_inst_sum_amount_diff_positive").alias(
            "inst_max_sum_amount_diff_positive_per_prev"
        ),
        F.avg("prev_inst_avg_amount_diff").alias("inst_avg_amount_diff_per_prev"),
    )

    agg_inst_per_prev = agg_inst_per_prev.select(
        "sk_id_curr",
        "inst_avg_late_count_per_prev",
        "inst_max_late_count_per_prev",
        "inst_sum_late_count_all_prev",
        "inst_avg_sum_overdue_per_prev",
        "inst_max_sum_overdue_per_prev",
        "inst_sum_overdue_all_prev",
        "inst_avg_sum_amount_diff_negative_per_prev",
        "inst_max_sum_amount_diff_negative_per_prev",
        "inst_avg_sum_amount_diff_positive_per_prev",
        "inst_max_sum_amount_diff_positive_per_prev",
        "inst_avg_amount_diff_per_prev",
    )

    # 6) Join everything together
    inst_features = (
        agg_inst.alias("a")
        .join(agg_inst_per_prev.alias("b"), on="sk_id_curr", how="left")
        .select(
            "sk_id_curr",
            # from agg_inst
            "inst_record_count",
            "inst_distinct_prev_count",
            "inst_avg_num_instalment_number",
            "inst_max_num_instalment_number",
            "inst_min_num_instalment_number",
            "inst_stddev_num_instalment_number",
            "inst_avg_num_instalment_version",
            "inst_max_num_instalment_version",
            "inst_min_num_instalment_version",
            "inst_stddev_num_instalment_version",
            "inst_avg_due_day",
            "inst_max_due_day",
            "inst_min_due_day",
            "inst_stddev_due_day",
            "inst_avg_pay_day",
            "inst_max_pay_day",
            "inst_min_pay_day",
            "inst_stddev_pay_day",
            "inst_sum_amt_payment",
            "inst_sum_amt_instalment",
            "inst_avg_amount_diff",
            "inst_max_amount_diff",
            "inst_min_amount_diff",
            "inst_stddev_amount_diff",
            "inst_count_late_payment",
            "inst_count_early_payment",
            "inst_count_on_time_payment",
            "inst_count_large_underpayment",
            "inst_sum_overdue_days",
            "inst_avg_overdue_days",
            "inst_max_overdue_days",
            "inst_sum_early_days",
            "inst_avg_early_days",
            "inst_max_early_days",
            "inst_ratio_late_payment",
            "inst_ratio_early_payment",
            "inst_ratio_on_time_payment",
            "inst_ratio_paid_to_due",
            "inst_sum_amount_diff_when_negative",
            "inst_sum_amount_diff_when_positive",
            # from agg_inst_per_prev
            "inst_avg_late_count_per_prev",
            "inst_max_late_count_per_prev",
            "inst_sum_late_count_all_prev",
            "inst_avg_sum_overdue_per_prev",
            "inst_max_sum_overdue_per_prev",
            "inst_sum_overdue_all_prev",
            "inst_avg_sum_amount_diff_negative_per_prev",
            "inst_max_sum_amount_diff_negative_per_prev",
            "inst_avg_sum_amount_diff_positive_per_prev",
            "inst_max_sum_amount_diff_positive_per_prev",
            "inst_avg_amount_diff_per_prev",
        )
    )

    return inst_features


# -----------------------------------------------------------------------------
# 5. Helper: assemble all features into one wide table
# -----------------------------------------------------------------------------
def assemble_all_features(
    prev_df: DataFrame, pos_df: DataFrame, cc_df: DataFrame, inst_df: DataFrame
) -> DataFrame:
    """
    Calls each feature‐extraction function above and joins the resulting DataFrames
    on sk_id_curr (left join, so if one module has no data for an applicant, we fill with nulls).
    Finally, you can .na.fill(0) to replace any missing values if desired.

    Returns:
        final_df: DataFrame with one row per sk_id_curr and ALL features.
    """
    prev_feat = get_prev_app_features(prev_df)
    pos_feat = get_pos_features(prev_df, pos_df)
    cc_feat = get_cc_features(prev_df, cc_df)
    inst_feat = get_inst_features(prev_df, inst_df)

    # Join all on sk_id_curr
    final_df = (
        prev_feat.alias("prev_app")
        .join(pos_feat.alias("pos_cash"), on="sk_id_curr", how="left")
        .join(cc_feat.alias("cc"), on="sk_id_curr", how="left")
        .join(inst_feat.alias("inst"), on="sk_id_curr", how="left")
    )

    # Optionally, fill nulls with 0 for numeric features:
    # final_df = final_df.na.fill(0)

    return final_df

In [None]:
final_features_df = assemble_all_features(prev_df, pos_df, cc_df, inst_df)
final_features_df = final_features_df.cache()

In [None]:
final_features_df.printSchema()

In [None]:
final_features_df.show(5)