# Feature Engineering & Aggregation


# Feature Engineering & Aggregation Overview

This notebook outlines the full pipeline for transforming raw credit data into a structured, feature-rich dataset for modeling. It includes data cleaning, feature engineering, aggregation across multiple sources, and final merging into a unified applicant-level view.


## Main Datasets

- **`application_train` / `application_test`**  
  Core datasets containing applicant-level information and the target variable.  
  These are merged into a single `application` DataFrame for unified processing.


## Supporting Datasets & Relationships

| Dataset                  | Key Column         | Linked To           | Description                             |
|--------------------------|--------------------|----------------------|-----------------------------------------|
| `bureau`                | `SK_ID_CURR`       | Application          | Credit history records per applicant    |
| `bureau_balance`        | `SK_ID_BUREAU`     | Bureau               | Monthly status updates for bureau loans |
| `previous_application`  | `SK_ID_CURR`       | Application          | Historical loan applications            |
| `credit_card_balance`   | `SK_ID_PREV`       | Previous Application | Monthly credit card usage               |
| `installments_payments`| `SK_ID_PREV`       | Previous Application | Installment payment behavior            |
| `POS_CASH_balance`      | `SK_ID_PREV`       | Previous Application | Point-of-sale loan activity             |

## Aggregation Functions

Each child table is aggregated to the applicant level (`SK_ID_CURR`) using statistical summaries and behavioral indicators.

### Bureau + Bureau Balance
- Total bureau records per applicant
- Mean/max of `DAYS_CREDIT`, `CREDIT_DAY_OVERDUE`, `AMT_CREDIT_SUM`
- Most frequent loan status (`STATUS`)
- Duration of bureau record (`MONTHS_BALANCE`)
- Delinquency ratios and closure timing

### Previous Applications
- Count of previous loans
- Approval/refusal rates
- Mean/max of `AMT_APPLICATION`, `AMT_CREDIT`, `DAYS_DECISION`
- Credit-to-application ratio and interest estimation

### Credit Card Balance
- Mean/max of `AMT_BALANCE`, `AMT_PAYMENT_CURRENT`
- Drawing and payment ratios
- Count of late payments (`SK_DPD` > 0)

### Installments Payments
- Mean/max of `AMT_INSTALMENT`, `AMT_PAYMENT`
- Payment-to-installment ratio
- Days past due and early payments
- Count of late payments

### POS Cash Balance
- Mean/max of `MONTHS_BALANCE`, `SK_DPD`, `SK_DPD_DEF`
- Total loan term (`CNT_INSTALMENT` + `CNT_INSTALMENT_FUTURE`)
- Delinquency ratios and behavioral flags


## Feature Engineering Functions

###  From `application`:
- `income_credit_ratio` = `AMT_INCOME_TOTAL` / `AMT_CREDIT`
- `employment_age_ratio` = `DAYS_EMPLOYED` / `DAYS_BIRTH`
- `external_sources_mean` = average of `EXT_SOURCE_1`, `EXT_SOURCE_2`, `EXT_SOURCE_3`
- Interaction features: EXT_SOURCE × EMPLOYED/BIRTH
- Credit and income comparisons: `CREDIT_DIV_ANNUITY`, `INCOME_MINUS_GOODS`
- Behavioral flags: `APP_MISSING_COUNT`, `GOODS_PRICE_POPULAR_TIER`

###  From Aggregated Tables:
- `credit_activity_intensity` = bureau record count / age
- `financial_stress_score` = sum of overdue amounts, late payments
- `application_behavior_score` = frequency and approval rate of previous loans


## 5 Final Merge & Output

- All aggregated features are merged into the `application` DataFrame using `SK_ID_CURR`
- Final dataset includes cleaned, encoded, and engineered features across all sources
- Saved as `data_appliecation_train.parquet`, `data_appliecation_test.parquet` for downstream modeling


##  1. Load Raw Data

In [18]:
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))

import pandas as pd
import polars as pl
import numpy as np
import joblib

from src.utils.eda_utils import (
    reduce_memory_usage_pl
)
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)



In [19]:
application_train = pl.read_csv("../data/application_train.csv")
application_train = reduce_memory_usage_pl(application_train)

application_test = pl.read_csv("../data/application_test.csv")
application_test = reduce_memory_usage_pl(application_test)

bureau_bal = pl.read_csv("../data/bureau_balance.csv")
bureau_bal = reduce_memory_usage_pl(bureau_bal)

bureau = pl.read_csv("../data/bureau.csv")
bureau = bureau.with_columns(
    pl.col("AMT_ANNUITY").cast(pl.Float64).alias("AMT_ANNUITY")
)
bureau = reduce_memory_usage_pl(bureau)

inst_pay_df= pl.read_csv("../data/installments_payments.csv")
inst_pay_df = reduce_memory_usage_pl(inst_pay_df)

prev_app_df= pl.read_csv("../data/previous_application.csv")
prev_app_df = reduce_memory_usage_pl(prev_app_df)


pos_cash= pl.read_csv("../data/POS_CASH_balance.csv")
pos_cash = reduce_memory_usage_pl(pos_cash)

inst_pay_df= pl.read_csv("../data/installments_payments.csv")
inst_pay_df = reduce_memory_usage_pl(inst_pay_df)

card_balance = pl.read_csv("../data/credit_card_balance.csv")
card_balance = reduce_memory_usage_pl(card_balance)

Size before reduction: 289.57 MB
Initial data types: Counter({Float64: 65, Int64: 41, String: 16})
Size after reduction: 111.85 MB
Final data types: Counter({Float32: 65, Int8: 37, Categorical(ordering='physical'): 16, Int32: 2, Int16: 2})
Size before reduction: 45.49 MB
Initial data types: Counter({Float64: 65, Int64: 40, String: 16})
Size after reduction: 17.67 MB
Final data types: Counter({Float32: 65, Int8: 36, Categorical(ordering='physical'): 16, Int32: 2, Int16: 2})
Size before reduction: 442.60 MB
Initial data types: Counter({Int64: 2, String: 1})
Size after reduction: 234.32 MB
Final data types: Counter({Int32: 1, Int8: 1, Categorical(ordering='physical'): 1})
Size before reduction: 233.43 MB
Initial data types: Counter({Float64: 8, Int64: 6, String: 3})
Size after reduction: 101.28 MB
Final data types: Counter({Float32: 8, Int32: 3, Categorical(ordering='physical'): 3, Int16: 2, Int8: 1})
Size before reduction: 830.48 MB
Initial data types: Counter({Float64: 5, Int64: 3})
Siz

## 2. Clean & Engineer Application Data

