## Configuration

In [1]:
import numpy as np
import pandas as pd
import polars as pl
import os
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from datetime import datetime
# finds all the pathnames matching a specified pattern according to Unix shell rules
from glob import glob
# imports Python's garbage collector
import gc

In [2]:
# define a configuration class to hold directory paths
class CFG:
    # root directory where the dataset is stored
    root_dir = '/kaggle/input/home-credit-credit-risk-model-stability'
    # directory where training data files are located
    train_dir = os.path.join(root_dir, 'parquet_files', 'train')
    # directory where test data files are located
    test_dir = os.path.join(root_dir, 'parquet_files', 'test')
# load feature definitions and display
feature_definitions_df = pd.read_csv(os.path.join(CFG.root_dir, "feature_definitions.csv"))
# display the relative infomation of the dataset
display(feature_definitions_df.info())
display(feature_definitions_df.head())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 465 entries, 0 to 464
Data columns (total 2 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   Variable     465 non-null    object
 1   Description  465 non-null    object
dtypes: object(2)
memory usage: 7.4+ KB


None

Unnamed: 0,Variable,Description
0,actualdpd_943P,Days Past Due (DPD) of previous contract (actu...
1,actualdpdtolerance_344P,DPD of client with tolerance.
2,addres_district_368M,District of the person's address.
3,addres_role_871L,Role of person's address.
4,addres_zip_823M,Zip code of the address.


## Data Collection and Preprocessing

In [3]:
#### define data process pipeline
class Pipeline:
    @staticmethod # avoid initializing a class instance
    def set_table_dtypes(df):
        # iterate over each column in the DataFrame
        for col in df.columns:
            # if column is in the specified list, cast it to 'Int64' 
            if col in ["case_id", "WEEK_NUM", "num_group1", "num_group2"]:
                df[col] = df[col].astype('Int64')
            # if column is 'date_decision', convert it to datetime
            elif col in ["date_decision"]:
                df[col] = pd.to_datetime(df[col])
            # if column name ends with 'P' or 'A', cast it to 'float64'
            elif col[-1] in ("P", "A"):
                df[col] = df[col].astype('float64')
            # if column name ends with 'M', cast it to 'string'
            elif col[-1] in ("M",):
                df[col] = df[col].astype('string')
            # if column name ends with 'D', convert it to datetime
            elif col[-1] in ("D",):
                df[col] = pd.to_datetime(df[col])
                
        return df
    
    @staticmethod
    def handle_dates(df):
        # iterate over each column in the DataFrame
        for col in df.columns:
            # if column name ends with 'D', perform date subtraction and convert to days
            if col[-1] in ("D",):
                df[col] = (df[col] - df["date_decision"]).dt.days

        # drop the 'date_decision' and 'MONTH' columns
        df = df.drop(columns=["date_decision", "MONTH"])

        return df
    
    @staticmethod
    def filter_cols(df):
        # collect columns with more than 95% null values
        cols_to_drop = [col for col in df.columns if col not in ["target", "case_id", "WEEK_NUM"] and df[col].isnull().mean() > 0.95]
        # drop the identified columns from the DataFrame
        df = df.drop(columns=cols_to_drop)
        # collect columns that are not 'target', 'case_id', or 'WEEK_NUM' and have a problematic number of unique values
        cols_to_drop = [col for col in df.columns if col not in ["target", "case_id", "WEEK_NUM"] and df[col].dtype == object and (df[col].nunique() == 1 or df[col].nunique() > 200)]
        # drop the identified columns from the DataFrame
        df = df.drop(columns=cols_to_drop)

        return df

In [4]:
# define a class to perform aggregation operations on a DataFrame based on naming conventions of columns
class Aggregator:
    # define the list of aggregation functions for numerical columns
    num_aggregators = ['max', 'min', 'mean']
    # define the list of aggregation functions for string (categorical) columns
    str_aggregators = ['max', 'min']
    
    # static method to generate a dictionary for aggregating numerical columns
    @staticmethod
    def num_expr(df):
        # identify numerical columns by checking if the last letter of the column name is 'P' or 'A'
        num_cols = [col for col in df.columns if col[-1] in ("P", "A")]
        # create a dictionary where column names map to the defined list of numeric aggregation functions
        expr_all = {col: Aggregator.num_aggregators for col in num_cols}
        return expr_all

    @staticmethod
    def date_expr(df):
        # identify date columns by checking if the last letter of the column name is 'D'
        date_cols = [col for col in df.columns if col[-1] in ("D",)]
        # create a dictionary where column names map to the defined list of numeric aggregation functions
        # assuming that the same numeric aggregations apply to date columns
        expr_all = {col: Aggregator.num_aggregators for col in date_cols}
        return expr_all

    @staticmethod
    def str_expr(df):
        # identify string columns by checking if the last letter of the column name is 'M'
        str_cols = [col for col in df.columns if col[-1] in ("M",)]
        # create a dictionary where column names map to the defined list of string aggregation functions
        expr_all = {col: Aggregator.str_aggregators for col in str_cols}
        return expr_all

    @staticmethod
    def get_exprs(df):
        # merge dictionaries from numerical, date, and string expressions
        exprs = {**Aggregator.num_expr(df),
                 **Aggregator.date_expr(df),
                 **Aggregator.str_expr(df)}
        return exprs

    @staticmethod
    def aggregate(df, groupby_cols):
        # get the combined dictionary of aggregation expressions
        exprs = Aggregator.get_exprs(df)
        # group the DataFrame by the specified columns and apply the aggregation expressions
        aggregated_df = df.groupby(groupby_cols).agg(exprs)
        # return the aggregated DataFrame
        return aggregated_df

## Feature Engineering

In [5]:
#### perform feature engineering on the given dataset
def feature_eng(df_base, depth_0, depth_1, depth_2):
    # create new columns 'month_decision' and 'weekday_decision' by extracting
    # the month and weekday from the 'date_decision' column.
    df_base['month_decision'] = df_base['date_decision'].dt.month
    df_base['weekday_decision'] = df_base['date_decision'].dt.weekday

    # concatenate the lists of DataFrames for iteration
    depth_frames = depth_0 + depth_1 + depth_2

    # join each DataFrame in the concatenated list to df_base
    for i, df in enumerate(depth_frames):
        # create a suffix using the loop index to avoid column name collisions
        suffix = f"_{i}"

        # perform a left join with df_base on the 'case_id' column
        df_base = df_base.merge(df, how="left", on="case_id", suffixes=('', suffix))

    # process the dataset
    df_base = df_base.pipe(Pipeline.handle_dates)

    return df_base

## Prepare Dada Frame

In [6]:
def prepare_df(data_dir, cat_cols=None, mode='train', display_store=False, train_cols=[]):
    print('Collecting data...')
    data_store = {
        'df_base': read_file(data_dir / f'{mode}_base.parquet'),
        'depth_0': [
            read_file(data_dir / f'{mode}_static_cb_0.parquet'),
            read_files(data_dir / f'{mode}_static_0_*.parquet')
        ],
        'depth_1': [
            read_files(data_dir / f'{mode}_applprev_1_*.parquet', 1),
            read_file(data_dir / f'{mode}_tax_registry_a_1.parquet', 1),
            read_file(data_dir / f'{mode}_tax_registry_b_1.parquet', 1),
            read_file(data_dir / f'{mode}_tax_registry_c_1.parquet', 1),
            read_file(data_dir / f'{mode}_credit_bureau_b_1.parquet', 1),
            read_file(data_dir / f'{mode}_other_1.parquet', 1),
            read_file(data_dir / f'{mode}_person_1.parquet')
        ]
    }