In [14]:
import os
import glob
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import argparse

from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType, BooleanType

In [15]:
path = "data/"
clickstream = path + "feature_clickstream.csv"
attributes = path + "features_attributes.csv"
financials = path + "features_financials.csv"
loan_daily = path + "lms_loan_daily.csv"

In [16]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

In [17]:
def generate_first_of_month_dates(start_date_str, end_date_str):
    # Convert the date strings to datetime objects
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
    end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
    
    # List to store the first of month dates
    first_of_month_dates = []

    # Start from the first of the month of the start_date
    current_date = datetime(start_date.year, start_date.month, 1)

    while current_date <= end_date:
        # Append the date in yyyy-mm-dd format
        first_of_month_dates.append(current_date.strftime("%Y-%m-%d"))
        
        # Move to the first of the next month
        if current_date.month == 12:
            current_date = datetime(current_date.year + 1, 1, 1)
        else:
            current_date = datetime(current_date.year, current_date.month + 1, 1)

    return first_of_month_dates

start_date_str = "2023-01-01"
end_date_str = "2024-12-01"

dates_str_lst = generate_first_of_month_dates(start_date_str, end_date_str)

In [52]:
def process_bronze_table_with_date(filepath, snapshot_date_str, bronze_directory, spark): 
    snapshot_date = datetime.strptime(snapshot_date_str, "%Y-%m-%d")
    filename = filepath.split("/")[-1][:-4]
    
    # load data
    df = spark.read.csv(filepath, 
                        header=True, 
                        inferSchema=True).filter(col('snapshot_date') == snapshot_date)

    # save bronze table to datamart
    partition_name = "bronze_" + filename + "_" + snapshot_date_str.replace('-','_') + '.csv'
    bronze_filepath = bronze_directory + partition_name

    # make directory if it doesnt exists
    if not os.path.exists(bronze_directory):
        os.makedirs(bronze_directory)
        
    df.toPandas().to_csv(bronze_filepath, index=False)
    print('saved to:', bronze_filepath)

    return df

def process_bronze_table(filepath, bronze_directory, spark): 
    filename = filepath.split("/")[-1][:-4]
    
    # load data
    df = spark.read.csv(filepath, 
                        header=True, 
                        inferSchema=True)

    # save bronze table to datamart
    partition_name = "bronze_" + filename + '.csv'
    bronze_filepath = bronze_directory + partition_name

    # make directory if it doesnt exists
    if not os.path.exists(bronze_directory):
        os.makedirs(bronze_directory)
        
    df.toPandas().to_csv(bronze_filepath, index=False)
    print('saved to:', bronze_filepath)

    return df

In [53]:
snapshotdate = "2023-01-01"

process_bronze_table_with_date(clickstream, snapshotdate, "datamart/bronze/clickstream/", spark)
process_bronze_table_with_date(loan_daily, snapshotdate, "datamart/bronze/lms/", spark)
process_bronze_table(attributes, "datamart/bronze/attribute/", spark)
process_bronze_table(financials, "datamart/bronze/financial/", spark)

saved to: datamart/bronze/clickstream/bronze_feature_clickstream_2023_01_01.csv
saved to: datamart/bronze/lms/bronze_lms_loan_daily_2023_01_01.csv
saved to: datamart/bronze/attribute/bronze_features_attributes.csv
saved to: datamart/bronze/financial/bronze_features_financials.csv


DataFrame[Customer_ID: string, Annual_Income: string, Monthly_Inhand_Salary: double, Num_Bank_Accounts: int, Num_Credit_Card: int, Interest_Rate: int, Num_of_Loan: string, Type_of_Loan: string, Delay_from_due_date: int, Num_of_Delayed_Payment: string, Changed_Credit_Limit: string, Num_Credit_Inquiries: double, Credit_Mix: string, Outstanding_Debt: string, Credit_Utilization_Ratio: double, Credit_History_Age: string, Payment_of_Min_Amount: string, Total_EMI_per_month: double, Amount_invested_monthly: string, Payment_Behaviour: string, Monthly_Balance: string, snapshot_date: date]

