# Medallion Architecture Data Cleaning Pipeline 

Delta Live Tables offer a fault-tolerant, optimized approach for building reliable data pipelines, making them ideal for this use case.

In the real world, roles & responsibilities of E2E data projects are as shown: 
- **Data Engineers**: Focus on building pipelines that handle common data issues such as duplicates, formatting of columns, schema definition, and invalid values.

- **Data Scientists**: Work on EDA, imputing missing values, handling outliers, and preparing data for modeling (feature engineering / selection / dimensionality reduction etc).

In this notebook, I will be implementing a simplified **Medallion Architecture** using **Delta Live Tables (DLT) 
 in Azure Databricks** to simulate real-world data engineering practices. 

I will be using the following visualisation as a guide to build the data pipeline. 


<br>

<img src="https://media.datacamp.com/cms/ad_4nxe4oejrhu9gexxri3ea6vmsu1fgxcxbvlwmbaj4ji5s2u31dg3hbyyg4sxmd7ma8-9zamnbxadzz_h4kllvjylicug3v4-iinvx65erdijn4htymmqvc3mjqblskqzdu5ttmodyua.png">



By the end of this notebook, I should be able to: 
- Output a **thoroughly cleansed target dataset** ready for data scientists' to conduct EDA, dataset preprocessing and other model building practices. 

- Define **feature and target variables** from the target table clearly 

## Import Libraries

In [1]:
# Import function to start Spark
from init_spark import start_spark
spark = start_spark()


from pyspark.sql.functions import (
    col, when, count, desc, isnan, isnull, lit, length, trim, lower, upper, to_date, concat_ws,  regexp_extract
)

from pyspark.sql.types import (
    StructType, StructField, StringType, DoubleType, IntegerType, DateType, NumericType
)



25/07/03 00:06:38 WARN Utils: Your hostname, Chengs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.77 instead (on interface en0)
25/07/03 00:06:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/lunlun/.ivy2/cache
The jars for the packages stored in: /Users/lunlun/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d54d7909-2f03-469a-bef1-8c7ee55e7a8e;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.1.0 in central
	found io.delta#delta-storage;3.1.0 in central


:: loading settings :: url = jar:file:/Users/lunlun/Downloads/Github/Credit-Risk-Modeling-PySpark/venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 72ms :: artifacts dl 3ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.1.0 from central in [default]
	io.delta#delta-storage;3.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-d54d7909-2f03-469a-bef1-8c7ee55e7a8e
	confs: [default]
	0 artifacts copied, 3 already retrieved (0kB/4ms)
25/07/03 00:06:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-ja

## Bronze Delta Table

This serves as a 'landing place' for raw data for single-source of truth purposes. In case data processing in subsequent stages go faulty, data specialists can use the **Bronze Delta Table** for reference, ensuring data integrity. 



In [2]:
# import dlt (specific to databricks)

# This will only be allowed if I can create a DLT pipeline (not allowed due to Azure for Students)

# @dlt.table(name="bronze_raw_lendingclub_data", comment="Ingest raw loan data from Lending Club csv")
# def bronze_raw_loans():
#     return spark.read.csv("/FileStore/tables/accepted_2007_to_2018Q4.csv", 
#                           header=True, 
#                           inferSchema=True)
    
# I will need to ensure inferSchema = True, so that all columns dtypes are auto-detected to lessen my workload later 

# ✅ The below allows DLT pipeline not to be created 
bronze_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv("../data/accepted_2007_to_2018Q4.csv")
)


# ✅ 2. Save as a Delta table in the `bronze` schema

bronze_df.write.format("delta").mode("overwrite").save("../data/bronze/lendingclub_raw")

25/07/03 00:06:47 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

## Silver Delta Table

Next, the pipeline to produce a Silver Delta Table will mainly perform key data cleaning steps.
  - Deal with Duplicates
  - Remove String Column Spaces
  - Handle String Formatting / Spelling Issues 
  - Ensure UTF-8 for String Columns 
  - Schema Definition 
  - Invalid Value Handling 

### String Columns Cleaning

These cleaning steps will take reference from **sandbox/string_issues** for specific cleaning steps to maintain data integrity.

In [3]:
def drop_duplicates(df):
    duplicate_rows = df.count() - df.dropDuplicates().count()

    return df.dropDuplicates()

