In [1]:
import os
import glob
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
import pyspark.sql.functions as F

from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType

import utils.data_processing_bronze_table
import utils.data_processing_silver_table
import utils.data_processing_gold_table

import utils.data_processing_bronze_table_feature
import utils.data_processing_silver_table_feature
import utils.data_processing_gold_table_feature

## set up pyspark session

In [2]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/12 17:04:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## set up config

In [3]:
# set up config
snapshot_date_str = "2023-01-01"

start_date_str = "2023-01-01"
end_date_str = "2024-12-01"

In [4]:
# generate list of dates to process
def generate_first_of_month_dates(start_date_str, end_date_str):
    # Convert the date strings to datetime objects
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
    end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
    
    # List to store the first of month dates
    first_of_month_dates = []

    # Start from the first of the month of the start_date
    current_date = datetime(start_date.year, start_date.month, 1)

    while current_date <= end_date:
        # Append the date in yyyy-mm-dd format
        first_of_month_dates.append(current_date.strftime("%Y-%m-%d"))
        
        # Move to the first of the next month
        if current_date.month == 12:
            current_date = datetime(current_date.year + 1, 1, 1)
        else:
            current_date = datetime(current_date.year, current_date.month + 1, 1)

    return first_of_month_dates

dates_str_lst = generate_first_of_month_dates(start_date_str, end_date_str)
dates_str_lst

['2023-01-01',
 '2023-02-01',
 '2023-03-01',
 '2023-04-01',
 '2023-05-01',
 '2023-06-01',
 '2023-07-01',
 '2023-08-01',
 '2023-09-01',
 '2023-10-01',
 '2023-11-01',
 '2023-12-01',
 '2024-01-01',
 '2024-02-01',
 '2024-03-01',
 '2024-04-01',
 '2024-05-01',
 '2024-06-01',
 '2024-07-01',
 '2024-08-01',
 '2024-09-01',
 '2024-10-01',
 '2024-11-01',
 '2024-12-01']

## Build Bronze Table

In [5]:
# create bronze datalake (clickstream)
bronze_clickstream_directory = "datamart/bronze/feature/clickstream/"

if not os.path.exists(bronze_clickstream_directory):
    os.makedirs(bronze_clickstream_directory)

In [6]:
# run bronze backfill (clickstream)
for date_str in dates_str_lst:
    utils.data_processing_bronze_table_feature.process_bronze_table_clickstream(date_str, bronze_clickstream_directory, spark)

2023-01-01row count: 8974
saved to: datamart/bronze/feature/clickstream/bronze_feature_clickstream_2023_01_01.csv
2023-02-01row count: 8974
saved to: datamart/bronze/feature/clickstream/bronze_feature_clickstream_2023_02_01.csv
2023-03-01row count: 8974
saved to: datamart/bronze/feature/clickstream/bronze_feature_clickstream_2023_03_01.csv
2023-04-01row count: 8974
saved to: datamart/bronze/feature/clickstream/bronze_feature_clickstream_2023_04_01.csv
2023-05-01row count: 8974
saved to: datamart/bronze/feature/clickstream/bronze_feature_clickstream_2023_05_01.csv
2023-06-01row count: 8974
saved to: datamart/bronze/feature/clickstream/bronze_feature_clickstream_2023_06_01.csv
2023-07-01row count: 8974
saved to: datamart/bronze/feature/clickstream/bronze_feature_clickstream_2023_07_01.csv
2023-08-01row count: 8974
saved to: datamart/bronze/feature/clickstream/bronze_feature_clickstream_2023_08_01.csv
2023-09-01row count: 8974
saved to: datamart/bronze/feature/clickstream/bronze_feature_c

In [7]:
# inspect output
utils.data_processing_bronze_table_feature.process_bronze_table_clickstream(date_str, bronze_clickstream_directory, spark).toPandas()

2024-12-01row count: 8974
saved to: datamart/bronze/feature/clickstream/bronze_feature_clickstream_2024_12_01.csv


