## Read Bronze table

In [0]:
train_df = spark.table("credit_catalog.bronze.train")
test_df = spark.table("credit_catalog.bronze.test")

# Silver Transformations

## Age
- **Data Type**: Convert to integer
- **Range**: 18-65 years (typical for financial datasets)
- **Data Quality Issues:**
  - Invalid values such as `-500`, `23_`, `8200` etc. are present (data errors)
  - Some missing values

### Filtering Consistent Age Values



In [0]:
from pyspark.sql.functions import regexp_replace, col, when

def clean_age_column(df, age_col="Age"):
    """
    Cleans and validates the Age column.
    
    """

    df = (
        df
        # Remove "_" placeholder
        .withColumn(
            age_col,
            regexp_replace(col(age_col), "_", "")
        )
        # Convert empty string to NULL
        .withColumn(
            age_col,
            when(col(age_col) == "", None).otherwise(col(age_col))
        )
        # Cast to integer
        .withColumn(
            age_col,
            col(age_col).cast("int")
        )
        # Filter valid ages
        .filter(
            col(age_col).isNotNull() &
            (col(age_col) >= 18) &
            (col(age_col) < 65)
        )
    )

    return df

train_df = clean_age_column(train_df, age_col="Age")
test_df = clean_age_column(test_df, age_col="Age")



In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, last, first

# Convert Month name → Month number first (important)
train_df = train_df.withColumn(
    "month_num",
    when(col("Month") == "January", 1)
    .when(col("Month") == "February", 2)
    .when(col("Month") == "March", 3)
    .when(col("Month") == "April", 4)
    .when(col("Month") == "May", 5)
    .when(col("Month") == "June", 6)
    .when(col("Month") == "July", 7)
    .when(col("Month") == "August", 8)
    
)

test_df = test_df.withColumn(
    "month_num",
    when(col("Month") == "September", 9)
    .when(col("Month") == "October", 10)
    .when(col("Month") == "November", 11)
    .when(col("Month") == "December", 12)
    
    
)



##  Occupation
- **Data Type**: String (Categorical)
- **Description**: Professional occupation or job title of the customer
- **Example Values**: `Scientist`, `Teacher`, `Engineer`, `Entrepreneur`, `Developer`
- **Purpose**: Occupational classification for risk profiling
- **Data Quality Issues**:
  - Some missing values represented as `_______`
  

### Replacing Inconsistent Occupation Values
- We are going to use the `Customer_ID` and `month` columns to replace the `Occupation` column. Since `Customer_ID` contains records for each month, we will leverage this information to replace inconsistent occupation values using `forward fill` and `backward fill` methods.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, last, first, lower

def clean_occupation_column(
    df,
    occupation_col="Occupation",
    partition_col="Customer_ID",
    order_col="Month_num"
    
):
    """
    Cleans and imputes the Occupation column using time-aware logic.
    """

    w = Window.partitionBy(partition_col).orderBy(order_col)

    df = (
        df
        # Step 1: mark invalid occupation as NULL
        .withColumn(
            occupation_col,
            when(col(occupation_col) == "_______", None)
            .otherwise(col(occupation_col))
        )
        # Step 2: forward fill
        .withColumn(
            occupation_col,
            last(occupation_col, ignorenulls=True).over(w)
        )
        # Step 3: backward fill
        .withColumn(
            occupation_col,
            first(occupation_col, ignorenulls=True)
            .over(w.rowsBetween(0, Window.unboundedFollowing))
        )
        # Step 4: standardize to lowercase
        .withColumn(
            occupation_col,
            lower(col(occupation_col))
        )
    )

    

    return df

train_df = clean_occupation_column(train_df)
test_df = clean_occupation_column(test_df)



## Annual_Income
- **Data Type**: Convert to Float/Decimal (Numeric)
- **Description**: Total annual income of the customer in currency units
- **Example Values**: `19114.12`, `34847.84`, `143162.64`, `30689.89`, `35547.71`
- **Unit**: Currency (e.g., USD)
- **Purpose**: Income-based credit assessment and risk evaluation
- **Data Quality Issues**:
  - Some values may have trailing underscores like `34847.84_`
  - Indicates inconsistent data entry


In [0]:
from pyspark.sql.functions import regexp_replace, col, when, round

def clean_annual_income_column(df, income_col="Annual_Income"):
    """
    Cleans and standardizes the Annual_Income column.
    """

    df = (
        df
        # Step 1: remove "_" placeholder
        .withColumn(
            income_col,
            regexp_replace(col(income_col), "_", "")
        )
        # Step 2: convert empty string to NULL
        .withColumn(
            income_col,
            when(col(income_col) == "", None).otherwise(col(income_col))
        )
        # Step 3: cast to float
        .withColumn(
            income_col,
            col(income_col).cast("float")
        )
        # Step 4: round to 2 decimals
        .withColumn(
            income_col,
            round(col(income_col), 2)
        )
    )

    return df
  

train_df = clean_annual_income_column(train_df)
test_df = clean_annual_income_column(test_df)




### Annual Income Inconsistency Across Months
- Each customer in the dataset has monthly records, but `Annual_Income` is a static attribute and should remain constant across all months for the same customer.
- However, due to data quality issues, some months contain abnormally high or incorrect `Annual_Income` values, creating inconsistencies within a single customer’s timeline.

In [0]:
from pyspark.sql.functions import expr, col

def replace_with_customer_median(
    df,
    value_col="Annual_Income",
    partition_col="Customer_ID"
):
    """
    Replaces a column's values with the median value per customer.
    """

    # Step 1: compute median per customer
    median_df = df.groupBy(partition_col).agg(
        expr(f"percentile_approx({value_col}, 0.5)").alias("median_value")
    )

    # Step 2: replace original values with median
    df = (
        df
        .join(median_df, on=partition_col, how="left")
        .withColumn(value_col, col("median_value"))
        .drop("median_value")
    )

    return df