In [None]:
def clean_applicans_data(df: pl.DataFrame) -> pl.DataFrame:
    """
    Cleans and transforms a Polars DataFrame with application data.
    Applies fixes to birth/employment days, flags anomalies, and removes invalid entries.
    """
    # Convert DAYS_BIRTH to age in years
    df = df.with_columns(
        (pl.col("DAYS_BIRTH") * -1 / 365).round().alias("DAYS_BIRTH")
    )

    df = df.with_columns(
        pl.col("DAYS_EMPLOYED").abs().alias("DAYS_EMPLOYED")
    )

    df = df.with_columns(
        pl.when(pl.col("DAYS_EMPLOYED") == 365243)
          .then(None)
          .otherwise(pl.col("DAYS_EMPLOYED"))
          .alias("DAYS_EMPLOYED")
    )

    df = df.with_columns(
        pl.col("DAYS_EMPLOYED").is_null().alias("YEAR_EMPLOYED_ANOM")
    )


    # Remove invalid gender entries
    df = df.filter(pl.col("CODE_GENDER") != "XNA")
    df = df.filter(pl.col("NAME_INCOME_TYPE") != "Maternity leave")
    df = df.filter(pl.col("NAME_FAMILY_STATUS") != "Unknown")


    return df


def create_new_features_application(df):
    """
    Create new engineered features for the application dataframe, including ratios, products, and missing value counts.
    Args:
        df: polars DataFrame containing application data.
    Returns:
        polars DataFrame with new features added.
    """

    df = df.with_columns(
            pl.mean_horizontal([
                pl.col('EXT_SOURCE_1'),
                pl.col('EXT_SOURCE_2'),
                pl.col('EXT_SOURCE_3')
            ]).alias('EXT_SOURCE_MEAN')
        )

    df = df.with_columns([
        # Missing value count
        pl.concat_list([pl.col(c).is_null().cast(pl.Int32) for c in df.columns]).list.sum().alias('APP_MISSING_COUNT'),

   
        (pl.col('EXT_SOURCE_1') * pl.col('EXT_SOURCE_2') * pl.col('EXT_SOURCE_3')).alias('EXT_SOURCE_PRODUCT'),
        (pl.col('EXT_SOURCE_1') * pl.col('EXT_SOURCE_2')).alias('EXT_SOURCE_1_X_2'),
        (pl.col('EXT_SOURCE_1') * pl.col('EXT_SOURCE_3')).alias('EXT_SOURCE_1_X_3'),
        (pl.col('EXT_SOURCE_2') * pl.col('EXT_SOURCE_3')).alias('EXT_SOURCE_2_X_3'),

        # EXT_SOURCE × EMPLOYED & BIRTH
        (pl.col('EXT_SOURCE_1') * pl.col('DAYS_EMPLOYED')).alias('EXT_SOURCE_1_EMPLOYED'),
        (pl.col('EXT_SOURCE_2') * pl.col('DAYS_EMPLOYED')).alias('EXT_SOURCE_2_EMPLOYED'),
        (pl.col('EXT_SOURCE_3') * pl.col('DAYS_EMPLOYED')).alias('EXT_SOURCE_3_EMPLOYED'),
        (pl.col('EXT_SOURCE_1') / pl.col('DAYS_BIRTH')).alias('EXT_SOURCE_1_BIRTH_RATIO'),
        (pl.col('EXT_SOURCE_2') / pl.col('DAYS_BIRTH')).alias('EXT_SOURCE_2_BIRTH_RATIO'),
        (pl.col('EXT_SOURCE_3') / pl.col('DAYS_BIRTH')).alias('EXT_SOURCE_3_BIRTH_RATIO'),

        # Credit comparisons
        (pl.col('AMT_CREDIT') - pl.col('AMT_GOODS_PRICE')).alias('CREDIT_MINUS_GOODS'),
        (pl.col('AMT_CREDIT') / pl.col('AMT_GOODS_PRICE')).alias('CREDIT_DIV_GOODS'),
        (pl.col('AMT_CREDIT') / pl.col('AMT_ANNUITY')).alias('CREDIT_DIV_ANNUITY'), 
        (pl.col('AMT_CREDIT') / pl.col('AMT_INCOME_TOTAL')).alias('CREDIT_DIV_INCOME'),

        # Income relationships
        (pl.col('AMT_INCOME_TOTAL') / 12.0 - pl.col('AMT_ANNUITY')).alias('INCOME_MONTHLY_MINUS_ANNUITY'),
        (pl.col('AMT_INCOME_TOTAL') / pl.col('AMT_ANNUITY')).alias('INCOME_DIV_ANNUITY'),
        (pl.col('AMT_INCOME_TOTAL') - pl.col('AMT_GOODS_PRICE')).alias('INCOME_MINUS_GOODS'),
        (pl.col('AMT_INCOME_TOTAL') / pl.col('CNT_FAM_MEMBERS')).alias('INCOME_DIV_FAM_SIZE'),

        # Price popularity
        pl.col('AMT_GOODS_PRICE').is_in([225000, 450000, 675000, 900000]).cast(pl.Int32).alias('GOODS_PRICE_POPULAR_TIER_1'),
        pl.col('AMT_GOODS_PRICE').is_in([1125000, 1350000, 1575000, 1800000, 2250000]).cast(pl.Int32).alias('GOODS_PRICE_POPULAR_TIER_2'),

        # Car age ratios
        (pl.col('OWN_CAR_AGE') / pl.col('DAYS_BIRTH')).alias('CAR_AGE_BIRTH_RATIO'),
        (pl.col('OWN_CAR_AGE') / pl.col('DAYS_EMPLOYED')).alias('CAR_AGE_EMPLOYED_RATIO'),

        # Phone change & employment timing
        (pl.col('DAYS_LAST_PHONE_CHANGE') / pl.col('DAYS_BIRTH')).alias('PHONE_CHANGE_BIRTH_RATIO'),
        (pl.col('DAYS_LAST_PHONE_CHANGE') / pl.col('DAYS_EMPLOYED')).alias('PHONE_CHANGE_EMPLOYED_RATIO'),
        (pl.col('DAYS_EMPLOYED') - pl.col('DAYS_BIRTH')).alias('EMPLOYED_MINUS_BIRTH'),
        (pl.col('DAYS_EMPLOYED') / pl.col('DAYS_BIRTH')).alias('EMPLOYED_BIRTH_RATIO'),
    ])
    
    return df


def one_hot_encoder(df, nan_as_category = True):
    """
    Perform one-hot encoding on categorical columns of a pandas DataFrame.
    Args:
        df: pandas DataFrame.
        nan_as_category: bool, whether to treat NaN as a separate category.
    Returns:
        Tuple of encoded DataFrame and list of new columns.
    """
 
    original_columns = list(df.columns)
    categorical_columns = [col for col in df.columns if df[col].dtype == 'category']
    df = pd.get_dummies(df, columns= categorical_columns, dummy_na= nan_as_category)
    new_columns = [c for c in df.columns if c not in original_columns]
    return df, new_columns