def handle_string_cols_spaces(df): 
    string_cols = [
        field.name for field in df.schema.fields
        if isinstance(field.dataType, StringType)]
    
    # Replaces each existing column with new <string> values which are trimmed 
    for col_name in string_cols:
        df = df.withColumn(col_name, trim(col(col_name)))
    
    return df 

def handle_string_cols_formatting(df):  
    """
    Uses library of RapidFuzz to provide lightweight similarity calculations, optimised for performance

    Takes reference from String issues are in ../sandbox/string_issues.ipynb
    """

    print(f"Original Number of Rows: {df.count()}. ")
    
    # # 1. Drops unusable String columns 
    # unusable_cols = ["emp_title",
    # "hardship_type",
    # "verification_status_joint",
    # "hardship_status",
    # "deferral_term",
    # "hardship_length",
    # "hardship_loan_status",
    # "settlement_status", "annual_inc_joint", 'dti_joint']
    # df = df.drop(*unusable_cols) # allows dropping of multiple columns

    # 2. Fix addr_state (check len() > 2)
    df = df.filter(length( col('addr_state') ) == 2)

    # 3. Fix invalid string column values 

    # invalid_entries_list = [ "application_type",
    #     "policy_code",
    #     "home_ownership",
    #     "verification_status",
    #     "loan_status",
    #     "pymnt_plan",
    #     "initial_list_status",
    #     "hardship_flag"]
    
    df = df.filter(
        col("application_type").isin("Individual", "Joint App") &
        col("policy_code").isin("1.0", "0.0") &  # stored as string initially
        col("home_ownership").isin("MORTGAGE", "RENT", "OWN", "ANY", "OTHER") &
        col("verification_status").isin("Source Verified", "Not Verified", "Verified") &
        col("loan_status").isin(
            "Fully Paid", "Current", "Charged Off", "In Grace Period", "Default"
        ) &
        col("pymnt_plan").isin("n", "y") &
        col("initial_list_status").isin("f", "w") &
        col("hardship_flag").isin("N", "Y")
    )
    df = df.withColumnRenamed('loan_status', 'default_status') 
    
    df = df.withColumn(
    'default_status',
    when(col('default_status') == 'Fully Paid', 0)
    .when(col('default_status') == 'Current', 0)
    .when(col('default_status') == 'In Grace Period', 0)
    .when(col('default_status') == 'Charged Off', 1)
    .when(col('default_status') == 'Default', 1)
)
    df = df.withColumn('home_ownership', 
                       when(col('home_ownership') == 'ANY', 'OTHER' )
                       .otherwise(col('home_ownership')))
    
    df = df.withColumn("policy_code", when(col("policy_code") == "1.0", 1).when(col("policy_code") == "0.0", 0))


    # 4. Drop meaningless string columns 
    meaningless_columns = [
    "url", "desc", "title", "zip_code", "purpose"]
    df = df.drop(*meaningless_columns)

    return df

def cast_string_to_numeric_cols(df): 
    numeric_columns = [
    "id", "member_id", "annual_inc", "annual_inc_joint",
    "dti", "dti_joint", "delinq_2yrs", "fico_range_low", "fico_range_high",
    "inq_last_6mths", "mths_since_last_delinq", "mths_since_last_record",
    "open_acc", "pub_rec", "revol_bal", "revol_util", "total_acc", 
    "out_prncp", "out_prncp_inv", "total_pymnt", "total_pymnt_inv", 
    "total_rec_prncp", "total_rec_int", "total_rec_late_fee", "recoveries", 
    "collection_recovery_fee", "last_fico_range_high", "last_fico_range_low", 
    "collections_12_mths_ex_med", "mths_since_last_major_derog",
    "acc_now_delinq", "tot_coll_amt", "loan_amnt", "funded_amnt", 
    "funded_amnt_inv", "installment", "tot_cur_bal", "total_rev_hi_lim", 
    "inq_fi", "hardship_amount", "hardship_dpd", "orig_projected_additional_accrued_interest",
    "hardship_payoff_balance_amount", "hardship_last_payment_amount", 
    "settlement_amount", "settlement_percentage", "settlement_term", 
    "avg_cur_bal", "total_bal_il", "bc_util", "il_util", "total_cu_tl",
    "max_bal_bc", "percent_bc_gt_75", "total_bal_ex_mort", "all_util", 
    "open_acc_6m", "open_act_il", "open_il_12m", "last_pymnt_amnt", "open_il_24m", "mths_since_rcnt_il", 
    "open_rv_12m", "open_rv_24m", 'emp_length', 'term']

    # Deal with Type Casting to Numeric Data 
    int_cols = ['id', 'member_id']

    for column in numeric_columns: 
        if column in int_cols: 
            df = df.withColumn(column, col(column).cast(IntegerType()))
        elif column == 'emp_length':
            # Convert emp_length to integer values
            df = df.withColumn(
                "emp_length",
                when(col("emp_length").rlike("10\\+"), 10)
                .when(col("emp_length").rlike("< 1"), 0)
                .otherwise(
                    regexp_extract(col("emp_length"), r"(\d+)", 1).cast("int") # extracts 1st regex group digit from string 
                )
            )
        elif column == 'term': 
            df = df.withColumn("term",regexp_extract(col("term"), r"(\d+)", 1).cast("int"))

        else: 
            df = df.withColumn(column, col(column).cast(DoubleType()))

    return df 