train_df = replace_with_customer_median(
    train_df,
    value_col="Annual_Income",
    partition_col="Customer_ID"
)

test_df = replace_with_customer_median(
    test_df,
    value_col="Annual_Income",
    partition_col="Customer_ID"
)


## Monthly_Inhand_Salary
- **Data Type**: Float/Decimal (Numeric)
- **Description**: Monthly take-home salary (net income) after deductions
- **Example Values**: `1824.84`, `3037.99`, `12187.22`, `2612.49`, `2853.31`
- **Unit**: Currency per month
- **Formula Relationship**: Approximately Annual_Income / 12 (varies due to taxes/deductions)
- **Purpose**: Monthly cash flow assessment and EMI eligibility
- **Data Quality Issues**: Some missing/empty values in the dataset


### Replacing Inconsistent Monthly_Inhand_Salary Values
- We are going to use the `Customer_ID` and `month` columns to impute the `Monthly_Inhand_Salary` column. Since `Customer_ID` contains records for each month, we will leverage this information to replace inconsistent monthly inhand salary values and fill null using `forward fill` and `backward fill` methods.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, last, first, round

def ffill_bfill_by_customer(
    df,
    value_col="Monthly_Inhand_Salary",
    partition_col="Customer_ID",
    order_col="Month_num"
):
    """
    Forward-fill and backward-fill a column per customer based on time order.

    """

    w = Window.partitionBy(partition_col).orderBy(order_col)

    df = (
        df
        # Step 1: forward fill
        .withColumn(
            value_col,
            last(value_col, ignorenulls=True).over(w)
        )
        # Step 2: backward fill
        .withColumn(
            value_col,
            first(value_col, ignorenulls=True)
            .over(w.rowsBetween(0, Window.unboundedFollowing))
        )
        # Step 3: round to 0 decimals
        .withColumn(
            value_col,
            round(col(value_col))
        )
    )

    return df

train_df = ffill_bfill_by_customer(
    train_df,
    value_col="Monthly_Inhand_Salary"
)

test_df = ffill_bfill_by_customer(
    test_df,
    value_col="Monthly_Inhand_Salary"
)



## Num_Bank_Accounts
- **Data Type**: Integer (Numeric)
- **Description**: Number of bank accounts held by the customer
- **Example Values**: `3`, `2`, `1`, `7`
- **Range**: 1-7 accounts typically
- **Purpose**: Asset diversification indicator and financial engagement measure
- **Use Case**: Customers with more accounts may indicate better financial management

### Replacing Inconsistent Num_Bank_Accounts Values
- We are going to use the `Customer_ID` and `month` columns to impute the `Num_Bank_Accounts` column. Since `Customer_ID` contains records for each month, we will leverage this information to replace inconsistent Num_Bank_Accounts values using `forward fill` and `backward fill` methods.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, last, first

def clean_num_bank_accounts(
    df,
    bank_col="Num_Bank_Accounts",
    partition_col="Customer_ID",
    order_col="Month_num",
    drop_zero=True
):
    """
    Cleans and imputes the Num_Bank_Accounts column.

    """

    w = Window.partitionBy(partition_col).orderBy(order_col)

    df = (
        df
        # Step 1: mark invalid values as NULL
        .withColumn(
            bank_col,
            when(
                (col(bank_col) < 0) | (col(bank_col) > 11),
                None
            ).otherwise(col(bank_col))
        )
        # Step 2: forward fill
        .withColumn(
            bank_col,
            last(bank_col, ignorenulls=True).over(w)
        )
        # Step 3: backward fill
        .withColumn(
            bank_col,
            first(bank_col, ignorenulls=True)
            .over(w.rowsBetween(0, Window.unboundedFollowing))
        )
    )

    # Step 4: optionally drop rows where value == 0
    if drop_zero:
        df = df.filter(col(bank_col) != 0)

    return df

train_df = clean_num_bank_accounts(train_df)
test_df = clean_num_bank_accounts(test_df)



## Num_Credit_Card
- **Data Type**: Integer (Numeric)
- **Description**: Number of credit cards held by the customer
- **Example Values**: `4`, `5`
- **Range**: Typically 1-11 cards
- **Purpose**: Credit exposure and revolving debt assessment
- **Data Quality Issues**: Some anomalies like `1385` (unusually high value indicating data error)


### Replacing Anomalous Num_Credit_Card Values
- We are going to use the `Customer_ID` and `month` columns to impute the `Num_Credit_Card` column. Since `Customer_ID` contains records for each month, we will leverage this information to replace anomalous Num_Credit_Card values using `forward fill` and `backward fill` methods.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, last, first

def clean_num_credit_card(
    df,
    credit_col="Num_Credit_Card",
    partition_col="Customer_ID",
    order_col="Month_num"
):
    """
    Cleans and imputes the Num_Credit_Card column.

    """

    w = Window.partitionBy(partition_col).orderBy(order_col)

    df = (
        df
        # Step 1: mark invalid values as NULL
        .withColumn(
            credit_col,
            when(col(credit_col) > 11, None)
            .otherwise(col(credit_col))
        )
        # Step 2: forward fill
        .withColumn(
            credit_col,
            last(credit_col, ignorenulls=True).over(w)
        )
        # Step 3: backward fill
        .withColumn(
            credit_col,
            first(credit_col, ignorenulls=True)
            .over(w.rowsBetween(0, Window.unboundedFollowing))
        )
    )

    return df

train_df = clean_num_credit_card(train_df)
test_df = clean_num_credit_card(test_df)




## Interest_Rate
- **Data Type**: Float/Decimal (Numeric)
- **Description**: Current interest rate applicable to the customer's loans/credit
- **Example Values**: `3`, `6`, `8`, `4`, `5`
- **Unit**: Percentage (%)
- **Range**: Typically 3-35%
- **Purpose**: Cost of borrowing and credit pricing
- **Note**: Lower rates indicate better creditworthiness