def one_hot_encoder_appl(df, nan_as_category=True):
    """
    One-hot encode categorical columns in application data and build encoder map.
    Args:
        df: pandas DataFrame.
        nan_as_category: bool, whether to treat NaN as a separate category.
    Returns:
        Tuple of encoded DataFrame, new columns, and encoder map.
    """
    df = df.copy()
    original_columns = list(df.columns)
    
    # Identify categorical columns
    categorical_columns = [col for col in df.columns if df[col].dtype.name == 'category']
    print("Categorical columns:", categorical_columns)

    # Remove invalid gender entries
    encoder_map = {}
    for col in categorical_columns:
        categories = df[col].cat.categories.tolist()
        if nan_as_category:
            categories = categories + ['NaN']
        encoder_map[col] = categories


    df = pd.get_dummies(df, columns=categorical_columns, dummy_na=nan_as_category)

    new_columns = [c for c in df.columns if c not in original_columns]

    return df, new_columns, encoder_map    

def remove_features(df, remove_features):
    """
    Remove specified features from the DataFrame.
    Args:
        df: DataFrame.
        remove_features: list of column names to remove.
    Returns:
        DataFrame with specified columns dropped.
    """
    return df.drop(remove_features)

def apply_saved_encoding(df, encoder_map, nan_as_category=True):
    """
    Apply saved encoding to DataFrame using encoder map.
    Args:
        df: pandas DataFrame.
        encoder_map: dict mapping columns to categories.
        nan_as_category: bool, whether to treat NaN as a separate category.
    Returns:
        Encoded DataFrame with numerical features.
    """
    df = df.copy()
    encoded_columns = []

    for col, categories in encoder_map.items():
        col_values = df[col].astype(str).fillna('NaN') if nan_as_category else df[col].astype(str)

        for cat in categories:
            encoded_col = f"{col}_{cat}"
            encoded_series = (col_values == cat).astype(int)
            encoded_columns.append(pd.DataFrame({encoded_col: encoded_series}))


    encoded_df = pd.concat(encoded_columns, axis=1)

    numerical_df = df.drop(columns=encoder_map.keys()).select_dtypes(include=['number'])

    final_df = pd.concat([encoded_df, numerical_df], axis=1)

    return final_df    


def application_agg(df, nan_as_category = False):
    """
    Aggregate and encode application data, clean and engineer features, and save encoder map.
    Args:
        df: polars DataFrame.
        nan_as_category: bool, whether to treat NaN as a separate category.
    Returns:
        polars DataFrame with aggregated and encoded features.
    """
    
    df = clean_applicans_data(df)
    df = create_new_features_application(df)

    df_application_pd = df.to_pandas()

    df_application_pd["CODE_GENDER"] = df_application_pd["CODE_GENDER"].replace({'XNA': np.nan})
    df_application_pd["NAME_INCOME_TYPE"] = df_application_pd["NAME_INCOME_TYPE"].replace({'Maternity leave': np.nan})
    df_application_pd['NAME_FAMILY_STATUS'] = df_application_pd['NAME_FAMILY_STATUS'].replace({'Unknown': np.nan})

    df_application, app_cat, encoder_map = one_hot_encoder_appl(df_application_pd, nan_as_category)

    joblib.dump(encoder_map, "../data/encoders/appl_encoder_map_v2.pkl")

    return pl.from_pandas(df_application)


def application_agg_encoding(df:pl, encoder_map):
    """
    Aggregate and encode application data using a provided encoder map.
    Args:
        df: polars DataFrame.
        encoder_map: dict mapping columns to categories.
    Returns:
        polars DataFrame with encoded features.
    """

    df = clean_applicans_data(df)
    df = create_new_features_application(df)

    df_application_pd = df.to_pandas()
    df_application_pd["CODE_GENDER"] = df_application_pd["CODE_GENDER"].replace({'XNA': np.nan})
    df_application_pd["NAME_INCOME_TYPE"] = df_application_pd["NAME_INCOME_TYPE"].replace({'Maternity leave': np.nan})
    df_application_pd['NAME_FAMILY_STATUS'] = df_application_pd['NAME_FAMILY_STATUS'].replace({'Unknown': np.nan})

    df_application = apply_saved_encoding(df_application_pd, encoder_map, nan_as_category=True)

    return pl.from_pandas(df_application)



In [None]:
low_variance = [
        'REGION_POPULATION_RELATIVE','BASEMENTAREA_AVG','YEARS_BEGINEXPLUATATION_AVG','COMMONAREA_AVG','LANDAREA_AVG','LIVINGAPARTMENTS_AVG',
        'NONLIVINGAPARTMENTS_AVG','NONLIVINGAREA_AVG','BASEMENTAREA_MODE','YEARS_BEGINEXPLUATATION_MODE','COMMONAREA_MODE','LANDAREA_MODE','NONLIVINGAPARTMENTS_MODE',
        'NONLIVINGAREA_MODE','BASEMENTAREA_MEDI','YEARS_BEGINEXPLUATATION_MEDI','COMMONAREA_MEDI','LANDAREA_MEDI','LIVINGAPARTMENTS_MEDI','NONLIVINGAPARTMENTS_MEDI',
        'NONLIVINGAREA_MEDI', 'AMT_REQ_CREDIT_BUREAU_HOUR','AMT_REQ_CREDIT_BUREAU_DAY', 'AMT_REQ_CREDIT_BUREAU_WEEK'
       ]
    
weak_correlated =[
        'OBS_30_CNT_SOCIAL_CIRCLE', 'OBS_60_CNT_SOCIAL_CIRCLE','AMT_REQ_CREDIT_BUREAU_QRT',
        'FLAG_DOCUMENT_15','FLAG_OWN_REALTY','NAME_TYPE_SUITE','REG_REGION_NOT_LIVE_REGION','HOUSETYPE_MODE',
        'FLAG_DOCUMENT_2','FLAG_DOCUMENT_9','FLAG_DOCUMENT_11','WEEKDAY_APPR_PROCESS_START','FLAG_DOCUMENT_21',
        'FLAG_DOCUMENT_17','LIVE_REGION_NOT_WORK_REGION','FLAG_DOCUMENT_4','FLAG_MOBIL','FLAG_EMAIL','FLAG_CONT_MOBILE',
        'FLAG_DOCUMENT_5','FLAG_DOCUMENT_7','FLAG_DOCUMENT_12','FLAG_DOCUMENT_10','FLAG_DOCUMENT_19','FLAG_DOCUMENT_20',
         'FLAG_EMP_PHONE'
    ]
REMOVE_APPLICATION_FEATURE =  low_variance + weak_correlated

