In [None]:
import os
import glob
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
import pyspark.sql.functions as F

from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType

import utils.data_processing_bronze_table
import utils.data_processing_silver_table
# import utils.data_processing_gold_table

from utils.data_processing_clickstream import (
    process_bronze_clickstream,
    process_silver_clickstream)

from utils.data_processing_attributes import (
    process_bronze_attributes,
    process_silver_attributes)

from utils.data_processing_financials import (
    process_bronze_financials,
    process_silver_financials)

from utils.data_processing_gold_features import (
    process_gold_customer_feature_store)

## set up pyspark session

In [None]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

## set up config

In [None]:
# set up config
snapshot_date_str = "2023-01-01"

start_date_str = "2023-01-01"
end_date_str = "2024-12-01"

In [None]:
# generate list of dates to process
def generate_first_of_month_dates(start_date_str, end_date_str):
    # Convert the date strings to datetime objects
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
    end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
    
    # List to store the first of month dates
    first_of_month_dates = []

    # Start from the first of the month of the start_date
    current_date = datetime(start_date.year, start_date.month, 1)

    while current_date <= end_date:
        # Append the date in yyyy-mm-dd format
        first_of_month_dates.append(current_date.strftime("%Y-%m-%d"))
        
        # Move to the first of the next month
        if current_date.month == 12:
            current_date = datetime(current_date.year + 1, 1, 1)
        else:
            current_date = datetime(current_date.year, current_date.month + 1, 1)

    return first_of_month_dates

dates_str_lst = generate_first_of_month_dates(start_date_str, end_date_str)
dates_str_lst

## Build Bronze Table

In [None]:
# create bronze datalake
bronze_lms_directory = "datamart/bronze/lms/"

if not os.path.exists(bronze_lms_directory):
    os.makedirs(bronze_lms_directory)

In [None]:
# run bronze backfill
for date_str in dates_str_lst:
    utils.data_processing_bronze_table.process_bronze_table(date_str, bronze_lms_directory, spark)
    utils.data_processing_clickstream.process_bronze_clickstream(date_str, bronze_lms_directory, spark)
    utils.data_processing_attributes.process_bronze_attributes(date_str, bronze_lms_directory, spark)
    utils.data_processing_financials.process_bronze_financials(date_str, bronze_lms_directory, spark)

In [None]:
# inspect Bronze outputs for all datasets

loan_bronze_df = utils.data_processing_bronze_table.process_bronze_table(date_str, bronze_lms_directory, spark)
display(loan_bronze_df.toPandas())

click_bronze_df = utils.data_processing_clickstream.process_bronze_clickstream(date_str, bronze_lms_directory, spark)
display(click_bronze_df.toPandas())

attr_bronze_df = utils.data_processing_attributes.process_bronze_attributes(date_str, bronze_lms_directory, spark)
display(attr_bronze_df.toPandas())

fin_bronze_df = utils.data_processing_financials.process_bronze_financials(date_str, bronze_lms_directory, spark)
display(fin_bronze_df.toPandas())


## Build Silver Table

In [None]:
# create bronze datalake
silver_loan_daily_directory = "datamart/silver/loan_daily/"

if not os.path.exists(silver_loan_daily_directory):
    os.makedirs(silver_loan_daily_directory)

In [None]:
# run silver backfill
for date_str in dates_str_lst:
    utils.data_processing_silver_table.process_silver_table(date_str, bronze_lms_directory, silver_loan_daily_directory, spark)
    utils.data_processing_clickstream.process_silver_clickstream(date_str, bronze_lms_directory, silver_loan_daily_directory, spark)
    utils.data_processing_attributes.process_silver_attributes(date_str, bronze_lms_directory, silver_loan_daily_directory, spark)
    utils.data_processing_financials.process_silver_financials(date_str, bronze_lms_directory, silver_loan_daily_directory, spark)

In [None]:
loan_silver_df = utils.data_processing_silver_table.process_silver_table(date_str, bronze_lms_directory, silver_loan_daily_directory, spark)
display(loan_silver_df.toPandas())

click_silver_df = utils.data_processing_clickstream.process_silver_clickstream(date_str, bronze_lms_directory, silver_loan_daily_directory, spark)
display(click_silver_df.toPandas())

