In [1]:
import os
import glob
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
import pyspark.sql.functions as F

from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType, DecimalType, BooleanType

import utils


## set up pyspark session

In [2]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/22 15:12:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## simulate monthly data-stream

In [3]:
# set up config
snapshot_date_str = "2023-01-01"

start_date_str = "2023-01-01"
end_date_str = "2024-12-01"

In [4]:
# generate list of dates to process
def generate_first_of_month_dates(start_date_str, end_date_str):
    # Convert the date strings to datetime objects
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
    end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
    
    # List to store the first of month dates
    first_of_month_dates = []

    # Start from the first of the month of the start_date
    current_date = datetime(start_date.year, start_date.month, 1)

    while current_date <= end_date:
        # Append the date in yyyy-mm-dd format
        first_of_month_dates.append(current_date.strftime("%Y-%m-%d"))
        
        # Move to the first of the next month
        if current_date.month == 12:
            current_date = datetime(current_date.year + 1, 1, 1)
        else:
            current_date = datetime(current_date.year, current_date.month + 1, 1)

    return first_of_month_dates

dates_str_lst = generate_first_of_month_dates(start_date_str, end_date_str)
dates_str_lst

['2023-01-01',
 '2023-02-01',
 '2023-03-01',
 '2023-04-01',
 '2023-05-01',
 '2023-06-01',
 '2023-07-01',
 '2023-08-01',
 '2023-09-01',
 '2023-10-01',
 '2023-11-01',
 '2023-12-01',
 '2024-01-01',
 '2024-02-01',
 '2024-03-01',
 '2024-04-01',
 '2024-05-01',
 '2024-06-01',
 '2024-07-01',
 '2024-08-01',
 '2024-09-01',
 '2024-10-01',
 '2024-11-01',
 '2024-12-01']

## Initial look at the Data

In [5]:
df_features_clickstream = spark.read.csv('data/feature_clickstream.csv', header=True, inferSchema=True)
df_features_attributes = spark.read.csv('data/features_attributes.csv', header=True, inferSchema=True)
df_features_financials = spark.read.csv('data/features_financials.csv', header=True, inferSchema=True)
df_label = spark.read.csv('data/lms_loan_daily.csv', header=True, inferSchema=True)
data = {
    'clickstream': df_features_clickstream, 
    'attributes': df_features_attributes, 
    'financials': df_features_financials, 
    'label': df_label
}

In [6]:
# getting an initial look at the data
for df in data.values():
    df.printSchema()
    df.show(5)

