In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [2]:
spark = SparkSession.builder.appName("SplitLoanTables").getOrCreate()

In [3]:
SOURCE_PATH = "../data/processed_data/part-00000-a3182952-1883-4fea-b3df-bf8414e6f643-c000.csv"
OUT_DIR = "../data/splitted_data"

In [4]:
df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .option("quote", '"')
    .option("escape", '"')
    .option("multiLine", True)
    .csv(SOURCE_PATH)
)

| Bảng (table)              | Khóa chính (PK)    | Các cột chính                                                                                                                                                                                                                                                                     | Vai trò / Mục đích                                                                                                      |
| ------------------------- | ------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------- |
| **borrower_profile**      | `member_id`        | `member_id`, `emp_title`, `emp_length`, `home_ownership`, `annual_inc`, `verification_status`, `addr_state`, `zip_code`                                                                                                                                                           | Lưu thông tin cá nhân và hồ sơ người vay - dùng để xây dựng profile người vay.                                          |
| **loan_info**             | `id`               | `id`, `member_id`, `loan_amnt`, `funded_amnt`, `funded_amnt_inv`, `term`, `int_rate`, `installment`, `grade`, `sub_grade`, `purpose`, `title`, `issue_d`, `policy_code`, `application_type`                                                                                       | Lưu thông tin chi tiết về từng khoản vay - điều kiện vay, thời hạn, lãi suất, mục đích.                                 |
| **loan_status_history**   | `id`               | `id`, `loan_status`, `pymnt_plan`, `initial_list_status`, `last_pymnt_d`, `next_pymnt_d`, `last_pymnt_amnt`, `out_prncp`, `out_prncp_inv`                                                                                                                                         | Theo dõi trạng thái khoản vay và tiến trình thanh toán - quan trọng để phân tích rủi ro và hoàn trả.                    |
| **repayment_summary**     | `id`               | `id`, `total_pymnt`, `total_pymnt_inv`, `total_rec_prncp`, `total_rec_int`, `total_rec_late_fee`, `recoveries`, `collection_recovery_fee`                                                                                                                                         | Tóm tắt hiệu quả hoàn trả của khoản vay - bao gồm tổng tiền đã trả, thu hồi, phí trễ hạn.                               |
| **credit_profile**        | None | `id`, `member_id`, `delinq_2yrs`, `earliest_cr_line`, `inq_last_6mths`, `mths_since_last_delinq`, `mths_since_last_record`, `open_acc`, `pub_rec`, `revol_bal`, `revol_util`, `total_acc`, `collections_12_mths_ex_med`, `mths_since_last_major_derog`                            | Lưu hồ sơ tín dụng của người vay - lịch sử trễ hạn, số tài khoản mở, mức sử dụng tín dụng.                              |
| **credit_account_detail** | None               | `id`, `member_id`, `open_acc_6m`, `open_il_6m`, `open_il_12m`, `open_il_24m`, `mths_since_rcnt_il`, `total_bal_il`, `il_util`, `open_rv_12m`, `open_rv_24m`, `max_bal_bc`, `all_util`, `total_rev_hi_lim`, `tot_coll_amt`, `tot_cur_bal`, `inq_fi`, `total_cu_tl`, `inq_last_12m` | Thông tin chi tiết hơn về tài khoản tín dụng và sử dụng tín dụng - hỗ trợ phân tích chi tiết về rủi ro tín dụng.        |
| **joint_application**     | None               | `id`, `annual_inc_joint`, `dti_joint`, `verification_status_joint`, `acc_now_delinq`                                                                                                                                                                                              | Dành cho hồ sơ vay chung (joint) - thông tin thu nhập kết hợp, tỷ lệ nợ/thu nhập kết hợp, hiện trạng tài khoản trễ hạn. |
| **meta_reference**        | None               | `id`, `url`, `desc`                                                                                                                                                                                                                                                               | Thông tin phụ trợ - liên kết tới chi tiết khoản vay và mô tả (description) của người vay.                               |


