## Libraries

In [2]:
import os
import gc
import time
import numpy as np
import pandas as pd
from contextlib import contextmanager
import multiprocessing as mp
from functools import partial
from scipy.stats import kurtosis, iqr, skew
from lightgbm import LGBMClassifier
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import KFold, StratifiedKFold
from sklearn.metrics import roc_auc_score
from glob import glob
from pathlib import Path
from datetime import datetime
import polars as pl
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import StratifiedGroupKFold
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.metrics import roc_auc_score 
from sklearn.metrics import roc_curve, auc
from tqdm.notebook import tqdm
import joblib
import lightgbm as lgb
import warnings

warnings.simplefilter(action='ignore', category=FutureWarning)


In [31]:
# GENERAL CONFIGURATIONS
NUM_THREADS = 4
# DATA_DIRECTORY = "/kaggle/input/home-credit-credit-risk-model-stability/parquet_files/"
DATA_DIRECTORY = "./parquet_files/"

SUBMISSION_SUFIX = "_model2_0"
# LIGHTGBM CONFIGURATION AND HYPER-PARAMETERS
GENERATE_SUBMISSION_FILES = True
STRATIFIED_KFOLD = False
RANDOM_SEED = 737851
NUM_FOLDS = 10
EARLY_STOPPING = 100
ROOT            = Path("/kaggle/input/home-credit-credit-risk-model-stability")

LIGHTGBM_PARAMS = {
    'boosting_type': 'goss',
    'n_estimators': 10000,
    'learning_rate': 0.005134,
    'num_leaves': 54,
    'max_depth': 10,
    'subsample_for_bin': 240000,
    'reg_alpha': 0.436193,
    'reg_lambda': 0.479169,
    'colsample_bytree': 0.508716,
    'min_split_gain': 0.024766,
    'subsample': 1,
    'is_unbalance': False,
    'silent':-1,
    'verbose':-1,
    
}

## Main function

