# Libraries

In [1]:
from sklearn import set_config
from glob import glob

import pandas as pd
import polars as pl
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt


# Configuration

In [2]:
# Global configurations for sklearn:
set_config(transform_output = "pandas")

# Global configurations for pandas:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 50)
pd.set_option('display.precision', 3)
pd.set_option('display.max_colwidth', None)

# Global configurations for polars:
pl.Config.activate_decimals(True).set_tbl_hide_column_data_types(True)
pl.Config(
    **dict(tbl_formatting = 'ASCII_FULL_CONDENSED',
            tbl_hide_column_data_types = False,
            tbl_hide_dataframe_shape = True,
            fmt_float = "mixed",
            tbl_cell_alignment = 'CENTER',
            tbl_hide_dtype_separator = True,
            tbl_cols = 100,
            tbl_rows = 50,
            fmt_str_lengths = 100,
            )
)

<polars.config.Config at 0x286dacb10>

In [3]:
ROOT_DIR = "data/"
TRAIN_DIR = ROOT_DIR + "train/"
TEST_DIR = ROOT_DIR + "test/"

# Functions

In [4]:
class DataFrameProcessor:
    """ Dataframe processing class.
    """

    @staticmethod
    def convert_types(df: pl.DataFrame) -> pl.DataFrame:
        """ Converts columns' data types for memory.

        Argument:
        df (polars dataframe): The DataFrame to be processed.
        """
        for column in df.columns:
            if column == "target":
                df = df.with_columns(pl.col(column).cast(pl.Int8))
            elif column in ["case_id", "WEEK_NUM", "num_group1", "num_group2"]:
                df = df.with_columns(pl.col(column).cast(pl.Int32))
            elif column == "date_decision":
                df = df.with_columns(pl.col(column).cast(pl.Date))
            elif column[-1] in ("P", "A"):
                df = df.with_columns(pl.col(column).cast(pl.Float64))
            elif column[-1] == "M":
                df = df.with_columns(pl.col(column).cast(pl.String))
            elif column[-1] == "D":
                df = df.with_columns(pl.col(column).cast(pl.Date))            

        return df
    
    @staticmethod
    def date_processor(df: pl.DataFrame) -> pl.DataFrame:
        """ Processes the date columns.

        Argument:
        df (polars dataframe): The DataFrame to be processed.
        """
        for column in df.columns:
            if column[-1] == "D":
                df = df.with_columns(pl.col(col) - pl.col("date_decision"))
                df = df.with_columns(pl.col(col).dt.total_days())
                df = df.with_columns(pl.col(col).cast(pl.Float32))

            if column == "date_decision":
                df = df.with_columns(
                    pl.col(column).dt.year().alias("year_decision"),
                    pl.col(column).dt.month().alias("month_decision"),
                    pl.col(column).dt.day().alias("day_decision"),
                    pl.col(column).dt.week().alias("week_decision"),
                    pl.col(column).dt.weekday().alias("weekday_decision"),
                    pl.col(column).dt.quarter().alias("quarter_decision")
                )

        df = df.drop(["date_decision", "WEEK_NUM", "MONTH"])
        
        return df
    
    @staticmethod
    def delete_nulls_column(df: pl.DataFrame) -> pl.DataFrame:
        """ Deletes columns with more than 80% of null values.

        Argument:
        df (polars dataframe): The DataFrame to be processed.
        """
        for column in df.columns:
            if column not in ["case_id", "target"]:
                null_percentage = df[column].is_null().sum() / df.shape[0]

                if null_percentage > 0.80:
                    df = df.drop(column)
        
        return df
    
    @staticmethod
    def drop_duplicates(df: pl.DataFrame) -> pl.DataFrame:
        """ Drops duplicates from the DataFrame.

        Argument:
        df (polars dataframe): The DataFrame to be processed.
        """
        df = df.unique(keep="first")
        
        return df
    
    @staticmethod
    def drop_columns_with_too_many_categories(df: pl.DataFrame) -> pl.DataFrame:
        """ Drops columns with more than 100 categories or just 1.

        Argument:
        df (polars dataframe): The DataFrame to be processed.
        """
        for column in df.columns:
            if (column not in ["target", "case_id"]) & (df[column].dtype == pl.String):
                categories_count = df[column].n_unique()

                if (categories_count == 1) | (categories_count > 200):
                    df = df.drop(column)

        return df