root
 |-- fe_1: integer (nullable = true)
 |-- fe_2: integer (nullable = true)
 |-- fe_3: integer (nullable = true)
 |-- fe_4: integer (nullable = true)
 |-- fe_5: integer (nullable = true)
 |-- fe_6: integer (nullable = true)
 |-- fe_7: integer (nullable = true)
 |-- fe_8: integer (nullable = true)
 |-- fe_9: integer (nullable = true)
 |-- fe_10: integer (nullable = true)
 |-- fe_11: integer (nullable = true)
 |-- fe_12: integer (nullable = true)
 |-- fe_13: integer (nullable = true)
 |-- fe_14: integer (nullable = true)
 |-- fe_15: integer (nullable = true)
 |-- fe_16: integer (nullable = true)
 |-- fe_17: integer (nullable = true)
 |-- fe_18: integer (nullable = true)
 |-- fe_19: integer (nullable = true)
 |-- fe_20: integer (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- snapshot_date: date (nullable = true)

+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----------+-------------+
|fe_1|fe_2|fe_3|fe

Insights: </br>
1. clickstream data looks like an incremental table
2. attributes data looks like overwrite table (have to confirm if customer id is unique identifier)
3. financials a bit unclear, could be both
4. loan data is also incremental

## Bronze Table Development

add all data to bronze database in monthly batches.
raw data, no cleaning, no typechecking, etc.

In [15]:
from utils.bronze_processing import add_data_bronze

In [16]:
source_tables = {
    'clickstream_data': 'data/feature_clickstream.csv',
    'customer_attributes': 'data/features_attributes.csv',
    'customer_financials': 'data/features_financials.csv',
    'loan_data': 'data/lms_loan_daily.csv'
}

for key, value in source_tables.items():
    for date in dates_str_lst:
        add_data_bronze(date, value, key, spark)
        print(f"Added data for {key} on {date}")

2023-01-01row count: 8974
saved to: datamart/bronze/clickstream_data/bronze_clickstream_data_2023_01_01.csv
Added data for clickstream_data on 2023-01-01
2023-02-01row count: 8974
saved to: datamart/bronze/clickstream_data/bronze_clickstream_data_2023_02_01.csv
Added data for clickstream_data on 2023-02-01
2023-03-01row count: 8974
saved to: datamart/bronze/clickstream_data/bronze_clickstream_data_2023_03_01.csv
Added data for clickstream_data on 2023-03-01
2023-04-01row count: 8974
saved to: datamart/bronze/clickstream_data/bronze_clickstream_data_2023_04_01.csv
Added data for clickstream_data on 2023-04-01
2023-05-01row count: 8974
saved to: datamart/bronze/clickstream_data/bronze_clickstream_data_2023_05_01.csv
Added data for clickstream_data on 2023-05-01
2023-06-01row count: 8974
saved to: datamart/bronze/clickstream_data/bronze_clickstream_data_2023_06_01.csv
Added data for clickstream_data on 2023-06-01
2023-07-01row count: 8974
saved to: datamart/bronze/clickstream_data/bronze_

## Silver Table Development

<b>Clickstream Data Processing</b></br>
- all feature columns are integers (Null Values ok = 0)
- customer_id and snapshot_date are strings
- Remove rows with customer_id or snapshot_date null
- Remove duplicates
- Clickstream Data has an entry for every customer for every snapshot_date
- All feature columns have roughly the same distribution as shown by min & max values, mean and std

In [5]:
from utils.data_loading import load_data

# Loading clickstream data
df_clickstream = load_data(spark, 'datamart/bronze/clickstream_data', None)

Loading csv files with wildcard: datamart/bronze/clickstream_data/*.csv
Loaded data from all csv files in datamart/bronze/clickstream_data. Row count: 215376


In [6]:
print("Clickstream Information:")
df_clickstream.printSchema()
df_clickstream.show(5)

Clickstream Information:
root
 |-- fe_1: integer (nullable = true)
 |-- fe_2: integer (nullable = true)
 |-- fe_3: integer (nullable = true)
 |-- fe_4: integer (nullable = true)
 |-- fe_5: integer (nullable = true)
 |-- fe_6: integer (nullable = true)
 |-- fe_7: integer (nullable = true)
 |-- fe_8: integer (nullable = true)
 |-- fe_9: integer (nullable = true)
 |-- fe_10: integer (nullable = true)
 |-- fe_11: integer (nullable = true)
 |-- fe_12: integer (nullable = true)
 |-- fe_13: integer (nullable = true)
 |-- fe_14: integer (nullable = true)
 |-- fe_15: integer (nullable = true)
 |-- fe_16: integer (nullable = true)
 |-- fe_17: integer (nullable = true)
 |-- fe_18: integer (nullable = true)
 |-- fe_19: integer (nullable = true)
 |-- fe_20: integer (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- snapshot_date: date (nullable = true)

+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----------+--------

In [14]:
print(f"Total entries: {df_clickstream.count()}")
print(f"Unique Customer_IDs: {df_clickstream.select("Customer_ID").distinct().count()}")
print(f"Unique Snapshot_Dates: {df_clickstream.select("Snapshot_Date").distinct().count()}")

Total entries: 215376
Unique Customer_IDs: 8974
Unique Snapshot_Dates: 24


In [17]:
df_clickstream.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------+
|summary|              fe_1|              fe_2|              fe_3|              fe_4|              fe_5|              fe_6|              fe_7|              fe_8|              fe_9|             fe_10|             fe_11|             fe_12|            fe_13|             fe_14|             fe_15|             fe_16|             fe_17|             fe_18|             fe_19|             fe_20|Customer_ID|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------

In [30]:
def add_clickstream_data_silver(spark: SparkSession, date: str, bronze_directory: str, silver_directory: str):
    '''
    Function to process clickstream data from bronze table to silver table. Enforces schema and data quality checks.
    Args:
        spark (SparkSession): Spark session object.
        date (str): Date for which data is being processed (corresponds to partition). Format: 'YYYY-MM-DD'.
        bronze_directory (str): Path to the bronze directory.
        silver_directory (str): Path to the silver directory.
    '''

    # Check input arguments
    if not os.path.exists(bronze_directory):
        raise FileNotFoundError(f"Bronze directory {bronze_directory} does not exist.")
    
    # Load data from bronze directory
    partition = 'bronze_clickstream_data_' + date.replace("-", "_") + '.csv'
    df = load_data(spark, bronze_directory, partition)
    bronze_count = df.count()
    if df is None or bronze_count == 0:
        raise ValueError(f"No data found in bronze directory for partition {partition}.")
    
    # Data quality checks

    # enforce schema
    column_type_map = {
        "fe_1": IntegerType(),
        "fe_2": IntegerType(),
        "fe_3": IntegerType(),
        "fe_4": IntegerType(),
        "fe_5": IntegerType(),
        "fe_6": IntegerType(),
        "fe_7": IntegerType(),
        "fe_8": IntegerType(),
        "fe_9": IntegerType(),
        "fe_10": IntegerType(),
        "fe_11": IntegerType(),
        "fe_12": IntegerType(),
        "fe_13": IntegerType(),
        "fe_14": IntegerType(),
        "fe_15": IntegerType(),
        "fe_16": IntegerType(),
        "fe_17": IntegerType(),
        "fe_18": IntegerType(),
        "fe_19": IntegerType(),
        "fe_20": IntegerType(),
        "Customer_ID": StringType(),
        "snapshot_date": DateType()
        }
    for column, dtype in column_type_map.items():
        df = df.withColumn(column, col(column).cast(dtype))

    # Handle null values (0 for numerical, remove if customer_id or snapshot_date are corrupted)
    for column in column_type_map.keys():
        if column == "Customer_ID" or column == "snapshot_date":
            df = df.filter(col(column).isNotNull())
        else:
            df = df.fillna(0, subset=[column])
    
    # Remove duplicates
    df = df.dropDuplicates(["Customer_ID", "snapshot_date"])
    
    # check for row count after cleaning
    silver_count = df.count()
    if silver_count != bronze_count:
        print(f"Warning: Row count changed from {bronze_count} to {silver_count} after cleaning for partition {partition}.")

    # Ensure that silver directory exists 
    os.makedirs(os.path.dirname(silver_directory), exist_ok=True)

    # Save the cleaned data to the silver directory as parquet
    silver_partition_name = 'silver_clickstream_data_' + date.replace("-", "_") + '.parquet'
    silver_filepath = os.path.join(silver_directory, silver_partition_name)

    df.write.mode("overwrite").parquet(silver_filepath)
    print(f"Successfully processed and saved data to {silver_filepath}.")

    return


In [33]:
import utils.silver_processing

for date in dates_str_lst:
    utils.silver_processing.add_clickstream_data_silver(spark, date, 'datamart/bronze/clickstream_data', 'datamart/silver/clickstream_data')


Loaded data from datamart/bronze/clickstream_data/bronze_clickstream_data_2023_01_01.csv. Row count: 8974
Successfully processed and saved data to datamart/silver/clickstream_data/silver_clickstream_data_2023_01_01.parquet.
Loaded data from datamart/bronze/clickstream_data/bronze_clickstream_data_2023_02_01.csv. Row count: 8974
Successfully processed and saved data to datamart/silver/clickstream_data/silver_clickstream_data_2023_02_01.parquet.
Loaded data from datamart/bronze/clickstream_data/bronze_clickstream_data_2023_03_01.csv. Row count: 8974
Successfully processed and saved data to datamart/silver/clickstream_data/silver_clickstream_data_2023_03_01.parquet.
Loaded data from datamart/bronze/clickstream_data/bronze_clickstream_data_2023_04_01.csv. Row count: 8974
Successfully processed and saved data to datamart/silver/clickstream_data/silver_clickstream_data_2023_04_01.parquet.
Loaded data from datamart/bronze/clickstream_data/bronze_clickstream_data_2023_05_01.csv. Row count: 897

<b>Customer Attributes Processing</b></br>
- overwrite table: each customer_id is unique
- Name column will be removed as person identifiable information does not belong here
- Same with SSN
- Enforce IntegerType on Age and do validity check (age > 0 and age < 100)
- Occupation one of 15 types or null value --> categorical variable

In [6]:
from utils.data_loading import load_data

df_attributes = load_data(spark, 'datamart/bronze/customer_attributes', None)
print("Customer Attributes Information:")
df_attributes.printSchema()
df_attributes.show(5)
print(f"Total entries: {df_attributes.count()}")
print(f"Unique Customer_IDs: {df_attributes.select("Customer_ID").distinct().count()}")
print(f"Unique Snapshot_Dates: {df_attributes.select("Snapshot_Date").distinct().count()}")
df_attributes.describe().show()

Loading csv files with wildcard: datamart/bronze/customer_attributes/*.csv
Loaded data from all csv files in datamart/bronze/customer_attributes. Row count: 11974
Customer Attributes Information:
root
 |-- Customer_ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SSN: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- snapshot_date: date (nullable = true)

+-----------+-----------+---+-----------+------------+-------------+
|Customer_ID|       Name|Age|        SSN|  Occupation|snapshot_date|
+-----------+-----------+---+-----------+------------+-------------+
| CUS_0x10ac|      Zhouy| 29|780-50-4730|   Developer|   2024-08-01|
| CUS_0x10c5|      Moony| 24|041-74-6785|     _______|   2024-08-01|
| CUS_0x1145|Blenkinsopr| 24|426-31-9194|     Teacher|   2024-08-01|
| CUS_0x11ac|  Liana B.v|26_|835-92-7751|  Journalist|   2024-08-01|
| CUS_0x122c| Papadimasf| 48|883-73-9594|Entrepreneur|   2024-08-01|
+-----------

In [7]:
occupations = df_attributes.select("Occupation").distinct().collect()
print(f"Unique Occupations: {df_attributes.select("Occupation").distinct().count()}")
print("Occupations:")
for occupation in occupations:
    print(occupation[0])

Unique Occupations: 16
Occupations:
Scientist
Media_Manager
Musician
Lawyer
Teacher
Developer
Writer
Architect
Mechanic
Entrepreneur
Journalist
Doctor
Engineer
Accountant
Manager
_______


In [8]:
low_age = df_attributes.select("Age").filter(col("Age") < 0).collect()
high_age = df_attributes.select("Age").filter(col("Age") > 100).collect()
print(f"Low Age Values: {len(low_age)}")
print(f"High Age Values: {len(high_age)}")
print("Low Age Values:")
print([age[0] for age in low_age])
print("High Age Values:")
print([age[0] for age in high_age])
print(f"Number of null values in Age: {df_attributes.filter(col("Age").isNull()).count()}")
print(f"Number of values with age = 0: {df_attributes.filter(col("Age") == 0).count()}")

Low Age Values: 99
High Age Values: 194
Low Age Values:
['-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500', '-500']
High Age Values:
['5656', '3640', '1388', '2672', '8125', '3441', '2598', '3856', '8467', '8100', '6283', '8547', '1094', '8448', '2038', '203', '2604'

In [31]:
def add_customer_attributes_silver(spark: SparkSession, date: str, bronze_directory: str, silver_directory: str):
    '''
    Function to process customer attributes data from bronze table to silver table. Enforces schema and data quality checks.
    Args:
        spark (SparkSession): Spark session object.
        date (str): Date for which data is being processed (corresponds to partition). Format: 'YYYY-MM-DD'.
        bronze_directory (str): Path to the bronze directory.
        silver_directory (str): Path to the silver directory.
    '''

    # Check input arguments
    if not os.path.exists(bronze_directory):
        raise FileNotFoundError(f"Bronze directory {bronze_directory} does not exist.")
    
    # Load data from bronze directory
    partition = 'bronze_customer_attributes_' + date.replace("-", "_") + '.csv'
    df = load_data(spark, bronze_directory, partition)
    bronze_count = df.count()
    if df is None or bronze_count == 0:
        raise ValueError(f"No data found in bronze directory for partition {partition}.")
    
    # Data quality checks

    # drop columns name and ssn
    df = df.drop("Name", "SSN")

    # enforce schema
    column_type_map = {
        "Age": IntegerType(),
        "Occupation": StringType(),
        "Customer_ID": StringType(),
        "snapshot_date": DateType()
        }
    for column, dtype in column_type_map.items():
        df = df.withColumn(column, col(column).cast(dtype))

    # Handle corrupted values (1 of 15 categories for occupation, sanity check for age, remove if customer_id or snapshot_date are corrupted)
    df = df.filter(col("Customer_ID").isNotNull() | col("snapshot_date").isNotNull())
    df = df.withColumn("Age", F.when((col("Age") >= 0) & (col("Age") <= 100), col("Age")).otherwise(0))
    age_null_count = df.filter(col("Age") == 0).count()
    df = df.withColumn("Occupation", 
                       F.when(col("Occupation").isin([
                           "Scientist", 
                           "Media_Manager", 
                           "Musician", 
                           "Lawyer", 
                           "Teacher", 
                           "Developer", 
                           "Writer", 
                           "Architect", 
                           "Mechanic", 
                           "Entrepreneur", 
                           "Journalist", 
                           "Doctor", 
                           "Engineer", 
                           "Accountant", 
                           "Manager", 
                           "Other"
                           ]), 
                        col("Occupation")).otherwise("Other"))
    occupation_null_count = df.filter(col("Occupation") == "Other").count()
    # Remove duplicates
    df = df.dropDuplicates(["Customer_ID", "snapshot_date"])
    
    # check for row count after cleaning
    row_difference = df.count() - bronze_count
    if any([row_difference, age_null_count, occupation_null_count]) > 0:
        print(f"Warning: Cleaning resulted in {row_difference} rows removed, {age_null_count} age nulls, and {occupation_null_count} occupation nulls for partition {partition}.")

    # Ensure that silver directory exists 
    os.makedirs(os.path.dirname(silver_directory), exist_ok=True)

    # Save the cleaned data to the silver directory as parquet
    silver_partition_name = 'silver_customer_attributes_' + date.replace("-", "_") + '.parquet'
    silver_filepath = os.path.join(silver_directory, silver_partition_name)

    df.write.mode("overwrite").parquet(silver_filepath)
    print(f"Successfully processed and saved data to {silver_filepath}.")

    return


In [9]:
import utils.silver_processing

for date in dates_str_lst: 
    utils.silver_processing.add_customer_attributes_silver(spark, date, 'datamart/bronze/customer_attributes', 'datamart/silver/customer_attributes')

Loaded data from datamart/bronze/customer_attributes/bronze_customer_attributes_2023_01_01.csv. Row count: 530
Successfully processed and saved data to datamart/silver/customer_attributes/silver_customer_attributes_2023_01_01.parquet.
Loaded data from datamart/bronze/customer_attributes/bronze_customer_attributes_2023_02_01.csv. Row count: 501
Successfully processed and saved data to datamart/silver/customer_attributes/silver_customer_attributes_2023_02_01.parquet.
Loaded data from datamart/bronze/customer_attributes/bronze_customer_attributes_2023_03_01.csv. Row count: 506
Successfully processed and saved data to datamart/silver/customer_attributes/silver_customer_attributes_2023_03_01.parquet.
Loaded data from datamart/bronze/customer_attributes/bronze_customer_attributes_2023_04_01.csv. Row count: 510
Successfully processed and saved data to datamart/silver/customer_attributes/silver_customer_attributes_2023_04_01.parquet.
Loaded data from datamart/bronze/customer_attributes/bronze_

<b>Customer Financials Processing</b></br>
- customer_id: StringType, remove rows with corrupted values
- annual income: FloatType with 2 decimals, sanity check (>0)
- monthly inhand salary: FloatType with 2 decimals, sanity check (>0 & aligns with anual income data?)
- num bank accounts: IntegerType, sanity check (> 0, < 100?)
- num credit cards same
- Interest rate: IntegerType
- num of loan: IntegerType, > 0, 
- type of loan: categorical, StringType, 
- Delay from due date: IntegerType
- num of delayed payments: IntegerType, > 0
- credit limit: FloatType, 2 decimals
- credit inquiries: IntegerType, > 0
- credit mix: categorical, StringType
- outstanding debt: FloatType, 2 decimals, > 0
- credit utilization ratio: FloatType, 2 decimals, > 0, < 100 --> seems like its given in %
- credit history age: transform into IntegerType (regex?), > 0
- payment min amount: Boolean
- total emi per month: FloatType, 2 decimals, fixed amount paid to bank each month to repay the loan
- amount invested monthly: FloatType, 2 decimals, > 0
- payment behaviour: categorical, StringType
- monthly, balance: FloatType, 2 decimals, > 0 ?
- snapshot date: DateType, remove row if corrupted


In [7]:
from utils.data_loading import load_data

df_financials = load_data(spark, 'datamart/bronze/customer_financials', None)
print("Customer Financials Information:")
df_financials.printSchema()
df_financials.show(5)
print(f"Total entries: {df_financials.count()}")
print(f"Unique Customer_IDs: {df_financials.select("Customer_ID").distinct().count()}")
print(f"Unique Snapshot_Dates: {df_financials.select("Snapshot_Date").distinct().count()}")
df_financials.describe().show()

Loading csv files with wildcard: datamart/bronze/customer_financials/*.csv
Loaded data from all files from datamart/bronze/customer_financials. Row count: 11974
Customer Financials Information:
root
 |-- Customer_ID: string (nullable = true)
 |-- Annual_Income: string (nullable = true)
 |-- Monthly_Inhand_Salary: double (nullable = true)
 |-- Num_Bank_Accounts: integer (nullable = true)
 |-- Num_Credit_Card: integer (nullable = true)
 |-- Interest_Rate: integer (nullable = true)
 |-- Num_of_Loan: string (nullable = true)
 |-- Type_of_Loan: string (nullable = true)
 |-- Delay_from_due_date: integer (nullable = true)
 |-- Num_of_Delayed_Payment: string (nullable = true)
 |-- Changed_Credit_Limit: string (nullable = true)
 |-- Num_Credit_Inquiries: double (nullable = true)
 |-- Credit_Mix: string (nullable = true)
 |-- Outstanding_Debt: string (nullable = true)
 |-- Credit_Utilization_Ratio: double (nullable = true)
 |-- Credit_History_Age: string (nullable = true)
 |-- Payment_of_Min_Amo

In [8]:
types_of_loans = df_financials.select("Type_of_Loan").distinct().collect()
print(f"Unique Loan Types: {df_financials.select("Type_of_Loan").distinct().count()}")
print("Loan Types:")
print([loan[0] for loan in types_of_loans])

Unique Loan Types: 6017
Loan Types:
['Personal Loan, Personal Loan, Credit-Builder Loan, Student Loan, and Auto Loan', 'Credit-Builder Loan, Student Loan, Home Equity Loan, Debt Consolidation Loan, Not Specified, Not Specified, Home Equity Loan, Student Loan, and Personal Loan', 'Personal Loan, Not Specified, Student Loan, and Mortgage Loan', 'Credit-Builder Loan, Student Loan, and Home Equity Loan', 'Personal Loan, Debt Consolidation Loan, Personal Loan, Home Equity Loan, and Debt Consolidation Loan', 'Credit-Builder Loan, Debt Consolidation Loan, Not Specified, Credit-Builder Loan, Debt Consolidation Loan, and Auto Loan', 'Personal Loan, and Mortgage Loan', 'Not Specified, Debt Consolidation Loan, and Auto Loan', 'Mortgage Loan, Credit-Builder Loan, Payday Loan, Not Specified, Personal Loan, Credit-Builder Loan, Debt Consolidation Loan, Student Loan, and Mortgage Loan', 'Debt Consolidation Loan, Credit-Builder Loan, Home Equity Loan, and Personal Loan', 'Payday Loan, Payday Loan, Cre

Loan Types:
- Personal Loan
- Student Loan
- Payday Loan
- Auto Loan
- Home Equity Loan
- Mortgage Loan
- Credit-Builder Loan
- Debt Consolidation Loan
- Not Specified

Type_of_Loan column combines them all together, do one-hot-encoding directly here or gold level?

In [9]:
credit_mix = df_financials.select("Credit_Mix").distinct().collect()
print(f"Unique Credit Mixes: {df_financials.select("Credit_Mix").distinct().count()}")
print("Credit Mixes:")
print([credit[0] for credit in credit_mix])

Unique Credit Mixes: 4
Credit Mixes:
['_', 'Good', 'Bad', 'Standard']


In [10]:
payment_behaviour = df_financials.select("Payment_Behaviour").distinct().collect()
print(f"Unique Behaviours: {df_financials.select("Payment_Behaviour").distinct().count()}")
print("Types:")
print([pb[0] for pb in payment_behaviour])

Unique Behaviours: 7
Types:
['Low_spent_Small_value_payments', 'High_spent_Medium_value_payments', 'High_spent_Small_value_payments', 'Low_spent_Large_value_payments', 'Low_spent_Medium_value_payments', 'High_spent_Large_value_payments', '!@9#%8']


In [12]:
df_financials.select("Credit_History_Age").show(10, truncate=False)

+----------------------+
|Credit_History_Age    |
+----------------------+
|16 Years and 3 Months |
|30 Years and 2 Months |
|12 Years and 5 Months |
|17 Years and 5 Months |
|22 Years and 2 Months |
|5 Years and 11 Months |
|5 Years and 5 Months  |
|10 Years and 9 Months |
|13 Years and 10 Months|
|12 Years and 2 Months |
+----------------------+
only showing top 10 rows



In [20]:
def add_customer_financials_silver(spark: SparkSession, date: str, bronze_directory: str, silver_directory: str):
    '''
    Function to process customer financials data from bronze table to silver table. Enforces schema and data quality checks.
    Args:
        spark (SparkSession): Spark session object.
        date (str): Date for which data is being processed (corresponds to partition). Format: 'YYYY-MM-DD'.
        bronze_directory (str): Path to the bronze directory.
        silver_directory (str): Path to the silver directory.
    '''

    # Check input arguments
    if not os.path.exists(bronze_directory):
        raise FileNotFoundError(f"Bronze directory {bronze_directory} does not exist.")
    
    # Load data from bronze directory
    partition = 'bronze_customer_financials_' + date.replace("-", "_") + '.csv'
    df = load_data(spark, bronze_directory, partition)
    bronze_count = df.count()
    if df is None or bronze_count == 0:
        raise ValueError(f"No data found in bronze directory for partition {partition}.")
    
    # Data quality checks

    # enforce schema
    column_type_map = {
        "Customer_ID": StringType(),
        "Annual_Income": DecimalType(18, 2),
        "Monthly_Inhand_Salary": DecimalType(18, 2),
        "Num_Bank_Accounts": IntegerType(),
        "Num_Credit_Card": IntegerType(),
        "Interest_Rate": IntegerType(),
        "Num_of_Loan": IntegerType(),
        "Type_of_Loan": StringType(),
        "Delay_from_due_date": IntegerType(),
        "Num_of_Delayed_Payment": IntegerType(),
        "Changed_Credit_Limit": FloatType(),
        "Num_Credit_Inquiries": IntegerType(),
        "Credit_Mix": StringType(),
        "Outstanding_Debt": DecimalType(18, 2),
        "Credit_Utilization_Ratio": DecimalType(4, 2),
        "Credit_History_Age": StringType(),
        "Payment_of_Min_Amount": BooleanType(),
        "Total_EMI_per_month": DecimalType(18, 2),
        "Amount_invested_monthly": DecimalType(18, 2),
        "Payment_Behaviour": StringType(),
        "Monthly_Balance": DecimalType(18, 2),
        "snapshot_date": DateType()
        }
    
    for column, dtype in column_type_map.items():
        df = df.withColumn(column, col(column).cast(dtype))

    # Handle corrupted values, transform values
    ## identifiers have to be present
    df = df.filter(col("Customer_ID").isNotNull() | col("snapshot_date").isNotNull())

    ## enforce false for boolean values if not present
    df = df.withColumn("Payment_of_Min_Amount", F.when(col("Payment_of_Min_Amount").isNotNull(), col("Payment_of_Min_Amount")).otherwise(False))

    ## numerical values checking (> 0 for all, < 100 for interest rate and credit utilization ratio)
    df = df.withColumn("Annual_Income", F.when(col("Annual_Income") > 0, col("Annual_Income")).otherwise(0))
    df = df.withColumn("Monthly_Inhand_Salary", F.when(col("Monthly_Inhand_Salary") > 0, col("Monthly_Inhand_Salary")).otherwise(0))
    df = df.withColumn("Num_Bank_Accounts", F.when(col("Num_Bank_Accounts") > 0, col("Num_Bank_Accounts")).otherwise(0))
    df = df.withColumn("Num_Credit_Card", F.when(col("Num_Credit_Card") > 0, col("Num_Credit_Card")).otherwise(0))
    df = df.withColumn("Interest_Rate", F.when((col("Interest_Rate") > 0) & (col("Interest_Rate") < 100), col("Interest_Rate")).otherwise(0))
    df = df.withColumn("Num_of_Loan", F.when(col("Num_of_Loan") > 0, col("Num_of_Loan")).otherwise(0))
    df = df.withColumn("Delay_from_due_date", F.when(col("Delay_from_due_date") > 0, col("Delay_from_due_date")).otherwise(0))
    df = df.withColumn("Num_of_Delayed_Payment", F.when(col("Num_of_Delayed_Payment") > 0, col("Num_of_Delayed_Payment")).otherwise(0))
    df = df.withColumn("Changed_Credit_Limit", F.when(col("Changed_Credit_Limit") > 0, col("Changed_Credit_Limit")).otherwise(0))
    df = df.withColumn("Num_Credit_Inquiries", F.when(col("Num_Credit_Inquiries") > 0, col("Num_Credit_Inquiries")).otherwise(0))
    df = df.withColumn("Outstanding_Debt", F.when(col("Outstanding_Debt") > 0, col("Outstanding_Debt")).otherwise(0))
    df = df.withColumn("Credit_Utilization_Ratio", F.when((col("Credit_Utilization_Ratio") > 0) & (col("Credit_Utilization_Ratio") < 100), col("Credit_Utilization_Ratio")).otherwise(0))
    df = df.withColumn("Total_EMI_per_month", F.when(col("Total_EMI_per_month") > 0, col("Total_EMI_per_month")).otherwise(0))
    df = df.withColumn("Amount_invested_monthly", F.when(col("Amount_invested_monthly") > 0, col("Amount_invested_monthly")).otherwise(0))
    df = df.withColumn("Monthly_Balance", F.when(col("Monthly_Balance") > 0, col("Monthly_Balance")).otherwise(0))
    
    ## categorical values checking
    df = df.withColumn("Credit_Mix", 
                       F.when(col("Credit_Mix").isin([
                           "Good", 
                           "Bad", 
                           "Standard", 
                           "Unknown"
                           ]), 
                        col("Credit_Mix")).otherwise("Unknown"))
    df = df.withColumn("Payment_Behaviour",
                       F.when(col("Payment_Behaviour").isin([
                           "Low_spent_Small_value_payments", 
                           "Low_spent_Medium_value_payments", 
                           "Low_spent_Large_value_payments", 
                           "High_spent_Small_value_payments", 
                           "High_spent_Medium_value_payments", 
                           "High_spent_Large_value_payments"
                           ]), 
                        col("Payment_Behaviour")).otherwise("Unknown"))
    
    ## Transform credit history age to no of months 
    years_col = F.coalesce(
        F.regexp_extract(col("Credit_History_Age"), r"(\d+)\s+Year(s)?", 1).cast(IntegerType()), 
        F.lit(0)
    )
    months_col = F.coalesce(
        F.regexp_extract(col("Credit_History_Age"), r"(\d+)\s+Month(s)?", 1).cast(IntegerType()), 
        F.lit(0)
    )
    df = df.withColumn("Credit_History_Age", ((years_col * 12) + months_col).cast(IntegerType()))

    # Remove duplicates
    df = df.dropDuplicates(["Customer_ID", "snapshot_date"])
    
    # check for row count after cleaning
    if bronze_count - df.count() != 0:
        print(f"Warning: Cleaning resulted in {bronze_count - df.count()} rows removed for partition {partition}.")

    # Ensure that silver directory exists 
    os.makedirs(os.path.dirname(silver_directory), exist_ok=True)

    # Save the cleaned data to the silver directory as parquet
    silver_partition_name = 'silver_customer_financials_' + date.replace("-", "_") + '.parquet'
    silver_filepath = os.path.join(silver_directory, silver_partition_name)

    df.write.mode("overwrite").parquet(silver_filepath)
    print(f"Successfully processed and saved data to {silver_filepath}.")

    return


In [21]:
for date in dates_str_lst:
    add_customer_financials_silver(spark, date, 'datamart/bronze/customer_financials', 'datamart/silver/customer_financials')

Loaded data from datamart/bronze/customer_financials/bronze_customer_financials_2023_01_01.csv. Row count: 530
Successfully processed and saved data to datamart/silver/customer_financials/silver_customer_financials_2023_01_01.parquet.
Loaded data from datamart/bronze/customer_financials/bronze_customer_financials_2023_02_01.csv. Row count: 501
Successfully processed and saved data to datamart/silver/customer_financials/silver_customer_financials_2023_02_01.parquet.
Loaded data from datamart/bronze/customer_financials/bronze_customer_financials_2023_03_01.csv. Row count: 506
Successfully processed and saved data to datamart/silver/customer_financials/silver_customer_financials_2023_03_01.parquet.
Loaded data from datamart/bronze/customer_financials/bronze_customer_financials_2023_04_01.csv. Row count: 510
Successfully processed and saved data to datamart/silver/customer_financials/silver_customer_financials_2023_04_01.parquet.
Loaded data from datamart/bronze/customer_financials/bronze_

In [22]:
from utils.data_loading import load_data
df_financials = load_data(spark, 'datamart/silver/customer_financials', None)

print("Customer Financials Information:")
df_financials.show(5)
df_financials.printSchema()
df_financials.describe().show()
print(f"Total entries: {df_financials.count()}")

Loading parquet files with wildcard: datamart/silver/customer_financials/*.parquet
Loaded data from all files from datamart/silver/customer_financials. Row count: 11974
Customer Financials Information:
+-----------+-------------+---------------------+-----------------+---------------+-------------+-----------+--------------------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+------------------+---------------------+-------------------+-----------------------+--------------------+---------------+-------------+
|Customer_ID|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|        Type_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Outstanding_Debt|Credit_Utilization_Ratio|Credit_History_Age|Payment_of_Min_Amount|Total_EMI_per_month|Amount_invested_monthly|   Payment_Behaviour|Monthly_Balance|sna

<b>Loan Data</b>
- formatted according to lab 2 implementation

In [23]:
from utils.silver_processing import add_loan_data_silver

for date in dates_str_lst:
    add_loan_data_silver(spark, date, 'datamart/bronze/loan_data', 'datamart/silver/loan_data')

Loaded data from datamart/bronze/loan_data/bronze_loan_data_2023_01_01.csv. Row count: 530
Successfully processed and saved data to datamart/silver/loan_data/silver_loan_data_2023_01_01.parquet.
Loaded data from datamart/bronze/loan_data/bronze_loan_data_2023_02_01.csv. Row count: 1031
Successfully processed and saved data to datamart/silver/loan_data/silver_loan_data_2023_02_01.parquet.
Loaded data from datamart/bronze/loan_data/bronze_loan_data_2023_03_01.csv. Row count: 1537
Successfully processed and saved data to datamart/silver/loan_data/silver_loan_data_2023_03_01.parquet.
Loaded data from datamart/bronze/loan_data/bronze_loan_data_2023_04_01.csv. Row count: 2047
Successfully processed and saved data to datamart/silver/loan_data/silver_loan_data_2023_04_01.parquet.
Loaded data from datamart/bronze/loan_data/bronze_loan_data_2023_05_01.csv. Row count: 2568
Successfully processed and saved data to datamart/silver/loan_data/silver_loan_data_2023_05_01.parquet.
Loaded data from data

Bad pipe message: %s [b'"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand']
Bad pipe message: %s [b'v="99"\r\nsec-ch-ua-mobile: ?0\r\nsec', b'h-ua-platform: "macOS"\r\nUpgrade-Insecure-Req', b'sts: 1\r\nUser-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/13', b'0.0.0 Safari/537.36\r\nAccept: text/html,application/xh']
Bad pipe message: %s [b'l+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7\r\nSec-', b'tch-Site: none\r\nSec-Fetch-Mode: navigate\r\nSec-Fetch-User: ?1\r\nSec-Fet', b'-Dest: document\r\nAccept-Encoding: gzip, deflate, br, zstd\r\nAccept-Language: en-US,en;q=0.9\r\nCookie']
Bad pipe message: %s [b'_xsrf=2|5dfff3d6|3a54527248676535f1fb28d130f17516|1745459']
Bad pipe message: %s [b'6; username-127-0-0-1-8888=2|1:0|10:1747816017|23:us']
Bad pipe message: %s [b'name-127-0-0-1-8888|196:eyJ1c2VybmFtZSI6ICIzMmU5YjMyOGIyZmM0MDg5OTM5ZWIwYzkyNjQxNGM2ZiIsICJuYW1lIj

## Gold Table Processing

<b>Create Label Store</b>

In [5]:
from utils.gold_processing import add_loan_data_gold_ls

for date in dates_str_lst:
    add_loan_data_gold_ls(spark, date, 'datamart/silver/loan_data', 'datamart/gold/label_store')

Loaded data from datamart/silver/loan_data/silver_loan_data_2023_01_01.parquet. Row count: 530
Successfully processed and saved data to datamart/gold/label_store/gold_label_store_2023_01_01.parquet.
Loaded data from datamart/silver/loan_data/silver_loan_data_2023_02_01.parquet. Row count: 1031
Successfully processed and saved data to datamart/gold/label_store/gold_label_store_2023_02_01.parquet.
Loaded data from datamart/silver/loan_data/silver_loan_data_2023_03_01.parquet. Row count: 1537
Successfully processed and saved data to datamart/gold/label_store/gold_label_store_2023_03_01.parquet.
Loaded data from datamart/silver/loan_data/silver_loan_data_2023_04_01.parquet. Row count: 2047
Successfully processed and saved data to datamart/gold/label_store/gold_label_store_2023_04_01.parquet.
Loaded data from datamart/silver/loan_data/silver_loan_data_2023_05_01.parquet. Row count: 2568
Successfully processed and saved data to datamart/gold/label_store/gold_label_store_2023_05_01.parquet.
L

<b>Create Feature Store</b></br>
Before Joining:
- clickstream data: perform PCA --> makes more sense according to the specific model, therefore no specific further processing
- customer attributes: one-hot-encoding on occupation
- customer financials: one-hot-encoding on credit mix, payment behaviour, specific encoding of loan type to capture no of loans
</br>
Join: ?????

In [6]:
from utils.data_loading import load_data
from utils.gold_processing import one_hot_encode

# load customer attributes data from silver
df_attributes = load_data(spark, 'datamart/silver/customer_attributes', None)

# One-hot encode the 'Occupation' column
df_attributes_ohe = one_hot_encode(df_attributes, "Occupation", "Other")
df_attributes_ohe.show(5)
df_attributes_ohe.printSchema()

Loading parquet files with wildcard: datamart/silver/customer_attributes/*.parquet
Loaded data from all files from datamart/silver/customer_attributes. Row count: 11974
+-----------+---+------------+-------------+----------------+-----------------+--------------------+---------------------+-------------------+------------------+-------------------+------------------------+--------------------+--------------------+-----------------------+---------------------+-----------------+-------------------+------------------+-----------------+
|Customer_ID|Age|  Occupation|snapshot_date|Occupation_index|Occupation_Lawyer|Occupation_Architect|Occupation_Accountant|Occupation_Engineer|Occupation_Teacher|Occupation_Mechanic|Occupation_Media_Manager|Occupation_Developer|Occupation_Scientist|Occupation_Entrepreneur|Occupation_Journalist|Occupation_Doctor|Occupation_Musician|Occupation_Manager|Occupation_Writer|
+-----------+---+------------+-------------+----------------+-----------------+------------

Customer_Financials Loan Types:
- Personal Loan
- Student Loan
- Payday Loan
- Auto Loan
- Home Equity Loan
- Mortgage Loan
- Credit-Builder Loan
- Debt Consolidation Loan
- Not Specified

Type_of_Loan column combines them all together, use maybe regex to extract everything fist

In [7]:
# Load customer financials data from silver
df_financials = load_data(spark, 'datamart/silver/customer_financials', None)

df_financials.select("Num_of_Loan", "Type_of_Loan").show(10, truncate=False)

Loading parquet files with wildcard: datamart/silver/customer_financials/*.parquet
Loaded data from all files from datamart/silver/customer_financials. Row count: 11974
+-----------+-------------------------------------------------------------------------------------------------------------------------------------------+
|Num_of_Loan|Type_of_Loan                                                                                                                               |
+-----------+-------------------------------------------------------------------------------------------------------------------------------------------+
|4          |Credit-Builder Loan, Credit-Builder Loan, Home Equity Loan, and Debt Consolidation Loan                                                    |
|1          |Payday Loan                                                                                                                                |
|9          |Student Loan, Payday Loan, Not Specified, Mortga

In [None]:
# Transform Type_of_Loan to one-hot encoding but instead of binary, use the amount of loans

from pyspark.sql.functions import col, when, lit, count, regexp_count
import re

def encode_loan_types_with_counts(df: DataFrame, loan_column_name: str) -> DataFrame:
    """
    Encodes a column containing concatenated loan types into separate columns for each loan type,
    with each new column storing the count of that specific loan type.

    Args:
        df (DataFrame): The input PySpark DataFrame.
        loan_column_name (str): The name of the column containing the concatenated loan type strings.

    Returns:
        DataFrame: The DataFrame with new columns for each loan type count.
    """

    loan_types = [
        "Personal Loan", "Student Loan", "Payday Loan", "Auto Loan", "Home Equity Loan", 
        "Mortgage Loan", "Credit-Builder Loan", "Debt Consolidation Loan", "Not Specified"
    ]

    df_processed = df

    for loan_type in loan_types:
        # Sanitize the loan_type to make it suitable for a column name
        sanitized_loan_type = re.sub(r'[^\w]+', '_', loan_type)
        sanitized_loan_type = sanitized_loan_type.strip('_')
        if not sanitized_loan_type: # Fallback for purely special character names
            sanitized_loan_type = f"loan_type_{loan_types.index(loan_type)}"
            
        new_col_name = f"LoanType_{sanitized_loan_type}_Count"

        # Count occurrences of the loan_type string.
        df_processed = df_processed.withColumn(
            new_col_name,
            when(col(loan_column_name).isNotNull(), 
                 regexp_count(col(loan_column_name), lit(loan_type))
            ).otherwise(0).cast(IntegerType())
        )
        
    return df_processed

# Apply the function to your df_financials DataFrame
df_financials_encoded_loans = encode_loan_types_with_counts(df_financials, "Type_of_Loan")

df_financials_encoded_loans.show(10, truncate=False)
df_financials_encoded_loans.printSchema()

+-----------+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------------------------------------------------------------------------------------------------------------------------------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+------------------+---------------------+-------------------+-----------------------+--------------------------------+---------------+-------------+----------------------------+---------------------------+--------------------------+------------------------+-------------------------------+----------------------------+----------------------------------+--------------------------------------+----------------------------+
|Customer_ID|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|Type_of_Loan                                                                      

Join

In [5]:
from utils.data_loading import load_data
from utils.gold_processing import one_hot_encode
from utils.gold_processing import encode_loan_types_with_counts

# load clickstream data
df_clickstream = load_data(spark=spark, input_directory='datamart/silver/clickstream_data/', partition=None)
# no further processing needed for clickstream data

# load customer attributes data
df_customer_attributes = load_data(spark=spark, input_directory='datamart/silver/customer_attributes/', partition=None)
# perform one-hot encoding for categorical variables
df_customer_attributes = one_hot_encode(df=df_customer_attributes, column="Occupation", drop_label="Other")

# load customer financials data
df_customer_financials = load_data(spark=spark, input_directory='datamart/silver/customer_financials/', partition=None)
# perform one-hot encoding for categorical variables
df_customer_financials = one_hot_encode(df=df_customer_financials, column="Credit_Mix", drop_label="Unknown")
df_customer_financials = one_hot_encode(df=df_customer_financials, column="Payment_Behaviour", drop_label="Unknown")
# transform Loan Type
df_customer_financials = encode_loan_types_with_counts(df=df_customer_financials, loan_column_name="Type_of_Loan")


Loading parquet files with wildcard: datamart/silver/clickstream_data/*.parquet
Loaded data from all files from datamart/silver/clickstream_data/. Row count: 215376
Loading parquet files with wildcard: datamart/silver/customer_attributes/*.parquet
Loaded data from all files from datamart/silver/customer_attributes/. Row count: 11974
Loading parquet files with wildcard: datamart/silver/customer_financials/*.parquet
Loaded data from all files from datamart/silver/customer_financials/. Row count: 11974


In [10]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from utils.data_loading import load_data # Ensure load_data is imported if you need to load df_clickstream_silver

def join_to_feature_store(df_clickstream: DataFrame, df_attributes: DataFrame, df_financials: DataFrame):
    """
    Joins clickstream data with customer attributes and customer financials data to create the gold level feature store.

    Args:
        df_clickstream (DataFrame): Clickstream data.
        df_attributes (DataFrame): Attributes data.
        df_financials (DataFrame): Financials data.

    Returns:
        DataFrame: Joined feature store DataFrame.
    """
    
    # Step 1: Prepare Attributes Table by renaming columns
    df_attributes_renamed = df_attributes \
        .withColumnRenamed("snapshot_date", "attr_effective_date") \
        .withColumnRenamed("Customer_ID", "attr_Customer_ID")

    # Step 2: Join Clickstream with Attributes (As-Of Join)
    # Define window. "attr.attr_effective_date" refers to df_attributes_renamed aliased as "attr" in the join.
    window_attr = Window.partitionBy(F.col("cs.Customer_ID"), F.col("cs.snapshot_date")).orderBy(F.col("attr.attr_effective_date").desc())

    df_cs_attr = df_clickstream.alias("cs") \
        .join(
            df_attributes_renamed.alias("attr"), # Alias df_attributes_renamed as "attr" for this join
            (F.col("cs.Customer_ID") == F.col("attr.attr_Customer_ID")) & # Use "attr." prefix
            (F.col("cs.snapshot_date") >= F.col("attr.attr_effective_date")), # Use "attr." prefix
            "left_outer"
        ) \
        .withColumn("attr_rank", F.row_number().over(window_attr)) \
        .filter(F.col("attr_rank") == 1) \
        .drop("attr_rank", "attr_Customer_ID", "attr_effective_date")

    print("Schema after joining clickstream with attributes:")
    df_cs_attr.printSchema()
    df_cs_attr.show(5, truncate=False)

    # Step 3: Prepare Aliases for Financials Table
    df_financials_renamed = df_financials \
        .withColumnRenamed("snapshot_date", "fin_effective_date") \
        .withColumnRenamed("Customer_ID", "fin_Customer_ID")

    # Step 4: Join the result with Financials (As-Of Join)
    window_fin = Window.partitionBy(F.col("cs_attr.Customer_ID"), F.col("cs_attr.snapshot_date")).orderBy(F.col("fin.fin_effective_date").desc())

    df_feature_store = df_cs_attr.alias("cs_attr") \
        .join(
            df_financials_renamed.alias("fin"), # Alias df_financials_renamed as "fin" for this join
            (F.col("cs_attr.Customer_ID") == F.col("fin.fin_Customer_ID")) &
            (F.col("cs_attr.snapshot_date") >= F.col("fin.fin_effective_date")),
            "left_outer"
        ) \
        .withColumn("fin_rank", F.row_number().over(window_fin)) \
        .filter(F.col("fin_rank") == 1) \
        .drop("fin_rank", "fin_Customer_ID", "fin_effective_date")

    print("Schema of the final feature store:")
    df_feature_store.printSchema()
    print(f"Total records in feature store: {df_feature_store.count()}")
    df_feature_store.show(5, truncate=False)
    return df_feature_store

df_feature_store = join_to_feature_store(df_clickstream, df_customer_attributes, df_customer_financials)
df_feature_store.printSchema()
df_feature_store.show(5, truncate=False)
df_feature_store.describe().show()

Schema after joining clickstream with attributes:
root
 |-- fe_1: integer (nullable = true)
 |-- fe_2: integer (nullable = true)
 |-- fe_3: integer (nullable = true)
 |-- fe_4: integer (nullable = true)
 |-- fe_5: integer (nullable = true)
 |-- fe_6: integer (nullable = true)
 |-- fe_7: integer (nullable = true)
 |-- fe_8: integer (nullable = true)
 |-- fe_9: integer (nullable = true)
 |-- fe_10: integer (nullable = true)
 |-- fe_11: integer (nullable = true)
 |-- fe_12: integer (nullable = true)
 |-- fe_13: integer (nullable = true)
 |-- fe_14: integer (nullable = true)
 |-- fe_15: integer (nullable = true)
 |-- fe_16: integer (nullable = true)
 |-- fe_17: integer (nullable = true)
 |-- fe_18: integer (nullable = true)
 |-- fe_19: integer (nullable = true)
 |-- fe_20: integer (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- snapshot_date: date (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Occupation_index: double 



+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------+------------------+----------+-----------------+-------------------+--------------------+---------------------+-------------------+--------------------+-------------------+------------------------+--------------------+--------------------+-----------------------+---------------------+-------------------+-------------------+-------------------+-------------------+------------------+---------------------+------------------+-----------------+------------------+------------------+--------------------+-------------------+----------------------+--------------------+--------------------+----------+-

                                                                                

## Combined EDA on features & label

In [8]:
from utils.data_loading import load_data

# load label store
df_label = load_data(spark, 'datamart/gold/label_store', None)
print("Label Store Information:")
df_label.printSchema()
df_label.show(5)
print(f"Total entries: {df_label.count()}")
print(f"Unique Customer_IDs: {df_label.select("Customer_ID").distinct().count()}")
print(f"Unique Snapshot_Dates: {df_label.select("Snapshot_Date").distinct().count()}")

Loading parquet files with wildcard: datamart/gold/label_store/*.parquet
Loaded data from all files from datamart/gold/label_store. Row count: 8974
Label Store Information:
root
 |-- loan_id: string (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- label_def: string (nullable = true)
 |-- snapshot_date: date (nullable = true)

+--------------------+-----------+-----+----------+-------------+
|             loan_id|Customer_ID|label| label_def|snapshot_date|
+--------------------+-----------+-----+----------+-------------+
|CUS_0x1037_2023_0...| CUS_0x1037|    0|30dpd_6mob|   2023-07-01|
|CUS_0x1069_2023_0...| CUS_0x1069|    0|30dpd_6mob|   2023-07-01|
|CUS_0x114a_2023_0...| CUS_0x114a|    0|30dpd_6mob|   2023-07-01|
|CUS_0x1184_2023_0...| CUS_0x1184|    0|30dpd_6mob|   2023-07-01|
|CUS_0x1297_2023_0...| CUS_0x1297|    1|30dpd_6mob|   2023-07-01|
+--------------------+-----------+-----+----------+-------------+
only showing top 5 rows

In [9]:
print(f"Portion of positive labels: {df_label.filter(col("Label") == 1).count() / df_label.count()}")

Portion of positive labels: 0.28872297749052817