Unnamed: 0,fe_1,fe_2,fe_3,fe_4,fe_5,fe_6,fe_7,fe_8,fe_9,fe_10,...,fe_13,fe_14,fe_15,fe_16,fe_17,fe_18,fe_19,fe_20,Customer_ID,snapshot_date
0,145,189,109,134,196,-37,101,82,111,24,...,65,249,200,185,-83,-18,-76,30,CUS_0x1037,2024-12-01
1,40,184,187,75,192,146,38,109,353,141,...,-14,193,125,117,215,91,33,255,CUS_0x1069,2024-12-01
2,98,121,180,200,95,48,59,194,76,84,...,167,101,92,185,98,68,-60,116,CUS_0x114a,2024-12-01
3,85,96,19,47,30,39,-32,210,-81,206,...,143,94,139,237,78,187,77,33,CUS_0x1184,2024-12-01
4,98,45,155,56,112,47,52,138,153,225,...,-43,142,121,10,189,110,264,241,CUS_0x1297,2024-12-01
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
8969,414,22,72,57,142,192,11,139,24,63,...,179,91,20,189,-35,-19,15,66,CUS_0xdf6,2024-12-01
8970,116,-124,-108,212,-21,227,146,112,186,-65,...,38,226,319,98,9,152,17,14,CUS_0xe23,2024-12-01
8971,237,-3,-49,375,144,41,-170,324,19,266,...,7,102,64,191,124,220,231,75,CUS_0xe4e,2024-12-01
8972,5,67,211,83,207,-41,325,14,-18,41,...,109,266,28,157,131,116,101,131,CUS_0xedd,2024-12-01


In [8]:
# create bronze datalake (attributes)
bronze_attributes_directory = "datamart/bronze/feature/attributes/"

if not os.path.exists(bronze_attributes_directory):
    os.makedirs(bronze_attributes_directory)

In [9]:
# run bronze backfill (attributes)
for date_str in dates_str_lst:
    utils.data_processing_bronze_table_feature.process_bronze_table_attributes(date_str, bronze_attributes_directory, spark)

2023-01-01row count: 530
saved to: datamart/bronze/feature/attributes/bronze_feature_attributes_2023_01_01.csv
2023-02-01row count: 501
saved to: datamart/bronze/feature/attributes/bronze_feature_attributes_2023_02_01.csv
2023-03-01row count: 506
saved to: datamart/bronze/feature/attributes/bronze_feature_attributes_2023_03_01.csv
2023-04-01row count: 510
saved to: datamart/bronze/feature/attributes/bronze_feature_attributes_2023_04_01.csv
2023-05-01row count: 521
saved to: datamart/bronze/feature/attributes/bronze_feature_attributes_2023_05_01.csv
2023-06-01row count: 517
saved to: datamart/bronze/feature/attributes/bronze_feature_attributes_2023_06_01.csv
2023-07-01row count: 471
saved to: datamart/bronze/feature/attributes/bronze_feature_attributes_2023_07_01.csv
2023-08-01row count: 481
saved to: datamart/bronze/feature/attributes/bronze_feature_attributes_2023_08_01.csv
2023-09-01row count: 454
saved to: datamart/bronze/feature/attributes/bronze_feature_attributes_2023_09_01.csv
2

In [10]:
# inspect output
utils.data_processing_bronze_table_feature.process_bronze_table_attributes(date_str, bronze_attributes_directory, spark).toPandas()

2024-12-01row count: 515
saved to: datamart/bronze/feature/attributes/bronze_feature_attributes_2024_12_01.csv


Unnamed: 0,Customer_ID,Name,Age,SSN,Occupation,snapshot_date
0,CUS_0x103e,Tim Kellyf,40,155-72-8070,Scientist,2024-12-01
1,CUS_0x1195,Alexk,31,822-48-3629,Manager,2024-12-01
2,CUS_0x1197,Nayako,28,799-23-8283,_______,2024-12-01
3,CUS_0x11e2,Valetkevitchr,34,809-04-1419,Musician,2024-12-01
4,CUS_0x11ec,William Schombergh,34,417-74-2163,Journalist,2024-12-01
...,...,...,...,...,...,...
510,CUS_0xe6c,Doris Frankelh,26,172-24-1577,Entrepreneur,2024-12-01
511,CUS_0xe99,Moone,48,164-90-3178,Mechanic,2024-12-01
512,CUS_0xf55,Tarmo Virkip,39,025-54-8593,Entrepreneur,2024-12-01
513,CUS_0xfd1,Frewy,32,389-55-6408,Architect,2024-12-01