In [5]:
tables = {
    "borrower_profile": {
        "cols": [
            "member_id","emp_title","emp_length","home_ownership","annual_inc",
            "verification_status","addr_state","zip_code"
        ],
        "pk": "member_id"
    },
    "loan_info": {
        "cols": [
            "id","member_id","loan_amnt","funded_amnt","funded_amnt_inv",
            "term","int_rate","installment","grade","sub_grade",
            "purpose","title","issue_d","policy_code","application_type"
        ],
        "pk": "id"
    },
    "loan_status_history": {
        "cols": [
            "id","loan_status","pymnt_plan","initial_list_status",
            "last_pymnt_d","next_pymnt_d","last_pymnt_amnt",
            "out_prncp","out_prncp_inv"
        ],
        "pk": "id"
    },
    "repayment_summary": {
        "cols": [
            "id","total_pymnt","total_pymnt_inv","total_rec_prncp",
            "total_rec_int","total_rec_late_fee","recoveries",
            "collection_recovery_fee"
        ],
        "pk": "id"
    },
    "credit_profile": {
        "cols": [
            "id","member_id","delinq_2yrs","earliest_cr_line","inq_last_6mths",
            "mths_since_last_delinq","mths_since_last_record","open_acc",
            "pub_rec","revol_bal","revol_util","total_acc",
            "collections_12_mths_ex_med","mths_since_last_major_derog"
        ],
        "pk": "id"
    },
    "credit_account_detail": {
        "cols": [
            "id","member_id","open_acc_6m","open_il_6m","open_il_12m","open_il_24m",
            "mths_since_rcnt_il","total_bal_il","il_util","open_rv_12m","open_rv_24m",
            "max_bal_bc","all_util","total_rev_hi_lim","tot_coll_amt","tot_cur_bal",
            "inq_fi","total_cu_tl","inq_last_12m"
        ],
        "pk": "id"
    },
    "joint_application": {
        "cols": [
            "id","annual_inc_joint","dti_joint","verification_status_joint","acc_now_delinq"
        ],
        "pk": "id"
    },
    "meta_reference": {
        "cols": ["id","url","desc"],
        "pk": "id"
    }
}

In [6]:
for tbl_name, meta in tables.items():
    cols = [c for c in meta["cols"] if c in df.columns]
    if not cols:
        print(f"Bỏ qua bảng '{tbl_name}' (không có cột nào hợp lệ).")
        continue

    subset_df = df.select(*cols)

    pk = meta.get("pk")
    if pk and pk in subset_df.columns:
        before = subset_df.count()
        subset_df = subset_df.dropDuplicates([pk])
        after = subset_df.count()
        if before > after:
            print(f"Bảng '{tbl_name}': xóa {before - after} dòng trùng trên khóa '{pk}'.")

    out_path = f"{OUT_DIR}/{tbl_name}.csv"
    subset_df.coalesce(1).write.mode("overwrite").option("header", True).csv(out_path)
    print(f"Đã lưu bảng '{tbl_name}' vào: {out_path}")

Đã lưu bảng 'borrower_profile' vào: ../data/splitted_data/borrower_profile.csv
Đã lưu bảng 'loan_info' vào: ../data/splitted_data/loan_info.csv
Đã lưu bảng 'loan_status_history' vào: ../data/splitted_data/loan_status_history.csv
Đã lưu bảng 'repayment_summary' vào: ../data/splitted_data/repayment_summary.csv
Đã lưu bảng 'credit_profile' vào: ../data/splitted_data/credit_profile.csv
Đã lưu bảng 'credit_account_detail' vào: ../data/splitted_data/credit_account_detail.csv
Đã lưu bảng 'joint_application' vào: ../data/splitted_data/joint_application.csv
Đã lưu bảng 'meta_reference' vào: ../data/splitted_data/meta_reference.csv