In [5]:
class Aggregator:
    """Dataframe aggreagating class."""

    @staticmethod
    def max_agg(df: pl.DataFrame) -> pl.Expr:
        """ Aggregates the DataFrame by the maximum value.

        Argument:
        df (polars dataframe): The DataFrame to be aggregated.
        """
        columns = [column for column in df.columns if column[-1] in ("P", "A")]

        expr_max = [pl.max(column).alias(f"max_{column}") for column in columns]

        return expr_max

    @staticmethod
    def min_agg(df: pl.DataFrame) -> pl.Expr:
        """ Aggregates the DataFrame by the minimum value.

        Argument:
        df (polars dataframe): The DataFrame to be aggregated.
        """
        columns = [column for column in df.columns if column[-1] in ("P", "A")]

        expr_min = [pl.min(column).alias(f"min_{column}") for column in columns]

        return expr_min

    @staticmethod
    def date_agg(df: pl.DataFrame) -> pl.Expr:
        """ Aggregates the DataFrame by the date columns.

        Argument:
        df (polars dataframe): The DataFrame to be aggregated.
        """
        columns = [column for column in df.columns if column[-1] == "D"]

        expr_date = [pl.max(column).alias(f"max_{column}") for column in columns]

        return expr_date

    @staticmethod
    def string_agg(df: pl.DataFrame) -> pl.Expr:
        """ Aggregates the DataFrame by the string columns.

        Argument:
        df (polars dataframe): The DataFrame to be aggregated.
        """
        columns = [column for column in df.columns if column[-1] == "M"]

        expr_string = [pl.max(column).alias(f"max_{column}") for column in columns]

        return expr_string

    @staticmethod
    def others_agg(df: pl.DataFrame) -> pl.Expr:
        """ Aggregates the DataFrame by the other columns.

        Argument:
        df (polars dataframe): The DataFrame to be aggregated.
        """
        columns = [column for column in df.columns if column in ("T", "L")]

        expr_others = [pl.max(column).alias(f"max_{column}") for column in columns]

        return expr_others

    @staticmethod
    def count_agg(df: pl.DataFrame) -> pl.Expr:
        """ Aggregates the DataFrame by the count of rows.

        Argument:
        df (polars dataframe): The DataFrame to be aggregated.
        """
        columns = [column for column in df.columns if "num_group" in column]

        expr_max = [pl.max(column).alias(f"max_{column}") for column in columns]

        return expr_max

    @staticmethod
    def agg_expr(df: pl.DataFrame) -> pl.Expr:
        """ Aggregates the DataFrame by all the columns.

        Argument:
        df (polars dataframe): The DataFrame to be aggregated.
        """
        expr_all = (
            Aggregator.max_agg(df)
            + Aggregator.min_agg(df)
            + Aggregator.date_agg(df)
            + Aggregator.string_agg(df)
            + Aggregator.others_agg(df)
            + Aggregator.count_agg(df)
        )
        return expr_all

In [6]:
def read_file(path: str, depth: int = 0) -> pl.DataFrame:
    """ Read file from the given path.
    
    Arguments:
    path (str): The path to the file.
    depth (int): The depth of the file.
    """
    df = pl.read_parquet(path)
    df = df.pipe(DataFrameProcessor.convert_types)

    if depth in (1, 2):
        df = df.group_by('case_id').agg(Aggregator.agg_expr(df))

    return df


def read_files(path: str, depth: int = 0) -> pl.DataFrame:
    """ Read multiple files from the given path and concatenate them vertically.
    
    Arguments:
    path (str): The path to the files.
    depth (int): The depth of the file.
    """
    chunks = []
    for one_path in glob(path):
        df = read_file(one_path, depth)
        chunks.append(df)
        
    df = pl.concat(chunks, how="vertical_relaxed")
    df = df.pipe(DataFrameProcessor.drop_duplicates)
    
    return df

In [7]:
def merge_dataframe(df_base: pl.DataFrame, *depth):
    df_base = (
        df_base
        .with_columns(
            month_decision = pl.col("date_decision").dt.month(),
            weekday_decision = pl.col("date_decision").dt.weekday(),
        )
    )
        
    for i, df in enumerate(depth):
        df_base = df_base.join(df, how="left", on="case_id", suffix=f"_{i}")
        
    df_base = df_base.pipe(Pipeline.handle_dates)
    
    return df_base

# Read Data