In [11]:
# create bronze datalake (financials)
bronze_financials_directory = "datamart/bronze/feature/financials/"

if not os.path.exists(bronze_financials_directory):
    os.makedirs(bronze_financials_directory)

In [12]:
# run bronze backfill (financials)
for date_str in dates_str_lst:
    utils.data_processing_bronze_table_feature.process_bronze_table_financials(date_str, bronze_financials_directory, spark)

2023-01-01row count: 530
saved to: datamart/bronze/feature/financials/bronze_feature_financials_2023_01_01.csv
2023-02-01row count: 501
saved to: datamart/bronze/feature/financials/bronze_feature_financials_2023_02_01.csv
2023-03-01row count: 506
saved to: datamart/bronze/feature/financials/bronze_feature_financials_2023_03_01.csv
2023-04-01row count: 510
saved to: datamart/bronze/feature/financials/bronze_feature_financials_2023_04_01.csv
2023-05-01row count: 521
saved to: datamart/bronze/feature/financials/bronze_feature_financials_2023_05_01.csv
2023-06-01row count: 517
saved to: datamart/bronze/feature/financials/bronze_feature_financials_2023_06_01.csv
2023-07-01row count: 471
saved to: datamart/bronze/feature/financials/bronze_feature_financials_2023_07_01.csv
2023-08-01row count: 481
saved to: datamart/bronze/feature/financials/bronze_feature_financials_2023_08_01.csv
2023-09-01row count: 454
saved to: datamart/bronze/feature/financials/bronze_feature_financials_2023_09_01.csv
2

In [13]:
# inspect output
utils.data_processing_bronze_table_feature.process_bronze_table_financials(date_str, bronze_financials_directory, spark).toPandas()

2024-12-01row count: 515
saved to: datamart/bronze/feature/financials/bronze_feature_financials_2024_12_01.csv


Unnamed: 0,Customer_ID,Annual_Income,Monthly_Inhand_Salary,Num_Bank_Accounts,Num_Credit_Card,Interest_Rate,Num_of_Loan,Type_of_Loan,Delay_from_due_date,Num_of_Delayed_Payment,...,Credit_Mix,Outstanding_Debt,Credit_Utilization_Ratio,Credit_History_Age,Payment_of_Min_Amount,Total_EMI_per_month,Amount_invested_monthly,Payment_Behaviour,Monthly_Balance,snapshot_date
0,CUS_0x103e,98690.8,8262.233333,4,6,9,1_,Student Loan,6,17,...,Good,706.96,26.860663,26 Years and 11 Months,No,55.004408,913.4813186573292,Low_spent_Small_value_payments,147.7376071067124,2024-12-01
1,CUS_0x1195,30429.91,2808.825833,4,6,16,2,"Auto Loan, and Auto Loan",22,17,...,Standard,362.48,33.349050,28 Years and 11 Months,No,29.914076,82.87878577514347,Low_spent_Large_value_payments,438.08972109416084,2024-12-01
2,CUS_0x1197,92300.01,7437.667500,2,4,11,3,"Credit-Builder Loan, Not Specified, and Credit...",27,9,...,_,755.17_,26.989787,18 Years and 11 Months,Yes,49236.000000,220.8621525417414,Low_spent_Large_value_payments,581.1567885447394,2024-12-01
3,CUS_0x11e2,44986.55,3689.879167,6,5,11,1,Credit-Builder Loan,0,4,...,Good,753.21,25.586286,20 Years and 0 Months,No,23.267135,43.20363344633164,High_spent_Large_value_payments,542.5171477430948,2024-12-01
4,CUS_0x11ec,14867.69,1005.974167,9,9,18,6,"Debt Consolidation Loan, Student Loan, Persona...",39,15,...,Standard,2344.06,24.344388,17 Years and 2 Months,Yes,55.459604,100.14574834721886,Low_spent_Medium_value_payments,224.99206407779144,2024-12-01
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
510,CUS_0xe6c,125597.52,9367.500187,1,3,12,4,"Debt Consolidation Loan, Not Specified, Studen...",2,9,...,Good,1294.94,30.324257,30 Years and 5 Months,NM,1278.186251,964.5381161830327,Low_spent_Medium_value_payments,763.3982127892344,2024-12-01
511,CUS_0xe99,45461.54,3917.461667,6,3,5,2,"Credit-Builder Loan, and Payday Loan",20,9,...,Standard,647.24,27.264685,16 Years and 9 Months,No,69.318349,42.941001590068666,High_spent_Large_value_payments,519.4868162135749,2024-12-01
512,CUS_0xf55,78443.48_,6358.956667,7,5,23,4,"Personal Loan, Home Equity Loan, Mortgage Loan...",39,19,...,Bad,1527.77,24.704429,15 Years and 10 Months,NM,177.387563,528.7469053018515,Low_spent_Medium_value_payments,209.76119880079318,2024-12-01
513,CUS_0xfd1,78666.56999999999,6485.547500,3,4,17,4,"Not Specified, Personal Loan, Home Equity Loan...",29,10,...,Standard,1498.7,37.831762,22 Years and 5 Months,No,247.851145,252.3461368272953,High_spent_Small_value_payments,408.35746850506007,2024-12-01