def cast_string_to_date_cols(df):
    date_columns = [
    "issue_d", "earliest_cr_line", "last_pymnt_d", "last_credit_pull_d", "next_pymnt_d",
    "sec_app_earliest_cr_line", "hardship_start_date", "hardship_end_date", 
    "payment_plan_start_date", "debt_settlement_flag_date", "settlement_date"]

    # Clean and cast each column
    for date_col in date_columns:
        # Format is 'MMM-yyyy' → Add dummy day '01' → Convert to 'yyyy-MM-dd'
        df = df.withColumn(
            date_col,
            to_date(concat_ws("-", col(date_col), lit("01")), "MMM-yyyy-dd")
        )

    return df 


In [4]:
# Step 0: Read from Bronze Table 
bronze_df_copy = spark.read.format("delta").load("../data/bronze/lendingclub_raw")


# Step 1: Drop duplicate rows
print(f"Original number of rows: {bronze_df_copy.count()}\n")

df_cleaned = drop_duplicates(bronze_df_copy)
print('✅ Duplicates removed...')


# Step 2: Trim spaces in all string columns
df_cleaned = handle_string_cols_spaces(df_cleaned)
print('✅ Trailing / Leading Spaces removed...')

# Step 3: Filter & fix invalid string formatting
df_cleaned = handle_string_cols_formatting(df_cleaned)
print('✅ Invalid String Column Formatting Settled & Meaningless Columns Dropped ...')

# Step 4: Type Casting
df_cleaned = cast_string_to_numeric_cols(df_cleaned)
df_cleaned = cast_string_to_date_cols(df_cleaned)
print('✅ String Columns Correctly Type Casted...\n')


print(f"New number of rows: {df_cleaned.count()}")

# Step 5: Save as Silver Delta Table 1 (Cleaned Strings Version)
df_cleaned.write.format("delta").mode("overwrite").option("mergeSchema", "true") \
.save("../data/silver/lendingclub_cleaned_string")


Original number of rows: 2260701



                                                                                

✅ Duplicates removed...
✅ Trailing / Leading Spaces removed...


                                                                                

Original Number of Rows: 2260701. 
✅ Invalid String Column Formatting Settled & Meaningless Columns Dropped ...
✅ String Columns Correctly Type Casted...



                                                                                

New number of rows: 2231827


                                                                                

### Numeric Columns Cleaning
These cleaning steps will take reference from **sandbox/numeric_issues** for specific cleaning steps to maintain data integrity