### Replacing Anomalous Interest_Rate Values
- We are going to use the `Customer_ID` and `month` columns to impute the `Interest_Rate` column. Since `Customer_ID` contains records for each month, we will leverage this information to replace anomalous Interest_Rate values using `forward fill` and `backward fill` methods.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, last, first

def clean_interest_rate(
    df,
    rate_col="Interest_Rate",
    partition_col="Customer_ID",
    order_col="Month_num"
):
    """
    Cleans and imputes the Interest_Rate column.

    """

    w = Window.partitionBy(partition_col).orderBy(order_col)

    df = (
        df
        # Step 1: mark invalid values as NULL
        .withColumn(
            rate_col,
            when(col(rate_col) > 35, None)
            .otherwise(col(rate_col))
        )
        # Step 2: forward fill
        .withColumn(
            rate_col,
            last(rate_col, ignorenulls=True).over(w)
        )
        # Step 3: backward fill
        .withColumn(
            rate_col,
            first(rate_col, ignorenulls=True)
            .over(w.rowsBetween(0, Window.unboundedFollowing))
        )
    )

    return df


train_df = clean_interest_rate(train_df)
test_df = clean_interest_rate(test_df)




## Num_of_Loan
- **Data Type**: Integer (Numeric)
- **Description**: Total number of active loans the customer has
- **Example Values**: `4`, `1`, `3`, `0`, `-100`, `967`
- **Range**: 0-4 typical (negative and extremely high values indicate data anomalies)
- **Purpose**: Debt burden assessment
- **Data Quality Issues**:
  - Negative values like `-100` are invalid
  - Extremely high values like `967` are erroneous

### Replacing Anomalous Num_of_Loan Values
- We are going to use the `Customer_ID` and `month` columns to impute the `Num_of_Loan` column. Since `Customer_ID` contains records for each month, we will leverage this information to replace anomalous Num_of_Loan values using `forward fill` and `backward fill` methods.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, last, first, regexp_replace

def clean_num_of_loan(
    df,
    loan_col="Num_of_Loan",
    partition_col="Customer_ID",
    order_col="Month_num"
):
    """
    Cleans and imputes the Num_of_Loan column.
    """

    w = Window.partitionBy(partition_col).orderBy(order_col)

    df = (
        df
        # Step 1: remove "_" placeholder
        .withColumn(
            loan_col,
            regexp_replace(col(loan_col), "_", "")
        )
        # Step 2: empty string → NULL
        .withColumn(
            loan_col,
            when(col(loan_col) == "", None).otherwise(col(loan_col))
        )
        # Step 3: cast to int
        .withColumn(
            loan_col,
            col(loan_col).cast("int")
        )
        # Step 4: mark invalid values as NULL
        .withColumn(
            loan_col,
            when(
                (col(loan_col) < 0) | (col(loan_col) > 9),
                None
            ).otherwise(col(loan_col))
        )
        # Step 5: forward fill
        .withColumn(
            loan_col,
            last(loan_col, ignorenulls=True).over(w)
        )
        # Step 6: backward fill
        .withColumn(
            loan_col,
            first(loan_col, ignorenulls=True)
            .over(w.rowsBetween(0, Window.unboundedFollowing))
        )
    )

    return df


train_df = clean_num_of_loan(train_df)
test_df = clean_num_of_loan(test_df)

## Type_of_Loan
- **Data Type**: String (Categorical/Text)
- **Description**: Types of loans held by the customer (can be multiple)
- **Example Values**: 
  - `Auto Loan, Credit-Builder Loan, Personal Loan, and Home Equity Loan`
  - `Credit-Builder Loan`
  - `Not Specified`
  - (Empty/blank values)
- **Delimiter**: Comma-separated list
- **Purpose**: Loan portfolio composition analysis
- **Data Quality Issues**: 
  - Missing values (empty cells)
  - Some records show "Not Specified"


In [0]:
from pyspark.sql.functions import col, regexp_replace, when, lower, coalesce, lit

def clean_type_of_loan(
    df,
    loan_col="Type_of_Loan"
):
    """
    Cleans Type_of_Loan 
    """

    #  clean raw column
    
    df = (
        df
        #replace "Null" with "UnSpecified" .
        .fillna( value = "UnSpecified", subset = [loan_col] )

        # replace " and " with space
        .withColumn(
            loan_col,
            regexp_replace(col(loan_col), " and ", " ")
        )
        # normalize invalid strings to NULL
        .withColumn(
            loan_col,
            when(
                (col(loan_col) == "nan") | (col(loan_col) == "Not Specified"),
                None
            ).otherwise(col(loan_col))
        )
    )


    return df

train_df = clean_type_of_loan(train_df)
test_df = clean_type_of_loan(test_df)




## Num_of_Delayed_Payment
- **Data Type**: Integer (Numeric)
- **Description**: Total number of times the customer has made delayed payments
- **Example Values**: `7`, `4`, `8`, `1`, `6`, `9`
- **Range**: Typically 0-9 instances, but in this dataset range is 0-28
- **Purpose**: Payment delinquency history
- **Data Quality Issues**: 
  - Some missing values
  - Some values have trailing underscore like `1295_`
  - Some anomalies like `2820` (unusually high value indicating data error)

### Replacing Anomalous Num_of_Delayed_Payment Values
- We are going to use the `Customer_ID` and `month` columns to impute the `Num_of_Delayed_Payment` column. Since `Customer_ID` contains records for each month, we will leverage this information to replace anomalous Num_of_Delayed_Payment values and fill null using `forward fill` and `backward fill` methods.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, last, first, regexp_replace