## Build Silver Table

In [14]:
import importlib
import utils.data_processing_silver_table as silver_mod
importlib.reload(silver_mod)

# see exactly what functions are exposed
print([f for f in dir(silver_mod) if f.startswith("process")])

['process_silver_table']


In [15]:
# create silver datalake (clickstream)
silver_clickstream_directory = "datamart/silver/feature/clickstream/"

if not os.path.exists(silver_clickstream_directory):
    os.makedirs(silver_clickstream_directory)

In [17]:
# run silver backfill
for date_str in dates_str_lst:
    utils.data_processing_silver_table_feature.process_silver_table_clickstream(date_str, bronze_clickstream_directory, silver_clickstream_directory, spark)

loaded from: datamart/bronze/feature/clickstream/bronze_feature_clickstream_2023_01_01.csv row count: 8974
saved to: datamart/silver/feature/clickstream/silver_feature_clickstream_2023_01_01.parquet
loaded from: datamart/bronze/feature/clickstream/bronze_feature_clickstream_2023_02_01.csv row count: 8974
saved to: datamart/silver/feature/clickstream/silver_feature_clickstream_2023_02_01.parquet
loaded from: datamart/bronze/feature/clickstream/bronze_feature_clickstream_2023_03_01.csv row count: 8974
saved to: datamart/silver/feature/clickstream/silver_feature_clickstream_2023_03_01.parquet
loaded from: datamart/bronze/feature/clickstream/bronze_feature_clickstream_2023_04_01.csv row count: 8974
saved to: datamart/silver/feature/clickstream/silver_feature_clickstream_2023_04_01.parquet
loaded from: datamart/bronze/feature/clickstream/bronze_feature_clickstream_2023_05_01.csv row count: 8974
saved to: datamart/silver/feature/clickstream/silver_feature_clickstream_2023_05_01.parquet
loade

In [19]:
utils.data_processing_silver_table_feature.process_silver_table_clickstream(date_str, bronze_clickstream_directory, silver_clickstream_directory, spark).toPandas()

loaded from: datamart/bronze/feature/clickstream/bronze_feature_clickstream_2024_12_01.csv row count: 8974
saved to: datamart/silver/feature/clickstream/silver_feature_clickstream_2024_12_01.parquet