In [54]:
def process_silver_clickstream_table(snapshot_date_str, bronze_dir, silver_dir, spark): 
    snapshot_date = datetime.strptime(snapshot_date_str, "%Y-%m-%d")
    
    filename = "bronze_feature_clickstream_" + snapshot_date_str.replace('-','_') + '.csv'
    filepath = bronze_dir + filename
    df = spark.read.csv(filepath, header=True, inferSchema=True)
    print('loaded from:', filepath, 'row count:', df.count())

    # clean data: enforce schema / data type
    # Dictionary specifying columns and their desired datatypes
    column_type_map = {"fe_1": IntegerType(),
                       'fe_2': IntegerType(),
                       'fe_3': IntegerType(),
                       'fe_4': IntegerType(),
                       'fe_5': IntegerType(),
                       'fe_6': IntegerType(),
                       'fe_7': IntegerType(),
                       'fe_8': IntegerType(),
                       'fe_9': IntegerType(),
                       'fe_10': IntegerType(),
                       'fe_11': IntegerType(),
                       'fe_12': IntegerType(),
                       'fe_13': IntegerType(),
                       'fe_14': IntegerType(),
                       'fe_15': IntegerType(),
                       'fe_16': IntegerType(),
                       'fe_17': IntegerType(),
                       'fe_18': IntegerType(),
                       'fe_19': IntegerType(),
                       'fe_20': IntegerType(),
                       "Customer_ID": StringType(),
                       "snapshot_date": DateType()
                    }

    for column, new_type in column_type_map.items():
        df = df.withColumn(column, col(column).cast(new_type))

    if not os.path.exists(silver_dir):
        os.makedirs(silver_dir)

    # save silver table - IRL connect to database to write
    filename = "silver_feature_clickstream_" + snapshot_date_str.replace('-','_') + '.parquet'
    filepath = silver_dir + filename
    df.write.mode("overwrite").parquet(filepath)
    print('saved to:', filepath)
    
    return df

snapshotdate = "2023-01-01"
process_silver_clickstream_table(snapshotdate, 
                                 "datamart/bronze/clickstream/", 
                                 "datamart/silver/clickstream/", 
                                 spark)

loaded from: datamart/bronze/clickstream/bronze_feature_clickstream_2023_01_01.csv row count: 8974
saved to: datamart/silver/clickstream/silver_feature_clickstream_2023_01_01.parquet


DataFrame[fe_1: int, fe_2: int, fe_3: int, fe_4: int, fe_5: int, fe_6: int, fe_7: int, fe_8: int, fe_9: int, fe_10: int, fe_11: int, fe_12: int, fe_13: int, fe_14: int, fe_15: int, fe_16: int, fe_17: int, fe_18: int, fe_19: int, fe_20: int, Customer_ID: string, snapshot_date: date]

In [71]:
def process_silver_attribute_table(bronze_dir, silver_dir, spark): 
    filepath = bronze_dir + "bronze_features_attributes.csv"
    df = spark.read.csv(filepath, header=True, inferSchema=True)
    print('loaded from:', filepath, 'row count:', df.count())

    # clean data: strip and replace alphanumeric for name, age and ssn
    df = df.withColumn("Name", F.trim(F.col("Name")))
    df = df.withColumn("Name", F.regexp_replace("Name", "[^A-Za-z ]", ""))
    df = df.withColumn("SSN", F.regexp_replace(F.col("SSN"), r"\s+", ""))
    df = df.withColumn("Age", F.regexp_replace("Age", "[^0-9]", ""))

    # clean data: enforce schema / data type
    # Dictionary specifying columns and their desired datatypes
    column_type_map = {"Customer_ID": StringType(),
                       "Name": StringType(),
                       'Age': IntegerType(),
                       'SSN': StringType(),
                       "snapshot_date": DateType()
                      }

    for column, new_type in column_type_map.items():
        df = df.withColumn(column, col(column).cast(new_type))

    # augment data: duplicated ssn
    w = Window.partitionBy("SSN")
    df = df.withColumn("duplicate_ssn", (F.count("*").over(w) > 1).cast(BooleanType()))
    
    # augment data: duplicated name and ssn
    w = Window.partitionBy("Name", "SSN")
    df = df.withColumn("duplicate_name_ssn", (F.count("*").over(w) > 1).cast(BooleanType()))

    # augment data: potential age typo (bleow 18 and above 100)
    df = df.withColumn("age_typo", (F.col("Age") < 0) | (F.col("Age") > 100).cast(BooleanType()))
    
    if not os.path.exists(silver_dir):
        os.makedirs(silver_dir)

    # save silver table - IRL connect to database to write
    filepath = silver_dir + "silver_feature_attributes.parquet"
    df.write.mode("overwrite").parquet(filepath)
    print('saved to:', filepath)
    
    return df

process_silver_attribute_table("datamart/bronze/attribute/", 
                               "datamart/silver/attribute/", 
                               spark)