In [8]:
%%time
# Test pandas reading time
test_pandas_df = pd.read_parquet('data/train/train_base.parquet')
test_pandas_df.head()

CPU times: user 106 ms, sys: 43.1 ms, total: 149 ms
Wall time: 151 ms


Unnamed: 0,case_id,date_decision,MONTH,WEEK_NUM,target
0,0,2019-01-03,201901,0,0
1,1,2019-01-03,201901,0,0
2,2,2019-01-04,201901,0,0
3,3,2019-01-03,201901,0,0
4,4,2019-01-04,201901,0,1


In [9]:
%%time
test_polars_df = pl.read_parquet('data/train/train_base.parquet')
display(test_polars_df.head())

case_id,date_decision,MONTH,WEEK_NUM,target
i64,str,i64,i64,i64
0,"""2019-01-03""",201901,0,0
1,"""2019-01-03""",201901,0,0
2,"""2019-01-04""",201901,0,0
3,"""2019-01-03""",201901,0,0
4,"""2019-01-04""",201901,0,1


CPU times: user 69.3 ms, sys: 31 ms, total: 100 ms
Wall time: 57.7 ms


> Comment: Polars read data faster than pandas, since there are many files, it might be a good idea for me to mitigate to Polars.

In [10]:
%%time
train_dict = {
    "train_base": read_file(TRAIN_DIR + "train_base.parquet"),
    "depth_0": [
        read_file(TRAIN_DIR + "train_static_cb_0.parquet"),
        read_files(TRAIN_DIR + "train_static_0_*.parquet"),
    ],
    "depth_1": [
        read_files(TRAIN_DIR + "train_applprev_1_*.parquet", 1),
        read_file(TRAIN_DIR + "train_tax_registry_a_1.parquet", 1),
        read_file(TRAIN_DIR + "train_tax_registry_b_1.parquet", 1),
        read_file(TRAIN_DIR + "train_tax_registry_c_1.parquet", 1),
        read_files(TRAIN_DIR + "train_credit_bureau_a_1_*.parquet", 1),
        read_file(TRAIN_DIR + "train_credit_bureau_b_1.parquet", 1),
        read_file(TRAIN_DIR + "train_other_1.parquet", 1),
        read_file(TRAIN_DIR + "train_person_1.parquet", 1),
        read_file(TRAIN_DIR + "train_deposit_1.parquet", 1),
        read_file(TRAIN_DIR + "train_debitcard_1.parquet", 1),
    ],
    "depth_2": [
        read_file(TRAIN_DIR + "train_credit_bureau_b_2.parquet", 2),
        read_files(TRAIN_DIR + "train_credit_bureau_a_2_*.parquet", 2),
    ]
}

CPU times: user 2min 30s, sys: 1min, total: 3min 31s
Wall time: 1min 25s


In [11]:
%%time
test_dict = {
    "df_base": read_file(TEST_DIR + "test_base.parquet"),
    "depth_0": [
        read_file(TEST_DIR + "test_static_cb_0.parquet"),
        read_files(TEST_DIR + "test_static_0_*.parquet"),
    ],
    "depth_1": [
        read_files(TEST_DIR + "test_applprev_1_*.parquet", 1),
        read_file(TEST_DIR + "test_tax_registry_a_1.parquet", 1),
        read_file(TEST_DIR + "test_tax_registry_b_1.parquet", 1),
        read_file(TEST_DIR + "test_tax_registry_c_1.parquet", 1),
        read_files(TEST_DIR + "test_credit_bureau_a_1_*.parquet", 1),
        read_file(TEST_DIR + "test_credit_bureau_b_1.parquet", 1),
        read_file(TEST_DIR + "test_other_1.parquet", 1),
        read_file(TEST_DIR + "test_person_1.parquet", 1),
        read_file(TEST_DIR + "test_deposit_1.parquet", 1),
        read_file(TEST_DIR + "test_debitcard_1.parquet", 1),
    ],
    "depth_2": [
        read_file(TEST_DIR + "test_credit_bureau_b_2.parquet", 2),
        read_files(TEST_DIR + "test_credit_bureau_a_2_*.parquet", 2),
    ]
}

CPU times: user 62.6 ms, sys: 109 ms, total: 171 ms
Wall time: 111 ms


# Reference

* [Home Credit Baseline](https://www.kaggle.com/code/greysky/home-credit-baseline)