def clean_num_of_delayed_payment(
    df,
    delayed_col="Num_of_Delayed_Payment",
    partition_col="Customer_ID",
    order_col="Month_num"
):
    """
    Cleans and imputes the Num_of_Delayed_Payment column.
    """

    w = Window.partitionBy(partition_col).orderBy(order_col)

    df = (
        df
        # Step 1: remove "_" placeholder
        .withColumn(
            delayed_col,
            regexp_replace(col(delayed_col), "_", "")
        )
        # Step 2: empty string → NULL
        .withColumn(
            delayed_col,
            when(col(delayed_col) == "", None).otherwise(col(delayed_col))
        )
        # Step 3: cast to int
        .withColumn(
            delayed_col,
            col(delayed_col).cast("int")
        )
        # Step 4: mark invalid values as NULL
        .withColumn(
            delayed_col,
            when(
                (col(delayed_col) < 0) | (col(delayed_col) > 27),
                None
            ).otherwise(col(delayed_col))
        )
        # Step 5: forward fill
        .withColumn(
            delayed_col,
            last(delayed_col, ignorenulls=True).over(w)
        )
        # Step 6: backward fill
        .withColumn(
            delayed_col,
            first(delayed_col, ignorenulls=True)
            .over(w.rowsBetween(0, Window.unboundedFollowing))
        )
    )

    return df

train_df = clean_num_of_delayed_payment(train_df)
test_df = clean_num_of_delayed_payment(test_df)



## Changed_Credit_Limit
- **Data Type**: Float/Decimal (Numeric)
- **Description**: Recent changes in the customer's credit limit (in currency units)
- **Example Values**: `11.27`, `6.27`, `9.27`, `7.1`, `5.42`
- **Unit**: Currency or percentage points
- **Purpose**: Credit limit adjustments and credit portfolio changes
- **Note**: Can be positive (increase) or negative (decrease)
- **Data Quality Issues**: some rows contain underscore only

### Replacing Inconsistent Changed_Credit_Limit Values
- We are going to use the `Customer_ID` and `month` columns to impute the `Changed_Credit_Limit` column. Since `Customer_ID` contains records for each month, we will leverage this information to replace `_` values using `forward fill` and `backward fill` methods.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, last, first, regexp_replace, round

def clean_changed_credit_limit(
    df,
    limit_col="Changed_Credit_Limit",
    partition_col="Customer_ID",
    order_col="Month_num"
):
    """
    Cleans and imputes the Changed_Credit_Limit column.
    """

    w = Window.partitionBy(partition_col).orderBy(order_col)

    df = (
        df
        # Step 1: remove "_" placeholder
        .withColumn(
            limit_col,
            regexp_replace(col(limit_col), "_", "")
        )
        # Step 2: empty string → NULL
        .withColumn(
            limit_col,
            when(col(limit_col) == "", None).otherwise(col(limit_col))
        )
        # Step 3: cast to float
        .withColumn(
            limit_col,
            col(limit_col).cast("float")
        )
        # Step 4: forward fill
        .withColumn(
            limit_col,
            last(limit_col, ignorenulls=True).over(w)
        )
        # Step 5: backward fill
        .withColumn(
            limit_col,
            first(limit_col, ignorenulls=True)
            .over(w.rowsBetween(0, Window.unboundedFollowing))
        )
        # Step 5: round to 2 decimals
        .withColumn(
            limit_col,
            round(col(limit_col))
        )
    )

    return df

train_df = clean_changed_credit_limit(train_df)
test_df = clean_changed_credit_limit(test_df)



## Num_Credit_Inquiries
- **Data Type**: Integer (Numeric)
- **Description**: Number of times the customer's credit was inquired
- **Example Values**: `4`, `2`, `3`
- **Purpose**: Credit-seeking behavior indicator
- **Interpretation**: Higher inquiries may indicate active credit seeking or financial stress
- **Data Quality Issues**: 
  - Some missing values
  - Some anomalies like `2061` (unusually high value indicating data error)

### Replacing Anomalous Num_Credit_Inquiries Values
- We are going to use the `Customer_ID` and `month` columns to impute the `Num_Credit_Inquiries` column. Since `Customer_ID` contains records for each month, we will leverage this information to replace anomalous Num_Credit_Inquiries values and fill null using `forward fill` and `backward fill` methods.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, last, first

def clean_num_credit_inquiries(
    df,
    inquiry_col="Num_Credit_Inquiries",
    partition_col="Customer_ID",
    order_col="Month_num"
):
    """
    Cleans and imputes the Num_Credit_Inquiries column.

    """

    w = Window.partitionBy(partition_col).orderBy(order_col)

    df = (
        df
        # Step 1: mark invalid values as NULL
        .withColumn(
            inquiry_col,
            when(col(inquiry_col) > 18, None)
            .otherwise(col(inquiry_col))
        )
        # Step 2: forward fill
        .withColumn(
            inquiry_col,
            last(inquiry_col, ignorenulls=True).over(w)
        )
        # Step 3: backward fill
        .withColumn(
            inquiry_col,
            first(inquiry_col, ignorenulls=True)
            .over(w.rowsBetween(0, Window.unboundedFollowing))
        )
    )

    return df


train_df = clean_num_credit_inquiries(train_df)
test_df = clean_num_credit_inquiries(test_df)




## Credit_Mix
- **Data Type**: String (Categorical)
- **Description**: Diversity of credit types held by the customer
- **Example Values**: `Good`, `Standard`, `_` (underscore = missing)
- **Categories**: 
  - `Good`: Mix of secured and unsecured credit
  - `Standard`: Limited credit diversity
  - `_` or blank: Missing data
- **Purpose**: Credit portfolio health assessment

### Replacing Inconsistent Credit_Mix Values
- We are going to use the `Customer_ID` and `month` columns to impute the `Credit_Mix` column. Since `Customer_ID` contains records for each month, we will leverage this information to replace `_` values using `forward fill` and `backward fill` methods.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, last, first, regexp_replace, lower