loaded from: datamart/bronze/attribute/bronze_features_attributes.csv row count: 12500
saved to: datamart/silver/attribute/silver_feature_attributes.parquet


DataFrame[Customer_ID: string, Name: string, Age: int, SSN: string, Occupation: string, snapshot_date: date, duplicate_ssn: boolean, duplicate_name_ssn: boolean, age_typo: boolean]

In [133]:
def process_silver_financial_table(bronze_dir, silver_dir, spark): 
    filepath = bronze_dir + "bronze_features_financials.csv"
    df = spark.read.csv(filepath, header=True, inferSchema=True)
    print('loaded from:', filepath, 'row count:', df.count())

    df = df.withColumn("Type_of_Loan", F.lower(F.col("Type_of_Loan")))
    
    df = df.withColumn("Credit_Mix", 
                       F.when(F.col("Credit_Mix") == "_", None)
                       .otherwise(F.col("Credit_Mix")))
    
    df = df.withColumn("Payment_Behaviour", 
                       F.when(F.col("Payment_Behaviour") == "!@9#%8", "Unknown")
                       .otherwise(F.col("Payment_Behaviour")))

    # clean data: enforce schema / data type
    cleaning_rules = {"Annual_Income": "[^0-9.]",
                      "Num_of_Loan": r"[^0-9.\-]",
                      "Num_of_Delayed_Payment": r"[^0-9.\-]",
                      "Changed_Credit_Limit": r"[^0-9.\-]",
                      "Outstanding_Debt": "[^0-9.]",
                      "Amount_invested_monthly": "[^0-9.]",
                      "Monthly_Balance": "[^0-9.]",
                    }
    for column, value in cleaning_rules.items(): 
        df = df.withColumn(column, F.regexp_replace(column, value, ""))
    
    # Dictionary specifying columns and their desired datatypes
    column_type_map = {'Customer_ID': StringType(),
                        'Annual_Income': FloatType(),
                        'Monthly_Inhand_Salary': FloatType(),
                        'Num_Bank_Accounts': FloatType(),
                        'Num_Credit_Card': FloatType(),
                        'Interest_Rate': FloatType(),
                        'Num_of_Loan': FloatType(),
                        'Type_of_Loan': StringType(),
                        'Delay_from_due_date': FloatType(),
                        'Num_of_Delayed_Payment': FloatType(),
                        'Changed_Credit_Limit': FloatType(),
                        'Num_Credit_Inquiries': FloatType(),
                        'Credit_Mix': StringType(),
                        'Outstanding_Debt': FloatType(),
                        'Credit_Utilization_Ratio': FloatType(),
                        'Credit_History_Age': StringType(),
                        'Payment_of_Min_Amount': StringType(),
                        'Total_EMI_per_month': FloatType(),
                        'Amount_invested_monthly': FloatType(),
                        'Payment_Behaviour': StringType(),
                        'Monthly_Balance': FloatType(),
                        'snapshot_date': DateType()
                        }

    for column, new_type in column_type_map.items():          
        df = df.withColumn(column, col(column).cast(new_type))

    # augment data: negative values
    df = df.withColumn("Negative_Num_of_Delayed_Payment", 
                       (F.col("Num_of_Delayed_Payment") < 0).cast(BooleanType()))
    
    df = df.withColumn("Negative_Changed_Credit_Limit", 
                       (F.col("Changed_Credit_Limit") < 0).cast(BooleanType()))

    df = df.withColumn("Negative_Delay_from_due_date", 
                       (F.col("Delay_from_due_date") < 0).cast(BooleanType()))

    df = df.withColumn("Negative_Num_of_Loan", 
                       (F.col("Num_of_Loan") < 0).cast(BooleanType()))

    df = df.withColumn("Negative_Num_Bank_Accounts", 
                       (F.col("Num_Bank_Accounts") < 0).cast(BooleanType()))

    df = df.withColumn("No_bank_accounts", 
                       (F.col("Num_Bank_Accounts") == 0).cast(BooleanType()))

    if not os.path.exists(silver_dir):
        os.makedirs(silver_dir)
    
    # save silver table - IRL connect to database to write
    filepath = silver_dir + "silver_feature_financials.parquet"
    df.write.mode("overwrite").parquet(filepath)
    print('saved to:', filepath)
    
    return df

process_silver_financial_table("datamart/bronze/financial/", 
                               "datamart/silver/financial/", 
                               spark)

loaded from: datamart/bronze/financial/bronze_features_financials.csv row count: 12500
saved to: datamart/silver/financial/silver_feature_financials.parquet