def main_application_aggragation(df:pl,train=True):
    """
    Main aggregation pipeline for application data, switching between train and test encoding.
    Args:
        df: polars DataFrame.
        train: bool, whether to use training pipeline.
    Returns:
        polars DataFrame with aggregated features.
    """
    df = remove_features(df, REMOVE_APPLICATION_FEATURE)
    if train:
        appl_agg = application_agg(df, nan_as_category=True)
    else:
        encoder_map = joblib.load("../data/encoders/appl_encoder_map.pkl")
        appl_agg = application_agg_encoding(df, encoder_map)

    return  appl_agg

df_application_train = main_application_aggragation(application_train, train=False)
df_application_test = main_application_aggragation(application_test,train=False)


## 3. Aggregate External Datasets

 ### 3.1 Bureau and Bureau_balance

In [22]:
def merge_bureu_balance(
        df_bureau_b,
        df_bureau,
        nan_as_category = True):

        """
        Merge bureau balance and bureau data, perform feature engineering and aggregation for modeling.
        Args:
            df_bureau_b: polars DataFrame for bureau balance.
            df_bureau: polars DataFrame for bureau.
            nan_as_category: bool, whether to treat NaN as a separate category.
        Returns:
            polars DataFrame with aggregated bureau features.
        """
        
        status_grouped = df_bureau_b.group_by('SK_ID_BUREAU').agg([
        pl.col('STATUS').first().alias('Last_status'),
        pl.col('STATUS').last().alias('First_status')
        ])
        df_bureau_b = df_bureau_b.join(status_grouped, on='SK_ID_BUREAU', how='left')

        tmp_month = df_bureau_b.group_by('SK_ID_BUREAU').agg(
        pl.col('MONTHS_BALANCE').last().abs().alias('Month')
        )
        df_bureau_b = df_bureau_b.join(tmp_month, on='SK_ID_BUREAU', how='left')

        tmp_closed = df_bureau_b.filter(pl.col('STATUS') == 'C').group_by('SK_ID_BUREAU').agg(
        pl.col('MONTHS_BALANCE').last().abs().alias('When_closed')
        )
        df_bureau_b = df_bureau_b.join(tmp_closed, on='SK_ID_BUREAU', how='left')

        del tmp_closed, tmp_month

        # Calculate Month_closed_to_end
        df_bureau_b = df_bureau_b.with_columns(
        (pl.col('Month') - pl.col('When_closed')).alias('Month_closed_to_end')
        )
        # DPD counts
        for c in range(6):
            tmp_dpd = df_bureau_b.filter(pl.col('STATUS') == str(c)).group_by('SK_ID_BUREAU').agg(
                  pl.count('MONTHS_BALANCE').alias(f'DPD_{c}_cnt'))  
            df_bureau_b = df_bureau_b.join(tmp_dpd, on='SK_ID_BUREAU', how='left')
            df_bureau_b = df_bureau_b.with_columns(
                  (pl.col(f'DPD_{c}_cnt') / pl.col('Month')).alias(f'DPD_{c} / Month')
                  )
        df_bureau_b = df_bureau_b.with_columns(
              (pl.col('DPD_1_cnt') + pl.col('DPD_2_cnt') + pl.col('DPD_3_cnt') +
              pl.col('DPD_4_cnt') + pl.col('DPD_5_cnt')).alias('Non_zero_DPD_cnt')
        )
        df_bureau_b_pd = df_bureau_b.to_pandas()
        df_bureau_b, bureau_b_cat = one_hot_encoder(df_bureau_b_pd, nan_as_category)
        df_bureau_b = pl.from_pandas(df_bureau_b)

        aggregations = []
        for col in df_bureau_b.columns:
            if col == 'SK_ID_BUREAU':
                   continue
            if col in bureau_b_cat:
                   aggregations.append(pl.col(col).mean().alias(f"{col}_MEAN"))
            else:
                aggregations.extend([
                    pl.col(col).min().alias(f"{col}_MIN"),
                    pl.col(col).max().alias(f"{col}_MAX"),
                    pl.len().alias(f"{col}_SIZE")
              ])

        agg_df = df_bureau_b.group_by('SK_ID_BUREAU').agg(aggregations)
        df_bureau_b_agg = agg_df.rename({col: col.upper() for col in agg_df.columns})

        df_bureau = df_bureau.with_columns([
            pl.when(pl.col('AMT_ANNUITY') > 0.8e8).then(None).otherwise(pl.col('AMT_ANNUITY')).alias('AMT_ANNUITY'),
            pl.when(pl.col('AMT_CREDIT_SUM') > 3e8).then(None).otherwise(pl.col('AMT_CREDIT_SUM')).alias('AMT_CREDIT_SUM'),
            pl.when(pl.col('AMT_CREDIT_SUM_DEBT') > 1e8).then(None).otherwise(pl.col('AMT_CREDIT_SUM_DEBT')).alias('AMT_CREDIT_SUM_DEBT'),
            pl.when(pl.col('AMT_CREDIT_MAX_OVERDUE') > 0.8e8).then(None).otherwise(pl.col('AMT_CREDIT_MAX_OVERDUE')).alias('AMT_CREDIT_MAX_OVERDUE'),
            pl.when(pl.col('DAYS_ENDDATE_FACT') < -10000).then(None).otherwise(pl.col('DAYS_ENDDATE_FACT')).alias('DAYS_ENDDATE_FACT'),
            pl.when(
                  (pl.col('DAYS_CREDIT_UPDATE') > 0) | (pl.col('DAYS_CREDIT_UPDATE') < -40000)
            ).then(None).otherwise(pl.col('DAYS_CREDIT_UPDATE')).alias('DAYS_CREDIT_UPDATE'),
            pl.when(pl.col('DAYS_CREDIT_ENDDATE') < -10000).then(None).otherwise(pl.col('DAYS_CREDIT_ENDDATE')).alias('DAYS_CREDIT_ENDDATE'),
            ])
    
        df_bureau = df_bureau.filter(
             pl.col('DAYS_ENDDATE_FACT') >= pl.col('DAYS_CREDIT')
             )
        drop_cols = [
             'CREDIT_CURRENCY'
            ]
        df_bureau = df_bureau.drop(drop_cols)
        
        past_loan = (
            df_bureau.group_by('SK_ID_CURR').agg(pl.count('SK_ID_BUREAU'))
            .rename({'SK_ID_BUREAU': 'LOAN_COUNT_BUREAU'})
            )
            
        df_bureau = df_bureau.with_columns(
              (pl.col('AMT_CREDIT_SUM') - pl.col('AMT_CREDIT_SUM_DEBT')).alias('CREDIT_SUM_DIFF_DEBT'),
              (pl.col('AMT_CREDIT_SUM') - pl.col('AMT_CREDIT_SUM_LIMIT')).alias('CREDIT_SUM_DIFF_LIMIT'),
              (pl.col('AMT_CREDIT_SUM') - pl.col('AMT_CREDIT_SUM_OVERDUE')).alias('CREDIT_SUM_DIFF_OVERDUE'),
              (pl.col('DAYS_CREDIT') - pl.col('CREDIT_DAY_OVERDUE')).alias('DAYS_DIFF_OVERDUE'),
              (pl.col('DAYS_CREDIT') - pl.col('DAYS_CREDIT_ENDDATE')).alias('DAYS_DIFF_ENDDATE'),
              (pl.col('DAYS_CREDIT') - pl.col('DAYS_ENDDATE_FACT')).alias('DAYS_DIFF_ENDDATE_FACT'),
              (pl.col('DAYS_CREDIT_ENDDATE') - pl.col('DAYS_ENDDATE_FACT')).alias('ENDDATE_DIFF_FACT'),
              (pl.col('DAYS_CREDIT_UPDATE') - pl.col('DAYS_CREDIT_ENDDATE')).alias('UPDATE_DIFF_ENDDATE')
        )
    
        df_bureau_pd = df_bureau.to_pandas()
        df_bureau, bureau_cat = one_hot_encoder(df_bureau_pd, nan_as_category)
        df_bureau = pl.from_pandas(df_bureau)

        df_bureau = df_bureau.join(df_bureau_b_agg, on='SK_ID_BUREAU', how='left')
        df_bureau = df_bureau.drop('SK_ID_BUREAU')
 
        categorical = bureau_cat + bureau_b_cat
 
        aggregations_b = []
        for col in df_bureau.columns:
            if col == 'SK_ID_CURR':
                continue
            if col in categorical:
                aggregations_b.append(pl.col(col).mean().alias(f"{col}_MEAN_"))
        
        aggregations_b.extend( [
            pl.col('DAYS_CREDIT').min().alias('DAYS_CREDIT_MIN'),
            pl.col('DAYS_CREDIT').max().alias('DAYS_CREDIT_MAX'),
            pl.col('DAYS_CREDIT').mean().alias('DAYS_CREDIT_MEAN'),
            pl.col('DAYS_CREDIT').var().alias('DAYS_CREDIT_VAR'),
            pl.col('DAYS_CREDIT_ENDDATE').min().alias('DAYS_CREDIT_ENDDATE_MIN'),
            pl.col('DAYS_CREDIT_ENDDATE').max().alias('DAYS_CREDIT_ENDDATE_MAX'),
            pl.col('DAYS_CREDIT_ENDDATE').mean().alias('DAYS_CREDIT_ENDDATE_MEAN'),
            pl.col('AMT_CREDIT_MAX_OVERDUE').mean().alias('B_AMT_CREDIT_MAX_OVERDUE_MEAN'),
            pl.col('AMT_CREDIT_SUM_DEBT').mean().alias('AMT_CREDIT_SUM_DEBT_MEAN'),
            pl.col('AMT_CREDIT_SUM_DEBT').sum().alias('AMT_CREDIT_SUM_DEBT_SUM'),
            pl.col('AMT_CREDIT_SUM_OVERDUE').mean().alias('AMT_CREDIT_SUM_OVERDUE_MEAN'),
            pl.col('AMT_CREDIT_SUM_LIMIT').mean().alias('AMT_CREDIT_SUM_LIMIT_MEAN'),
            pl.col('AMT_CREDIT_SUM_LIMIT').sum().alias('AMT_CREDIT_SUM_LIMIT_SUM'),
            pl.col('MONTHS_BALANCE_MIN').min().alias('B_MONTHS_BALANCE_MIN_MIN'),
            pl.col('MONTHS_BALANCE_MAX').max().alias('B_MONTHS_BALANCE_MAX_MAX'),
            pl.col('MONTHS_BALANCE_SIZE').mean().alias('MONTHS_BALANCE_SIZE_MEAN'),
            pl.col('MONTHS_BALANCE_SIZE').sum().alias('MONTHS_BALANCE_SIZE_SUM'),
            pl.col('CREDIT_SUM_DIFF_DEBT').mean().alias('CREDIT_SUM_DIFF_DEBT_MEAN'),
            pl.col('CREDIT_SUM_DIFF_LIMIT').mean().alias('CREDIT_SUM_DIFF_LIMIT_MEAN'),
            pl.col('CREDIT_SUM_DIFF_OVERDUE').mean().alias('CREDIT_SUM_DIFF_OVERDUE_MEAN'),
            pl.col('DAYS_DIFF_OVERDUE').mean().alias('DAYS_DIFF_OVERDUE_MEAN'),
            pl.col('DAYS_DIFF_ENDDATE').mean().alias('DAYS_DIFF_ENDDATE_MEAN'),
            pl.col('DAYS_DIFF_ENDDATE_FACT').mean().alias('DAYS_DIFF_ENDDATE_FACT_MEAN'),
            pl.col('ENDDATE_DIFF_FACT').mean().alias('ENDDATE_DIFF_FACT_MEAN'),
            pl.col('UPDATE_DIFF_ENDDATE').mean().alias('UPDATE_DIFF_ENDDATE_MEAN')
  
        ])        
        df_bureau_agg = df_bureau.group_by('SK_ID_CURR').agg(aggregations_b)
        df_bureau_agg = df_bureau_agg.join(past_loan, on='SK_ID_CURR', how='left')
        df_bureau_agg = df_bureau_agg.rename({col: col.upper() for col in df_bureau_agg.columns})
        
        return  df_bureau_agg