def clean_credit_mix(
    df,
    credit_mix_col="Credit_Mix",
    partition_col="Customer_ID",
    order_col="Month_num"
):
    """
    Cleans and imputes the Credit_Mix column.

    """

    w = Window.partitionBy(partition_col).orderBy(order_col)

    df = (
        df
        # Step 1: remove "_" placeholder
        .withColumn(
            credit_mix_col,
            regexp_replace(col(credit_mix_col), "_", "")
        )
        # Step 2: empty string → NULL
        .withColumn(
            credit_mix_col,
            when(col(credit_mix_col) == "", None)
            .otherwise(col(credit_mix_col))
        )
        # Step 3: forward fill
        .withColumn(
            credit_mix_col,
            last(credit_mix_col, ignorenulls=True).over(w)
        )
        # Step 4: backward fill
        .withColumn(
            credit_mix_col,
            first(credit_mix_col, ignorenulls=True)
            .over(w.rowsBetween(0, Window.unboundedFollowing))
        )
        # Step 5: standardize case
        .withColumn(
            credit_mix_col,
            lower(col(credit_mix_col))
        )
    )

    return df

train_df = clean_credit_mix(train_df)
test_df = clean_credit_mix(test_df)




## Outstanding_Debt
- **Data Type**: Float/Decimal (Numeric)
- **Description**: Total outstanding debt amount owed by the customer
- **Example Values**: `809.98`, `605.03`, `1303.01`, `632.46`, `943.86`
- **Unit**: Currency
- **Purpose**: Total debt burden and solvency assessment
- **Risk Factor**: Higher outstanding debt = higher default risk
- **Data Quality Issues**: some rows contain trailing underscore like `706.22_`


In [0]:
from pyspark.sql.functions import regexp_replace, col, when, round

def clean_outstanding_debt(
    df,
    debt_col="Outstanding_Debt"
):
    """
    Cleans the Outstanding_Debt column.
    """

    df = (
        df
        # Step 1: remove "_" placeholder
        .withColumn(
            debt_col,
            regexp_replace(col(debt_col), "_", "")
        )
        # Step 2: empty string → NULL
        .withColumn(
            debt_col,
            when(col(debt_col) == "", None).otherwise(col(debt_col))
        )
        # Step 3: cast to float
        .withColumn(
            debt_col,
            col(debt_col).cast("float")
        )
        # Step 4: round to 0 decimals
        .withColumn(
            debt_col,
            round(col(debt_col))
        )
    )

    return df


train_df = clean_outstanding_debt(train_df)
test_df = clean_outstanding_debt(test_df)




## Credit_History_Age
- **Data Type**: String (Duration/Time Period)
- **Description**: Length of the customer's credit history
- **Example Values**: 
  - `22 Years and 1 Months`
  - `26 Years and 7 Months`
  - `17 Years and 9 Months`
  - `NA` (missing data)
- **Unit**: Years and Months
- **Purpose**: Credit experience and stability assessment
- **Data Quality Issues**: Some records show `NA` for missing values


### Converting Credit_History_Age  into total months
- We are going to use the `Customer_ID` and `month` columns to impute the `Credit_History_Age` column. Since `Customer_ID` contains records for each month, we will leverage this information to fill `NA` values using `forward fill` and `backward fill` methods.

In [0]:
from pyspark.sql.functions import col, when, regexp_extract

def parse_credit_history_age(
    df,
    age_col="Credit_History_Age"
):
    """
    Converts 'X Years and Y Months' into total months.
    Handles 'NA' and NULL safely.
    """

    df = df.withColumn(
        age_col,
        when(
            (col(age_col) == "NA") | col(age_col).isNull(),
            None
        ).otherwise(
            regexp_extract(col(age_col), r"(\d+)\s+Years", 1).cast("int") * 12 +
            regexp_extract(col(age_col), r"(\d+)\s+Months", 1).cast("int")
        )
    )

    return df


train_df = parse_credit_history_age(train_df)
test_df = parse_credit_history_age(test_df)


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, last, first, when, row_number

def sequential_impute(
    df,
    value_col,
    partition_col="Customer_ID",
    order_col="Month_num"
):
    """
    Sequentially imputes missing values in a monotonically increasing
    monthly column (e.g., Credit_History_Age).

    Rules:
    - Missing AFTER a known value → last_value + distance
    - Missing BEFORE a known value → next_value - distance
    """

    w = Window.partitionBy(partition_col).orderBy(order_col)

    # Row number for distance calculation
    df = df.withColumn("rn", row_number().over(w))

    # Last known value (forward reference)
    df = df.withColumn(
        "last_val",
        last(value_col, ignorenulls=True).over(w)
    )

    # Row number of last known value
    df = df.withColumn(
        "last_val_rn",
        when(col(value_col).isNotNull(), col("rn"))
    )

    df = df.withColumn(
        "last_val_rn",
        last("last_val_rn", ignorenulls=True).over(w)
    )

    # Next known value (backward reference)
    df = df.withColumn(
        "next_val",
        first(value_col, ignorenulls=True)
        .over(w.rowsBetween(0, Window.unboundedFollowing))
    )

    # Row number of next known value
    df = df.withColumn(
        "next_val_rn",
        when(col(value_col).isNotNull(), col("rn"))
    )

    df = df.withColumn(
        "next_val_rn",
        first("next_val_rn", ignorenulls=True)
        .over(w.rowsBetween(0, Window.unboundedFollowing))
    )

    # Apply sequential logic
    df = df.withColumn(
        value_col,
        when(col(value_col).isNotNull(), col(value_col))
        .when(
            col("last_val").isNotNull(),
            col("last_val") + (col("rn") - col("last_val_rn"))
        )
        .when(
            col("next_val").isNotNull(),
            col("next_val") - (col("next_val_rn") - col("rn"))
        )
        .otherwise(None)
    )

    # Cleanup helper columns
    df = df.drop(
        "rn",
        "last_val",
        "last_val_rn",
        "next_val",
        "next_val_rn"
    )

    return df