In [10]:
def main(debug= False):
    num_rows = 30000 if debug else None
    with timer("Test base"):
        df = get_base(DATA_DIRECTORY, num_rows=num_rows)
        print("Test base dataframe shape:", df.shape)

    with timer("Test static"):
        df_static = get_static(DATA_DIRECTORY, num_rows=num_rows)
        df = df.join(df_static, on='case_id', how='left', suffix='_static')
        print("Test static dataframe shape:", df_static.shape)
        del df_static
        gc.collect()

    with timer("Test static_cb"):
        df_static_cb = get_static_cb(DATA_DIRECTORY, num_rows=num_rows)
        df = df.join(df_static_cb, on='case_id', how='left', suffix='_static_cb')
        print("Test static cb dataframe shape:", df_static_cb.shape)
        del df_static_cb
        gc.collect()

    with timer("Previous applications depth 1 test"):
        df_applprev1 = get_applprev1(DATA_DIRECTORY, num_rows=num_rows)
        df = df.join(df_applprev1, on='case_id', how='left', suffix='_applprev1')
        print("Previous applications depth 1 test dataframe shape:", df_applprev1.shape)
        del df_applprev1
        gc.collect()

    with timer("Previous applications depth 2 test"):
        df_applprev2 = get_applprev2(DATA_DIRECTORY, num_rows=num_rows)
        df = df.join(df_applprev2, on='case_id', how='left', suffix='_applprev2')
        print("Previous applications depth 2 test dataframe shape:", df_applprev2.shape)
        del df_applprev2
        gc.collect()

    with timer("Person depth 1 test"):
        df_person1 = get_person1(DATA_DIRECTORY, num_rows=num_rows)
        df = df.join(df_person1, on='case_id', how='left', suffix='_person1')
        print("Person depth 1 test dataframe shape:", df_person1.shape)
        del df_person1
        gc.collect()

    with timer("Person depth 2 test"):
        df_person2 = get_person2(DATA_DIRECTORY, num_rows=num_rows)
        df = df.join(df_person2, on='case_id', how='left', suffix='_person2')
        print("Person depth 2 test dataframe shape:", df_person2.shape)
        del df_person2
        gc.collect()

    with timer("Other test"):
        df_other = get_other(DATA_DIRECTORY, num_rows=num_rows)
        df = df.join(df_other, on='case_id', how='left', suffix='_other')
        print("Other test dataframe shape:", df_other.shape)
        del df_other
        gc.collect()

    with timer("Debit card test"):
        df_debitcard = get_debitcard(DATA_DIRECTORY, num_rows=num_rows)
        df = df.join(df_debitcard, on='case_id', how='left', suffix='_debitcard')
        print("Debit card test dataframe shape:", df_debitcard.shape)
        del df_debitcard
        gc.collect()

    with timer("Tax registry a test"):
        df_tax_registry_a = get_tax_registry_a(DATA_DIRECTORY, num_rows=num_rows)
        df = df.join(df_tax_registry_a, on='case_id', how='left', suffix='_tax_registry_a')
        print("Tax registry a test dataframe shape:", df_tax_registry_a.shape)
        del df_tax_registry_a
        gc.collect()

    with timer("Tax registry b test"):
        df_tax_registry_b = get_tax_registry_b(DATA_DIRECTORY, num_rows=num_rows)
        df = df.join(df_tax_registry_b, on='case_id', how='left', suffix='_tax_registry_b')
        print("Tax registry b test dataframe shape:", df_tax_registry_b.shape)
        del df_tax_registry_b
        gc.collect()

    with timer("Tax registry c test"):
        df_tax_registry_c = get_tax_registry_c(DATA_DIRECTORY, num_rows=num_rows)
        df = df.join(df_tax_registry_c, on='case_id', how='left', suffix='_tax_registry_c')
        print("Tax registry c test dataframe shape:", df_tax_registry_c.shape)
        del df_tax_registry_c
        gc.collect()
    '''
    with timer("Credit bureau a 1 test"):
        df_credit_bureau_a_1 = get_credit_bureau_a_1(DATA_DIRECTORY, num_rows=num_rows)
        df = df.join(df_credit_bureau_a_1, on='case_id', how='left', suffix='_cb_a_1')
        print("Credit bureau a 1 test dataframe shape:", df_credit_bureau_a_1.shape)
        del df_credit_bureau_a_1
        gc.collect()
        '''

    with timer("Credit bureau b 1 test"):
        df_credit_bureau_b_1 = get_credit_bureau_b_1(DATA_DIRECTORY, num_rows=num_rows)
        df = df.join(df_credit_bureau_b_1, on='case_id', how='left', suffix='_cb_b_1')
        print("Credit bureau b 1 test dataframe shape:", df_credit_bureau_b_1.shape)
        del df_credit_bureau_b_1
        gc.collect()

    '''
    with timer("Credit bureau a 2 test"):
        df_credit_bureau_a_2 = get_credit_bureau_a_2(DATA_DIRECTORY, num_rows=num_rows)
        df = df.join(df_credit_bureau_a_2, on='case_id', how='left', suffix='_cb_a_2')
        print("Credit bureau a 2 test dataframe shape:", df_credit_bureau_a_2.shape)

        # Free memory
        del df_credit_bureau_a_2
        gc.collect()
'''   
    with timer("Credit bureau b 2 test"):
        df_credit_bureau_b_2 = get_credit_bureau_b_2(DATA_DIRECTORY, num_rows=num_rows)
        df = df.join(df_credit_bureau_b_2, on='case_id', how='left', suffix='_cb_b_2')

    df = df.pipe(Pipeline.handle_dates) 
        

    #with timer("more preprocessing"):
        #df = add_ratios_features(df)
        #df = add_ratios_features(df)
    print(df.shape)
    get_info(df)
    with timer("Run LightGBM"):
        df, cat_cols = to_pandas(df)
        feat_importance = kfold_lightgbm_sklearn(df, cat_cols)
        print(feat_importance)

   

### Pipeline

In [11]:
class Pipeline:
    @staticmethod
    
    
    # Sets datatypes accordingly
    def set_table_dtypes(df):
        for col in df.columns:
            if col in ["case_id", "WEEK_NUM", "num_group1", "num_group2"]:
                df = df.with_columns(pl.col(col).cast(pl.Int64))
            elif col in ["date_decision"]:
                df = df.with_columns(pl.col(col).cast(pl.Date))
            elif col[-1] in ("P", "A"):
                df = df.with_columns(pl.col(col).cast(pl.Float64))
            elif col[-1] in ("M",):
                df = df.with_columns(pl.col(col).cast(pl.String))
            elif col[-1] in ("D",):
                df = df.with_columns(pl.col(col).cast(pl.Date))            

        return df
    
    
    # Changes the values of all date columns. The result will not be a date but number of days since date_decision.
    @staticmethod
    def handle_dates(df):
        for col in df.columns:
            if col[-1] in ("D",):
                df = df.with_columns(pl.col(col) - pl.col("date_decision"))
                df = df.with_columns(pl.col(col).dt.total_days())
                
        df = df.drop("date_decision", "MONTH")

        return df
    
    # It drops columns with a lot of NaN values.
    @staticmethod
    def filter_cols(df):
        for col in df.columns:
            if col not in ["target", "case_id", "WEEK_NUM"]:
                isnull = df[col].is_null().mean()

                if isnull > 0.95:
                    df = df.drop(col)

        for col in df.columns:
            if (col not in ["target", "case_id", "WEEK_NUM"]) & (df[col].dtype == pl.String):
                freq = df[col].n_unique()

                if (freq == 1) | (freq > 200):
                    df = df.drop(col)

        return df