In [5]:
def clear_invalid_numerical_entries(df):
    df = df.filter( ~(col('dti') < 0  ))
    df = df.filter( ~ (col('total_rec_late_fee') < 0  ))
    df = df.filter( col('loan_amnt') > 0 ) 
    df = df.filter( col('funded_amnt') > 0 )
    df = df.filter( (col('int_rate') >= 0) & (col('int_rate') <= 100) ) # Assuming interest rate is between 0% and 100%
    df = df.filter( col('installment') >= 0 ) 
    df = df.filter( col('annual_inc') >= 0 )  
    df = df.filter( (col('revol_util') >= 0) & (col('revol_util') <= 100) )  # Else, user maxed out credit accounts' limits
    df = df.filter( col('total_rec_late_fee') >= 0 )  
    df = df.filter( col('recoveries') >= 0 )  
    df = df.filter( col('collection_recovery_fee') >= 0 )
    df = df.filter( col('total_rec_prncp') >= 0 )
    df = df.filter( col('total_rec_int') >= 0 )
    df = df.filter( col('total_pymnt') >= 0 )
    df = df.filter( col('out_prncp') >= 0 )
    
    # Fico Range Invalid Entries Removal 
    df = df.filter((col('fico_range_low') >= 300) & (col('fico_range_low') <= 850))
    df = df.filter((col('fico_range_high') >= 300) & (col('fico_range_high') <= 850))
    df = df.filter(col('fico_range_low') <= col('fico_range_high'))


    return df

    

In [6]:
silver_table1 = spark.read.format("delta")\
    .load("../data/silver/lendingclub_cleaned_string")

    
print(f"Original number of rows: {silver_table1.count()}\n")

silver_table2 = clear_invalid_numerical_entries(silver_table1)
print('✅ Invalid Numerical Values Settled...')

print(f"Final number of rows: {silver_table2.count()}\n")

Original number of rows: 2231827

✅ Invalid Numerical Values Settled...
Final number of rows: 2221208



In [7]:
silver_table2.limit(10).toPandas()

Unnamed: 0,id,member_id,loan_amnt,funded_amnt,funded_amnt_inv,term,int_rate,installment,grade,sub_grade,...,hardship_last_payment_amount,disbursement_method,debt_settlement_flag,debt_settlement_flag_date,settlement_status,settlement_date,settlement_amount,settlement_percentage,settlement_term,default_status
0,1236347,,9000.0,9000.0,8900.0,36,8.9,285.78,A,A5,...,,Cash,N,,,,,,,0
1,1468694,,2000.0,2000.0,2000.0,36,8.9,63.51,A,A5,...,,Cash,N,,,,,,,0
2,1375351,,4000.0,4000.0,4000.0,36,7.62,124.65,A,A3,...,,Cash,N,,,,,,,0
3,1441945,,5000.0,5000.0,5000.0,36,7.9,156.46,A,A4,...,,Cash,N,,,,,,,0
4,1104374,,20000.0,20000.0,20000.0,36,14.65,689.89,C,C3,...,,Cash,N,,,,,,,1
5,434246,,6400.0,6400.0,5538.37,36,14.22,219.42,C,C5,...,,Cash,N,,,,,,,1
6,1125229,,7000.0,7000.0,7000.0,36,7.9,219.04,A,A4,...,,Cash,N,,,,,,,0
7,1249175,,10000.0,10000.0,10000.0,36,6.62,307.04,A,A2,...,,Cash,N,,,,,,,0
8,1167749,,21000.0,21000.0,21000.0,36,7.62,654.39,A,A3,...,,Cash,N,,,,,,,0
9,471462,,24000.0,24000.0,23950.0,36,11.83,795.22,B,B3,...,,Cash,N,,,,,,,0


In [8]:
# Step 5: Save as Silver Delta Table 2 (Cleaned Strings Version)
silver_table2.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("../data/silver/lendingclub_cleaned_numeric")

silver_table2.limit(10).toPandas()

                                                                                

Unnamed: 0,id,member_id,loan_amnt,funded_amnt,funded_amnt_inv,term,int_rate,installment,grade,sub_grade,...,hardship_last_payment_amount,disbursement_method,debt_settlement_flag,debt_settlement_flag_date,settlement_status,settlement_date,settlement_amount,settlement_percentage,settlement_term,default_status
0,1236347,,9000.0,9000.0,8900.0,36,8.9,285.78,A,A5,...,,Cash,N,,,,,,,0
1,1468694,,2000.0,2000.0,2000.0,36,8.9,63.51,A,A5,...,,Cash,N,,,,,,,0
2,1375351,,4000.0,4000.0,4000.0,36,7.62,124.65,A,A3,...,,Cash,N,,,,,,,0
3,1441945,,5000.0,5000.0,5000.0,36,7.9,156.46,A,A4,...,,Cash,N,,,,,,,0
4,1104374,,20000.0,20000.0,20000.0,36,14.65,689.89,C,C3,...,,Cash,N,,,,,,,1
5,434246,,6400.0,6400.0,5538.37,36,14.22,219.42,C,C5,...,,Cash,N,,,,,,,1
6,1125229,,7000.0,7000.0,7000.0,36,7.9,219.04,A,A4,...,,Cash,N,,,,,,,0
7,1249175,,10000.0,10000.0,10000.0,36,6.62,307.04,A,A2,...,,Cash,N,,,,,,,0
8,1167749,,21000.0,21000.0,21000.0,36,7.62,654.39,A,A3,...,,Cash,N,,,,,,,0
9,471462,,24000.0,24000.0,23950.0,36,11.83,795.22,B,B3,...,,Cash,N,,,,,,,0


