In [0]:
%run ./config_and_imports

Installing prophet...
Collecting prophet
  Downloading prophet-1.1.6-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.5 kB)
Collecting cmdstanpy>=1.0.4 (from prophet)
  Downloading cmdstanpy-1.2.5-py3-none-any.whl.metadata (4.0 kB)
Collecting holidays<1,>=0.25 (from prophet)
  Downloading holidays-0.70-py3-none-any.whl.metadata (34 kB)
Collecting tqdm>=4.36.1 (from prophet)
  Downloading tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/57.7 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.7/57.7 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting importlib-resources (from prophet)
  Downloading importlib_resources-6.5.2-py3-none-any.whl.metadata (3.9 kB)
Collecting stanio<2.0.0,>=0.4.0 (from cmdstanpy>=1.0.4->prophet)
  Downloading stanio-0.5.1-py3-none-any.whl.metadata (1.6 kB)
Downloading prophet-1.1.6-py3-none-manylinux_2_17_x86_64

DataFrame[]

In [0]:
# Only run once for the first time
# %run ./recreate_data

# 1. Load MACRO Data - Monthly

In [0]:
columns=[
        "date",
        "RGDPGrowth",
        "NGDPGrowth",
        "RDIGrowth",
        "NDIGrowth",
        "UnemploymentRate",
        "CPIInflationRate",
        "TreasuryRate3Mo",
        "TreasuryYield5yr",
        "TreasuryYield10yr",
        "BBBCorpYield",
        "MortgageRate",
        "PrimeRate",
        "DJTSIndex",
        "HousePriceIndex",
        "CREPriceIndex",
        "MarketVolatilityIndex"
    ]

macro = spark.read.csv(f"/Volumes/{catalog}/{schema}/{volume}/{volume_folder_name}/historical_macro_data.csv", header=False, sep="\t", inferSchema = True ).toDF(*columns)
macro = macro.drop(*["RGDPGrowth", "NGDPGrowth", "RDIGrowth", "NDIGrowth"])
display(macro)

In [0]:
macro.selectExpr("min(date) as start_date", "max(date) as end_date").show()

##### The macroeconomic data spans from January 2000 to October 2021.

In [0]:
macro.write.format("delta").mode("overwrite").saveAsTable("raw_macro")

# 2. Load scenarios

In [0]:
scenarios = spark.read.csv(f"/Volumes/{catalog}/{schema}/{volume}/{volume_folder_name}/scenarios_fr.csv", header=True, sep=",", inferSchema=True)

display(scenarios)

scenarios.write.format("delta").mode("overwrite").saveAsTable("raw_scenarios")

In [0]:
raw_scenarios = spark.read.table("raw_scenarios")

# Fill in gaps between dates (it's quarterly -> becomes monthly and copies over data from month 1, 4, 7, 10)
raw_scenarios_frmt = raw_scenarios.withColumn(
    "date_sequence", 
    sequence(col("Date"), add_months(col("Date"), 2), F.expr("INTERVAL 1 MONTH"))
)

raw_scenarios_frmt_2 = raw_scenarios_frmt.withColumn("date", explode(col("date_sequence"))).drop("date_sequence")

# Filter here so it doesn't create null values - since our prediction data is only 5y ahead
raw_scenarios_rnmd = raw_scenarios_frmt_2.filter("date >= '2021-09-01'").filter("date <= '2026-10-01'")
raw_scenarios_rnmd.display()
raw_scenarios_rnmd.write.format("delta").mode("overwrite").saveAsTable("silver_scenarios")

In [0]:
raw_scenarios_rnmd.selectExpr("min(date) as start_date", "max(date) as end_date").show()

# 3. Loan Peformance Data

In [0]:
columns = ["quarter", "loan_id", "maturity_date", "credit_status", "date", "credit_score", "orig_UPB", "orig_LTV", "state", "loan_term", "current_UPB", "current_delinquency_status", "loan_age", "remain_months_maturity", "zero_UPB_code", "current_interest_rate"]

perf = spark.read.csv(f"/Volumes/{catalog}/{schema}/{volume}/{volume_folder_name}/loans.csv", sep="\t", inferSchema=True)

perf = perf.toDF(*columns)

perf.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("raw_loan_perf")

In [0]:
perf.display()

In [0]:
perf.selectExpr("min(date) as start_date", "max(date) as end_date").show()


In [0]:
perf.count()

In [0]:
loan_perf = spark.read.table("raw_loan_perf")
window_spec = Window.partitionBy("loan_id").orderBy("date")
ranked_df = loan_perf.withColumn("rank", rank().over(window_spec))
result_df = ranked_df.withColumn("loan_age_since_last", ranked_df["rank"])
result_df = result_df.drop("rank")
result_df = result_df.withColumn("date", to_date(col("date")))
result_df = result_df.withColumn("maturity_date", to_date(col("maturity_date")))
result_df = result_df.withColumn("date", to_date(col("date")))

display(result_df)

In [0]:
result_df.drop(
    *["zero_UPB_code", "state", "quarter", "maturity_data"]
).write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("silver_loan_perf")

### Logic to get only the loans that are active and where the last payment was 2021-09-01

In [0]:
query = """select * 
        from silver_loan_perf 
        where loan_id in (
            select loan_id 
            from (
              select loan_id, 
                      date(max(date)) as max_date 
              from silver_loan_perf 
              group by loan_id) 
            where max_date = "2021-09-01")"""
loans = spark.sql(query)

loans.display()

### Loan Ammortization

In [0]:
from pyspark.sql.functions import max
# get the last record for each loan (`loan_id`) based on date
loans_last = loans.groupBy("loan_id").agg(max("date").alias("date"))