In [23]:
df_bureau_agg = reduce_memory_usage_pl(merge_bureu_balance(bureau_bal, bureau))

Size before reduction: 73.62 MB
Initial data types: Counter({Float64: 26, Float32: 15, Int16: 2, Int8: 2, UInt32: 2, Int32: 1})
Size after reduction: 47.02 MB
Final data types: Counter({Float32: 41, Int16: 2, Int8: 2, UInt32: 2, Int32: 1})


- ### 3.2 Previous Appplication

In [24]:

def previous_application(df_prev, nan_as_category= True):
    """
    Aggregate previous application data, engineer new features, and encode categorical columns.
    Args:
        df_prev: polars DataFrame for previous applications.
        nan_as_category: bool, whether to treat NaN as a separate category.
    Returns:
        polars DataFrame with aggregated previous application features.
    """
    df_prev = df_prev.with_columns(
          pl.when(pl.col('AMT_CREDIT') > 6_000_000).then(None).otherwise(pl.col('AMT_CREDIT')).alias('AMT_CREDIT')
          )

    replace_365243 = [
        'DAYS_FIRST_DRAWING', 'DAYS_FIRST_DUE', 'DAYS_LAST_DUE_1ST_VERSION',
        'DAYS_LAST_DUE', 'DAYS_TERMINATION'
    ]
    drop_columns = [
        'SELLERPLACE_AREA',
        'RATE_INTEREST_PRIMARY',
        'NFLAG_INSURED_ON_APPROVAL',
        'NAME_SELLER_INDUSTRY'
    ]

    df_prev = df_prev.drop(drop_columns)

    for col in replace_365243:
        df_prev = df_prev.with_columns(
            pl.when(pl.col(col) == 365243).then(None).otherwise(pl.col(col)).alias(col)
        )
    
    df_prev = df_prev.with_columns([
        (pl.col('AMT_APPLICATION') / pl.col('AMT_CREDIT')).alias('PREV_APP_CREDIT_PERC'),
        (pl.col('AMT_APPLICATION') - pl.col('AMT_CREDIT')).alias('PREV_AMT_DECLINED'),
        (pl.col('AMT_CREDIT') - pl.col('AMT_GOODS_PRICE')).alias('PREV_AMT_CREDIT_GOODS_DIF'),
        (pl.col('CNT_PAYMENT') *  pl.col('AMT_ANNUITY') - pl.col('AMT_CREDIT')).alias('AMT_INTEREST'),
        ])

    df_prev_pd = df_prev.to_pandas()
    df_prev, prev_cat = one_hot_encoder(df_prev_pd, nan_as_category)
    df_prev = pl.from_pandas(df_prev)

    cat_agg = [pl.col(col).mean().alias(f"{col}_MEAN") for col in df_prev[prev_cat].columns]            
    
    num_agg= [
        pl.col('AMT_ANNUITY').min().alias('AMT_ANNUITY_MIN'),
        pl.col('AMT_ANNUITY').max().alias('AMT_ANNUITY_MAX'),
        pl.col('AMT_ANNUITY').mean().alias('AMT_ANNUITY_MEAN'),
        pl.col('AMT_CREDIT').mean().alias('AMT_CREDIT_MEAN'),
        pl.col('PREV_APP_CREDIT_PERC').min().alias('APP_CREDIT_PERC_MIN'),
        pl.col('PREV_APP_CREDIT_PERC').max().alias('APP_CREDIT_PERC_MAX'),
        pl.col('PREV_APP_CREDIT_PERC').mean().alias('APP_CREDIT_PERC_MEAN'),
        pl.col('PREV_APP_CREDIT_PERC').var().alias('APP_CREDIT_PERC_VAR'),
        pl.col('PREV_AMT_DECLINED').min().alias('PREV_AMT_DECLINED_MIN'),
        pl.col('PREV_AMT_DECLINED').max().alias('PREV_AMT_DECLINED_MAX'),
        pl.col('PREV_AMT_DECLINED').mean().alias('PREV_AMT_DECLINED_MEAN'),
        pl.col('AMT_GOODS_PRICE').mean().alias('PREV_AMT_GOODS_PRICE_MEAN'), 
        pl.col('HOUR_APPR_PROCESS_START').min().alias('HOUR_APPR_PROCESS_START_MIN'),
        pl.col('HOUR_APPR_PROCESS_START').max().alias('HOUR_APPR_PROCESS_START_MAX'),
        pl.col('HOUR_APPR_PROCESS_START').mean().alias('HOUR_APPR_PROCESS_START_MEAN'),
        pl.col('RATE_DOWN_PAYMENT').min().alias('RATE_DOWN_PAYMENT_MIN'),
        pl.col('RATE_DOWN_PAYMENT').max().alias('RATE_DOWN_PAYMENT_MAX'),
        pl.col('RATE_DOWN_PAYMENT').mean().alias('RATE_DOWN_PAYMENT_MEAN'),
        pl.col('DAYS_DECISION').min().alias('DAYS_DECISION_MIN'),
        pl.col('DAYS_DECISION').max().alias('DAYS_DECISION_MAX'),
        pl.col('DAYS_DECISION').mean().alias('DAYS_DECISION_MEAN'),
        pl.col('CNT_PAYMENT').mean().alias('PREV_CNT_PAYMENT_MEAN'),
        pl.col('CNT_PAYMENT').sum().alias('PREV_CNT_PAYMENT_SUM'),
    ]
    df_prev_agg = df_prev.group_by('SK_ID_CURR').agg(cat_agg + num_agg)
    df_prev_agg = df_prev_agg.rename({col: col.upper() for col in df_prev_agg.columns})

    return df_prev_agg