DataFrame[Customer_ID: string, Annual_Income: float, Monthly_Inhand_Salary: float, Num_Bank_Accounts: float, Num_Credit_Card: float, Interest_Rate: float, Num_of_Loan: float, Type_of_Loan: string, Delay_from_due_date: float, Num_of_Delayed_Payment: float, Changed_Credit_Limit: float, Num_Credit_Inquiries: float, Credit_Mix: string, Outstanding_Debt: float, Credit_Utilization_Ratio: float, Credit_History_Age: string, Payment_of_Min_Amount: string, Total_EMI_per_month: float, Amount_invested_monthly: float, Payment_Behaviour: string, Monthly_Balance: float, snapshot_date: date, Negative_Num_of_Delayed_Payment: boolean, Negative_Changed_Credit_Limit: boolean, Negative_Delay_from_due_date: boolean, Negative_Num_of_Loan: boolean, Negative_Num_Bank_Accounts: boolean, No_bank_accounts: boolean]

In [136]:
def process_silver_lms_table(snapshot_date_str, bronze_lms_directory, silver_loan_daily_directory, spark):
    # prepare arguments
    snapshot_date = datetime.strptime(snapshot_date_str, "%Y-%m-%d")
    
    # connect to bronze table
    partition_name = "bronze_lms_loan_daily_" + snapshot_date_str.replace('-','_') + '.csv'
    filepath = bronze_lms_directory + partition_name
    df = spark.read.csv(filepath, header=True, inferSchema=True)
    print('loaded from:', filepath, 'row count:', df.count())

    # clean data: enforce schema / data type
    # Dictionary specifying columns and their desired datatypes
    column_type_map = {
        "loan_id": StringType(),
        "Customer_ID": StringType(),
        "loan_start_date": DateType(),
        "tenure": IntegerType(),
        "installment_num": IntegerType(),
        "loan_amt": FloatType(),
        "due_amt": FloatType(),
        "paid_amt": FloatType(),
        "overdue_amt": FloatType(),
        "balance": FloatType(),
        "snapshot_date": DateType(),
    }

    for column, new_type in column_type_map.items():
        df = df.withColumn(column, col(column).cast(new_type))

    # augment data: add month on book
    df = df.withColumn("mob", col("installment_num").cast(IntegerType()))

    # augment data: add days past due
    df = df.withColumn("installments_missed", F.ceil(col("overdue_amt") / col("due_amt")).cast(IntegerType())).fillna(0)
    df = df.withColumn("first_missed_date", F.when(col("installments_missed") > 0, F.add_months(col("snapshot_date"), -1 * col("installments_missed"))).cast(DateType()))
    df = df.withColumn("dpd", F.when(col("overdue_amt") > 0.0, F.datediff(col("snapshot_date"), col("first_missed_date"))).otherwise(0).cast(IntegerType()))

    # save silver table - IRL connect to database to write
    partition_name = "silver_lms_loan_daily_" + snapshot_date_str.replace('-','_') + '.parquet'
    filepath = silver_loan_daily_directory + partition_name
    df.write.mode("overwrite").parquet(filepath)
    print('saved to:', filepath)
    
    return df

snapshotdate = "2023-01-01"
process_silver_lms_table(snapshotdate, 
                         "datamart/bronze/lms/", 
                         "datamart/silver/lms/", 
                         spark)

loaded from: datamart/bronze/lms/bronze_lms_loan_daily_2023_01_01.csv row count: 530
saved to: datamart/silver/lms/silver_lms_loan_daily_2023_01_01.parquet


DataFrame[loan_id: string, Customer_ID: string, loan_start_date: date, tenure: int, installment_num: int, loan_amt: float, due_amt: float, paid_amt: float, overdue_amt: float, balance: float, snapshot_date: date, mob: int, installments_missed: int, first_missed_date: date, dpd: int]

In [137]:
test = pd.read_parquet("datamart/silver/lms/silver_lms_loan_daily_2023_01_01.parquet")
test

