In [1]:
from pathlib import Path 
from typing import List 

import numpy as np 
import pandas as pd 

#--Paths-- 
INPUT_PATH = Path("C:/Users/Medha/Projects/loan-eligibility-prediction/data/processed/combined_loan_data_processed.csv") 
OUTPUT_PATH = Path("C:/Users/Medha/Projects/loan-eligibility-prediction/data/processed/engineered_loan_dataset.csv") 

#--Column Definitions--
RAW_CATEGORICAL_COLUMNS = [
    "term", 
    "emp_length", 
    "home_ownership", 
    "purpose", 
    "addr_state", 
    "application_type", 
] 

RAW_NUMERIC_COLUMNS = [
    "loan_amnt", 
    "int_rate", 
    "annual_inc", 
    "dti", 
    "delinq_2yrs", 
    "fico_range_high", 
    "fico_range_low", 
    "inq_last_6mths", 
] 

TARGET_COLUMN = "loan_status"

In [2]:
def _parse_term(term_value : str) -> float: 
    """Convert terms like '36 months' to the numeric month count."""
    if pd.isna(term_value): 
        return np.nan
    digits = "".join(ch for ch in str(term_value) if ch.isdigit())
    return float(digits) if digits else np.nan

def _parse_emp_length(emp_length_value : str) -> float: 
    """Normalize employment length strings to a numeric year count."""
    if pd.isna(emp_length_value):
        return np.nan


    value = str(emp_length_value).strip().lower()
    if value in {"10+ years", "10 years", "10+"}: 
        return 10.0
    if value in {"< 1 year", "<1 year", "<1"}:
        return 0.0
    digits = "".join(ch for ch in value if ch.isdigit())
    return float(digits) if digits else np.nan


def _parse_percentage(value : str) -> float: 
    """Strip percent signs and cast to float."""
    if pd.isna(value): 
        return np.nan
    cleaned = str(value).replace("%", "").strip()
    return float(cleaned) if cleaned else np.nan 


def _build_fico_score(df: pd.DataFrame) -> pd.Series: 
    """Combine high/low ranges into a single representative score."""
    low = df.get("fico_range_low") 
    high = df.get("fico_range_high") 


    #Prefer the midpoint when both are present, otherwise fall back 
    if low is not None and high is not None: 
        return (pd.to_numeric(low, errors = "coerce") + 
                pd.to_numeric(high, errors = "coerce")) / 2
    if low is not None: 
        return pd.to_numeric(low, errors = "coerce") 
    if high is not None: 
        return pd.to_numeric(high, errors = "coerce") 
    return pd.Series(np.nan, index = df.index)

def _normalize_columns(df: pd.DataFrame) -> pd.DataFrame: 
    """Lowercase and strip column names for consistency."""
    normalized = df.copy()
    normalized.columns = normalized.columns.str.lower().str.strip()
    return normalized 
        
    

In [3]:
def engineer_features(df: pd.DataFrame) -> pd.DataFrame: 

    """
    Return a cleaned, model-ready dataframe. 

    Steps 
    ------------
    * Standardize column casing 
    * Parse text percentages and term/tenure fields to numerics 
    * Build a single fico_score 
    * Impute missing values with medians (numeric) and most-common labels (categorical) 
    * One-hot encode categorical features 
    * Convert the target label to a binary indicator (1 = accepted, 0 = rejected) 

    """

    #Normalize columns 
    df = _normalize_columns(df) 
    working = df.copy()

    #----- Parse engineered numeric features -----
    if "term" in working: 
        working["term_months"] = working["term"].apply(_parse_term)

    if "emp_length" in working: 
        working["emp_length_years"] = working["emp_length"].apply(_parse_emp_length)
        
    if "int_rate" in working: 
        working["int_rate"] = working["int_rate"].apply(_parse_percentage)

    if "dti" in working: 
        working["dti"] = working["dti"].apply(_parse_percentage) 

    working["fico_score"] = _build_fico_score(working) 

    # ----- Target label encoding -----
    if TARGET_COLUMN in working: 
        working[TARGET_COLUMN] = (
            working[TARGET_COLUMN] 
            .astype(str)
            .str.strip()
            .str.lower()
            .map({"accepted" :1, "rejected" : 0})
        )
        #Keep only rows with valid mapped labels (0 or 1) 
        mask_valid = working[TARGET_COLUMN].isin([0,1])
        working = working[mask_valid].copy()

    # ----- Identify numeric and categorical features -----
    numeric_features : List[str] = []
    for column in RAW_NUMERIC_COLUMNS + ["term_months", "emp_length_years", "fico_score"]: 
        if column in working.columns: 
            numeric_features.append(column) 
            working[column] = pd.to_numeric(working[column], errors = "coerce") 

    categorical_features: List[str] = []
    for column in RAW_CATEGORICAL_COLUMNS: 
        if column in working.columns: 
            categorical_features.append(column) 
            working[column] = working[column].astype(str).str.strip().str.lower()

    # ----- Impute missing values ----- 
    if numeric_features: 
        for column in numeric_features: 
            working[column] = working[column].fillna(working[column].median())

    if caterogical_features: 
        for column in categorical_features: 
            working[column] = working[column].replace({"nan":np.nan})
            mode = working[column].mode(dropna= True)
            fill_value = mode.iloc[0] if not mode.empty else "unknown"
            working[column] = working[column].fillna(fill_value)

    # ----- One-hot encode categoricals -----
    if categorical_features: 
        encoded = pd.get_dummies(
            working[categorical_features], 
            prefix = categorical_features, 
            drop_first = False
        )
    else: 
        encoded = pd.DataFrame(index = working.index) 

    # ----- Assemble final dataframe -----
    feature_part = pd.concat(
        [working[numeric_features], encoded], 
        axis = 1
    ) 

    if TARGET_COLUMN in working.columns: 
        feature_part[TARGET_COLUMN] = working[TARGET_COLUMN].astype(int)


    return feature_part
        

        

In [4]:
def engineer_and_save(input_path: Path, output_path: Path) -> pd.DataFrame:
    """Load the combined dataset, engineer features, save, and return the df."""
    if not input_path.exists():
        raise FileNotFoundError(f"Input file not found at: {input_path}")

    print(f"Loading input dataset from: {input_path}")
    df_raw = pd.read_csv(input_path, low_memory = False)
    print(f"  Raw Shape: {df_raw.shape}")

    df_engineered = engineer_features(df_raw)
    print(f"Engineered shape: {df_engineered.shape}") 

    output_path.parent.mkdir(parents = True, exist_ok = True)
    df_engineered.to_csv(output_path, index = False) 
    print(f"Saved engineered dataset to: {output_path}")

    return df_engineered 

In [5]:
# Run the full engineering pipeline and inspect results 

engineered_df = engineer_and_save(INPUT_PATH, OUTPUT_PATH) 

print("\nPreview of engineered dataset:") 
display(engineered_df.head())

print("\nInfo:")
print(engineered_df.info())

if TARGET_COLUMN in engineered_df.columns: 
    print("\nTarget distribution:") 
    print(engineered_df[TARGET_COLUMN].value_counts(dropna=False))
    

Loading input dataset from: C:\Users\Medha\Projects\loan-eligibility-prediction\data\processed\combined_loan_data_processed.csv
  Raw Shape: (29909442, 15)


MemoryError: Unable to allocate 456. MiB for an array with shape (29909442,) and data type complex128