attr_silver_df = utils.data_processing_attributes.process_silver_attributes(date_str, bronze_lms_directory, silver_loan_daily_directory, spark)
display(attr_silver_df.toPandas())

fin_silver_df = utils.data_processing_financials.process_silver_financials(date_str, bronze_lms_directory, silver_loan_daily_directory, spark)
display(fin_silver_df.toPandas())

## EDA on credit labels

In [None]:
# Set DPD label definition
dpd = 30

# Path to the folder containing all parquet files
folder_path = silver_loan_daily_directory

# Filter only loan daily parquet files
loan_files = [
    f for f in glob.glob(os.path.join(folder_path, '*'))
    if 'silver_loan_daily_' in os.path.basename(f)
]

if not loan_files:
    raise FileNotFoundError("No loan parquet files found. Please check the directory or file naming.")

# Read and merge all loan parquet files
df = spark.read.parquet(*loan_files)

# Check schema to avoid reading the wrong table
if "loan_start_date" not in df.columns:
    raise ValueError(
        f"The current DataFrame does not contain 'loan_start_date'. "
        f"Actual columns are: {df.columns}. "
        f"Please check if non-loan parquet files were mixed in."
    )

# Filter only loans that started before January 1, 2024
df = df.filter(col("loan_start_date") < datetime.strptime("2024-01-01", "%Y-%m-%d"))

# Create DPD flag (1 if dpd >= threshold, else 0)
df = df.withColumn("dpd_flag", F.when(col("dpd") >= dpd, 1).otherwise(0))

# Select actual bad loans (e.g., installment_num == 10)
actual_bads_df = df.filter(col("installment_num") == 10)

# Convert to pandas for visualization
pdf = df.toPandas()

# Group by 'mob' and calculate bad rate
grouped = pdf.groupby('mob')['dpd_flag'].mean().sort_index()

# Plot the bad rate curve
grouped.plot(kind='line', marker='o')
plt.title(f'DPD: {dpd}')
plt.xlabel('mob')
plt.ylabel('bad rate')
plt.grid(True)
plt.show()


In [None]:
df.show()

## Build gold table for labels

In [None]:
# create bronze datalake
gold_label_store_directory = "datamart/gold/label_store/"

if not os.path.exists(gold_label_store_directory):
    os.makedirs(gold_label_store_directory)

In [None]:
# # run gold backfill
# for date_str in dates_str_lst:
#     utils.data_processing_gold_table.process_labels_gold_table(date_str, silver_loan_daily_directory, gold_label_store_directory, spark, dpd = 30, mob = 6)
#     utils.data_processing_gold_features.process_gold_customer_feature_store(date_str, silver_loan_daily_directory, gold_label_store_directory, spark)

# Gold backfill for labels + feature store
for date_str in dates_str_lst:
    utils.data_processing_gold_table.process_labels_gold_table(
        date_str, 
        silver_loan_daily_directory, 
        gold_label_store_directory, 
        spark, 
        dpd=30, 
        mob=6)

    gold_feature_df = utils.data_processing_gold_features.process_gold_customer_feature_store(
        date_str, 
        silver_loan_daily_directory,  
        gold_label_store_directory, 
        spark)

    print(f"[GOLD] {date_str}: Feature store row count = {gold_feature_df.count()}, columns = {len(gold_feature_df.columns)}")

In [None]:
label_gold_df = utils.data_processing_gold_table.process_labels_gold_table(date_str, silver_loan_daily_directory, gold_label_store_directory, spark, dpd = 30, mob = 6).dtypes
print(label_gold_df.dtypes)

feature_gold_df = utils.data_processing_gold_features.process_gold_customer_feature_store(ate_str, silver_loan_daily_directory, gold_label_store_directory, spark)
print(feature_gold_df.dtypes)

## inspect label store

In [None]:
folder_path = gold_label_store_directory
files_list = [folder_path+os.path.basename(f) for f in glob.glob(os.path.join(folder_path, '*'))]
df = spark.read.option("header", "true").parquet(*files_list)
print("row_count:",df.count())

df.show()

In [None]:
df.printSchema()