Unnamed: 0,loan_id,Customer_ID,loan_start_date,tenure,installment_num,loan_amt,due_amt,paid_amt,overdue_amt,balance,snapshot_date,mob,installments_missed,first_missed_date,dpd
0,CUS_0x1037_2023_01_01,CUS_0x1037,2023-01-01,10,0,10000.0,0.0,0.0,0.0,10000.0,2023-01-01,0,0,,0
1,CUS_0x1069_2023_01_01,CUS_0x1069,2023-01-01,10,0,10000.0,0.0,0.0,0.0,10000.0,2023-01-01,0,0,,0
2,CUS_0x114a_2023_01_01,CUS_0x114a,2023-01-01,10,0,10000.0,0.0,0.0,0.0,10000.0,2023-01-01,0,0,,0
3,CUS_0x1184_2023_01_01,CUS_0x1184,2023-01-01,10,0,10000.0,0.0,0.0,0.0,10000.0,2023-01-01,0,0,,0
4,CUS_0x1297_2023_01_01,CUS_0x1297,2023-01-01,10,0,10000.0,0.0,0.0,0.0,10000.0,2023-01-01,0,0,,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
525,CUS_0xe98_2023_01_01,CUS_0xe98,2023-01-01,10,0,10000.0,0.0,0.0,0.0,10000.0,2023-01-01,0,0,,0
526,CUS_0xea6_2023_01_01,CUS_0xea6,2023-01-01,10,0,10000.0,0.0,0.0,0.0,10000.0,2023-01-01,0,0,,0
527,CUS_0xed3_2023_01_01,CUS_0xed3,2023-01-01,10,0,10000.0,0.0,0.0,0.0,10000.0,2023-01-01,0,0,,0
528,CUS_0xed8_2023_01_01,CUS_0xed8,2023-01-01,10,0,10000.0,0.0,0.0,0.0,10000.0,2023-01-01,0,0,,0


In [80]:
financials_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 12500 entries, 0 to 12499
Data columns (total 22 columns):
 #   Column                    Non-Null Count  Dtype  
---  ------                    --------------  -----  
 0   Customer_ID               12500 non-null  object 
 1   Annual_Income             12500 non-null  object 
 2   Monthly_Inhand_Salary     12500 non-null  float64
 3   Num_Bank_Accounts         12500 non-null  int64  
 4   Num_Credit_Card           12500 non-null  int64  
 5   Interest_Rate             12500 non-null  int64  
 6   Num_of_Loan               12500 non-null  object 
 7   Type_of_Loan              11074 non-null  object 
 8   Delay_from_due_date       12500 non-null  int64  
 9   Num_of_Delayed_Payment    12500 non-null  object 
 10  Changed_Credit_Limit      12500 non-null  object 
 11  Num_Credit_Inquiries      12500 non-null  float64
 12  Credit_Mix                12500 non-null  object 
 13  Outstanding_Debt          12500 non-null  object 
 14  Credit

In [None]:
# # clickstream
#     column_type_map = {"fe_1": IntegerType(),
#                        'fe_2': IntegerType(),
#                        'fe_3': IntegerType(),
#                        'fe_4': IntegerType(),
#                        'fe_5': IntegerType(),
#                        'fe_6': IntegerType(),
#                        'fe_7': IntegerType(),
#                        'fe_8': IntegerType(),
#                        'fe_9': IntegerType(),
#                        'fe_10': IntegerType(),
#                        'fe_11': IntegerType(),
#                        'fe_12': IntegerType(),
#                        'fe_13': IntegerType(),
#                        'fe_14': IntegerType(),
#                        'fe_15': IntegerType(),
#                        'fe_16': IntegerType(),
#                        'fe_17': IntegerType(),
#                        'fe_18': IntegerType(),
#                        'fe_19': IntegerType(),
#                        'fe_20': IntegerType(),
#                        "Customer_ID": StringType(),
#                        "snapshot_date": DateType()
#                     }

# # attributes
#         column_type_map = {"Customer_ID": StringType(),
#                            "Name": IntegerType(),
#                            'Age': IntegerType(),
#                            'SSN': IntegerType(),
#                            "snapshot_date": DateType()
#                           }

# financial
    # columns_type_map = {'Customer_ID': StringType(),
    #                     'Annual_Income': FloatType(),
    #                     'Monthly_Inhand_Salary': FloatType(),
    #                     'Num_Bank_Accounts': IntegerType(),
    #                     'Num_Credit_Card': IntegerType(),
    #                     'Interest_Rate': IntegerType(),
    #                     'Num_of_Loan': IntegerType(),
    #                     'Type_of_Loan': StringType(),
    #                     'Delay_from_due_date': IntegerType(),
    #                     'Num_of_Delayed_Payment': IntegerType(),
    #                     'Changed_Credit_Limit': FloatType(),
    #                     'Num_Credit_Inquiries': FloatType(),
    #                     'Credit_Mix': StringType(),
    #                     'Outstanding_Debt': FloatType(),
    #                     'Credit_Utilization_Ratio': FloatType(),
    #                     'Credit_History_Age': StringType(),
    #                     'Payment_of_Min_Amount': StringType(),
    #                     'Total_EMI_per_month': FloatType(),
    #                     'Amount_invested_monthly': FloatType(),
    #                     'Payment_Behaviour': StringType(),
    #                     'Monthly_Balance': FloatType(),
    #                     'snapshot_date': DateType()
    #                     }

