In [1]:
import os
import glob
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
import pyspark.sql.functions as F

from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType, TimestampType

import utils.data_processing_bronze_table

import utils.data_processing_silver_table
import utils.data_processing_gold_table


## set up pyspark session

In [2]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")
processor = utils.data_processing_silver_table.DataProcessor(spark)
silver_builder = utils.data_processing_silver_table.SilverDataMart(spark)
proc = utils.data_processing_gold_table.GoldDataProcessor(spark)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/17 12:20:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2025-05-17 12:20:57,771 - INFO - Loading cleaned silver inputs …
2025-05-17 12:21:00,388 - INFO - Input row counts – LMS: 104,288, Clickstream: 215,376, Attr: 11,011, Fin: 6,034


## set up config

In [3]:
# set up config
snapshot_date_str = "2023-01-01"
start_date_str = "2023-01-01"
end_date_str = "2024-12-01"

In [4]:
# generate list of dates to process
def generate_first_of_month_dates(start_date_str, end_date_str):
    # Convert the date strings to datetime objects
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
    end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
    
    # List to store the first of month dates
    first_of_month_dates = []

    # Start from the first of the month of the start_date
    current_date = datetime(start_date.year, start_date.month, 1)

    while current_date <= end_date:
        # Append the date in yyyy-mm-dd format
        first_of_month_dates.append(current_date.strftime("%Y-%m-%d"))
        
        # Move to the first of the next month
        if current_date.month == 12:
            current_date = datetime(current_date.year + 1, 1, 1)
        else:
            current_date = datetime(current_date.year, current_date.month + 1, 1)

    return first_of_month_dates

dates_str_lst = generate_first_of_month_dates(start_date_str, end_date_str)
# dates_str_lst

## Build Bronze Table

In [5]:
data_directory = 'data'
list_of_raw_files = utils.data_processing_bronze_table.get_csv_files_in_directory(data_directory)
for file_name in list_of_raw_files:
    base_name = os.path.splitext(file_name)[0]
    directory_path = f"datamart/bronze/{base_name}/"
    csv_file_path = os.path.join(data_directory, file_name)
    utils.data_processing_bronze_table.process_bronze_datalake(
        directory_path=directory_path,
        dates_list=dates_str_lst,
        processing_function=utils.data_processing_bronze_table.process_bronze_table,
        spark=spark,
        csv_file_path=csv_file_path
    )

Directory already exists: datamart/bronze/lms_loan_daily/
Processing date: 2023-01-01
Saved data to Parquet table at: datamart/bronze/lms_loan_daily/date=2023-01-01
Processing date: 2023-02-01
Saved data to Parquet table at: datamart/bronze/lms_loan_daily/date=2023-02-01
Processing date: 2023-03-01
Saved data to Parquet table at: datamart/bronze/lms_loan_daily/date=2023-03-01
Processing date: 2023-04-01
Saved data to Parquet table at: datamart/bronze/lms_loan_daily/date=2023-04-01
Processing date: 2023-05-01
Saved data to Parquet table at: datamart/bronze/lms_loan_daily/date=2023-05-01
Processing date: 2023-06-01
Saved data to Parquet table at: datamart/bronze/lms_loan_daily/date=2023-06-01
Processing date: 2023-07-01
Saved data to Parquet table at: datamart/bronze/lms_loan_daily/date=2023-07-01
Processing date: 2023-08-01
Saved data to Parquet table at: datamart/bronze/lms_loan_daily/date=2023-08-01
Processing date: 2023-09-01
Saved data to Parquet table at: datamart/bronze/lms_loan_d

## Build Silver Table

In [6]:

bronze_directory = 'datamart/bronze/feature_clickstream'
silver_directory = 'datamart/silver/feature_clickstream_cleaned'
processor.process_bronze_to_silver(
    bronze_path=bronze_directory,
    silver_path=silver_directory,
    dates=dates_str_lst,
    source_type="feature_clickstream"
)