train_df = sequential_impute(
    train_df,
    value_col="Credit_History_Age",
    partition_col="Customer_ID",
    order_col="Month_num"
)


test_df = sequential_impute(
    test_df,
    value_col="Credit_History_Age",
    partition_col="Customer_ID",
    order_col="Month_num"
)


## Total_EMI_per_month
- **Data Type**: Float/Decimal (Numeric)
- **Description**: Total Equated Monthly Installment (EMI) amount across all loans
- **Example Values**: `49.57`, `18.82`, `246.99`, `16.42`, `0`
- **Unit**: Currency per month
- **Formula**: Sum of all monthly loan installments
- **Purpose**: Monthly obligation assessment
- **Note**: `0` indicates no active loan EMI
- **Data Quality Issues**: 
  - Some anomalies like `82238` (unusually high value indicating data error)

### Total_EMI_per_month Anomalous Across Months
- Each customer in the dataset has monthly records, but `Total_EMI_per_month` is a static attribute and should remain constant across all months for the same customer.
- However, due to data quality issues, some months contain abnormally high or incorrect `Total_EMI_per_month` values, creating inconsistencies within a single customer’s timeline.

In [0]:
from pyspark.sql.functions import expr, col, round

def replace_with_customer_median(
    df,
    value_col="Total_EMI_per_month",
    partition_col="Customer_ID"
):
    """
    Replaces a column's values with the median value per customer.
    """

    # Step 1: compute median per customer
    median_df = df.groupBy(partition_col).agg(
        expr(f"percentile_approx({value_col}, 0.5)").alias("median_value")
    )

    # Step 2: replace original values with median
    df = (
        df
        .join(median_df, on=partition_col, how="left")
        .withColumn(value_col, round(col("median_value")))
        .drop("median_value")

    )


    return df


train_df = replace_with_customer_median(train_df, "Total_EMI_per_month")
test_df  = replace_with_customer_median(test_df, "Total_EMI_per_month")



## Amount_invested_monthly
- **Data Type**: Convert into Float/Decimal (Numeric)
- **Description**: Amount the customer invests or saves monthly
- **Example Values**: `80.42`, `118.28`, `104.29`, `81.70`, `276.73`
- **Unit**: Currency per month
- **Purpose**: Savings behavior and financial capacity assessment
- **Data Quality Issues**: 
  - Some missing values
  - Some anomalies like `__10000__` (formatting errors)
  - Values with underscores indicate corruption

### Replacing Inconsistent Amount_invested_monthly
- We are going to use the `Customer_ID` and `month` columns to impute the `Amount_invested_monthly` column. Since `Customer_ID` contains records for each month, we will leverage this information to replace `__10000__` and fill null values using `forward fill` and `backward fill` methods.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, last, first, round

def clean_amount_invested_monthly(
    df,
    invest_col="Amount_invested_monthly",
    partition_col="Customer_ID",
    order_col="Month_num"
):
    """
    Cleans and imputes the Amount_invested_monthly column.

    """

    w = Window.partitionBy(partition_col).orderBy(order_col)

    df = (
        df
        # Step 1: mark invalid placeholder as NULL
        .withColumn(
            invest_col,
            when(col(invest_col) == "__10000__", None)
            .otherwise(col(invest_col))
        )
        # Step 2: cast to float
        .withColumn(
            invest_col,
            col(invest_col).cast("float")
        )
        # Step 3: forward fill
        .withColumn(
            invest_col,
            last(invest_col, ignorenulls=True).over(w)
        )
        # Step 4: backward fill
        .withColumn(
            invest_col,
            first(invest_col, ignorenulls=True)
            .over(w.rowsBetween(0, Window.unboundedFollowing))
        )
        # Step 5: round to 0 decimals
        .withColumn(
            invest_col,
            round(col(invest_col))
        )
    )

    return df


train_df = clean_amount_invested_monthly(train_df)
test_df = clean_amount_invested_monthly(test_df)



## Payment_Behaviour
- **Data Type**: String (Categorical)
- **Description**: Pattern or category of payment behavior
- **Example Values**: 
  - `High_spent_Small_value_payments`
  - `Low_spent_Large_value_payments`
  - `Low_spent_Medium_value_payments`
  - `High_spent_Medium_value_payments`
  - `!@9#%8` (corrupted data)
- **Categories**: Combination of spending level (High/Low) and payment value size
- **Purpose**: Behavioral segmentation and credit scoring
- **Data Quality Issues**: Some corrupted values with special characters

### Replacing Inconsistent Payment_Behaviour
- We are going to use the `Customer_ID` and `month` columns to impute the `Payment_Behaviour` column. Since `Customer_ID` contains records for each month, we will leverage this information to replace `!@9#%8` values using `forward fill` and `backward fill` methods.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, last, first, lower

def clean_payment_behaviour(
    df,
    behaviour_col="Payment_Behaviour",
    partition_col="Customer_ID",
    order_col="Month_num"
):
    """
    Cleans and imputes the Payment_Behaviour column.
    """

    w = Window.partitionBy(partition_col).orderBy(order_col)

    df = (
        df
        # Step 1: mark invalid placeholder as NULL
        .withColumn(
            behaviour_col,
            when(col(behaviour_col) == "!@9#%8", None)
            .otherwise(col(behaviour_col))
        )
        # Step 2: forward fill
        .withColumn(
            behaviour_col,
            last(behaviour_col, ignorenulls=True).over(w)
        )
        # Step 3: backward fill
        .withColumn(
            behaviour_col,
            first(behaviour_col, ignorenulls=True)
            .over(w.rowsBetween(0, Window.unboundedFollowing))
        )
        # Step 4: normalize case
        .withColumn(
            behaviour_col,
            lower(col(behaviour_col))
        )
    )

    return df


