In [1]:
import os
import glob
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta


In [3]:
import pprint
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType

import utils.bronze_layer
import utils.silver_layer
import utils.gold_layer

Set up Spark Session

In [16]:
from pyspark.sql import SparkSession

spark=SparkSession.builder\
    .appName("Loan Default EDA")\
    .master("local[*]")\
    .getOrCreate()

spark.sparkContext.setLogLevel("Error")


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/21 11:44:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Loading dataset

In [18]:
# Load Clickstream
df_clickstream=spark.read.option("header","true").option("inferSchema","true").csv("data/feature_clickstream.csv")

# Load attributes
df_attributes = spark.read.option("header","true").option("inferSchema","true").csv("data/features_attributes.csv")

# Load financials
df_financials = spark.read.option("header","true").option("inferSchema","true").csv("data/features_financials.csv")

# Load loan data
df_loans= spark.read.option("header","true").option("inferSchema","true").csv("data/lms_loan_daily.csv")

print("data loaded")

data loaded


In [27]:
print("-- Clickstream data--")

print(df_clickstream.printSchema())
print(len(df_clickstream.columns))
print(df_clickstream.columns)

-- Clickstream data--
root
 |-- fe_1: integer (nullable = true)
 |-- fe_2: integer (nullable = true)
 |-- fe_3: integer (nullable = true)
 |-- fe_4: integer (nullable = true)
 |-- fe_5: integer (nullable = true)
 |-- fe_6: integer (nullable = true)
 |-- fe_7: integer (nullable = true)
 |-- fe_8: integer (nullable = true)
 |-- fe_9: integer (nullable = true)
 |-- fe_10: integer (nullable = true)
 |-- fe_11: integer (nullable = true)
 |-- fe_12: integer (nullable = true)
 |-- fe_13: integer (nullable = true)
 |-- fe_14: integer (nullable = true)
 |-- fe_15: integer (nullable = true)
 |-- fe_16: integer (nullable = true)
 |-- fe_17: integer (nullable = true)
 |-- fe_18: integer (nullable = true)
 |-- fe_19: integer (nullable = true)
 |-- fe_20: integer (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- snapshot_date: date (nullable = true)

None
22
['fe_1', 'fe_2', 'fe_3', 'fe_4', 'fe_5', 'fe_6', 'fe_7', 'fe_8', 'fe_9', 'fe_10', 'fe_11', 'fe_12', 'fe_13', 'fe_14', 'fe_15', 

In [28]:
print("-- Attributes data--")

print(df_attributes.printSchema())
print(len(df_attributes.columns))
print(df_attributes.columns)

-- Attributes data--
root
 |-- Customer_ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SSN: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- snapshot_date: date (nullable = true)

None
6
['Customer_ID', 'Name', 'Age', 'SSN', 'Occupation', 'snapshot_date']


In [29]:
print("-- Finance Data--")

print(df_financials.printSchema())
print(len(df_financials.columns))
print(df_financials.columns)

-- Finance Data--
root
 |-- Customer_ID: string (nullable = true)
 |-- Annual_Income: string (nullable = true)
 |-- Monthly_Inhand_Salary: double (nullable = true)
 |-- Num_Bank_Accounts: integer (nullable = true)
 |-- Num_Credit_Card: integer (nullable = true)
 |-- Interest_Rate: integer (nullable = true)
 |-- Num_of_Loan: string (nullable = true)
 |-- Type_of_Loan: string (nullable = true)
 |-- Delay_from_due_date: integer (nullable = true)
 |-- Num_of_Delayed_Payment: string (nullable = true)
 |-- Changed_Credit_Limit: string (nullable = true)
 |-- Num_Credit_Inquiries: double (nullable = true)
 |-- Credit_Mix: string (nullable = true)
 |-- Outstanding_Debt: string (nullable = true)
 |-- Credit_Utilization_Ratio: double (nullable = true)
 |-- Credit_History_Age: string (nullable = true)
 |-- Payment_of_Min_Amount: string (nullable = true)
 |-- Total_EMI_per_month: double (nullable = true)
 |-- Amount_invested_monthly: string (nullable = true)
 |-- Payment_Behaviour: string (nullable

In [30]:
print("-- Loan Data--")

print(df_loans.printSchema())
print(len(df_loans.columns))
print(df_loans.columns)

-- Loan Data--
root
 |-- loan_id: string (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- loan_start_date: date (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- installment_num: integer (nullable = true)
 |-- loan_amt: integer (nullable = true)
 |-- due_amt: double (nullable = true)
 |-- paid_amt: double (nullable = true)
 |-- overdue_amt: double (nullable = true)
 |-- balance: double (nullable = true)
 |-- snapshot_date: date (nullable = true)

None
11
['loan_id', 'Customer_ID', 'loan_start_date', 'tenure', 'installment_num', 'loan_amt', 'due_amt', 'paid_amt', 'overdue_amt', 'balance', 'snapshot_date']


Check for missing value

In [33]:
def check_data_quality(df,name):
    print(f"\n -- {name} DATA QUALITY --")

    missing_counts=[]
    for col_name in df.columns:
        missing_count = df.filter(col(col_name).isNull() | (col(col_name) == '')).count()
        missing_counts.append((col_name, missing_count))
    
    missing_df = spark.createDataFrame(missing_counts, ["column", "missing_count"])
    missing_df.show()

    # Sample data
    print(f"\nSample data:")
    df.show(5)

check_data_quality(df_clickstream, "CLICKSTREAM")
check_data_quality(df_attributes, "ATTRIBUTES")
check_data_quality(df_financials, "FINANCIALS")
check_data_quality(df_loans, "LOANS")

{"ts": "2025-09-21 12:22:15.450", "level": "ERROR", "logger": "DataFrameQueryContextLogger", "msg": "[CAST_INVALID_INPUT] The value '' of the type \"STRING\" cannot be cast to \"BIGINT\" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018", "context": {"file": "line 6 in cell [33]", "line": "", "fragment": "__eq__", "errorClass": "CAST_INVALID_INPUT"}, "exception": {"class": "Py4JJavaError", "msg": "An error occurred while calling o195.count.\n: org.apache.spark.SparkNumberFormatException: [CAST_INVALID_INPUT] The value '' of the type \"STRING\" cannot be cast to \"BIGINT\" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018\n== DataFrame ==\n\"__eq__\" was called from\nline 6 in cell [33]\n\n\tat org.apache.spark.sql.errors.QueryExecutionErrors$.i


 -- CLICKSTREAM DATA QUALITY --


NumberFormatException: [CAST_INVALID_INPUT] The value '' of the type "STRING" cannot be cast to "BIGINT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
== DataFrame ==
"__eq__" was called from
line 6 in cell [33]