2025-05-17 12:21:26,781 - INFO - Schema validation passed.
2025-05-17 12:21:26,802 - INFO - Data for 2023-01-01 already exists in silver. Skipping.
2025-05-17 12:21:26,836 - INFO - Schema validation passed.
2025-05-17 12:21:26,859 - INFO - Data for 2023-02-01 already exists in silver. Skipping.
2025-05-17 12:21:26,897 - INFO - Schema validation passed.
2025-05-17 12:21:26,922 - INFO - Data for 2023-03-01 already exists in silver. Skipping.
2025-05-17 12:21:26,965 - INFO - Schema validation passed.
2025-05-17 12:21:26,996 - INFO - Data for 2023-04-01 already exists in silver. Skipping.
2025-05-17 12:21:27,041 - INFO - Schema validation passed.
2025-05-17 12:21:27,068 - INFO - Data for 2023-05-01 already exists in silver. Skipping.
2025-05-17 12:21:27,112 - INFO - Schema validation passed.
2025-05-17 12:21:27,139 - INFO - Data for 2023-06-01 already exists in silver. Skipping.
2025-05-17 12:21:27,177 - INFO - Schema validation passed.
2025-05-17 12:21:27,200 - INFO - Data for 2023-07-01 

In [7]:
bronze_directory = 'datamart/bronze/features_financials'
silver_directory = 'datamart/silver/features_financials_cleaned'
processor.process_bronze_to_silver(
    bronze_path=bronze_directory,
    silver_path=silver_directory,
    dates=dates_str_lst,
    source_type="features_financials"
)

2025-05-17 12:21:28,248 - INFO - Schema validation passed.
2025-05-17 12:21:28,271 - INFO - Data for 2023-01-01 already exists in silver. Skipping.
2025-05-17 12:21:28,312 - INFO - Schema validation passed.
2025-05-17 12:21:28,337 - INFO - Data for 2023-02-01 already exists in silver. Skipping.
2025-05-17 12:21:28,377 - INFO - Schema validation passed.
2025-05-17 12:21:28,401 - INFO - Data for 2023-03-01 already exists in silver. Skipping.
2025-05-17 12:21:28,440 - INFO - Schema validation passed.
2025-05-17 12:21:28,462 - INFO - Data for 2023-04-01 already exists in silver. Skipping.
2025-05-17 12:21:28,499 - INFO - Schema validation passed.
2025-05-17 12:21:28,520 - INFO - Data for 2023-05-01 already exists in silver. Skipping.
2025-05-17 12:21:28,562 - INFO - Schema validation passed.
2025-05-17 12:21:28,599 - INFO - Data for 2023-06-01 already exists in silver. Skipping.
2025-05-17 12:21:28,651 - INFO - Schema validation passed.
2025-05-17 12:21:28,687 - INFO - Data for 2023-07-01 

In [8]:
bronze_directory = 'datamart/bronze/lms_loan_daily'
silver_directory = 'datamart/silver/lms_loan_daily_cleaned'
processor.process_bronze_to_silver(
    bronze_path=bronze_directory,
    silver_path=silver_directory,
    dates=dates_str_lst,
    source_type="lms_loan_daily"
)

2025-05-17 12:21:29,774 - INFO - Schema validation passed.
2025-05-17 12:21:29,799 - INFO - Data for 2023-01-01 already exists in silver. Skipping.
2025-05-17 12:21:29,840 - INFO - Schema validation passed.
2025-05-17 12:21:29,865 - INFO - Data for 2023-02-01 already exists in silver. Skipping.
2025-05-17 12:21:29,904 - INFO - Schema validation passed.
2025-05-17 12:21:29,930 - INFO - Data for 2023-03-01 already exists in silver. Skipping.
2025-05-17 12:21:29,973 - INFO - Schema validation passed.
2025-05-17 12:21:30,002 - INFO - Data for 2023-04-01 already exists in silver. Skipping.
2025-05-17 12:21:30,048 - INFO - Schema validation passed.
2025-05-17 12:21:30,074 - INFO - Data for 2023-05-01 already exists in silver. Skipping.
2025-05-17 12:21:30,115 - INFO - Schema validation passed.
2025-05-17 12:21:30,139 - INFO - Data for 2023-06-01 already exists in silver. Skipping.
2025-05-17 12:21:30,178 - INFO - Schema validation passed.
2025-05-17 12:21:30,202 - INFO - Data for 2023-07-01 