In [12]:
def to_pandas(df_data, cat_cols=None):
    df_data = df_data.to_pandas()
    
    if cat_cols is None:
        cat_cols = list(df_data.select_dtypes("object").columns)
    
    df_data[cat_cols] = df_data[cat_cols].astype("category")
    
    return df_data, cat_cols

In [13]:
def reduce_mem_usage(df, verbose=True):
    numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
    start_mem = df.memory_usage().sum() / 1024**2
    for col in df.columns:
        col_type = df[col].dtypes
        if col_type in numerics:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)

    end_mem = df.memory_usage().sum() / 1024**2
    print('Memory usage after optimization is: {:.2f} MB'.format(end_mem))
    print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))

    return df

### Model

In [14]:
def kfold_lightgbm_sklearn(data, categorical_feature = None):
    
    df = data[data['target'].notnull()]
    test = data[data['target'].isnull()]
    print("Train/valid shape: {}, test shape: {}".format(df.shape, test.shape))
    del_features = ['target', 'case_id']
    predictors = list(filter(lambda v: v not in del_features, df.columns))

    if not STRATIFIED_KFOLD:
        folds = KFold(n_splits= NUM_FOLDS, shuffle=True, random_state= RANDOM_SEED)
    else:
        folds = StratifiedKFold(n_splits= NUM_FOLDS, shuffle=True, random_state= RANDOM_SEED)
    
        # Hold oof predictions, test predictions, feature importance and training/valid auc
    oof_preds = np.zeros(df.shape[0])
    sub_preds = np.zeros(test.shape[0])
    importance_df = pd.DataFrame()
    eval_results = dict()
    
    
    for n_fold, (train_idx, valid_idx) in enumerate(folds.split(df[predictors], df['target'])):
        train_x, train_y = df[predictors].iloc[train_idx], df['target'].iloc[train_idx]
        valid_x, valid_y = df[predictors].iloc[valid_idx], df['target'].iloc[valid_idx]
        
        params = {'random_state': RANDOM_SEED, 'nthread': NUM_THREADS}
        clf = LGBMClassifier(**{**params, **LIGHTGBM_PARAMS})


        if not categorical_feature:
                clf.fit(train_x, train_y, eval_set=[(train_x, train_y), (valid_x, valid_y)],
                        eval_metric='auc' )
        else:
            clf.fit(train_x, train_y, eval_set=[(train_x, train_y), (valid_x, valid_y)],eval_metric='auc',
                    feature_name= list(df[predictors].columns), categorical_feature= categorical_feature)


        oof_preds[valid_idx] = clf.predict_proba(valid_x, num_iteration=clf.best_iteration_)[:, 1]
        sub_preds += clf.predict_proba(test[predictors], num_iteration=clf.best_iteration_)[:, 1] / folds.n_splits

            # Feature importance by GAIN and SPLIT
        fold_importance = pd.DataFrame()
        fold_importance["feature"] = predictors
        fold_importance["gain"] = clf.booster_.feature_importance(importance_type='gain')
        fold_importance["split"] = clf.booster_.feature_importance(importance_type='split')
        importance_df = pd.concat([importance_df, fold_importance], axis=0)
        eval_results['train_{}'.format(n_fold+1)]  = clf.evals_result_['training']['auc']
        eval_results['valid_{}'.format(n_fold+1)] = clf.evals_result_['valid_1']['auc']

        print('Fold %2d AUC : %.6f' % (n_fold + 1, roc_auc_score(valid_y, oof_preds[valid_idx])))
        del clf, train_x, train_y, valid_x, valid_y
        gc.collect()

    print('Full AUC score %.6f' % roc_auc_score(df['target'], oof_preds))
    test['target'] = sub_preds.copy()
    # Get the average feature importance between folds
    mean_importance = importance_df.groupby('feature').mean().reset_index()
    mean_importance.sort_values(by= 'gain', ascending=False, inplace=True)
    # Save feature importance, test predictions and oof predictions as csv
    if GENERATE_SUBMISSION_FILES:

        # Generate oof csv
        oof = pd.DataFrame()
        oof['case_id'] = df['case_id'].copy()
        df['PREDICTIONS'] = oof_preds.copy()
        df['target'] = df['target'].copy()
        df.to_csv('oof{}.csv'.format(SUBMISSION_SUFIX), index=False)
        # Save submission (test data) and feature importance
        df_subm = pd.read_csv(ROOT / "sample_submission.csv")
        df_subm = df_subm.set_index("case_id")
        df_subm["score"] = sub_pred
        df_subm.to_csv("submission.csv")
        mean_importance.to_csv('feature_importance{}.csv'.format(SUBMISSION_SUFIX), index=False)
    return mean_importance

### Df info function