In [25]:
df_previous_agg =  reduce_memory_usage_pl(previous_application(prev_app_df))

Size before reduction: 410.89 MB
Initial data types: Counter({Float64: 149, Float32: 17, Int8: 2, Int16: 2, Int32: 1})
Size after reduction: 218.29 MB
Final data types: Counter({Float32: 166, Int8: 2, Int16: 2, Int32: 1})


- ### 3.3 Installments aggregation

In [None]:
def installments_payments_agg(df_ins, nan_as_category = True):
    """
    Aggregate installment payments data, engineer new features, and encode categorical columns.
    Args:
        df_ins: polars DataFrame for installment payments.
        nan_as_category: bool, whether to treat NaN as a separate category.
    Returns:
        polars DataFrame with aggregated installment payment features.
    """
  
    df_ins = df_ins.with_columns(
        pl.when(pl.col('NUM_INSTALMENT_VERSION') > 60).then(None).otherwise(pl.col('NUM_INSTALMENT_VERSION')).alias('NUM_INSTALMENT_VERSION')
        )
    
    df_ins = df_ins.with_columns(
          pl.when(pl.col('DAYS_ENTRY_PAYMENT') < -4000).then(None).otherwise(pl.col('DAYS_ENTRY_PAYMENT')).alias('DAYS_ENTRY_PAYMENT')
          )
    df_ins = df_ins.with_columns([
        (pl.col('DAYS_ENTRY_PAYMENT') - pl.col('AMT_INSTALMENT')).alias('DAYS_PAST_DUE'),
        (pl.col('AMT_INSTALMENT') - pl.col('DAYS_ENTRY_PAYMENT')).alias('DAYS_BEFORE_DUE'),
        (pl.col('AMT_PAYMENT') / pl.col('AMT_INSTALMENT')).alias('PAYMENT_RATIO'),
        (pl.col('DAYS_INSTALMENT') / pl.col('DAYS_ENTRY_PAYMENT')).alias('DAYS_PAYMENT_RATIO'),
        (pl.col('AMT_INSTALMENT') - pl.col('AMT_PAYMENT')).alias('PAYMENT_DIFF')
        ])
    # Ensure only positive values (replace negatives with 0)
    df_ins = df_ins.with_columns([
        pl.when(pl.col('DAYS_PAST_DUE') > 0).then(pl.col('DAYS_PAST_DUE')).otherwise(0).alias('DAYS_PAST_DUE'),
        pl.when(pl.col('DAYS_BEFORE_DUE') > 0).then(pl.col('DAYS_BEFORE_DUE')).otherwise(0).alias('DAYS_BEFORE_DUE')
        ])

    num_agg = [
        pl.col('DAYS_PAST_DUE').max().alias('DAYS_PAST_DUE_MAX'),
        pl.col('DAYS_PAST_DUE').mean().alias('DAYS_PAST_DUE_MEAN'),
        pl.col('DAYS_PAST_DUE').sum().alias('DAYS_PAST_DUE_SUM'),

        pl.col('DAYS_BEFORE_DUE').max().alias('DAYS_BEFORE_DUE_MAX'),
        pl.col('DAYS_BEFORE_DUE').mean().alias('DAYS_BEFORE_DUE_MEAN'),
        pl.col('DAYS_BEFORE_DUE').sum().alias('DAYS_BEFORE_DUE_SUM'),

        pl.col('PAYMENT_RATIO').max().alias('PAYMENT_RATIO_MAX'),
        pl.col('PAYMENT_RATIO').mean().alias('PAYMENT_RATIO_MEAN'),
        pl.col('PAYMENT_RATIO').sum().alias('PAYMENT_RATIO_SUM'),
        pl.col('PAYMENT_RATIO').var().alias('PAYMENT_RATIO_VAR'),

        pl.col('DAYS_PAYMENT_RATIO').max().alias('DAYS_PAYMENT_RATIO_MAX'),
        pl.col('DAYS_PAYMENT_RATIO').mean().alias('DAYS_PAYMENT_RATIO_MEAN'),
        pl.col('DAYS_PAYMENT_RATIO').sum().alias('DAYS_PAYMENT_RATIO_SUM'),
        pl.col('DAYS_PAYMENT_RATIO').var().alias('DAYS_PAYMENT_RATIO_VAR'),

        pl.col('PAYMENT_DIFF').max().alias('PAYMENT_DIFF_MAX'),
        pl.col('PAYMENT_DIFF').mean().alias('PAYMENT_DIFF_MEAN'),
        pl.col('PAYMENT_DIFF').sum().alias('PAYMENT_DIFF_SUM'),
        pl.col('PAYMENT_DIFF').var().alias('PAYMENT_DIFF_VAR'),

        pl.col('AMT_INSTALMENT').mean().alias('AMT_INSTALMENT_MEAN'),
        pl.col('AMT_PAYMENT').mean().alias('AMT_PAYMENT_MEAN'),
     
        pl.col('DAYS_ENTRY_PAYMENT').max().alias('DAYS_ENTRY_PAYMENT_MAX'),
        pl.col('DAYS_ENTRY_PAYMENT').mean().alias('DAYS_ENTRY_PAYMENT_MEAN'),
        pl.col('DAYS_ENTRY_PAYMENT').sum().alias('DAYS_ENTRY_PAYMENT_SUM'),
        ]
    
    df_ins_pd = df_ins.to_pandas()
    df_ins, ins_cat = one_hot_encoder(df_ins_pd, nan_as_category)
    df_ins = pl.from_pandas(df_ins)

    cat_agg = [pl.col(col).mean().alias(f"{col}_MEAN") for col in df_ins[ins_cat].columns]   

    df_ins_agg = df_ins.group_by('SK_ID_CURR').agg(num_agg) 
    df_ins_agg = df_ins_agg.rename({col: col.upper() for col in df_ins_agg.columns})

    return df_ins_agg 