In [9]:
bronze_directory = 'datamart/bronze/features_attributes'
silver_directory = 'datamart/silver/features_attributes_cleaned'
processor.process_bronze_to_silver(
    bronze_path=bronze_directory,
    silver_path=silver_directory,
    dates=dates_str_lst,
    source_type="features_attributes"
)

2025-05-17 12:21:31,667 - INFO - Schema validation passed.
2025-05-17 12:21:31,702 - INFO - Data for 2023-01-01 already exists in silver. Skipping.
2025-05-17 12:21:31,743 - INFO - Schema validation passed.
2025-05-17 12:21:31,767 - INFO - Data for 2023-02-01 already exists in silver. Skipping.
2025-05-17 12:21:31,808 - INFO - Schema validation passed.
2025-05-17 12:21:31,832 - INFO - Data for 2023-03-01 already exists in silver. Skipping.
2025-05-17 12:21:31,872 - INFO - Schema validation passed.
2025-05-17 12:21:31,895 - INFO - Data for 2023-04-01 already exists in silver. Skipping.
2025-05-17 12:21:31,938 - INFO - Schema validation passed.
2025-05-17 12:21:31,961 - INFO - Data for 2023-05-01 already exists in silver. Skipping.
2025-05-17 12:21:32,001 - INFO - Schema validation passed.
2025-05-17 12:21:32,026 - INFO - Data for 2023-06-01 already exists in silver. Skipping.
2025-05-17 12:21:32,065 - INFO - Schema validation passed.
2025-05-17 12:21:32,091 - INFO - Data for 2023-07-01 

In [10]:

silver_builder.run(write_mode="overwrite", preview=True)

2025-05-17 12:21:33,464 - INFO - Writing dim_customer  to  datamart/silver/dim_customer
2025-05-17 12:21:34,006 - INFO - Writing dim_feature  to  datamart/silver/dim_feature
2025-05-17 12:21:34,578 - INFO - Writing dim_credit_mix  to  datamart/silver/dim_credit_mix
2025-05-17 12:21:34,730 - INFO - Writing dim_payment_behaviour  to  datamart/silver/dim_payment_behaviour
2025-05-17 12:21:34,912 - INFO - Writing dim_min_payment  to  datamart/silver/dim_min_payment
2025-05-17 12:21:35,026 - INFO - Writing dim_loan_type  to  datamart/silver/dim_loan_type
2025-05-17 12:21:35,181 - INFO - Writing fact_clickstream  to  datamart/silver/fact_clickstream
2025-05-17 12:21:36,772 - INFO - Writing fact_financials  to  datamart/silver/fact_financials
2025-05-17 12:21:37,210 - INFO - Writing fact_loan  to  datamart/silver/fact_loan
2025-05-17 12:21:37,655 - INFO - Writing fact_customer_loan_type  to  datamart/silver/fact_customer_loan_type
2025-05-17 12:21:38,102 - INFO - 
Schema – dim_customer
2025-0

root
 |-- customer_id: string (nullable = true)
 |-- SSN: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Occupation: string (nullable = true)

+-----------+-----------+-----------------+---+------------+
|customer_id|SSN        |Name             |Age|Occupation  |
+-----------+-----------+-----------------+---+------------+
|CUS_0x6939 |633-98-1570|Nicolaj          |34 |Accountant  |
|CUS_0x4a94 |515-69-0297|Terrilz          |41 |Architect   |
|CUS_0x6fea |444-51-2059|Jeanine Preziosou|31 |Entrepreneur|
|CUS_0xea6  |991-58-6499|Daviesd          |31 |Lawyer      |
|CUS_0x4020 |273-14-6142|Valetkevitchi    |33 |Mechanic    |
+-----------+-----------+-----------------+---+------------+
only showing top 5 rows

root
 |-- feature_id: long (nullable = true)
 |-- feature_name: string (nullable = true)



2025-05-17 12:21:38,347 - INFO - 
Schema – dim_credit_mix
2025-05-17 12:21:38,417 - INFO - 
Schema – dim_payment_behaviour
2025-05-17 12:21:38,494 - INFO - 
Schema – dim_min_payment


+----------+------------+
|feature_id|feature_name|
+----------+------------+
|1         |fe_1        |
|2         |fe_2        |
|3         |fe_3        |
|4         |fe_4        |
|5         |fe_5        |
+----------+------------+
only showing top 5 rows