In [15]:
def get_info(dataframe):
    """
    View data types, shape, and calculate the percentage of NaN (missing) values in each column
    of a Polars DataFrame simultaneously.
    
    Parameters:
    dataframe (polars.DataFrame): The DataFrame to analyze.
    
    Returns:
    None
    """
    # Print DataFrame shape
    print("DataFrame Shape:", dataframe.shape)
    print("-" * 60)
    
    # Print column information
    print("{:<50} {:<30} {:<20}".format("Column Name", "Data Type", "NaN Percentage"))
    print("-" * 60)
    
    # Total number of rows in the DataFrame
    total_rows = len(dataframe)
    
    # Iterate over each column
    for column in dataframe.columns:
        # Get the data type of the column
        dtype = str(dataframe[column].dtype)
        
        # Count the number of NaN values in the column
        nan_count = dataframe[column].null_count()
        
        # Calculate the percentage of NaN values
        nan_percentage = (nan_count / total_rows) * 100
        
        # Print the information
        print("{:<50} {:<30} {:.2f}%".format(column, dtype, nan_percentage))


### get_base()

In [16]:
def get_base(path, num_rows=None):
    """
    Function to read base data.

    Parameters:
        path (str): Path to the directory containing the data files.
        num_rows (int, optional): Number of rows to read from the training data. Default is None,
            meaning the entire training dataset is read.

    Returns:
        DataFrame: Concatenated DataFrame containing base data from both train and test datasets.
    """
    # Initialize empty dictionaries to store train and test data
    train = {}
    test = {}
    
    # If num_rows is not specified, read the entire training data
    if num_rows is None:
        train = pl.read_parquet(os.path.join(path, 'train/train_base.parquet'))
    # If num_rows is specified, read only the specified number of rows from the training data
    else:
        train = pl.read_parquet(os.path.join(path, 'train/train_base.parquet')).limit(num_rows) 
        
    # Read the test data
    test = pl.read_parquet(os.path.join(path, 'test/test_base.parquet'))    
    
    # Create a series filled with None values to serve as the 'target' column for the test data
    length = len(test)
    nan_series = pl.Series([None] * length)
    
    # Add the 'target' column to the test data
    test = test.select(pl.col("*"), nan_series.alias("target"))
    
    # Concatenate train and test data
    df = pl.concat([train, test])
    
    # Convert 'date_decision' column to Date type
    df = df.with_column(pl.col('date_decision').cast(pl.Date))
    
    return df


In [None]:
A=get_base(DATA_DIRECTORY)
get_info(A)
del A

### get_static()

In [17]:
def get_static(path, num_rows=None):
    """
    Function to read static data.

    Parameters:
        path (str): Path to the directory containing the data files.
        num_rows (int, optional): Number of rows to read from the training data. Default is None,
            meaning the entire training dataset is read.

    Returns:
        DataFrame: Concatenated DataFrame containing static data from both train and test datasets.
    """
    # Read training data
    chunks = []
    for path in glob(DATA_DIRECTORY + str('train/train_static_0_*.parquet')):
        chunks.append(pl.read_parquet(path).pipe(Pipeline.set_table_dtypes))
    train = pl.concat(chunks, how="vertical_relaxed").pipe(Pipeline.filter_cols)
    
    # If num_rows is specified, limit the number of rows in the training data
    if num_rows is not None:
        df1 = train.slice(0, num_rows)
        df2 = train.slice(num_rows, len(train))
        train = df1
        del df2
    
    # Read test data
    chunks = []
    for path in glob(DATA_DIRECTORY + str('test/test_static_0_*.parquet')):
        chunks.append(pl.read_parquet(path).pipe(Pipeline.set_table_dtypes))
    test = pl.concat(chunks, how="vertical_relaxed")
    
    # Filter columns of the training data
    columns_to_keep = train.columns

    # Find columns in 'test' that are not in 'train'
    columns_to_remove = [column for column in test.columns if column not in columns_to_keep]

    # Drop columns from 'test' that are not in 'train'
    test = test.drop(columns_to_remove)
    
    # Concatenate train and test data
    df = pl.concat([train, test])
    
    return df


In [None]:
A=get_static(DATA_DIRECTORY, 100)
get_info(A)
del A

### get_static_cb()

In [18]:
def get_static_cb(path, num_rows=None):
    """
    Function to read static credit bureau data.

    Parameters:
        path (str): Path to the directory containing the data files.
        num_rows (int, optional): Number of rows to read from the training data. Default is None,
            meaning the entire training dataset is read.

    Returns:
        DataFrame: Concatenated DataFrame containing static credit bureau data from both train and test datasets.
    """
    # If num_rows is not specified, read the entire training data
    if num_rows is None:
        train = pl.read_parquet(os.path.join(path, 'train/train_static_cb_0.parquet')).pipe(Pipeline.set_table_dtypes)
    # If num_rows is specified, read only the specified number of rows from the training data
    else:
        train = pl.read_parquet(os.path.join(path, 'train/train_static_cb_0.parquet')).limit(num_rows).pipe(Pipeline.set_table_dtypes)
    
    # Read the test data
    test = pl.read_parquet(os.path.join(path, 'test/test_static_cb_0.parquet')).pipe(Pipeline.set_table_dtypes)
    
    # Filter columns of the training data
    train = train.pipe(Pipeline.filter_cols)
    
    # Ensure consistency of columns between train and test data
    columns_to_keep = train.columns
    columns_to_remove = [column for column in test.columns if column not in columns_to_keep]
    test = test.drop(columns_to_remove)

    # Concatenate train and test data
    df = pl.concat([train, test])
    
    return df