Unnamed: 0,fe_1,fe_2,fe_3,fe_4,fe_5,fe_6,fe_7,fe_8,fe_9,fe_10,...,fe_14,fe_15,fe_16,fe_17,fe_18,fe_19,fe_20,Customer_ID,snapshot_date,fe_1_5_mean
0,145.0,189.0,109.0,134.0,196.0,-37.0,101.0,82.0,111.0,24.0,...,249.0,200.0,185.0,-83.0,-18.0,-76.0,30.0,CUS_0x1037,2024-12-01,154.600006
1,40.0,184.0,187.0,75.0,192.0,146.0,38.0,109.0,353.0,141.0,...,193.0,125.0,117.0,215.0,91.0,33.0,255.0,CUS_0x1069,2024-12-01,135.600006
2,98.0,121.0,180.0,200.0,95.0,48.0,59.0,194.0,76.0,84.0,...,101.0,92.0,185.0,98.0,68.0,-60.0,116.0,CUS_0x114a,2024-12-01,138.800003
3,85.0,96.0,19.0,47.0,30.0,39.0,-32.0,210.0,-81.0,206.0,...,94.0,139.0,237.0,78.0,187.0,77.0,33.0,CUS_0x1184,2024-12-01,55.400002
4,98.0,45.0,155.0,56.0,112.0,47.0,52.0,138.0,153.0,225.0,...,142.0,121.0,10.0,189.0,110.0,264.0,241.0,CUS_0x1297,2024-12-01,93.199997
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
8969,414.0,22.0,72.0,57.0,142.0,192.0,11.0,139.0,24.0,63.0,...,91.0,20.0,189.0,-35.0,-19.0,15.0,66.0,CUS_0xdf6,2024-12-01,141.399994
8970,116.0,-124.0,-108.0,212.0,-21.0,227.0,146.0,112.0,186.0,-65.0,...,226.0,319.0,98.0,9.0,152.0,17.0,14.0,CUS_0xe23,2024-12-01,15.000000
8971,237.0,-3.0,-49.0,375.0,144.0,41.0,-170.0,324.0,19.0,266.0,...,102.0,64.0,191.0,124.0,220.0,231.0,75.0,CUS_0xe4e,2024-12-01,140.800003
8972,5.0,67.0,211.0,83.0,207.0,-41.0,325.0,14.0,-18.0,41.0,...,266.0,28.0,157.0,131.0,116.0,101.0,131.0,CUS_0xedd,2024-12-01,114.599998


In [20]:
# create silver datalake (attributes)
silver_attributes_directory = "datamart/silver/feature/attributes/"

if not os.path.exists(silver_attributes_directory):
    os.makedirs(silver_attributes_directory)

In [21]:
# run silver backfill
for date_str in dates_str_lst:
    utils.data_processing_silver_table_feature.process_silver_table_attributes(date_str, bronze_attributes_directory, silver_attributes_directory, spark)

loaded from: datamart/bronze/feature/attributes/bronze_feature_attributes_2023_01_01.csv row count: 530
saved to: datamart/silver/feature/attributes/silver_features_attributes_2023_01_01.parquet
loaded from: datamart/bronze/feature/attributes/bronze_feature_attributes_2023_02_01.csv row count: 501
saved to: datamart/silver/feature/attributes/silver_features_attributes_2023_02_01.parquet
loaded from: datamart/bronze/feature/attributes/bronze_feature_attributes_2023_03_01.csv row count: 506
saved to: datamart/silver/feature/attributes/silver_features_attributes_2023_03_01.parquet
loaded from: datamart/bronze/feature/attributes/bronze_feature_attributes_2023_04_01.csv row count: 510
saved to: datamart/silver/feature/attributes/silver_features_attributes_2023_04_01.parquet
loaded from: datamart/bronze/feature/attributes/bronze_feature_attributes_2023_05_01.csv row count: 521
saved to: datamart/silver/feature/attributes/silver_features_attributes_2023_05_01.parquet
loaded from: datamart/bro

In [22]:
utils.data_processing_silver_table_feature.process_silver_table_attributes(date_str, bronze_attributes_directory, silver_attributes_directory, spark).toPandas()

loaded from: datamart/bronze/feature/attributes/bronze_feature_attributes_2024_12_01.csv row count: 515
saved to: datamart/silver/feature/attributes/silver_features_attributes_2024_12_01.parquet