train_df = clean_payment_behaviour(train_df)
test_df = clean_payment_behaviour(test_df)



## Monthly_Balance
- **Data Type**: Float/Decimal (Numeric)
- **Description**: Monthly account balance after all transactions and payments
- **Example Values**: `312.49`, `284.63`, `331.21`, `340.48`, `288.61`
- **Unit**: Currency
- **Purpose**: Liquidity position and cash flow assessment
- **Interpretation**: Higher balance indicates better financial health
- **Data Quality Issues**: 
  - Some corrupted values with `__-333333333333333333333333333__`
  - Some missing values


### Replacing Inconsistent Monthly_Balance
- We are going to use the `Customer_ID` and `month` columns to impute the `Monthly_Balance` column. Since `Customer_ID` contains records for each month, we will leverage this information to replace `__-333333333333333333333333333__` and fill null values using `forward fill` and `backward fill` methods.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, last, first, regexp_replace, round

def clean_monthly_balance(
    df,
    balance_col="Monthly_Balance",
    partition_col="Customer_ID",
    order_col="Month_num"
):
    """
    Cleans and imputes the Monthly_Balance column.

    """

    w = Window.partitionBy(partition_col).orderBy(order_col)

    df = (
        df
        # Step 1: remove invalid placeholder
        .withColumn(
            balance_col,
            regexp_replace(
                col(balance_col),
                "__-333333333333333333333333333__",
                ""
            )
        )
        # Step 2: empty string → NULL
        .withColumn(
            balance_col,
            when(col(balance_col) == "", None)
            .otherwise(col(balance_col))
        )
        # Step 3: cast to float
        .withColumn(
            balance_col,
            col(balance_col).cast("float")
        )
        # Step 4: forward fill
        .withColumn(
            balance_col,
            last(balance_col, ignorenulls=True).over(w)
        )
        # Step 5: backward fill
        .withColumn(
            balance_col,
            first(balance_col, ignorenulls=True)
            .over(w.rowsBetween(0, Window.unboundedFollowing))
        )
        # Step 6: round to 0 decimals
        .withColumn(
            balance_col,
            round(col(balance_col))
        )
    )

    return df


train_df = clean_monthly_balance(train_df)
test_df = clean_monthly_balance(test_df)



## Credit_Score
- **Data Type**: String (Categorical)
- **Description**: Credit score category/rating of the customer
- **Example Values**: `Good`, `Standard`, `Poor` (if present)
- **Categories**: 
  - `Good`: Creditworthy (typically 700+)
  - `Standard`: Average credit (600-700)
  - `Poor`: Low creditworthiness (<600)
- **Purpose**: Overall credit rating for loan/credit approval decisions
- **Use Case**: Primary target variable in credit risk models

In [0]:
from pyspark.sql.functions import col, lower, trim

def clean_credit_score(
    df,
    credit_score_col="Credit_Score"
):
    """
    Standardizes the Credit_Score column.
    """

    df = df.withColumn(
        credit_score_col,
        trim(lower(col(credit_score_col)))
    )

    return df


train_df = clean_credit_score(train_df)




## Payment_of_Min_Amount
- **Data Type**: String (Categorical - Boolean-like)
- **Description**: Whether the customer pays at least the minimum amount due
- **Example Values**: `Yes`, `No`, `NM` (likely "Not Mentioned" or missing)
- **Categories**: 
  - `Yes`: Pays minimum amount regularly
  - `No`: Fails to pay minimum amount
  - `NM`: Data missing or not mentioned
- **Purpose**: Minimum payment compliance indicator
- **Risk Assessment**: `No` indicates higher default risk

In [0]:
from pyspark.sql.functions import col, lower, trim

def clean_payment_of_min_amount(
    df,
    payment_col="Payment_of_Min_Amount"
):
    """
    Standardizes the Payment_of_Min_Amount column.
    """

    df = df.withColumn(
        payment_col,
        trim(lower(col(payment_col)))
    )

    return df


train_df = clean_payment_of_min_amount(train_df)
test_df = clean_payment_of_min_amount(test_df)




# Feature Engineering

### Creating Income_Group column based on GNI per capita thresholds.


In [0]:
from pyspark.sql.functions import col, when

def add_income_group(
    df,
    income_col="Monthly_Inhand_Salary",
    output_col="Income_Group"
):
    """
    Creates Income_Group column based on GNI per capita thresholds.

    Groups:
    - Low income: ≤ 1,135
    - Lower-middle income: 1,136 - 4,495
    - Upper-middle income: 4,496 - 13,935
    - High income: > 13,935
    """

    df = df.withColumn(
        output_col,
        when(col(income_col).isNull(), None)
        .when(col(income_col) <= 1135, "low_income")
        .when((col(income_col) >= 1136) & (col(income_col) <= 4495), "lower_middle_income")
        .when((col(income_col) >= 4496) & (col(income_col) <= 13935), "upper_middle_income")
        .otherwise("high_income")
    )

    return df

train_df = add_income_group(train_df)
test_df  = add_income_group(test_df)


### Transform 'Type_of_Loan' into individual feature columns