## Gold Delta Table 
Finally, to produce a Gold Delta Table, I will need to sort the dataset in chronological order. 

For credit risk modeling, banks use past data loan data to predict future defaults / metrics. As such, we want our dataset to be sorted in **chronological order**, so that built models are trained on older data, and tested on newer data **(out-of-time split)**. 

There should not be random splitting of data **(out-of-sample split)**, e.g. `train-test-split` from `sklearn` since credit-risk modeling is a **time-series problem**.

Hence, I will be sorting the dataset right from the start. 

By producing the Gold Delta Table, the subsequent jobs would require data scientists to impute missing values, conduct feature engineering and dimensionality reduction for accurate credit risk modeling.


In [9]:
silver_table2 = spark.read.format("delta")\
    .load("../data/silver/lendingclub_cleaned_numeric")

gold_df = silver_table2.orderBy(["issue_d"], ascending=True)



In [10]:
# Step 5: Save as Silver Delta Table 2 (Cleaned Strings Version)

gold_df.write.format("delta").mode("overwrite").option('mergeSchema', 'true').save("../data/gold/medallion_cleaned_lc_data")


                                                                                

In [11]:
# Check if Gold Delta is accessible to data scientists
gold_table = spark.read.format("delta")\
    .load("../data/gold/medallion_cleaned_lc_data")
    
gold_table.limit(10).toPandas()

Unnamed: 0,id,member_id,loan_amnt,funded_amnt,funded_amnt_inv,term,int_rate,installment,grade,sub_grade,...,hardship_last_payment_amount,disbursement_method,debt_settlement_flag,debt_settlement_flag_date,settlement_status,settlement_date,settlement_amount,settlement_percentage,settlement_term,default_status
0,46998570,,3750.0,3750.0,3750.0,36,20.49,140.31,E,E4,...,,Cash,N,,,,,,,0
1,49037600,,9675.0,9675.0,9675.0,36,16.55,342.78,D,D2,...,,Cash,N,,,,,,,0
2,47665387,,6050.0,6050.0,6050.0,36,9.99,195.19,B,B3,...,,Cash,N,,,,,,,0
3,47897317,,15000.0,15000.0,15000.0,36,10.99,491.01,B,B4,...,,Cash,N,,,,,,,0
4,49894160,,4200.0,4200.0,4200.0,36,12.29,140.09,C,C1,...,,Cash,N,,,,,,,1
5,50043224,,20000.0,20000.0,20000.0,36,10.99,654.68,B,B4,...,,Cash,N,,,,,,,0
6,47807152,,24000.0,24000.0,24000.0,60,9.99,509.82,B,B3,...,,Cash,N,,,,,,,0
7,48765048,,7875.0,7875.0,7875.0,36,16.55,279.01,D,D2,...,,Cash,N,,,,,,,1
8,47665594,,13000.0,13000.0,13000.0,36,7.89,406.72,A,A5,...,,Cash,N,,,,,,,0
9,48674698,,11350.0,11350.0,11350.0,36,16.55,402.13,D,D2,...,,Cash,N,,,,,,,0


In [12]:
print('✅ All steps completed successfully!')

✅ All steps completed successfully!


25/07/03 01:39:48 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 921094 ms exceeds timeout 120000 ms
25/07/03 01:39:48 WARN SparkContext: Killing executors is not supported by current scheduler.
25/07/03 01:39:52 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

As seen, the `issue_d` column wasn't sorted in order even after I have conducted the sorting before saving it as a Gold Delta Table. After researching, I realised that the distributed computing environment in PySpark prevents this from happening. To counter this, data scientists will have to take note to sort by the data before any machine learning model building can happen. 