Unnamed: 0,Customer_ID,Name,SSN,Occupation,snapshot_date,Age_num,Age_missing
0,CUS_0x103e,Tim Kellyf,155-72-8070,Scientist,2024-12-01,40,0
1,CUS_0x1195,Alexk,822-48-3629,Manager,2024-12-01,31,0
2,CUS_0x1197,Nayako,799-23-8283,Unknown,2024-12-01,28,0
3,CUS_0x11e2,Valetkevitchr,809-04-1419,Musician,2024-12-01,34,0
4,CUS_0x11ec,William Schombergh,417-74-2163,Journalist,2024-12-01,34,0
...,...,...,...,...,...,...,...
510,CUS_0xe6c,Doris Frankelh,172-24-1577,Entrepreneur,2024-12-01,26,0
511,CUS_0xe99,Moone,164-90-3178,Mechanic,2024-12-01,48,0
512,CUS_0xf55,Tarmo Virkip,025-54-8593,Entrepreneur,2024-12-01,39,0
513,CUS_0xfd1,Frewy,389-55-6408,Architect,2024-12-01,32,0


In [29]:
# create silver datalake (financials)
silver_financials_directory = "datamart/silver/feature/financials/"

if not os.path.exists(silver_financials_directory):
    os.makedirs(silver_financials_directory)

In [30]:
# run silver backfill
for date_str in dates_str_lst:
    utils.data_processing_silver_table_feature.process_silver_table_financials(date_str, bronze_financials_directory, silver_financials_directory, spark)

loaded from: datamart/bronze/feature/financials/bronze_feature_financials_2023_01_01.csv row count: 530
saved to: datamart/silver/feature/financials/silver_feature_financials_2023_01_01.parquet
loaded from: datamart/bronze/feature/financials/bronze_feature_financials_2023_02_01.csv row count: 501
saved to: datamart/silver/feature/financials/silver_feature_financials_2023_02_01.parquet
loaded from: datamart/bronze/feature/financials/bronze_feature_financials_2023_03_01.csv row count: 506
saved to: datamart/silver/feature/financials/silver_feature_financials_2023_03_01.parquet
loaded from: datamart/bronze/feature/financials/bronze_feature_financials_2023_04_01.csv row count: 510
saved to: datamart/silver/feature/financials/silver_feature_financials_2023_04_01.parquet
loaded from: datamart/bronze/feature/financials/bronze_feature_financials_2023_05_01.csv row count: 521
saved to: datamart/silver/feature/financials/silver_feature_financials_2023_05_01.parquet
loaded from: datamart/bronze/f

In [31]:
utils.data_processing_silver_table_feature.process_silver_table_financials(date_str, bronze_financials_directory, silver_financials_directory, spark).toPandas()

loaded from: datamart/bronze/feature/financials/bronze_feature_financials_2024_12_01.csv row count: 515
saved to: datamart/silver/feature/financials/silver_feature_financials_2024_12_01.parquet