# loan
    # column_type_map = {
    #         "loan_id": StringType(),
    #         "Customer_ID": StringType(),
    #         "loan_start_date": DateType(),
    #         "tenure": IntegerType(),
    #         "installment_num": IntegerType(),
    #         "loan_amt": FloatType(),
    #         "due_amt": FloatType(),
    #         "paid_amt": FloatType(),
    #         "overdue_amt": FloatType(),
    #         "balance": FloatType(),
    #         "snapshot_date": DateType(),
    #     }

In [96]:
# clickstream_df = pd.read_csv(clickstream)
clickstream_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 215376 entries, 0 to 215375
Data columns (total 22 columns):
 #   Column         Non-Null Count   Dtype 
---  ------         --------------   ----- 
 0   fe_1           215376 non-null  int64 
 1   fe_2           215376 non-null  int64 
 2   fe_3           215376 non-null  int64 
 3   fe_4           215376 non-null  int64 
 4   fe_5           215376 non-null  int64 
 5   fe_6           215376 non-null  int64 
 6   fe_7           215376 non-null  int64 
 7   fe_8           215376 non-null  int64 
 8   fe_9           215376 non-null  int64 
 9   fe_10          215376 non-null  int64 
 10  fe_11          215376 non-null  int64 
 11  fe_12          215376 non-null  int64 
 12  fe_13          215376 non-null  int64 
 13  fe_14          215376 non-null  int64 
 14  fe_15          215376 non-null  int64 
 15  fe_16          215376 non-null  int64 
 16  fe_17          215376 non-null  int64 
 17  fe_18          215376 non-null  int64 
 18  fe_1

In [61]:
# Get unique IDs
click_cust = set(clickstream_df["Customer_ID"].unique())
attr_cust = set(attributes_df["Customer_ID"].unique())

# 1. Customers in clickstream but NOT in attributes
missing_in_attr = click_cust - attr_cust

# 2. Customers in attributes but NOT in clickstream
missing_in_click = attr_cust - click_cust

# 3. Customers present in both
matched = click_cust & attr_cust

print(f"Total clickstream customers: {len(click_cust)}")
print(f"Total attribute customers: {len(attr_cust)}")
print(f"Matched customers: {len(matched)}")
print(f"Missing in attributes: {len(missing_in_attr)}")
print(f"Missing in clickstream: {len(missing_in_click)}")

Total clickstream customers: 8974
Total attribute customers: 12500
Matched customers: 8974
Missing in attributes: 0
Missing in clickstream: 3526


array(['CUS_0x1000', 'CUS_0x1009', 'CUS_0x100b', ..., 'CUS_0xff6',
       'CUS_0xffc', 'CUS_0xffd'], shape=(12500,), dtype=object)

In [10]:
clickstream_df = pd.read_csv(clickstream)
clickstream_df.head()

Unnamed: 0,fe_1,fe_2,fe_3,fe_4,fe_5,fe_6,fe_7,fe_8,fe_9,fe_10,...,fe_13,fe_14,fe_15,fe_16,fe_17,fe_18,fe_19,fe_20,Customer_ID,snapshot_date
0,63,118,80,121,55,193,111,112,-101,83,...,-16,-81,-126,114,35,85,-73,76,CUS_0x1037,2023-01-01
1,-108,182,123,4,-56,27,25,-6,284,222,...,-14,-96,200,35,130,94,111,75,CUS_0x1069,2023-01-01
2,-13,8,87,166,214,-98,215,152,129,139,...,26,86,171,125,-130,354,17,302,CUS_0x114a,2023-01-01
3,-85,45,200,89,128,54,76,51,61,139,...,172,96,174,163,37,207,180,118,CUS_0x1184,2023-01-01
4,55,120,226,-86,253,97,107,68,103,126,...,76,43,183,159,-26,104,118,184,CUS_0x1297,2023-01-01


In [11]:
attributes_df = pd.read_csv(attributes)
attributes_df.head()