root
 |-- credit_mix_id: integer (nullable = true)

+-------------+
|credit_mix_id|
+-------------+
|1            |
|2            |
|0            |
+-------------+

root
 |-- payment_behaviour_id: string (nullable = true)
 |-- spend: string (nullable = true)
 |-- value: string (nullable = true)

+--------------------------------+-----+------+
|payment_behaviour_id            |spend|value |
+--------------------------------+-----+------+
|Low_spent_Medium_value_payments |Low  |Medium|
|Low_spent_Large_value_payments  |Low  |Large |
|High_spent_Medium_value_payments|High |Medium|
|High_spent_Small_value_payments |High |Small |
|Low_spent_Small_value_payments  |Low  |Small |
+--------------------------------+-----+------+
only showing

2025-05-17 12:21:38,561 - INFO - 
Schema – dim_loan_type
2025-05-17 12:21:38,707 - INFO - 
Schema – fact_clickstream


+------------+
|payment_code|
+------------+
|1           |
|0           |
+------------+

root
 |-- loan_type_id: long (nullable = true)
 |-- loan_flag_col: string (nullable = true)
 |-- loan_type_name: string (nullable = true)

+------------+-----------------------+-------------------+
|loan_type_id|loan_flag_col          |loan_type_name     |
+------------+-----------------------+-------------------+
|1           |Has_Credit-Builder_Loan|Credit-Builder_Loan|
|2           |Has_Student_Loan       |Student_Loan       |
|3           |Has_Mortgage_Loan      |Mortgage_Loan      |
|4           |Has_Payday_Loan        |Payday_Loan        |
|5           |Has_Personal_Loan      |Personal_Loan      |
+------------+-----------------------+-------------------+
only showing top 5 rows

root
 |-- customer_id: string (nullable = true)
 |-- feature_id: long (nullable = true)
 |-- value: integer (nullable = true)
 |-- snapshot_id: long (nullable = true)



2025-05-17 12:21:39,187 - INFO - 
Schema – fact_financials


+-----------+----------+-----+-----------+
|customer_id|feature_id|value|snapshot_id|
+-----------+----------+-----+-----------+
|CUS_0x1037 |2         |243  |24         |
|CUS_0x1037 |6         |56   |24         |
|CUS_0x1037 |3         |89   |24         |
|CUS_0x1037 |5         |202  |24         |
|CUS_0x1037 |1         |71   |24         |
+-----------+----------+-----+-----------+
only showing top 5 rows

root
 |-- customer_id: string (nullable = true)
 |-- snapshot_id: long (nullable = false)
 |-- Annual_Income: float (nullable = true)
 |-- Monthly_Inhand_Salary: float (nullable = true)
 |-- Num_Bank_Accounts: integer (nullable = true)
 |-- Num_Credit_Card: integer (nullable = true)
 |-- Interest_Rate: integer (nullable = true)
 |-- Num_of_Loan: integer (nullable = true)
 |-- Delay_from_due_date: integer (nullable = true)
 |-- Num_of_Delayed_Payment: float (nullable = true)
 |-- Changed_Credit_Limit: float (nullable = true)
 |-- Num_Credit_Inquiries: integer (nullable = true)
 |-- 

2025-05-17 12:21:39,499 - INFO - 
Schema – fact_loan


+-----------+-----------+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------------+------------------------+-------------------+-----------------------+------------+--------------------------------+-------------+--------------------+---------------------+
|customer_id|snapshot_id|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Outstanding_Debt|Credit_Utilization_Ratio|Total_EMI_per_month|Amount_invested_monthly|payment_code|payment_behaviour_id            |credit_mix_id|Credit_History_Years|Credit_History_Months|
+-----------+-----------+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+--

2025-05-17 12:21:39,869 - INFO - 
Schema – fact_customer_loan_type


