In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("SplitLoanTables") \
    .getOrCreate()

In [3]:
SOURCE_PATH = "hdfs://namenode:9000/bigdata/data/processed_data"
OUT_DIR = "hdfs://namenode:9000/bigdata/data/splitted_data"

In [4]:
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("member_id", IntegerType(), True),
    StructField("loan_amnt", DoubleType(), True),
    StructField("funded_amnt", DoubleType(), True),
    StructField("funded_amnt_inv", DoubleType(), True),
    StructField("term", StringType(), True),
    StructField("int_rate", DoubleType(), True),
    StructField("installment", DoubleType(), True),
    StructField("grade", StringType(), True),
    StructField("sub_grade", StringType(), True),
    StructField("emp_title", StringType(), True),
    StructField("emp_length", StringType(), True),
    StructField("home_ownership", StringType(), True),
    StructField("annual_inc", DoubleType(), True),
    StructField("verification_status", StringType(), True),
    StructField("issue_d", StringType(), True),
    StructField("loan_status", StringType(), True),
    StructField("pymnt_plan", StringType(), True),
    StructField("url", StringType(), True),
    StructField("desc", StringType(), True),
    StructField("purpose", StringType(), True),
    StructField("title", StringType(), True),
    StructField("zip_code", StringType(), True),
    StructField("addr_state", StringType(), True),
    StructField("dti", DoubleType(), True),
    StructField("delinq_2yrs", DoubleType(), True),
    StructField("earliest_cr_line", StringType(), True),
    StructField("inq_last_6mths", DoubleType(), True),
    StructField("mths_since_last_delinq", DoubleType(), True),
    StructField("mths_since_last_record", DoubleType(), True),
    StructField("open_acc", DoubleType(), True),
    StructField("pub_rec", DoubleType(), True),
    StructField("revol_bal", DoubleType(), True),
    StructField("revol_util", DoubleType(), True),
    StructField("total_acc", DoubleType(), True),
    StructField("initial_list_status", StringType(), True),
    StructField("out_prncp", DoubleType(), True),
    StructField("out_prncp_inv", DoubleType(), True),
    StructField("total_pymnt", DoubleType(), True),
    StructField("total_pymnt_inv", DoubleType(), True),
    StructField("total_rec_prncp", DoubleType(), True),
    StructField("total_rec_int", DoubleType(), True),
    StructField("total_rec_late_fee", DoubleType(), True),
    StructField("recoveries", DoubleType(), True),
    StructField("collection_recovery_fee", DoubleType(), True),
    StructField("last_pymnt_d", StringType(), True),
    StructField("last_pymnt_amnt", DoubleType(), True),
    StructField("next_pymnt_d", StringType(), True),
    StructField("last_credit_pull_d", StringType(), True),
    StructField("collections_12_mths_ex_med", DoubleType(), True),
    StructField("mths_since_last_major_derog", DoubleType(), True),
    StructField("policy_code", DoubleType(), True),
    StructField("application_type", StringType(), True),
    StructField("annual_inc_joint", DoubleType(), True),
    StructField("dti_joint", DoubleType(), True),
    StructField("verification_status_joint", StringType(), True),
    StructField("acc_now_delinq", DoubleType(), True),
    StructField("tot_coll_amt", DoubleType(), True),
    StructField("tot_cur_bal", DoubleType(), True),
    StructField("open_acc_6m", DoubleType(), True),
    StructField("open_il_6m", DoubleType(), True),
    StructField("open_il_12m", DoubleType(), True),
    StructField("open_il_24m", DoubleType(), True),
    StructField("mths_since_rcnt_il", DoubleType(), True),
    StructField("total_bal_il", DoubleType(), True),
    StructField("il_util", DoubleType(), True),
    StructField("open_rv_12m", DoubleType(), True),
    StructField("open_rv_24m", DoubleType(), True),
    StructField("max_bal_bc", DoubleType(), True),
    StructField("all_util", DoubleType(), True),
    StructField("total_rev_hi_lim", DoubleType(), True),
    StructField("inq_fi", DoubleType(), True),
    StructField("total_cu_tl", DoubleType(), True),
    StructField("inq_last_12m", DoubleType(), True)
])

In [5]:
df = (
    spark.read.option("header", True)
    .option("quote", '"')
    .option("escape", '"')
    .schema(schema)
    .csv(SOURCE_PATH)
)