Unnamed: 0,Customer_ID,Name,Age,SSN,Occupation,snapshot_date
0,CUS_0x1000,Alistair Barrf,18,913-74-1218,Lawyer,2023-05-01
1,CUS_0x1009,Arunah,26,063-67-6938,Mechanic,2025-01-01
2,CUS_0x100b,Shirboni,19,#F%$D@*&8,Media_Manager,2024-03-01
3,CUS_0x1011,Schneyerh,44,793-05-8223,Doctor,2023-11-01
4,CUS_0x1013,Cameront,44,930-49-9615,Mechanic,2023-12-01


In [12]:
financials_df = pd.read_csv(financials)
financials_df.head()

Unnamed: 0,Customer_ID,Annual_Income,Monthly_Inhand_Salary,Num_Bank_Accounts,Num_Credit_Card,Interest_Rate,Num_of_Loan,Type_of_Loan,Delay_from_due_date,Num_of_Delayed_Payment,...,Credit_Mix,Outstanding_Debt,Credit_Utilization_Ratio,Credit_History_Age,Payment_of_Min_Amount,Total_EMI_per_month,Amount_invested_monthly,Payment_Behaviour,Monthly_Balance,snapshot_date
0,CUS_0x1000,30625.94,2706.161667,6,5,27,2,"Credit-Builder Loan, and Home Equity Loan",57,26,...,Bad,1562.91,30.077191,10 Years and 9 Months,Yes,42.94109,77.31427572208112,High_spent_Medium_value_payments,400.36080052211616,2023-05-01
1,CUS_0x1009,52312.68_,4250.39,6,5,17,4,"Not Specified, Home Equity Loan, Credit-Builde...",5,18,...,_,202.68,40.286997,31 Years and 0 Months,Yes,108.366467,58.66019164829086,High_spent_Medium_value_payments,508.01234122645366,2025-01-01
2,CUS_0x100b,113781.38999999998,9549.7825,1,4,1,0,,14,8,...,Good,1030.2,28.592943,15 Years and 10 Months,No,0.0,617.0792665202719,High_spent_Small_value_payments,597.8989834797281,2024-03-01
3,CUS_0x1011,58918.47,5208.8725,3,3,17,3,"Student Loan, Credit-Builder Loan, and Debt Co...",27,13,...,Standard,473.14,27.829959,15 Years and 10 Months,Yes,123.434939,383.35084463651407,Low_spent_Medium_value_payments,294.1014665671429,2023-11-01
4,CUS_0x1013,98620.98,7962.415,3,3,6,3,"Student Loan, Debt Consolidation Loan, and Per...",12,9,...,Good,1233.51,26.524864,17 Years and 10 Months,No,228.018084,332.3337079767732,High_spent_Medium_value_payments,485.8897083704929,2023-12-01


In [13]:
loan_daily_df = pd.read_csv(loan_daily)
loan_daily_df.head()

Unnamed: 0,loan_id,Customer_ID,loan_start_date,tenure,installment_num,loan_amt,due_amt,paid_amt,overdue_amt,balance,snapshot_date
0,CUS_0x1000_2023_05_01,CUS_0x1000,2023-05-01,10,0,10000,0.0,0.0,0.0,10000.0,2023-05-01
1,CUS_0x1000_2023_05_01,CUS_0x1000,2023-05-01,10,1,10000,1000.0,1000.0,0.0,9000.0,2023-06-01
2,CUS_0x1000_2023_05_01,CUS_0x1000,2023-05-01,10,2,10000,1000.0,1000.0,0.0,8000.0,2023-07-01
3,CUS_0x1000_2023_05_01,CUS_0x1000,2023-05-01,10,3,10000,1000.0,0.0,1000.0,8000.0,2023-08-01
4,CUS_0x1000_2023_05_01,CUS_0x1000,2023-05-01,10,4,10000,1000.0,2000.0,0.0,6000.0,2023-09-01


In [43]:
clickstream_df.snapshot_date.max()

'2024-12-01'

In [2]:
def process_bronze_table(snapshot_date_str, bronze_lms_directory, spark):
    # prepare arguments
    snapshot_date = datetime.strptime(snapshot_date_str, "%Y-%m-%d")
    
    # connect to source back end - IRL connect to back end source system
    csv_file_path = "data/lms_loan_daily.csv"

    # load data - IRL ingest from back end source system
    df = spark.read.csv(csv_file_path, header=True, inferSchema=True).filter(col('snapshot_date') == snapshot_date)
    print(snapshot_date_str + 'row count:', df.count())

    # save bronze table to datamart - IRL connect to database to write
    partition_name = "bronze_loan_daily_" + snapshot_date_str.replace('-','_') + '.csv'
    filepath = bronze_lms_directory + partition_name
    df.toPandas().to_csv(filepath, index=False)
    print('saved to:', filepath)

    return df


