In [40]:
import pathlib
import polars as pl
from columns_fill_nulls import fill_nulls
import pandas as pd

pl.Config.set_fmt_float("full")

polars.config.Config

In [41]:
BASE_PATH = pathlib.Path.cwd().parent / "data" / "datalake" / "bronze" / "train"

BASE_TRAIN_PATH = BASE_PATH / "train_base.parquet"

APPLPREV0_PATH = BASE_PATH / "train_applprev_1_0.parquet"
APPLPREV1_PATH = BASE_PATH / "train_applprev_1_1.parquet"

DEBITCARD_PATH = BASE_PATH / "train_debitcard_1.parquet"
DEPOSIT_PATH = BASE_PATH / "train_deposit_1.parquet"
OTHER_PATH = BASE_PATH / "train_other_1.parquet"
PERSON_PATH = BASE_PATH / "train_person_1.parquet"

## Functions

In [42]:
def get_date_categorized_transforms(dataframe: pl.LazyFrame) -> list[pl.Expr]:
    date_transforms = []
    
    for col in dataframe.columns:
        if not col.endswith("D"):
            continue

        transform = (
            pl.when(pl.col(col) > 365 * 5).then(pl.lit("MORE_THAN_5_YEARS")).otherwise(
                pl.when((pl.col(col) <= 365 * 5) & (pl.col(col) > 365)).then(pl.lit("BETWEEN_1_AND_5_YEARS")).otherwise(
                    pl.when((pl.col(col) <= 365) & (pl.col(col) > 180)).then(pl.lit("BETWEEN_6_MONTHS_AND_1_YEAR")).otherwise(
                        pl.when((pl.col(col) <= 180) & (pl.col(col).is_not_null())).then(pl.lit("LESS_THAN_6_MONTH")).otherwise(pl.lit("UNKNOWN"))
                    )
                )
            ).alias(col))
        date_transforms.append(transform)
        
    return date_transforms