In [37]:
df_installments_agg = reduce_memory_usage_pl(installments_payments_agg(inst_pay_df))

Size before reduction: 31.24 MB
Initial data types: Counter({Float32: 23, Int32: 1})
Size after reduction: 31.24 MB
Final data types: Counter({Float32: 23, Int32: 1})


- ### 3.4 POS_CASH aggregation

In [38]:
def pos_cash_agg(df_pos, nan_as_category = True):
    """
    Aggregate POS cash balance data, engineer new features, and encode categorical columns.
    Args:
        df_pos: polars DataFrame for POS cash balance.
        nan_as_category: bool, whether to treat NaN as a separate category.
    Returns:
        polars DataFrame with aggregated POS cash features.
    """

    df_pos = df_pos.with_columns(
          pl.when(pl.col('CNT_INSTALMENT_FUTURE') > 60).then(None).otherwise(pl.col('CNT_INSTALMENT_FUTURE')).alias('CNT_INSTALMENT_FUTURE')
          )
    df_pos = df_pos.with_columns([
        (pl.col('SK_DPD') / pl.col('SK_DPD_DEF')).alias('APP_CREDIT_PERC'),
        (pl.col('CNT_INSTALMENT') + pl.col('CNT_INSTALMENT_FUTURE')).alias('TOTAL_TERM'),
        (pl.col('CNT_INSTALMENT') > pl.col('CNT_INSTALMENT_FUTURE')).alias('pos_CMT_INSTALMEN_MORE_FUTURE')
        ])

    df_pos_pd = df_pos.to_pandas()
    df_pos, pos_cat = one_hot_encoder(df_pos_pd, nan_as_category)
    df_pos = pl.from_pandas(df_pos)
  
    num_agg = [
        pl.col('MONTHS_BALANCE').max().alias('MONTHS_BALANCE_MAX'),
        pl.col('MONTHS_BALANCE').mean().alias('MONTHS_BALANCE_MEAN'),
        pl.col('MONTHS_BALANCE').len().alias('MONTHS_BALANCE_SIZE'),

        pl.col('SK_DPD').max().alias('SK_DPD_MAX'),
        pl.col('SK_DPD').mean().alias('SK_DPD_MEAN'),
        
        pl.col('SK_DPD_DEF').max().alias('SK_DPD_DEF_MAX'),
        pl.col('SK_DPD_DEF').mean().alias('SK_DPD_DEF_MEAN'),
    ]

    cat_agg = [pl.col(col).mean().alias(f"{col}_MEAN") for col in df_pos[pos_cat].columns]   

    df_pos_agg = df_pos.group_by('SK_ID_CURR').agg(cat_agg + num_agg) 
    df_pos_agg = df_pos_agg.rename({col: col.upper() for col in df_pos_agg.columns})

    pos_cash_count = df_pos_agg.group_by('SK_ID_CURR').len()
    df_pos_agg = df_pos_agg.join(pos_cash_count, on='SK_ID_CURR', how='left')
    
    return  df_pos_agg

In [39]:
df_pos_cash_agg = reduce_memory_usage_pl(pos_cash_agg(pos_cash))

Size before reduction: 38.92 MB
Initial data types: Counter({Float64: 13, UInt32: 2, Int16: 2, Int32: 1, Int8: 1})
Size after reduction: 22.19 MB
Final data types: Counter({Float32: 13, UInt32: 2, Int16: 2, Int32: 1, Int8: 1})


- ### 3.5  Credit Card Balance Aggregation