In [None]:
A=get_static_cb(DATA_DIRECTORY, 100)
get_info(A)
del A

### get_applprev1(DATA_DIRECTORY, num_rows=num_rows)

In [19]:
def get_applprev1(path, num_rows = None):
    chunks = []
    for path in glob(DATA_DIRECTORY + str('train/train_applprev_1_*.parquet')):
        chunks.append(pl.read_parquet(path).pipe(Pipeline.set_table_dtypes))
    train = pl.concat(chunks, how="vertical_relaxed").pipe(Pipeline.filter_cols)

    if num_rows is not None:
        df1 = train.slice(0, num_rows)
        df2 = train.slice(num_rows, len(train))
        train = df1
        del df2

    chunks = []
    for path in glob(DATA_DIRECTORY + str('test/test_applprev_1_*.parquet')):
        chunks.append(pl.read_parquet(path).pipe(Pipeline.set_table_dtypes))
    test = pl.concat(chunks, how="vertical_relaxed")
    columns_to_keep = train.columns
    columns_to_remove = [column for column in test.columns if column not in columns_to_keep]
    test = test.drop(columns_to_remove)
    df = pl.concat([train, test])

    cols = ['annuity_853A', 'currdebt_94A', 'mainoccupationinc_437A']

    # Perform aggregations
    expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]
    expr_min = [pl.min(col).alias(f"min_{col}") for col in cols]
    expr_mean = [pl.mean(col).alias(f"mean_{col}") for col in cols]

    agg_max_df = df.group_by("case_id").agg(expr_max)
    agg_min_df = df.group_by("case_id").agg(expr_min)
    agg_mean_df = df.group_by("case_id").agg(expr_mean)

    # Merge all aggregated dataframes
    agg_df = agg_max_df.join(agg_min_df, on="case_id", how="inner")
    agg_df = agg_df.join(agg_mean_df, on="case_id", how="inner")

    del df

    return agg_df

In [None]:
A=get_applprev1(DATA_DIRECTORY, 100)
print(A)
get_info(A)
del A

### get_applprev2(DATA_DIRECTORY, num_rows=num_rows)

In [20]:
def get_applprev2(path, num_rows = None):
    train={}
    if num_rows == None:
        train = pl.read_parquet(os.path.join(path, 'train/train_applprev_2.parquet')).pipe(Pipeline.set_table_dtypes)
        
     
    else:
        train = pl.read_parquet(os.path.join(path, 'train/train_applprev_2.parquet')).limit(num_rows).pipe(Pipeline.set_table_dtypes)
       
    
    
    test = pl.read_parquet(os.path.join(path, 'test/test_applprev_2.parquet')).pipe(Pipeline.set_table_dtypes)
    train = train.pipe(Pipeline.filter_cols)
   
    columns_to_keep = train.columns
    columns_to_remove = [column for column in test.columns if column not in columns_to_keep]
    test = test.drop(columns_to_remove)

    df=pl.concat([train, test])
    
    cols=['num_group1','num_group2']

    agg_df = df.groupby("case_id").agg(*[pl.sum(col).alias(f'count_{col}') for col in cols])


# Rename the columns to include "count_" prefix
    agg_df.columns = ['case_id'] + [f'count_applprev2_{col}' for col in cols]
    
    
    del df
    
    return agg_df

In [None]:
A=get_applprev2(DATA_DIRECTORY, 1000)
get_info(A)
print(A.head(100))
del A

### get_person1

In [21]:
def get_person1(path, num_rows = None):
    if num_rows == None:
        train = pl.read_parquet(os.path.join(path, 'train/train_person_1.parquet')).pipe(Pipeline.set_table_dtypes)
        
    
    else:
        train = pl.read_parquet(os.path.join(path, 'train/train_person_1.parquet')).limit(num_rows).pipe(Pipeline.set_table_dtypes)
      
    
    
    test = pl.read_parquet(os.path.join(path, 'test/test_person_1.parquet')).pipe(Pipeline.set_table_dtypes)
    train = train.pipe(Pipeline.filter_cols)
   
    columns_to_keep = train.columns
    columns_to_remove = [column for column in test.columns if column not in columns_to_keep]
    test = test.drop(columns_to_remove)

    df=pl.concat([train, test])
    
    
    
    
    cols = ['birth_259D',
        'mainoccupationinc_384A']
    expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]
    agg_df = df.group_by("case_id").agg(expr_max)
    
    
    
    
    
    del df
    
    return agg_df