In [0]:
def create_loan_type_flags(
    df,
    loan_col="Type_of_Loan"
):
    """
    creates binary indicator columns
    for each known loan type.
    """
     # Step 1: lowercase + NULL-safe column for parsing
    df = df.withColumn(
        "loan_lc",
        lower(coalesce(col(loan_col), lit("")))
    )

    # Step 2: create binary columns
    df = (
        df
        .withColumn("has_auto_loan",
                    col("loan_lc").contains("auto loan").cast("int"))
        .withColumn("has_credit_builder_loan",
                    col("loan_lc").contains("credit-builder loan").cast("int"))
        .withColumn("has_debt_consolidation_loan",
                    col("loan_lc").contains("debt consolidation loan").cast("int"))
        .withColumn("has_home_equity_loan",
                    col("loan_lc").contains("home equity loan").cast("int"))
        .withColumn("has_mortgage_loan",
                    col("loan_lc").contains("mortgage loan").cast("int"))
        .withColumn("has_payday_loan",
                    col("loan_lc").contains("payday loan").cast("int"))
        .withColumn("has_personal_loan",
                    col("loan_lc").contains("personal loan").cast("int"))
        .withColumn("has_student_loan",
                    col("loan_lc").contains("student loan").cast("int"))
    )

    # Step 3: drop helper & raw column
    df = df.drop("loan_lc", loan_col)

    return df



train_df = create_loan_type_flags(train_df, "Type_of_Loan")
test_df = create_loan_type_flags(test_df, "Type_of_Loan")


### Payment_Behaviour
- Values follow a clear pattern like:
   - High_spent_Small_value_payments
   - Low_spent_Large_value_payments
   - Low_spent_Medium_value_payments
   - High_spent_Medium_value_payments

- This encodes two concepts in one string:
  - Spending level: `High / Low`
  - Typical transaction size: `Small / Medium / Large`

In [0]:
from pyspark.sql.functions import col, when

def split_payment_behaviour(
    df,
    source_col="Payment_Behaviour",
    spending_col="spending_behavior",
    value_col="payment_value"
):
    """
    Splits payment behaviour into two numeric features:
    1. Spending behavior: Low_spent -> 0, High_spent -> 1
    2. Payment value: Small -> 0, Medium -> 1, Large -> 2
    """

    df = (
        df
        # Spending behaviour
        .withColumn(
            spending_col,
            when(col(source_col).isNull(), None)
            .when(col(source_col).contains("Low_spent"), 0)
            .otherwise(1)
        )
        # Payment value
        .withColumn(
            value_col,
            when(col(source_col).isNull(), None)
            .when(col(source_col).contains("Small_value"), 0)
            .when(col(source_col).contains("Medium_value"), 1)
            .otherwise(2)  # Large_value
        )
    )

    return df


train_df = split_payment_behaviour(
    train_df,
    source_col="Payment_Behaviour",
    spending_col="spend_level",
    value_col="txn_value_level"
)


test_df = split_payment_behaviour(
    test_df,
    source_col="Payment_Behaviour",
    spending_col="spend_level",
    value_col="txn_value_level"
)


### Creating Debt_to_Income_Ratio Columns

In [0]:
from pyspark.sql.functions import col, when

def add_debt_to_income_ratio(
    df,
    debt_col="Outstanding_Debt",
    income_col="Annual_Income",
    output_col="Debt_to_Income_Ratio"
):
    """
    Creates Debt-to-Income Ratio:
    Outstanding_Debt / Annual_Income

    - Handles NULL values
    - Avoids division by zero
    """

    df = df.withColumn(
        output_col,
        when(
            col(debt_col).isNull() | col(income_col).isNull() | (col(income_col) == 0),
            None
        ).otherwise(col(debt_col) / col(income_col))
    )

    return df


train_df = add_debt_to_income_ratio(train_df)
test_df = add_debt_to_income_ratio(test_df)




### Creating Total_EMI_per_month column

In [0]:
from pyspark.sql.functions import col, when

def add_emi_to_salary_ratio(
    df,
    emi_col="Total_EMI_per_month",
    salary_col="Monthly_Inhand_Salary",
    output_col="EMI_to_Salary_Ratio"
):
    """
    Creates EMI to Salary ratio feature.

    Formula:
    EMI_to_Salary_Ratio = Total_EMI_per_month / Monthly_Inhand_Salary

    Handles:
    - NULL values
    - Division by zero
    """

    df = df.withColumn(
        output_col,
        when(
            (col(salary_col).isNull()) | (col(salary_col) == 0),
            None
        ).otherwise(
            col(emi_col) / col(salary_col)
        )
    )

    return df

train_df = add_emi_to_salary_ratio(train_df)
test_df = add_emi_to_salary_ratio(test_df)

### Creating Saving_Capacity column

In [0]:
from pyspark.sql.functions import col, when

def add_saving_capacity(
    df,
    balance_col="Monthly_Balance",
    salary_col="Monthly_Inhand_Salary",
    output_col="Saving_Capacity"
):
    """
    Creates Saving Capacity feature.

    Formula:
    Saving_Capacity = Monthly_Balance / Monthly_Inhand_Salary

    Handles:
    - NULL values
    - Division by zero
    """

    df = df.withColumn(
        output_col,
        when(
            (col(salary_col).isNull()) | (col(salary_col) == 0),
            None
        ).otherwise(
            col(balance_col) / col(salary_col)
        )
    )

    return df


train_df = add_saving_capacity(train_df)
test_df = add_saving_capacity(test_df)


## Drop Unimportant Columns

In [0]:
#We are dropping the month column because this is a classification problem, and the month feature does not provide meaningful information. However, if we were performing time-series forecasting, the month feature would be essential.

cols_to_drop = ["ID", "Customer_ID", "Name", "SSN", "month_num", "Payment_Behaviour", "Month"]

train_df = train_df.drop(*cols_to_drop)
test_df = test_df.drop(*cols_to_drop)



In [0]:
train_df = train_df.dropna()
test_df = test_df.dropna()

In [0]:
train_df.count(), test_df.count()

In [0]:
train_df.limit(10).display()

In [0]:
test_df.limit(10).display()

# Writing Silver Table

In [0]:
(
    train_df.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .saveAsTable("credit_catalog.silver.train")

)


(
    test_df.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .saveAsTable("credit_catalog.silver.test")

)