loans_last_alias = loans_last.alias("last")
loans_alias = loans.alias("loans")

result = loans_last_alias.join(
    loans_alias, 
    (loans_last_alias.loan_id == loans_alias.loan_id) & 
    (loans_last_alias.date == loans_alias.date)
).drop(loans_last_alias.date).drop(loans_last_alias.loan_id)

result.display()

In [0]:
result.write.mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("silver_last_loan_payment")

## Ammortization calculation

In [0]:
df_silver_payment = spark.read.table("silver_last_loan_payment")

In [0]:
result = df_silver_payment.withColumn(
    "date_sequence", 
    sequence(col("date"), col("maturity_date"), F.expr("INTERVAL 1 MONTH"))
)

# Magic to fill in the blanks since our data was by quarter
result = result.withColumn("date", explode(col("date_sequence"))).drop("date_sequence")
result = result.drop(*["loan_age_since_last"])
result.display()

### Filtering Active Loans

This line filters the `result` DataFrame to keep only the loans with:

1. **Positive outstanding balance (`current_UPB > 0`)**.
2. **More than 1 month left to maturity (`remain_months_maturity > 1`)**.


In [0]:
result_2 = result.filter(col("current_UPB") > 0).filter(col("remain_months_maturity") > 1)

In [0]:
result_2 = result_2.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

window_spec = Window.partitionBy("loan_id").orderBy("date")
ranked_df = result_2.withColumn("rank", rank().over(window_spec))
result_df = ranked_df.withColumn("loan_age_since_last", col("rank"))
result_df = result_df.drop("rank")

display(result_df)

In [0]:
# Calculate the required variables for ammortization

result = result_df.withColumn(
    "rate_per_period", (col("current_interest_rate") / 100) / 12
).withColumn(
    "loan_age_last", col("loan_age") + col("loan_age_since_last") - 1
).withColumn(
    "remain_month_maturity_last", col("loan_term") - col("loan_age_last")
).withColumn(
    "mortgage_payment", 
    (col("rate_per_period") * col("current_UPB")) / (1 - pow(1 + col("rate_per_period"), col("loan_term") * -1))
).withColumn(
    "remaining_balance_num", 
    col("current_UPB") * pow(1 + col("rate_per_period"), col("loan_age_last") - 1)
).withColumn(
    "remaining_balance_den", 
    col("mortgage_payment") * (pow(1 + col("rate_per_period"), col("loan_age_last") - 1) - 1) / col("rate_per_period")
).withColumn(
    "remaining_balance_est", col("remaining_balance_num") - col("remaining_balance_den")
).withColumn(
    "curr_LTV", round(( 1 - (col("remaining_balance_est") / col("orig_UPB"))), 3 )
)
result.display()

In [0]:
result.count()

In [0]:
result.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("gold_loan_payment")

# Run Forecasting (not needed everytime)


In [0]:
# dbutils.notebook.run("2. Modeling - Forecasting", 10000)

# Merge Loan data with forecasted macroeconomic Data

In [0]:
gold_loan = spark.read.table("gold_loan_payment")
macro = spark.read.table("gold_macro_predictions")

In [0]:
macro = macro.withColumn("date", to_date(col("ds"), "yyyy-MM-dd"))
macro = macro.drop(col("ds"))

In [0]:
macro.selectExpr("min(date) as start_date", "max(date) as end_date").show()

In [0]:
# join gold_loan and macro
gold_loan_macro = gold_loan.join(macro, gold_loan.date == macro.date, "left").drop(macro.date)
gold_loan_macro.display()

In [0]:
save_gold_macro = gold_loan_macro.drop(*["loan_age_last", "credit_status", "loan_age_since_last", "loan_age", "rate_per_period", "current_UPB", "remain_months_maturity", "maturity_data"])


### Default Binary Flag Creation

A binary flag is created to indicate whether a loan is in default based on the `current_delinquency_status`. 

- **Logic**:
  - If `current_delinquency_status` is greater than 0, the loan is flagged as default (`1`).
  - If it is 0 or less, the loan is not in default (`0`).

This new `default` column is used for CECL modeling to predict the likelihood of default.


In [0]:
# Model default variable binary
save_gold_macro = save_gold_macro.withColumn("default", when(col("current_delinquency_status") > 0, 1).otherwise(0)).drop(*["current_delinquency_status"])

In [0]:
# # get the first record for each loan (`loan_id`) based on date
# # loans_last = save_gold_macro.groupBy("loan_id").agg(min("date").alias("date"))

# loans_last_alias = loans_last.alias("last")
# loans_alias = save_gold_macro.alias("loans")

# result = loans_last_alias.join(
#     loans_alias, 
#     (loans_last_alias.loan_id == loans_alias.loan_id) & 
#     (loans_last_alias.date == loans_alias.date)
# ).drop(loans_last_alias.date).drop(loans_last_alias.loan_id)

# result.display()

### Joining loan data with Macro Data

In [0]:
from pyspark.sql.functions import to_date, col, when

macro = spark.read.table("gold_macro_predictions")
macro = macro.withColumn("date", to_date(col("ds"), "yyyy-MM-dd")).drop("ds")

gold_loan_macro = gold_loan.join(macro, gold_loan.date == macro.date, "left").drop(macro.date)
display(gold_loan_macro)

save_gold_macro = gold_loan_macro.drop(*["loan_age_last", "credit_status", "loan_age_since_last", "loan_age", "rate_per_period", "current_UPB", "remain_months_maturity", "maturity_data"])
save_gold_macro = save_gold_macro.withColumn("default", when(col("current_delinquency_status") > 0, 1).otherwise(0)).drop("current_delinquency_status")

save_gold_macro.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("gold_loan_macro_features")