In [None]:
A=get_person1(DATA_DIRECTORY, 1000)
get_info(A)
print(A.head())
del A

### get_person2

In [22]:
def get_person2(path, num_rows = None):
    if num_rows == None:
        train = pl.read_parquet(os.path.join(path, 'train/train_person_2.parquet')).pipe(Pipeline.set_table_dtypes)
        
    else:
        train = pl.read_parquet(os.path.join(path, 'train/train_person_2.parquet')).limit(num_rows).pipe(Pipeline.set_table_dtypes)
        
    test = pl.read_parquet(os.path.join(path, 'test/test_person_2.parquet')).pipe(Pipeline.set_table_dtypes)
    
    train = train.pipe(Pipeline.filter_cols)
   
    columns_to_keep = train.columns
    columns_to_remove = [column for column in test.columns if column not in columns_to_keep]
    test = test.drop(columns_to_remove)

    df=pl.concat([train, test])
    
    
    cols=['num_group1','num_group2']

    agg_df = df.groupby("case_id").agg(*[pl.sum(col).alias(f'count_{col}') for col in cols])


# Rename the columns to include "count_" prefix
    agg_df.columns = ['case_id'] + [f'count_person2_{col}' for col in cols]
    
    
    del df
    
    return agg_df

In [None]:
A=get_person2(DATA_DIRECTORY, 1000)
get_info(A)
print(A.head())
del A

### other

In [23]:
def get_other(path, num_rows = None):
     # Read the Parquet file using scan() method
    if num_rows == None:
        train = pl.read_parquet(os.path.join(path, 'train/train_other_1.parquet')).pipe(Pipeline.set_table_dtypes)
        
    else:
        train = pl.read_parquet(os.path.join(path, 'train/train_other_1.parquet')).limit(num_rows).pipe(Pipeline.set_table_dtypes)
         
    test = pl.read_parquet(os.path.join(path, 'test/test_other_1.parquet')).pipe(Pipeline.set_table_dtypes)
    
    
    train = train.pipe(Pipeline.filter_cols)
   
    columns_to_keep = train.columns
    columns_to_remove = [column for column in test.columns if column not in columns_to_keep]
    test = test.drop(columns_to_remove)

    df=pl.concat([train, test])
    
    cols = ['amtdebitincoming_4809443A',
        'amtdepositincoming_4809444A']
    expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]
    agg_df = df.group_by("case_id").agg(expr_max)
    
    
    
    
    
    del df
    
    return agg_df

In [None]:
A=get_other(DATA_DIRECTORY, 1000)
get_info(A)
print(A.head())
del A

## get_debitcard

In [24]:
def get_debitcard(path, num_rows = None):
    # Read the Parquet file using scan() method
    if num_rows == None:
        train = pl.read_parquet(os.path.join(path, 'train/train_debitcard_1.parquet')).pipe(Pipeline.set_table_dtypes)
        
     
    else:
        train = pl.read_parquet(os.path.join(path, 'train/train_debitcard_1.parquet')).limit(num_rows).pipe(Pipeline.set_table_dtypes)
      
        
    test = pl.read_parquet(os.path.join(path, 'test/test_debitcard_1.parquet')).pipe(Pipeline.set_table_dtypes)
    
    
    train = train.pipe(Pipeline.filter_cols)
   
    columns_to_keep = train.columns
    columns_to_remove = [column for column in test.columns if column not in columns_to_keep]
    test = test.drop(columns_to_remove)

    df=pl.concat([train, test])
    
    cols=['num_group1']

    agg_df = df.groupby("case_id").agg(*[pl.sum(col).alias(f'count_{col}') for col in cols])


# Rename the columns to include "count_" prefix
    agg_df.columns = ['case_id'] + [f'count_debitcard_{col}' for col in cols]
    
    
    del df
    
    return agg_df

In [None]:
A=get_debitcard(DATA_DIRECTORY, 1000)
get_info(A)
print(A.head())
del A

### get_tax_registry_a

In [25]:
def get_tax_registry_a(path, num_rows = None):
    
    # Read the Parquet file using scan() method
    if num_rows == None:
        train = pl.read_parquet(os.path.join(path, 'train/train_tax_registry_a_1.parquet')).pipe(Pipeline.set_table_dtypes)
        
    
    else:
        train = pl.read_parquet(os.path.join(path, 'train/train_tax_registry_a_1.parquet')).limit(num_rows).pipe(Pipeline.set_table_dtypes)
  
    
    
    test = pl.read_parquet(os.path.join(path, 'test/test_tax_registry_a_1.parquet')).pipe(Pipeline.set_table_dtypes)
    train = train.pipe(Pipeline.filter_cols)
   
    columns_to_keep = train.columns
    columns_to_remove = [column for column in test.columns if column not in columns_to_keep]
    test = test.drop(columns_to_remove)

    df=pl.concat([train, test])
    
    
    
    
    cols = ['amount_4527230A']
    expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]
    agg_df = df.group_by("case_id").agg(expr_max)
    
    
    
    
    
    del df
    
    return agg_df