In [40]:
def credit_card_balance_agg(df_card, nan_as_category = True):
    """
    Aggregate credit card balance data, engineer new features, and encode categorical columns.
    Args:
        df_card: polars DataFrame for credit card balance.
        nan_as_category: bool, whether to treat NaN as a separate category.
    Returns:
        polars DataFrame with aggregated credit card balance features.
    """

    df_card = df_card.with_columns([
        pl.when(pl.col('AMT_PAYMENT_CURRENT') > 4_000_000)
        .then(None)
        .otherwise(pl.col('AMT_PAYMENT_CURRENT'))
        .alias('AMT_PAYMENT_CURRENT'),

        pl.when(pl.col('AMT_CREDIT_LIMIT_ACTUAL') > 1_000_000)
        .then(None)
        .otherwise(pl.col('AMT_CREDIT_LIMIT_ACTUAL'))
        .alias('AMT_CREDIT_LIMIT_ACTUAL')
    ])
                                    
    df_card = df_card.with_columns([pl.concat_list([pl.col(col).is_null().cast(pl.Int32) for col in df_card.columns]).list.sum().alias('card missing')])

    df_card = df_card.with_columns([
        (pl.col('AMT_DRAWINGS_ATM_CURRENT') +
        pl.col('AMT_DRAWINGS_CURRENT') +
        pl.col('AMT_DRAWINGS_OTHER_CURRENT') +
        pl.col('AMT_DRAWINGS_POS_CURRENT')).alias('CARD_AMT_DRAWING_SUM'),

        # Balance-to-limit ratio
        (pl.col('AMT_BALANCE') / (pl.col('AMT_CREDIT_LIMIT_ACTUAL') + 0.00001)).alias('CARD_BALANCE_LIMIT_RATIO'),

        # Total drawing count
        (pl.col('CNT_DRAWINGS_ATM_CURRENT') +
        pl.col('CNT_DRAWINGS_CURRENT') +
        pl.col('CNT_DRAWINGS_OTHER_CURRENT') +
        pl.col('CNT_DRAWINGS_POS_CURRENT') +
        pl.col('CNT_INSTALMENT_MATURE_CUM')).alias('CARD_CNT_DRAWING_SUM'),

        (pl.col('AMT_PAYMENT_CURRENT') / (pl.col('AMT_INST_MIN_REGULARITY') + 0.0001)).alias('CARD_MIN_PAYMENT_RATIO'),
        (pl.col('AMT_PAYMENT_CURRENT') - pl.col('AMT_INST_MIN_REGULARITY')).alias('CARD_PAYMENT_MIN_DIFF'),
        (pl.col('AMT_PAYMENT_TOTAL_CURRENT') / (pl.col('AMT_INST_MIN_REGULARITY') + 0.00001)).alias('CARD_MIN_PAYMENT_TOTAL_RATIO'),
        (pl.col('AMT_PAYMENT_TOTAL_CURRENT') - pl.col('AMT_INST_MIN_REGULARITY')).alias('CARD_PAYMENT_MIN_TOTAL_DIFF'),
        (pl.col('AMT_TOTAL_RECEIVABLE') - pl.col('AMT_RECEIVABLE_PRINCIPAL')).alias('CARD_INTEREST_RECEIVABLE'),
        (pl.col('SK_DPD') / (pl.col('SK_DPD_DEF') + 0.00001)).alias('CARD_DPD_RATIO')
    ])

    
    drop_columns = [
       'AMT_TOTAL_RECEIVABLE',
       'AMT_PAYMENT_TOTAL_CURRENT',
       'AMT_RECEIVABLE_PRINCIPAL',
       'AMT_RECIVABLE',
       'AMT_DRAWINGS_ATM_CURRENT',
       'AMT_DRAWINGS_CURRENT',
    ]

    df_card= df_card.drop(drop_columns)

    df_card_pd = df_card.to_pandas()
    df_card, card_cat = one_hot_encoder(df_card_pd, nan_as_category)
    df_card = pl.from_pandas(df_card)

    aggregations = []
    for col in df_card.columns:
        if col == 'SK_ID_PREV' or col == 'SK_ID_CURR':
                continue
        if col in card_cat:
                aggregations.append(pl.col(col).mean().alias(f"card_{col}_MEAN+"))
        else:
            aggregations.extend([
                pl.col(col).min().alias(f"card_{col}_MIN"),
                pl.col(col).max().alias(f"card_{col}_MAX"),
                pl.col(col).len().alias(f"card_{col}_SIZE"),
                pl.col(col).mean().alias(f"card_{col}_MEAN_")
        ])

    df_card_agg = df_card.group_by('SK_ID_CURR').agg(aggregations)
    df_card_agg = df_card_agg.rename({col: col.upper() for col in df_card_agg.columns})
  
    return df_card_agg

In [41]:
df_credit_card_agg = reduce_memory_usage_pl(credit_card_balance_agg(card_balance))

Size before reduction: 47.61 MB
Initial data types: Counter({Float32: 51, UInt32: 24, Float64: 19, Int16: 6, Int32: 3, Int8: 2})
Size after reduction: 39.41 MB
Final data types: Counter({Float32: 70, UInt32: 24, Int8: 5, Int16: 5, Int32: 1})


## 4. Merge All Aggregated Features

In [32]:
# Merge on SK_ID_CURR
df_merged_train = df_application_train \
    .join(df_bureau_agg, on="SK_ID_CURR", how="left") \
    .join(df_previous_agg, on="SK_ID_CURR", how="left") \
    .join(df_installments_agg, on="SK_ID_CURR", how="left") \
    .join(df_credit_card_agg, on="SK_ID_CURR", how="left") \
    .join(df_pos_cash_agg, on="SK_ID_CURR", how="left")


In [33]:
df_merged_test = df_application_test \
    .join(df_bureau_agg, on="SK_ID_CURR", how="left") \
    .join(df_previous_agg, on="SK_ID_CURR", how="left") \
    .join(df_installments_agg, on="SK_ID_CURR", how="left") \
    .join(df_credit_card_agg, on="SK_ID_CURR", how="left") \
    .join(df_pos_cash_agg, on="SK_ID_CURR", how="left")

## 5. Final Checks & Export

In [34]:
# df_bureau_agg.write_parquet("../data/cache/bureau_balance_agg.parquet")
# df_previous_agg.write_parquet("../data/cache/previous_application_agg.parquet")
# df_installments_agg.write_parquet("../data/cache/installments_agg.parquet")
# df_pos_cash_agg.write_parquet("../data/cache/pos_cash_agg.parquet")
# df_credit_card_agg.write_parquet("../data/cache/credit_card_agg.parquet")

In [35]:
print(df_merged_train.shape)
print(df_merged_test.shape)
df_merged_train.write_parquet("../data/processed/data_aggregated_train.parquet")
df_merged_test.write_parquet("../data/processed/data_aggregated_test_v1.parquet")

(307500, 580)
(48744, 579)