Unnamed: 0,Customer_ID,Annual_Income,Monthly_Inhand_Salary,Num_Bank_Accounts,Num_Credit_Card,Interest_Rate,Num_of_Loan,Type_of_Loan,Delay_from_due_date,Num_of_Delayed_Payment,...,Total_EMI_per_month,Amount_invested_monthly,Payment_Behaviour,Monthly_Balance,snapshot_date,Credit_History_Age_num,debt_to_income_ratio,monthly_repayment_to_income,days_overdue_per_late_payment,credit_inquiries_per_year
0,CUS_0x103e,98690.796875,8262.233398,4.0,6.0,9.0,,Student Loan,6.0,17.0,...,55.004406,913.481323,Low_spent_Small_value_payments,147.737610,2024-12-01,,0.007163,0.006657,0.352941,
1,CUS_0x1195,30429.910156,2808.825928,4.0,6.0,16.0,2.0,"Auto Loan, and Auto Loan",22.0,17.0,...,29.914076,82.878784,Low_spent_Large_value_payments,438.089722,2024-12-01,,0.011912,0.010650,1.294118,
2,CUS_0x1197,92300.007812,7437.667480,2.0,4.0,11.0,3.0,"Credit-Builder Loan, Not Specified, and Credit...",27.0,9.0,...,49236.000000,220.862152,Low_spent_Large_value_payments,581.156799,2024-12-01,,,6.619817,3.000000,
3,CUS_0x11e2,44986.550781,3689.879150,6.0,5.0,11.0,1.0,Credit-Builder Loan,0.0,4.0,...,23.267136,43.203632,High_spent_Large_value_payments,542.517151,2024-12-01,,0.016743,0.006306,0.000000,
4,CUS_0x11ec,14867.690430,1005.974182,9.0,9.0,18.0,6.0,"Debt Consolidation Loan, Student Loan, Persona...",39.0,15.0,...,55.459602,100.145752,Low_spent_Medium_value_payments,224.992065,2024-12-01,,0.157661,0.055130,2.600000,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
510,CUS_0xe6c,125597.523438,9367.500000,1.0,3.0,12.0,4.0,"Debt Consolidation Loan, Not Specified, Studen...",2.0,9.0,...,1278.186279,964.538086,Low_spent_Medium_value_payments,763.398193,2024-12-01,,0.010310,0.136449,0.222222,
511,CUS_0xe99,45461.539062,3917.461670,6.0,3.0,5.0,2.0,"Credit-Builder Loan, and Payday Loan",20.0,9.0,...,69.318352,42.941002,High_spent_Large_value_payments,519.486816,2024-12-01,,0.014237,0.017695,2.222222,
512,CUS_0xf55,,6358.956543,7.0,5.0,23.0,4.0,"Personal Loan, Home Equity Loan, Mortgage Loan...",39.0,19.0,...,177.387558,528.746887,Low_spent_Medium_value_payments,209.761200,2024-12-01,,,0.027896,2.052632,
513,CUS_0xfd1,78666.570312,6485.547363,3.0,4.0,17.0,4.0,"Not Specified, Personal Loan, Home Equity Loan...",29.0,10.0,...,247.851151,252.346130,High_spent_Small_value_payments,408.357483,2024-12-01,,0.019051,0.038216,2.900000,


## EDA on credit labels

In [None]:
# set dpd label definition
dpd = 30

# Path to the folder containing CSV files
folder_path = silver_loan_daily_directory

# Read all CSV files into a single DataFrame
files_list = [folder_path+os.path.basename(f) for f in glob.glob(os.path.join(folder_path, '*'))]
df = spark.read.option("header", "true").parquet(*files_list)

# filter only completed loans
df = df.filter(col("loan_start_date") < datetime.strptime("2024-01-01", "%Y-%m-%d"))

# create dpd flag if more than dpd
df = df.withColumn("dpd_flag", F.when(col("dpd") >= dpd, 1).otherwise(0))

# actual bads 
actual_bads_df = df.filter(col("installment_num") == 10)

# prepare for analysis
# df = df.filter(col("installment_num") < 10)

# visualise bad rate
pdf = df.toPandas()

# Group by col_A and count occurrences in col_B
grouped = pdf.groupby('mob')['dpd_flag'].mean()

# Sort the index (x-axis) of the grouped DataFrame
grouped = grouped.sort_index()

# Plotting
grouped.plot(kind='line', marker='o')

plt.title('DPD: '+ str(dpd))
plt.xlabel('mob')
plt.ylabel('bad rate')
plt.grid(True)
plt.show()


In [13]:
df.show()

+--------------------+-----------+---------------+------+---------------+--------+-------+--------+-----------+-------+-------------+---+-------------------+-----------------+---+--------+
|             loan_id|Customer_ID|loan_start_date|tenure|installment_num|loan_amt|due_amt|paid_amt|overdue_amt|balance|snapshot_date|mob|installments_missed|first_missed_date|dpd|dpd_flag|
+--------------------+-----------+---------------+------+---------------+--------+-------+--------+-----------+-------+-------------+---+-------------------+-----------------+---+--------+
|CUS_0x1011_2023_1...| CUS_0x1011|     2023-11-01|    10|             10| 10000.0| 1000.0|  1000.0|        0.0|    0.0|   2024-09-01| 10|                  0|             NULL|  0|       0|
|CUS_0x1013_2023_1...| CUS_0x1013|     2023-12-01|    10|              9| 10000.0| 1000.0|  1000.0|        0.0| 1000.0|   2024-09-01|  9|                  0|             NULL|  0|       0|
|CUS_0x1018_2023_1...| CUS_0x1018|     2023-11-01|    1

## Build gold table for labels