def get_date_transforms(dataframe: pl.LazyFrame) -> list[pl.Expr]:
    date_transforms = []
    for col in dataframe.columns:
        if col == "date_decision":
            date_transforms.append(pl.col(col).cast(pl.Date))
            date_transforms.extend([
                ((pl.col("date_decision").cast(pl.Date) - pl.col("birth_259D").cast(pl.Date)).dt.total_days() // 365).alias("age"),
                ((pl.col("date_decision").cast(pl.Date) - pl.col("birth_259D").cast(pl.Date)).dt.total_days() % 365).alias("age_days"),
            ])
        
        if not col.endswith("D"):
            continue
        
        date_transforms.append(
            (pl.col("date_decision").cast(pl.Date) - pl.col(col).cast(pl.Date)).dt.total_days().alias(col)
        )
        
    return date_transforms


def get_group_by_transforms(dataframe: pl.LazyFrame, group_by_col: list[str]) -> list[pl.Expr]:
    transforms = []
    for col, col_dtype in zip(dataframe.columns, dataframe.dtypes):
        if col in group_by_col:
            continue
        
        if isinstance(col_dtype, tuple(pl.NUMERIC_DTYPES)):
            transforms.append(pl.col(col).mean().shrink_dtype())
            
        elif isinstance(col_dtype, pl.String):
            transforms.append(pl.col(col).mode().drop_nulls().first())

        elif isinstance(col_dtype, pl.Date):
            transforms.append(pl.col(col).first())
        
    return transforms


def get_columns_with_more_85_perc_nulls(dataframe: pl.LazyFrame, threshold: float = 0.85) -> list[str]:
    dataframe = dataframe.collect()
    length_dataframe: int = dataframe.shape[0]

    describe = dataframe.describe(percentiles=[]).filter(pl.col("statistic") == "null_count")
    describe = describe.transpose(include_header=True, header_name="column", column_names=["null_count"]).slice(1).with_columns(pl.col("null_count").cast(pl.Float32) / length_dataframe)
    
    return describe.filter(pl.col("null_count") > threshold).select("column").to_series().to_list()
    
    
def binarize_columns_with_more_85_perc_nulls(columns: list[str]) -> list[pl.Expr]:
    transforms = []
    for col in columns:
        transforms.append(
            pl.when(pl.col(col).is_null()).then(pl.lit(False)).otherwise(pl.lit(True)).alias(col)
        )
    return transforms


## Load data

In [43]:
JOIN_COLS = ["case_id", "num_group1"]

In [44]:
base_train: pl.LazyFrame = pl.scan_parquet(BASE_TRAIN_PATH).select(["case_id", "date_decision", "target"])
applprev_data: pl.LazyFrame = pl.concat([pl.scan_parquet(APPLPREV0_PATH), pl.scan_parquet(APPLPREV1_PATH)]).drop(["childnum_21L", "education_1138M", "familystate_726L"])
# debitcard_data: pl.LazyFrame = pl.scan_parquet(DEBITCARD_PATH)
deposit_data: pl.LazyFrame = pl.scan_parquet(DEPOSIT_PATH)
other_data: pl.LazyFrame = pl.scan_parquet(OTHER_PATH)
person_data: pl.LazyFrame = pl.scan_parquet(PERSON_PATH).select([
    "case_id",
    "num_group1",
    "birth_259D",
    "incometype_1044T",
    "role_1084L",
    "empl_employedfrom_271D",
    "empl_industry_691L",
    "mainoccupationinc_384A",
])

data = base_train.select("case_id", "date_decision").join(person_data, how="left", on="case_id")

data: pl.LazyFrame = (data
        .join(deposit_data, how="left", on=JOIN_COLS)
        .join(other_data, how="left", on=JOIN_COLS)
        .join(applprev_data, how="left", on=JOIN_COLS))

# data = base_train.select("case_id", "date_decision").join(data, how="left", on="case_id")

data.sort(JOIN_COLS).head().collect()

case_id,date_decision,num_group1,birth_259D,incometype_1044T,role_1084L,empl_employedfrom_271D,empl_industry_691L,mainoccupationinc_384A,amount_416A,contractenddate_991D,openingdate_313D,amtdebitincoming_4809443A,amtdebitoutgoing_4809440A,amtdepositbalance_4809441A,amtdepositincoming_4809444A,amtdepositoutgoing_4809442A,actualdpd_943P,annuity_853A,approvaldate_319D,byoccupationinc_3656910L,cancelreason_3545846M,creationdate_885D,credacc_actualbalance_314A,credacc_credlmt_575A,credacc_maxhisbal_375A,credacc_minhisbal_90A,credacc_status_367L,credacc_transactions_402L,credamount_590A,credtype_587L,currdebt_94A,dateactivated_425D,district_544M,downpmt_134A,dtlastpmt_581D,dtlastpmtallstes_3545839D,employedfrom_700D,firstnonzeroinstldate_307D,inittransactioncode_279L,isbidproduct_390L,isdebitcard_527L,mainoccupationinc_437A,maxdpdtolerance_577P,outstandingdebt_522A,pmtnum_8L,postype_4733339M,profession_152M,rejectreason_755M,rejectreasonclient_4145042M,revolvingaccount_394A,status_219L,tenor_203L
i64,str,i64,str,str,str,str,str,f64,f64,str,str,f64,f64,f64,f64,f64,f64,f64,str,f64,str,str,f64,f64,f64,f64,str,f64,f64,str,f64,str,str,f64,str,str,str,str,str,bool,bool,f64,f64,f64,f64,str,str,str,str,f64,str,f64
0,"""2019-01-03""",0,"""1986-07-01""","""SALARIED_GOVT""","""CL""","""2017-09-15""","""OTHER""",10800.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
0,"""2019-01-03""",1,,,"""EM""",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
0,"""2019-01-03""",2,,,"""PE""",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
0,"""2019-01-03""",3,,,"""PE""",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1,"""2019-01-03""",0,"""1957-08-01""","""SALARIED_GOVT""","""CL""","""2008-10-29""","""OTHER""",10000.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [45]:
# cols_to_binarize = get_columns_with_more_85_perc_nulls(data)
# 
# print(cols_to_binarize)
# 
# data = data.with_columns(*binarize_columns_with_more_85_perc_nulls(cols_to_binarize))

### Group by cols

In [46]:
LAST_COLS = [
    "contractenddate_991D",
    "openingdate_313D",
    "dtlastpmt_581D",
    "birth_259D"
]

MEAN_COLS = [
    "byoccupationinc_3656910L",
    "credacc_transactions_402L",
    "amtdebitincoming_4809443A",
    "amtdebitoutgoing_4809440A",
    "amtdepositbalance_4809441A",
    "amtdepositincoming_4809444A",
    "amtdepositoutgoing_4809442A",
    "credacc_actualbalance_314A",
    "credacc_maxhisbal_375A",
    "credacc_minhisbal_90A",
    "amount_416A",
    "revolvingaccount_394A",
    "mainoccupationinc_384A",
    "annuity_853A",
    "credamount_590A",
    "downpmt_134A",
    "credacc_credlmt_575A",
    "actualdpd_943P"
]

MEDIAN_COLS = [
    "isdebitcard_527L",
    "isbidproduct_390L"
]

MODE_COLS = [
    "credacc_status_367L"
]

#### Fill Nulls

In [47]:
# nulls_data: pd.DataFrame = fill_nulls(data).describe(percentiles=[]).filter(pl.col("statistic") == "null_count").drop("statistic").transpose(include_header=True, column_names=["null_count"]).with_columns(pl.col("null_count").cast(pl.Float32).cast(pl.Int32)).to_pandas().query("null_count > 0")
# nulls_data

In [48]:
data: pl.LazyFrame = data.group_by(["case_id", "date_decision"]).agg(
    *[pl.col(col).drop_nulls().last() for col in LAST_COLS],
    *[pl.col(col).drop_nulls().mean() for col in MEAN_COLS],
    *[pl.col(col).drop_nulls().median() for col in MEDIAN_COLS],
    *[pl.col(col).drop_nulls().mode().first() for col in MODE_COLS],
)

In [49]:
data: pl.LazyFrame = fill_nulls(data).drop("district_544M")

#### Transforms

In [50]:
data = data.with_columns(*get_date_transforms(data)).drop("birth_259D")

data = data.with_columns(
    (pl.col("age") + (pl.col("age_days") / 365)).shrink_dtype().alias("age")
).drop(["age_days"])

# data = data.with_columns(*get_date_categorized_transforms(data))
# data = data.with_columns(pl.col(pl.Boolean).fill_null(False).cast(pl.Int8))
data = data.with_columns(
    # pl.col("target").cast(pl.Boolean),
    # pl.col("age").cast(pl.Int16),
    # pl.col("num_group1").cast(pl.Int16),
).drop("date_decision")

data.sort("case_id").select("case_id", "age").head().collect()

case_id,age
i64,f32
0,49.03835678100586
1,49.03835678100586
2,49.04109573364258
3,49.03835678100586
4,49.04109573364258


In [51]:
def save_dataframe_to_disk(group_by_col: str, num_percentiles: int = 10, depth: int = 1) -> None:
    SLICE_OUTPUT_BASE_DIR: pathlib.Path = pathlib.Path(f"../data/datalake/silver/depth_{depth}")
    SLICE_OUTPUT_BASE_DIR.mkdir(parents=True, exist_ok=True)

    describe: dict[str, list[str | float]] = data.select("case_id").describe(percentiles=[x/num_percentiles for x in range(num_percentiles)]).filter((pl.col("statistic").str.contains("%")) | (pl.col("statistic").str.contains("max"))).to_dict(as_series=False)
    case_id_percentiles = sorted(describe.get(group_by_col))

    for i in range(len(case_id_percentiles) - 1):
        slice_data: pl.LazyFrame = data.filter((pl.col(group_by_col) >= case_id_percentiles[i]) & (pl.col(group_by_col) < case_id_percentiles[i+1]))
        # slice_data.group_by(group_by_col).agg(*get_group_by_transforms(data, [group_by_col])).collect().write_parquet(SLICE_OUTPUT_BASE_DIR / f"slice_{i}.parquet")
        slice_data.collect().write_parquet(SLICE_OUTPUT_BASE_DIR / f"slice_{i}.parquet")

    # merge files and delete slices
    pl.scan_parquet(SLICE_OUTPUT_BASE_DIR / "slice_*.parquet").collect().write_parquet(f"../data/datalake/silver/depth_{depth}.parquet")
    for path in pathlib.Path(SLICE_OUTPUT_BASE_DIR).glob("slice*.parquet"):
        path.unlink(missing_ok=True)

    SLICE_OUTPUT_BASE_DIR.rmdir()
    
save_dataframe_to_disk(group_by_col="case_id")

In [52]:
sorted(data.columns)

['actualdpd_943P',
 'age',
 'amount_416A',
 'amtdebitincoming_4809443A',
 'amtdebitoutgoing_4809440A',
 'amtdepositbalance_4809441A',
 'amtdepositincoming_4809444A',
 'amtdepositoutgoing_4809442A',
 'annuity_853A',
 'byoccupationinc_3656910L',
 'case_id',
 'contractenddate_991D',
 'credacc_actualbalance_314A',
 'credacc_credlmt_575A',
 'credacc_maxhisbal_375A',
 'credacc_minhisbal_90A',
 'credacc_status_367L',
 'credacc_transactions_402L',
 'credamount_590A',
 'downpmt_134A',
 'dtlastpmt_581D',
 'isbidproduct_390L',
 'isdebitcard_527L',
 'mainoccupationinc_384A',
 'openingdate_313D',
 'revolvingaccount_394A']