**Polars vs. Dask**
- Polars is very efficient with memory and is quick, but it does load entire data into memory. If needing to be resource concious Dask should be used.
- Dask is slower but uses partitions to load in memory and uses less memory

# Variable Initialize

In [31]:
csv_file = r"..\files\dataset\MIB_NAIC_20240429_2009_19.txt"
write_path = r"..\files\partition"
# needs asterisks to load in all folders and files
pl_parquet_path = r"..\files\partition\*\*\*.parquet"

# Polars

In [32]:
import polars as pl

## Write Parquet

Polars is much faster than Dask at writing to parquet (estimated 15x faster)

In [33]:
pl_dtypes = {
    "Observation_Year": pl.Int32,
    "Age_Ind": pl.Categorical,
    "Sex": pl.Categorical,
    "Smoker_Status": pl.Categorical,
    "Insurance_Plan": pl.Categorical,
    "Issue_Age": pl.Int32,
    "Duration": pl.Int32,
    "Face_Amount_Band": pl.Categorical,
    "Issue_Year": pl.Int32,
    "Attained_Age": pl.Int32,
    "SOA_Antp_Lvl_TP": pl.Categorical,
    "SOA_Guar_Lvl_TP": pl.Categorical,
    "SOA_Post_Lvl_Ind": pl.Categorical,
    "Slct_Ult_Ind": pl.Categorical,
    "Preferred_Class": pl.Categorical,
    "Number_of_Pfd_Classes": pl.Categorical,
    "Preferred_Indicator": pl.Categorical,
    "MIB_Flag": pl.Categorical,
    "Amount_Exposed": pl.Float64,
    "Policies_Exposed": pl.Float64,
    "Death_Claim_Amount": pl.Int64,
    "Death_Count": pl.Int64,
    "ExpDth_Amt_VBT2015": pl.Float64,
    "ExpDth_Amt_VBT2015wMI": pl.Float64,
    "ExpDth_Cnt_VBT2015": pl.Float64,
    "ExpDth_Cnt_VBT2015wMI": pl.Float64,
    "Cen2MomP1wMI_byAmt": pl.Float64,
    "Cen2MomP2wMI_byAmt": pl.Float64,
    "Cen3MomP1wMI_byAmt": pl.Float64,
    "Cen3MomP2wMI_byAmt": pl.Float64,
    "Cen3MomP3wMI_byAmt": pl.Float64,
}

In [34]:
lzdf = pl.read_csv(
    csv_file,
    separator="\t",
    dtypes=pl_dtypes,
)

In [35]:
# 5 minute
lzdf.write_parquet(
    write_path,
    compression='snappy',
    use_pyarrow=True,
    pyarrow_options={"partition_cols": ["Observation_Year", "Sex"]},
)

## Read Parquet

Polars seems to be ~3x faster at reading

In [9]:
pl.enable_string_cache()
lzdf = pl.scan_parquet(
    pl_parquet_path,
).cast({"Sex": pl.Categorical})

In [10]:
lzdf.select(pl.len()).collect().item()

110970449

### Validate

In [11]:
from morai.experience import validators

In [12]:
check_dict = validators.get_checks()

