In [61]:
import duckdb
import polars as pl

In [None]:
# Path to DuckDB file
basepath = "Set where your data is stored"

# Scan Parquet lazily (no data loaded yet)
orig_df = pl.scan_parquet(f"{basepath}/origination.parquet")
perf_df = pl.scan_parquet(f"{basepath}/performance.parquet")
unemp_data = pl.scan_parquet(f"{basepath}/state_unemp.parquet")

orig_small = orig_df.select(
    "LOAN_SEQUENCE_NUMBER",
    "ORIGINAL_LOAN_TERM",
    "FIRST_PAYMENT_DATE",
    "ORIGINAL_INTEREST_RATE",
    "ORIGINAL_LOAN_TO_VALUE",
    "PROPERTY_STATE",
    "MATURITY_DATE",
    "ORIGINAL_DEBT_TO_INCOME",
    "CREDIT_SCORE"
)

perf_small = perf_df.select(
    "LOAN_SEQUENCE_NUMBER",
    "LOAN_AGE",
    "MONTHLY_REPORTING_PERIOD",
    "ESTIMATED_LOAN_TO_VALUE",
    "CURRENT_INTEREST_RATE",
    "ZERO_BALANCE_CODE",
    "ZERO_BALANCE_EFFECTIVE_DATE",
    "CURRENT_LOAN_DELINQUENCY_STATUS"
)

In [63]:
unemp_data = unemp_data.with_columns(
    (pl.col("date").dt.strftime("%Y%m")).alias("date")
)

In [64]:
loan_df = perf_small.join(orig_small, on='LOAN_SEQUENCE_NUMBER', how='left')

In [65]:
# How many loans are there in each month?
result = loan_df.group_by("MONTHLY_REPORTING_PERIOD").agg(
    pl.len().alias('number_of_loans')
).sort("MONTHLY_REPORTING_PERIOD")

In [66]:
loan_df = loan_df.with_columns(
    # if zero balance code is 1 and zero balance effective data is before maturity date, then it is prepayment
    pl.when((pl.col("ZERO_BALANCE_CODE") == 1) & (pl.col("ZERO_BALANCE_EFFECTIVE_DATE") < pl.col("MATURITY_DATE")))
    .then(pl.lit(1))
    .otherwise(pl.lit(0)).alias("PREPAID"),

    pl.when(~pl.col("CURRENT_LOAN_DELINQUENCY_STATUS").is_in(["0", "RA"]))
    .then(pl.lit(1))
    .otherwise(pl.lit(0))
    .alias("DELINQUENT")

)

In [67]:
loan_df = (
    loan_df.join(
        orig_small.group_by("FIRST_PAYMENT_DATE")
        .agg(pl.mean("ORIGINAL_INTEREST_RATE").alias("AVG_INTEREST_RATE")),
        left_on="MONTHLY_REPORTING_PERIOD",
        right_on="FIRST_PAYMENT_DATE",
        how="left"
    ).join(
        unemp_data,
        left_on = ["MONTHLY_REPORTING_PERIOD", "PROPERTY_STATE"],
        right_on = ["date",'state']
    )
)

In [68]:
loan_df = loan_df.select(
    "LOAN_SEQUENCE_NUMBER",
    "MONTHLY_REPORTING_PERIOD",
    "ORIGINAL_LOAN_TERM",
    "LOAN_AGE",
    "PROPERTY_STATE",
    "ORIGINAL_LOAN_TO_VALUE",
    "ORIGINAL_DEBT_TO_INCOME",
    "ESTIMATED_LOAN_TO_VALUE",
    "CREDIT_SCORE",
    pl.col("unemp",).alias('UNEMPLOYMENT'),
    "PREPAID",
    "DELINQUENT",
    (pl.col("CURRENT_INTEREST_RATE") - pl.col("AVG_INTEREST_RATE")).alias("INTEREST_RATE_DIFF"),
    pl.col("FIRST_PAYMENT_DATE").str.slice(0,4).alias('ORIGINATION_YEAR'),
    pl.col("FIRST_PAYMENT_DATE").str.slice(4,2).alias('ORIGINATION_MONTH')
)

In [69]:
loan_small = loan_df.head(5000).collect(engine = "streaming")

In [70]:
# see number of nulls
loan_small.select(
    pl.all().is_null().sum().name.prefix('NULL_')
)

NULL_LOAN_SEQUENCE_NUMBER,NULL_MONTHLY_REPORTING_PERIOD,NULL_ORIGINAL_LOAN_TERM,NULL_LOAN_AGE,NULL_PROPERTY_STATE,NULL_ORIGINAL_LOAN_TO_VALUE,NULL_ORIGINAL_DEBT_TO_INCOME,NULL_ESTIMATED_LOAN_TO_VALUE,NULL_CREDIT_SCORE,NULL_UNEMPLOYMENT,NULL_PREPAID,NULL_DELINQUENT,NULL_INTEREST_RATE_DIFF,NULL_ORIGINATION_YEAR,NULL_ORIGINATION_MONTH
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
0,0,0,0,0,0,0,4846,0,0,0,0,0,0,0


In [71]:
loan_small.head()

LOAN_SEQUENCE_NUMBER,MONTHLY_REPORTING_PERIOD,ORIGINAL_LOAN_TERM,LOAN_AGE,PROPERTY_STATE,ORIGINAL_LOAN_TO_VALUE,ORIGINAL_DEBT_TO_INCOME,ESTIMATED_LOAN_TO_VALUE,CREDIT_SCORE,UNEMPLOYMENT,PREPAID,DELINQUENT,INTEREST_RATE_DIFF,ORIGINATION_YEAR,ORIGINATION_MONTH
str,str,f64,f64,str,f64,f64,f64,f64,f64,i32,i32,f64,str,str
"""F01Q20167818""","""200302""",180.0,20.0,"""PA""",80.0,30.0,,788.0,5.8,0,0,0.556291,"""2001""","""07"""
"""F01Q20167953""","""200302""",180.0,20.0,"""PA""",73.0,51.0,,785.0,5.8,0,0,0.181291,"""2001""","""07"""
"""F01Q20170044""","""200402""",180.0,32.0,"""CA""",47.0,50.0,,715.0,6.5,0,0,1.006293,"""2001""","""07"""
"""F01Q20170962""","""201312""",180.0,151.0,"""IL""",86.0,36.0,,644.0,8.5,0,0,1.535684,"""2001""","""06"""
"""F01Q20175134""","""200509""",120.0,52.0,"""KY""",41.0,24.0,,725.0,6.0,0,0,1.017969,"""2001""","""06"""


In [None]:
loan_df = loan_df.select(
    pl.all().exclude("ESTIMATED_LOAN_TO_VALUE") # get rid of this because of too many NAs
)

In [None]:
# Path for output
output_file = f"{basepath}/loan_data.parquet"

# Use the lazy frame to write to Parquet in streaming mode
loan_df.sink_parquet(output_file, compression="snappy", row_group_size=500_000)