In [4]:
bronze_lms_directory = "datamart/bronze/lms/"
snapshotdate = "2023-01-01"

spark = pyspark.sql.SparkSession.builder \
        .appName("dev") \
        .master("local[*]") \
        .getOrCreate()

process_bronze_table(snapshotdate, bronze_lms_directory, spark)

2023-01-01row count: 530
saved to: datamart/bronze/lms/bronze_loan_daily_2023_01_01.csv


DataFrame[loan_id: string, Customer_ID: string, loan_start_date: date, tenure: int, installment_num: int, loan_amt: int, due_amt: double, paid_amt: double, overdue_amt: double, balance: double, snapshot_date: date]

In [5]:
def process_silver_table(snapshot_date_str, bronze_lms_directory, silver_loan_daily_directory, spark):
    # prepare arguments
    snapshot_date = datetime.strptime(snapshot_date_str, "%Y-%m-%d")
    
    # connect to bronze table
    partition_name = "bronze_loan_daily_" + snapshot_date_str.replace('-','_') + '.csv'
    filepath = bronze_lms_directory + partition_name
    df = spark.read.csv(filepath, header=True, inferSchema=True)
    print('loaded from:', filepath, 'row count:', df.count())

    # clean data: enforce schema / data type
    # Dictionary specifying columns and their desired datatypes
    column_type_map = {
        "loan_id": StringType(),
        "Customer_ID": StringType(),
        "loan_start_date": DateType(),
        "tenure": IntegerType(),
        "installment_num": IntegerType(),
        "loan_amt": FloatType(),
        "due_amt": FloatType(),
        "paid_amt": FloatType(),
        "overdue_amt": FloatType(),
        "balance": FloatType(),
        "snapshot_date": DateType(),
    }

    for column, new_type in column_type_map.items():
        df = df.withColumn(column, col(column).cast(new_type))

    # augment data: add month on book
    df = df.withColumn("mob", col("installment_num").cast(IntegerType()))

    # augment data: add days past due
    df = df.withColumn("installments_missed", F.ceil(col("overdue_amt") / col("due_amt")).cast(IntegerType())).fillna(0)
    df = df.withColumn("first_missed_date", F.when(col("installments_missed") > 0, F.add_months(col("snapshot_date"), -1 * col("installments_missed"))).cast(DateType()))
    df = df.withColumn("dpd", F.when(col("overdue_amt") > 0.0, F.datediff(col("snapshot_date"), col("first_missed_date"))).otherwise(0).cast(IntegerType()))

    # save silver table - IRL connect to database to write
    partition_name = "silver_loan_daily_" + snapshot_date_str.replace('-','_') + '.parquet'
    filepath = silver_loan_daily_directory + partition_name
    df.write.mode("overwrite").parquet(filepath)
    # df.toPandas().to_parquet(filepath,
    #           compression='gzip')
    print('saved to:', filepath)
    
    return df

In [None]:
def process_labels_gold_table(snapshot_date_str, silver_loan_daily_directory, gold_label_store_directory, spark, dpd, mob):
    
    # prepare arguments
    snapshot_date = datetime.strptime(snapshot_date_str, "%Y-%m-%d")
    
    # connect to bronze table
    partition_name = "silver_loan_daily_" + snapshot_date_str.replace('-','_') + '.parquet'
    filepath = silver_loan_daily_directory + partition_name
    df = spark.read.parquet(filepath)
    print('loaded from:', filepath, 'row count:', df.count())

    # get customer at mob
    df = df.filter(col("mob") == mob)

    # get label
    df = df.withColumn("label", F.when(col("dpd") >= dpd, 1).otherwise(0).cast(IntegerType()))
    df = df.withColumn("label_def", F.lit(str(dpd)+'dpd_'+str(mob)+'mob').cast(StringType()))

    # select columns to save
    df = df.select("loan_id", "Customer_ID", "label", "label_def", "snapshot_date")

    # save gold table - IRL connect to database to write
    partition_name = "gold_label_store_" + snapshot_date_str.replace('-','_') + '.parquet'
    filepath = gold_label_store_directory + partition_name
    df.write.mode("overwrite").parquet(filepath)
    # df.toPandas().to_parquet(filepath,
    #           compression='gzip')
    print('saved to:', filepath)
    
    return df