| Bảng (table)              | Khóa chính (PK)    | Các cột chính                                                                                                                                                                                                                                                                     | Vai trò / Mục đích                                                                                                      |
| ------------------------- | ------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------- |
| **borrower_profile**      | `member_id`        | `member_id`, `emp_title`, `emp_length`, `home_ownership`, `annual_inc`, `verification_status`, `addr_state`, `zip_code`                                                                                                                                                           | Lưu thông tin cá nhân và hồ sơ người vay - dùng để xây dựng profile người vay.                                          |
| **loan_info**             | `id`               | `id`, `member_id`, `loan_amnt`, `funded_amnt`, `funded_amnt_inv`, `term`, `int_rate`, `installment`, `grade`, `sub_grade`, `purpose`, `title`, `issue_d`, `policy_code`, `application_type`                                                                                       | Lưu thông tin chi tiết về từng khoản vay - điều kiện vay, thời hạn, lãi suất, mục đích.                                 |
| **loan_status_history**   | `id`               | `id`, `loan_status`, `pymnt_plan`, `initial_list_status`, `last_pymnt_d`, `next_pymnt_d`, `last_pymnt_amnt`, `out_prncp`, `out_prncp_inv`                                                                                                                                         | Theo dõi trạng thái khoản vay và tiến trình thanh toán - quan trọng để phân tích rủi ro và hoàn trả.                    |
| **repayment_summary**     | `id`               | `id`, `total_pymnt`, `total_pymnt_inv`, `total_rec_prncp`, `total_rec_int`, `total_rec_late_fee`, `recoveries`, `collection_recovery_fee`                                                                                                                                         | Tóm tắt hiệu quả hoàn trả của khoản vay - bao gồm tổng tiền đã trả, thu hồi, phí trễ hạn.                               |
| **credit_profile**        | `id`               | `id`, `member_id`, `delinq_2yrs`, `earliest_cr_line`, `inq_last_6mths`, `mths_since_last_delinq`, `mths_since_last_record`, `open_acc`, `pub_rec`, `revol_bal`, `revol_util`, `total_acc`, `collections_12_mths_ex_med`, `mths_since_last_major_derog`                            | Lưu hồ sơ tín dụng của người vay - lịch sử trễ hạn, số tài khoản mở, mức sử dụng tín dụng.                              |
| **credit_account_detail** | `id`               | `id`, `member_id`, `open_acc_6m`, `open_il_6m`, `open_il_12m`, `open_il_24m`, `mths_since_rcnt_il`, `total_bal_il`, `il_util`, `open_rv_12m`, `open_rv_24m`, `max_bal_bc`, `all_util`, `total_rev_hi_lim`, `tot_coll_amt`, `tot_cur_bal`, `inq_fi`, `total_cu_tl`, `inq_last_12m` | Thông tin chi tiết hơn về tài khoản tín dụng và sử dụng tín dụng - hỗ trợ phân tích chi tiết về rủi ro tín dụng.        |
| **joint_application**     | `id`               | `id`, `annual_inc_joint`, `dti_joint`, `verification_status_joint`, `acc_now_delinq`                                                                                                                                                                                              | Dành cho hồ sơ vay chung (joint) - thông tin thu nhập kết hợp, tỷ lệ nợ/thu nhập kết hợp, hiện trạng tài khoản trễ hạn. |
| **meta_reference**        | `id`               | `id`, `url`, `desc`                                                                                                                                                                                                                                                               | Thông tin phụ trợ - liên kết tới chi tiết khoản vay và mô tả (description) của người vay.                               |


In [6]:
tables = {
    "borrower_profile": {
        "cols": [
            "id", "member_id","emp_title","emp_length","home_ownership","annual_inc",
            "verification_status","addr_state","zip_code"
        ]
    },
    "loan_info": {
        "cols": [
            "id","member_id","loan_amnt","funded_amnt","funded_amnt_inv",
            "term","int_rate","installment","grade","sub_grade",
            "purpose","title","issue_d","policy_code","application_type"
        ]
    },
    "loan_status_history": {
        "cols": [
            "id","loan_status","pymnt_plan","initial_list_status",
            "last_pymnt_d","next_pymnt_d","last_pymnt_amnt",
            "out_prncp","out_prncp_inv"
        ]
    },
    "repayment_summary": {
        "cols": [
            "id","total_pymnt","total_pymnt_inv","total_rec_prncp",
            "total_rec_int","total_rec_late_fee","recoveries",
            "collection_recovery_fee"
        ]
    },
    "credit_profile": {
        "cols": [
            "id","member_id","delinq_2yrs","earliest_cr_line","inq_last_6mths",
            "mths_since_last_delinq","mths_since_last_record","open_acc",
            "pub_rec","revol_bal","revol_util","total_acc",
            "collections_12_mths_ex_med","mths_since_last_major_derog"
        ]
    },
    "credit_account_detail": {
        "cols": [
            "id","member_id","open_acc_6m","open_il_6m","open_il_12m","open_il_24m",
            "mths_since_rcnt_il","total_bal_il","il_util","open_rv_12m","open_rv_24m",
            "max_bal_bc","all_util","total_rev_hi_lim","tot_coll_amt","tot_cur_bal",
            "inq_fi","total_cu_tl","inq_last_12m"
        ]
    },
    "joint_application": {
        "cols": [
            "id","annual_inc_joint","dti_joint","verification_status_joint","acc_now_delinq"
        ]
    },
    "meta_reference": {
        "cols": ["id","url","desc"]
    }
}

In [7]:
for table_name, meta in tables.items():
    cols = meta.get("cols", [])
    
    available_cols = [c for c in cols if c in df.columns]
    missing_cols = [c for c in cols if c not in df.columns]

    out_path = f"{OUT_DIR.rstrip('/')}/{table_name}"
    out_df = df.select(*available_cols)

    writer = out_df.write.mode("overwrite")
    
    writer.option("header", "true").option("quote", '"').option("escape", '"').csv(out_path)

    print(f"[WRITE] {table_name} -> {out_path} (missing_cols: {missing_cols})")

[WRITE] borrower_profile -> hdfs://namenode:9000/bigdata/data/splitted_data/borrower_profile (missing_cols: [])
[WRITE] loan_info -> hdfs://namenode:9000/bigdata/data/splitted_data/loan_info (missing_cols: [])
[WRITE] loan_status_history -> hdfs://namenode:9000/bigdata/data/splitted_data/loan_status_history (missing_cols: [])
[WRITE] repayment_summary -> hdfs://namenode:9000/bigdata/data/splitted_data/repayment_summary (missing_cols: [])
[WRITE] credit_profile -> hdfs://namenode:9000/bigdata/data/splitted_data/credit_profile (missing_cols: [])
[WRITE] credit_account_detail -> hdfs://namenode:9000/bigdata/data/splitted_data/credit_account_detail (missing_cols: [])
[WRITE] joint_application -> hdfs://namenode:9000/bigdata/data/splitted_data/joint_application (missing_cols: [])
[WRITE] meta_reference -> hdfs://namenode:9000/bigdata/data/splitted_data/meta_reference (missing_cols: [])


In [8]:
spark.stop()