[37m 2024-05-13 23:04:48 [0m|[37m morai.experience.validators [0m|[32m INFO     [0m|[32m Loading checks from C:\Users\johnk\Desktop\github\morai\files\checks\checks.yaml. [0m
[37m 2024-05-13 23:04:48 [0m|[37m morai.experience.validators [0m|[32m INFO     [0m|[32m Ensure checks are reviewed and safe to run as they are evaluated with eval(). [0m
[37m 2024-05-13 23:04:48 [0m|[37m morai.experience.validators [0m|[32m INFO     [0m|[32m Loaded 13 checks. [0m


In [13]:
check_output = validators.run_checks(lzdf=lzdf, check_dict=check_dict)
check_output

[37m 2024-05-13 23:04:50 [0m|[37m morai.experience.validators [0m|[32m INFO     [0m|[32m Using check_dict passed to function. [0m
[37m 2024-05-13 23:04:50 [0m|[37m morai.experience.validators [0m|[32m INFO     [0m|[32m Running 13 checks [0m
Completed checks 13 of 13...
[37m 2024-05-13 23:05:15 [0m|[37m morai.experience.validators [0m|[32m INFO     [0m|[32m Completed 13 checks [0m


Unnamed: 0,checks,result,percent
0,logic_measures_zero,584254,0.005265
1,logic_exposure_zero_with_claim,39634,0.000357
2,logic_exposure_less_than_claim,193873,0.001747
3,limit_attained_age,0,0.0
4,limit_duration,0,0.0
5,logic_underwriting_na,0,0.0
6,logic_underwriting_u,0,0.0
7,logic_underwriting_1,0,0.0
8,logic_underwriting_2,0,0.0
9,logic_underwriting_3,0,0.0


In [14]:
single_check = validators.view_single_check(lzdf, check_dict, "logic_exposure_less_than_claim")

In [15]:
single_check.head(1)

Age_Ind,Smoker_Status,Insurance_Plan,Issue_Age,Duration,Face_Amount_Band,Issue_Year,Attained_Age,SOA_Antp_Lvl_TP,SOA_Guar_Lvl_TP,SOA_Post_Lvl_Ind,Slct_Ult_Ind,Preferred_Indicator,Number_of_Pfd_Classes,Preferred_Class,MIB_Flag,Amount_Exposed,Policies_Exposed,Death_Claim_Amount,Death_Count,ExpDth_Amt_VBT2015,ExpDth_Amt_VBT2015wMI,ExpDth_Cnt_VBT2015,ExpDth_Cnt_VBT2015wMI,Cen2MomP1wMI_byAmt,Cen2MomP2wMI_byAmt,Cen3MomP1wMI_byAmt,Cen3MomP2wMI_byAmt,Cen3MomP3wMI_byAmt,Observation_Year,Sex
cat,cat,cat,i32,i32,cat,i32,i32,cat,cat,cat,cat,cat,cat,cat,cat,f64,f64,i64,i64,f64,f64,f64,f64,f64,f64,f64,f64,f64,i64,cat
"""ALB""","""NS""","""Perm""",25,5,"""07: 500,000 - …",2005,29,"""N/A (Not Term)…","""N/A (Not Term)…","""N/A""","""S""","""1""","""3""","""3""","""1""",751043.0,1.0,752177,1,127.677307,131.404404,0.00017,0.000175,98690360.0,17267.119141,74121000000000.0,12968000000.0,2269000.0,2009,"""F"""


# Dask (Under Development)

Dask was tested and evaluated, however was not used to create the dataset.

The config below should adjusted as needed by the user

In [None]:
import dask

dask.config.set({"dataframe.query-planning": True})
import dask.dataframe as dd

## Write Parquet

In [None]:
dtypes = {
    "Observation_Year": "int32",
    "Age_Ind": "category",
    "Sex": "category",
    "Smoker_Status": "category",
    "Insurance_Plan": "category",
    "Issue_Age": "int32",
    "Duration": "int32",
    "Face_Amount_Band": "category",
    "Issue_Year": "int32",
    "Attained_Age": "int32",
    "SOA_Antp_Lvl_TP": "category",
    "SOA_Guar_Lvl_TP": "category",
    "SOA_Post_Lvl_Ind": "str",
    "Slct_Ult_Ind": "category",
    "Preferred_Class": "str",
    "Number_of_Pfd_Classes": "str",
    "Preferred_Indicator": "category",
    "MIB_Flag": "category",
    "Amount_Exposed": "float64",
    "Policies_Exposed": "float64",
    "Death_Claim_Amount": "int64",
    "Death_Count": "int64",
    "ExpDth_Amt_VBT2015": "float64",
    "ExpDth_Amt_VBT2015wMI": "float64",
    "ExpDth_Cnt_VBT2015": "float64",
    "ExpDth_Cnt_VBT2015wMI": "float64",
    "Cen2MomP1wMI_byAmt": "float64",
    "Cen2MomP2wMI_byAmt": "float64",
    "Cen3MomP1wMI_byAmt": "float64",
    "Cen3MomP2wMI_byAmt": "float64",
    "Cen3MomP3wMI_byAmt": "float64",
}

In [None]:
fix_categories = ["SOA_Post_Lvl_Ind", "Number_of_Pfd_Classes", "Preferred_Class"]

In [None]:
lzdf = dd.read_csv(
    csv_file,
    delimiter="\t",
    dtype=dtypes,
)

In [None]:
for col in fix_categories:
    lzdf[col] = lzdf[col].fillna("NA").astype("category")

In [None]:
lzdf = lzdf.repartition(partition_size="2048MB")

In [None]:
# takes 15 mins
lzdf.to_parquet(
    write_path,
    partition_on=["Observation_Year", "Sex"],
)

## Read Parquet

In [7]:
# reading in the dataset and limiting to only columns needed to save memory
lzdf = dd.read_parquet(
    location,
    columns=columns_needed,
    engine="pyarrow",
)

In [8]:
# parquet partitions are automatically categorical
lzdf["Observation_Year"] = lzdf["Observation_Year"].astype("int32")

In [9]:
print(f"shape: {lzdf.shape[0].compute()}, {lzdf.shape[1]}")

shape: 55538329.0, 17


In [10]:
# used in ILEC report
core_filters = (
    (lzdf["Observation_Year"] >= 2012)
    & (lzdf["Issue_Age"] > 17)
    & (lzdf["SOA_Post_Lvl_Ind"] != "PLT")
    & (lzdf["Insurance_Plan"] != "Other")
    & (lzdf["Issue_Year"] >= 2000)
    & (lzdf["Smoker_Status"] != "U")
    & (
        ~lzdf["Face_Amount_Band"].isin(
            [
                "01: 0 - 9,999",
                "02: 10,000 - 24,999",
                "03: 25,000 - 49,999",
                "04: 50,000 - 99,999",
            ]
        )
    )
)

In [11]:
# less strict filter allowing 1980+ and all face bands
base_filters = (
    (lzdf["Observation_Year"] >= 2012)
    & (lzdf["Issue_Age"] > 17)
    & (lzdf["SOA_Post_Lvl_Ind"] != "PLT")
    & (lzdf["Insurance_Plan"] != "Other")
    & (lzdf["Issue_Year"] >= 1980)
    & (lzdf["Smoker_Status"] != "U")
)

In [12]:
# parquet files read 4 - 10x faster (still take 2-3 mins)
# important to use observed=True here to limit RAM usage
# credit: https://stackoverflow.com/questions/50051210/avoiding-memory-issues-for-groupby-on-large-pandas-dataframe
grouped_df = (
    lzdf[base_filters]
    .groupby(
        [
            "Observation_Year",
            "Sex",
            "Smoker_Status",
            "Insurance_Plan",
            "Issue_Age",
            "Duration",
            "Face_Amount_Band",
            "Issue_Year",
            "Attained_Age",
            "SOA_Post_Lvl_Ind",
            "Number_of_Pfd_Classes",
            "Preferred_Class",
        ],
        observed=True,
    )[measures]
    .sum()
    .reset_index()
    .compute()
)

# Reload

In [None]:
import importlib

In [None]:
importlib.reload(validators)