+--------------------+-----------+-----------+---------------+------+---------------+--------+-------+--------+-----------+-------+
|loan_id             |customer_id|snapshot_id|loan_start_date|tenure|installment_num|loan_amt|due_amt|paid_amt|overdue_amt|balance|
+--------------------+-----------+-----------+---------------+------+---------------+--------+-------+--------+-----------+-------+
|CUS_0xffd_2024_03_01|CUS_0xffd  |0          |2024-03-01     |10    |5              |10000.0 |1000.0 |1000.0  |0.0        |5000.0 |
|CUS_0xffc_2024_01_01|CUS_0xffc  |0          |2024-01-01     |10    |7              |10000.0 |1000.0 |0.0     |4000.0     |7000.0 |
|CUS_0xff3_2024_06_01|CUS_0xff3  |0          |2024-06-01     |10    |2              |10000.0 |1000.0 |1000.0  |0.0        |8000.0 |
|CUS_0xfea_2023_10_01|CUS_0xfea  |0          |2023-10-01     |10    |10             |10000.0 |1000.0 |1000.0  |0.0        |0.0    |
|CUS_0xfe3_2024_04_01|CUS_0xfe3  |0          |2024-04-01     |10    |4      

## Build gold table for labels

In [12]:

proc.run_all()

2025-05-17 12:21:40,266 - INFO - Reading Silver table 'dim_credit_mix' from datamart/silver/dim_credit_mix
2025-05-17 12:21:40,297 - INFO - Reading Silver table 'dim_payment_behaviour' from datamart/silver/dim_payment_behaviour
2025-05-17 12:21:40,328 - INFO - Reading Silver table 'dim_min_payment' from datamart/silver/dim_min_payment
2025-05-17 12:21:40,363 - INFO - Reading Silver table 'dim_loan_type' from datamart/silver/dim_loan_type
2025-05-17 12:21:40,399 - INFO - Reading Silver table 'dim_snapshot' from datamart/silver/dim_snapshot
2025-05-17 12:21:40,433 - INFO - Reading Silver table 'fact_loan' from datamart/silver/fact_loan
2025-05-17 12:21:40,461 - INFO - Reading Silver table 'dim_customer' from datamart/silver/dim_customer
2025-05-17 12:21:40,487 - INFO - Reading Silver table 'fact_financials' from datamart/silver/fact_financials
2025-05-17 12:21:40,512 - INFO - Reading Silver table 'fact_customer_loan_type' from datamart/silver/fact_customer_loan_type
2025-05-17 12:21:41,6

## inspect label store

In [17]:
from pathlib import Path
gold_root = "datamart/gold"

def has_delta_log(dir_path: Path) -> bool:
    return (dir_path / "_delta_log").is_dir()

def contains_parquet(dir_path: Path) -> bool:
    for root, _, files in os.walk(dir_path):
        if any(f.endswith(".parquet") for f in files):
            return True
    return False

for table_dir in [Path(gold_root) / d
                  for d in os.listdir(gold_root)
                  if (Path(gold_root) / d).is_dir()]:

    try:
        if has_delta_log(table_dir):
            df = spark.read.format("delta").load(str(table_dir))
            fmt = "delta"
        elif contains_parquet(table_dir):
            df = spark.read.parquet(str(table_dir))
            fmt = "parquet"
        else:
            continue

        print(f"  format: {fmt}, rows: {df.count()}")
        df.show(5, truncate=False)

    except Exception as e:
        print(f"  ** unable to read table – {type(e).__name__}: {e}")

  format: parquet, rows: 5553
+-----------+--------------+---+----------+-------------+---------------------+----------------+------------+
|customer_id|name          |age|occupation|Annual_Income|Monthly_Inhand_Salary|Outstanding_Debt|total_events|
+-----------+--------------+---+----------+-------------+---------------------+----------------+------------+
|CUS_0x1000 |Alistair Barrf|18 |Lawyer    |30625.94     |2706.1616            |1562.91         |46236       |
|CUS_0x1011 |Schneyerh     |44 |Doctor    |58918.47     |5208.8726            |473.14          |50559       |
|CUS_0x1013 |Cameront      |44 |Mechanic  |98620.98     |7962.415             |1233.51         |50892       |
|CUS_0x1018 |Felsenthalq   |15 |Accountant|61194.81     |5014.5674            |2773.09         |46135       |
|CUS_0x1026 |Josephv       |52 |Manager   |170614.28    |14463.856            |849.69          |53388       |
+-----------+--------------+---+----------+-------------+---------------------+-----------