In [None]:
A=get_tax_registry_a(DATA_DIRECTORY, 1000)
get_info(A)
print(A.head())
del A

### get_tax_registry_b

In [26]:
def get_tax_registry_b(path, num_rows = None):
    # Read the Parquet file using scan() method
    if num_rows == None:
        train = pl.read_parquet(os.path.join(path, 'train/train_tax_registry_b_1.parquet')).pipe(Pipeline.set_table_dtypes)
        
        
    else:
        train = pl.read_parquet(os.path.join(path, 'train/train_tax_registry_b_1.parquet')).limit(num_rows).pipe(Pipeline.set_table_dtypes)
        
    
    test = pl.read_parquet(os.path.join(path, 'test/test_tax_registry_b_1.parquet')).pipe(Pipeline.set_table_dtypes)
    
    train = train.pipe(Pipeline.filter_cols)
   
    columns_to_keep = train.columns
    columns_to_remove = [column for column in test.columns if column not in columns_to_keep]
    test = test.drop(columns_to_remove)

    df=pl.concat([train, test])
    
    
    
    
    cols = ['amount_4917619A']
    expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]
    agg_df = df.group_by("case_id").agg(expr_max)
    
    
    
    
    
    del df
    
    return agg_df

In [None]:
A=get_tax_registry_b(DATA_DIRECTORY, 1000)
get_info(A)
print(A.head())
del A

### get_tax_registry_c

In [27]:
def get_tax_registry_c(path, num_rows = None):
     # Read the Parquet file using scan() method
# Read the Parquet file using scan() method
    if num_rows == None:
        train = pl.read_parquet(os.path.join(path, 'train/train_tax_registry_c_1.parquet')).pipe(Pipeline.set_table_dtypes)
    
    else:
        train = pl.read_parquet(os.path.join(path, 'train/train_tax_registry_c_1.parquet')).limit(num_rows).pipe(Pipeline.set_table_dtypes)
        
    
    test = pl.read_parquet(os.path.join(path, 'test/test_tax_registry_c_1.parquet')).pipe(Pipeline.set_table_dtypes)
    
    train = train.pipe(Pipeline.filter_cols)
   
    columns_to_keep = train.columns
    columns_to_remove = [column for column in test.columns if column not in columns_to_keep]
    test = test.drop(columns_to_remove)

    df=pl.concat([train, test])
    
    
    
    
    cols = ['pmtamount_36A']
    expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]
    agg_df = df.group_by("case_id").agg(expr_max)
    
    
    
    
    
    del df
    
    return agg_df

In [None]:
A=get_tax_registry_c(DATA_DIRECTORY, 1000)
get_info(A)
print(A.head())
del A

### get_credit_bureau_a_1

In [28]:
def get_credit_bureau_a_1(path, num_rows = None):
    chunks = []
    for path in glob(DATA_DIRECTORY+str('train/train_credit_bureau_a_1_*.parquet')):
        chunks.append(pl.read_parquet(path).pipe(Pipeline.set_table_dtypes))
    train = pl.concat(chunks, how="vertical_relaxed").pipe(Pipeline.filter_cols)
    if num_rows!= None:
        df1 = train.slice(0,num_rows)
        df2 = train.slice(num_rows,len(train))
        
        train=df1
        del df2
    
    
    chunks = []
    for path in glob(DATA_DIRECTORY+str('test/test_credit_bureau_a_1_*.parquet')):
        chunks.append(pl.read_parquet(path).pipe(Pipeline.set_table_dtypes))
        test = pl.concat(chunks, how="vertical_relaxed")
    columns_to_keep = train.columns
    columns_to_remove = [column for column in test.columns if column not in columns_to_keep]
    test = test.drop(columns_to_remove)
    df=pl.concat([train, test])
    
   
    
    cols = ['overdueamountmaxdatemonth_365T']
    expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]
    agg_df = df.group_by("case_id").agg(expr_max)
    
    
    del df
    
    return agg_df

### get_credit_bureau_b_1

In [29]:
def get_credit_bureau_b_1(path, num_rows = None):
    if num_rows == None:
        train = pl.read_parquet(os.path.join(path, 'train/train_credit_bureau_b_1.parquet')).pipe(Pipeline.set_table_dtypes)
        
        
    else:
        train = pl.read_parquet(os.path.join(path, 'train/train_credit_bureau_b_1.parquet')).limit(num_rows).pipe(Pipeline.set_table_dtypes)
   
    
    test = pl.read_parquet(os.path.join(path, 'test/test_credit_bureau_b_1.parquet')).pipe(Pipeline.set_table_dtypes)
    
    train = train.pipe(Pipeline.filter_cols)
   
    columns_to_keep = train.columns
    columns_to_remove = [column for column in test.columns if column not in columns_to_keep]
    test = test.drop(columns_to_remove)

    df=pl.concat([train, test])
    
 
    
    cols = ['amount_1115A','credlmt_3940954A']
    expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]
    agg_df = df.group_by("case_id").agg(expr_max)
    
    
    del df
    
    return agg_df