In [14]:
# create bronze datalake
gold_label_store_directory = "datamart/gold/label_store/"

if not os.path.exists(gold_label_store_directory):
    os.makedirs(gold_label_store_directory)

In [15]:
# run gold backfill
for date_str in dates_str_lst:
    utils.data_processing_gold_table.process_labels_gold_table(date_str, silver_loan_daily_directory, gold_label_store_directory, spark, dpd = 30, mob = 6)


loaded from: datamart/silver/loan_daily/silver_loan_daily_2023_01_01.parquet row count: 530
saved to: datamart/gold/label_store/gold_label_store_2023_01_01.parquet
loaded from: datamart/silver/loan_daily/silver_loan_daily_2023_02_01.parquet row count: 1031
saved to: datamart/gold/label_store/gold_label_store_2023_02_01.parquet
loaded from: datamart/silver/loan_daily/silver_loan_daily_2023_03_01.parquet row count: 1537
saved to: datamart/gold/label_store/gold_label_store_2023_03_01.parquet
loaded from: datamart/silver/loan_daily/silver_loan_daily_2023_04_01.parquet row count: 2047
saved to: datamart/gold/label_store/gold_label_store_2023_04_01.parquet
loaded from: datamart/silver/loan_daily/silver_loan_daily_2023_05_01.parquet row count: 2568
saved to: datamart/gold/label_store/gold_label_store_2023_05_01.parquet
loaded from: datamart/silver/loan_daily/silver_loan_daily_2023_06_01.parquet row count: 3085
saved to: datamart/gold/label_store/gold_label_store_2023_06_01.parquet
loaded from

In [16]:
utils.data_processing_gold_table.process_labels_gold_table(date_str, silver_loan_daily_directory, gold_label_store_directory, spark, dpd = 30, mob = 6).dtypes


loaded from: datamart/silver/loan_daily/silver_loan_daily_2024_12_01.parquet row count: 5531
saved to: datamart/gold/label_store/gold_label_store_2024_12_01.parquet


[('loan_id', 'string'),
 ('Customer_ID', 'string'),
 ('label', 'int'),
 ('label_def', 'string'),
 ('snapshot_date', 'date')]

## inspect label store

In [17]:
folder_path = gold_label_store_directory
files_list = [folder_path+os.path.basename(f) for f in glob.glob(os.path.join(folder_path, '*'))]
df = spark.read.option("header", "true").parquet(*files_list)
print("row_count:",df.count())

df.show()

row_count: 8974
+--------------------+-----------+-----+----------+-------------+
|             loan_id|Customer_ID|label| label_def|snapshot_date|
+--------------------+-----------+-----+----------+-------------+
|CUS_0x1037_2023_0...| CUS_0x1037|    0|30dpd_6mob|   2023-07-01|
|CUS_0x1069_2023_0...| CUS_0x1069|    0|30dpd_6mob|   2023-07-01|
|CUS_0x114a_2023_0...| CUS_0x114a|    0|30dpd_6mob|   2023-07-01|
|CUS_0x1184_2023_0...| CUS_0x1184|    0|30dpd_6mob|   2023-07-01|
|CUS_0x1297_2023_0...| CUS_0x1297|    1|30dpd_6mob|   2023-07-01|
|CUS_0x12fb_2023_0...| CUS_0x12fb|    0|30dpd_6mob|   2023-07-01|
|CUS_0x1325_2023_0...| CUS_0x1325|    0|30dpd_6mob|   2023-07-01|
|CUS_0x1341_2023_0...| CUS_0x1341|    0|30dpd_6mob|   2023-07-01|
|CUS_0x1375_2023_0...| CUS_0x1375|    1|30dpd_6mob|   2023-07-01|
|CUS_0x13a8_2023_0...| CUS_0x13a8|    0|30dpd_6mob|   2023-07-01|
|CUS_0x13ef_2023_0...| CUS_0x13ef|    0|30dpd_6mob|   2023-07-01|
|CUS_0x1440_2023_0...| CUS_0x1440|    0|30dpd_6mob|   2023-0

In [18]:
df.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- label_def: string (nullable = true)
 |-- snapshot_date: date (nullable = true)