In [30]:
A=get_credit_bureau_b_1(DATA_DIRECTORY, 1000)
get_info(A)
print(A.head())
del A

NameError: name 'DATA_DIRECTORY' is not defined

### get_credit_bureau_a_2

In [32]:
def get_credit_bureau_a_2(path, num_rows = None):
    chunks = []
    for path in glob(DATA_DIRECTORY+str('train/train_credit_bureau_a_2_*.parquet')):
        chunks.append(reduce_mem_usage(pl.read_parquet(path))) #.pipe(Pipeline.set_table_dtypes))
        print(path)
    train = pl.concat(chunks, how="vertical_relaxed").pipe(Pipeline.filter_cols)
    
    '''
    if num_rows!= None:
        df1 = train.slice(0,num_rows)
        df2 = train.slice(num_rows,len(df))
        
        train=df1
        del df2
    
    '''
    chunks = []
    for path in glob(DATA_DIRECTORY+str('test/test_credit_bureau_a_2_*.parquet')):
        chunks.append(reduce_mem_usage(pl.read_parquet(path))) #.pipe(Pipeline.set_table_dtypes))
        test = pl.concat(chunks, how="vertical_relaxed")
        print(path)
    columns_to_keep = train.columns
    columns_to_remove = [column for column in test.columns if column not in columns_to_keep]
    test = test.drop(columns_to_remove)
    df=pl.concat([train, test])
    return df
                      
                      
    '''
    if num_rows!= None:
        df1 = test.slice(0,num_rows)
        df2 = test.slice(num_rows,len(df))
        
        test=df1
        del df2
    
    
    
'''
'''
    
    cols=['num_group1', 'num_group2']

    agg_df = df.groupby("case_id").agg(*[pl.sum(col).alias(f'count_{col}') for col in cols])


# Rename the columns to include "count_" prefix
    agg_df.columns = ['case_id'] + [f'count_{col}' for col in cols]
    
    
    del df
    
    return agg_df
    
    '''

'\n    \n    cols=[\'num_group1\', \'num_group2\']\n\n    agg_df = df.groupby("case_id").agg(*[pl.sum(col).alias(f\'count_{col}\') for col in cols])\n\n\n# Rename the columns to include "count_" prefix\n    agg_df.columns = [\'case_id\'] + [f\'count_{col}\' for col in cols]\n    \n    \n    del df\n    \n    return agg_df\n    \n    '

### get_credit_bureau_b_2

In [33]:
def get_credit_bureau_b_2(path, num_rows = None):
    if num_rows == None:
        train = pl.read_parquet(os.path.join(path, 'train/train_credit_bureau_b_2.parquet')).pipe(Pipeline.set_table_dtypes)
   
    else:
        train = pl.read_parquet(os.path.join(path, 'train/train_credit_bureau_b_2.parquet')).limit(num_rows).pipe(Pipeline.set_table_dtypes)

    
    test = pl.read_parquet(os.path.join(path, 'test/test_credit_bureau_b_2.parquet')).pipe(Pipeline.set_table_dtypes)
    
    train = train.pipe(Pipeline.filter_cols)
   
    columns_to_keep = train.columns
    columns_to_remove = [column for column in test.columns if column not in columns_to_keep]
    test = test.drop(columns_to_remove)
    
    df=pl.concat([train, test])
    
    cols=['num_group1', 'num_group2']

    agg_df = df.groupby("case_id").agg(*[pl.sum(col).alias(f'count_{col}') for col in cols])


# Rename the columns to include "count_" prefix
    agg_df.columns = ['case_id'] + [f'count_cbb2_{col}' for col in cols]
    
    
    del df
    
    return agg_df

In [None]:
A=get_credit_bureau_b_2(DATA_DIRECTORY, 1000)
get_info(A)
print(A.head())
del A

### Utility functions

In [1]:
@contextmanager
def timer(name):
    t0 = time.time()
    yield
    print("{} - done in {:.0f}s".format(name, time.time() - t0))


NameError: name 'contextmanager' is not defined

## Execution

In [36]:
if __name__ == "__main__":
    pd.set_option('display.max_rows', 60)
    pd.set_option('display.max_columns', 100)
    with timer("Pipeline total time"):
        main(debug= True)

AttributeError: 'DataFrame' object has no attribute 'with_column'

In [6]:
# write a simple function to test timer 
def test_timer():
    with timer("test"):
        time.sleep(1)
    assert True
test_timer()