In [11]:
import pandas as pd
import os
import shutil
import numpy as np
from sklearn.preprocessing import LabelEncoder
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan, trim, mean, expr, stddev, sum as spark_sum
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Imputer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql import functions as F
import language_tool_python
tool = language_tool_python.LanguageTool('en-US')
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType
import csv

# Tạo thư mục temp trước khi khởi tạo Spark
temp_dir = "C:/temp/spark-temp"
warehouse_dir = "C:/temp/spark-warehouse"
if not os.path.exists(temp_dir):
	os.makedirs(temp_dir, exist_ok=True)
if not os.path.exists(warehouse_dir):
	os.makedirs(warehouse_dir, exist_ok=True)

# Stop any existing Spark session
if 'spark' in globals():
	spark.stop()

# Khởi tạo SparkSession với cấu hình tối ưu
spark = SparkSession.builder \
	.appName("HomeCreditCleaning") \
	.master("local[*]") \
	.config("spark.sql.adaptive.enabled", "true") \
	.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
	.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
	.config("spark.local.dir", temp_dir) \
	.config("spark.sql.warehouse.dir", warehouse_dir) \
	.config("spark.driver.extraJavaOptions", f"-Djava.io.tmpdir={temp_dir}") \
	.config("spark.executor.extraJavaOptions", f"-Djava.io.tmpdir={temp_dir}") \
	.config("spark.sql.shuffle.partitions", "200") \
	.config("spark.driver.memory", "4g") \
	.config("spark.executor.memory", "4g") \
	.config("spark.driver.maxResultSize", "2g") \
	.getOrCreate()

# Set Spark log level to reduce verbosity
spark.sparkContext.setLogLevel("WARN")

# Đọc dữ liệu, tự động suy luận schema và sử dụng dòng đầu tiên làm header
file_path = r"C:\Users\DatGo\OneDrive\Documents\Personal_Project\Home_Credit_Default_Risk\application_train.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

df.show(5)

+----------+------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+---------------+----------------+--------------------+--------------------+-----------------+--------------------------+----------+-------------+-----------------+---------------+-----------+----------+--------------+---------------+----------------+----------+----------+---------------+---------------+--------------------+---------------------------+--------------------------+-----------------------+--------------------------+--------------------------+---------------------------+----------------------+----------------------+-----------------------+--------------------+-------------------+------------------+-------------------+--------------+----------------+---------------------------+------------------+--------------+-------------+-------------+-------------+-------------+------------+--------------------+--------------+-----------------

In [2]:
# In ra schema của DataFrame
print("Kiểu dữ liệu của các cột:")
df.printSchema()

Kiểu dữ liệu của các cột:
root
 |-- SK_ID_CURR: integer (nullable = true)
 |-- TARGET: integer (nullable = true)
 |-- NAME_CONTRACT_TYPE: string (nullable = true)
 |-- CODE_GENDER: string (nullable = true)
 |-- FLAG_OWN_CAR: string (nullable = true)
 |-- FLAG_OWN_REALTY: string (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: double (nullable = true)
 |-- AMT_CREDIT: double (nullable = true)
 |-- AMT_ANNUITY: double (nullable = true)
 |-- AMT_GOODS_PRICE: double (nullable = true)
 |-- NAME_TYPE_SUITE: string (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- NAME_FAMILY_STATUS: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- REGION_POPULATION_RELATIVE: double (nullable = true)
 |-- DAYS_BIRTH: integer (nullable = true)
 |-- DAYS_EMPLOYED: integer (nullable = true)
 |-- DAYS_REGISTRATION: double (nullable = true)
 |-- DAYS_ID_PUBLISH: integer (nullab

In [3]:
# Hiển thị thống kê mô tả cho các cột số
print("Thống kê mô tả dữ liệu:")
df.describe().show()

Thống kê mô tả dữ liệu:
+-------+------------------+-------------------+------------------+-----------+------------+---------------+------------------+------------------+-----------------+------------------+-----------------+---------------+----------------+--------------------+------------------+-----------------+--------------------------+-------------------+------------------+------------------+-------------------+------------------+--------------------+------------------+------------------+-------------------+-------------------+-------------------+--------------------+------------------+--------------------+---------------------------+--------------------------+-----------------------+--------------------------+--------------------------+---------------------------+----------------------+----------------------+-----------------------+-----------------+--------------------+--------------------+--------------------+-------------------+-------------------+---------------------------+

In [None]:
# Xử lý giá trị 365243 trong cột DAYS_EMPLOYED (đại diện cho người không có việc)
df = df.withColumn('DAYS_EMPLOYED', when(col('DAYS_EMPLOYED') == 365243, None).otherwise(col('DAYS_EMPLOYED')))
print("\nĐã xử lý giá trị bất thường trong cột 'DAYS_EMPLOYED'.")
# Các cột số cần xử lý ngoại lai
numeric_cols_for_outliers = ['AMT_INCOME_TOTAL', 'AMT_CREDIT', 'AMT_ANNUITY', 'DAYS_BIRTH']
print("\nXử lý giá trị ngoại lai...")
for c in numeric_cols_for_outliers:
    # Tính phân vị 1% và 99%
    # 0.001 là sai số tương đối cho phép để tăng tốc độ tính toán
    quantiles = df.approxQuantile(c, [0.01, 0.99], 0.001)
    q1 = quantiles[0]
    q99 = quantiles[1]

    if q1 is not None and q99 is not None:
        print(f"Cột '{c}': Giới hạn dưới = {q1}, Giới hạn trên = {q99}")
        # Áp dụng clipping
        df = df.withColumn(c,
            when(col(c) < q1, q1)
            .when(col(c) > q99, q99)
            .otherwise(col(c))
        )

print("Đã xử lý xong giá trị ngoại lai.")


Đã xử lý giá trị bất thường trong cột 'DAYS_EMPLOYED'.


In [114]:
# Kiểm tra giá trị null và NaN cho mỗi cột
missing_counts = df.select([
    count(when(isnan(c) | col(c).isNull(), c)).alias(c)
    for c in df.columns
])

print("Số lượng giá trị thiếu trong mỗi cột:")
missing_counts.show()

Số lượng giá trị thiếu trong mỗi cột:
+----------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+---------------+----------------+-------------------+------------------+-----------------+--------------------------+----------+-------------+-----------------+---------------+-----------+----------+--------------+---------------+----------------+----------+----------+---------------+---------------+--------------------+---------------------------+--------------------------+-----------------------+--------------------------+--------------------------+---------------------------+----------------------+----------------------+-----------------------+-----------------+------------+------------+------------+--------------+----------------+---------------------------+---------------+--------------+-------------+-------------+-------------+-------------+------------+--------------------+--------------+---------------

In [115]:
# Xóa các cột có hơn 40% giá trị bị thiếu
total_count = df.count()
missing_counts_row = missing_counts.collect()[0].asDict()
missing_percentage = {col: missing_counts_row[col] / total_count for col in df.columns}
cols_to_drop = [col for col, pct in missing_percentage.items() if pct > 0.4]
df = df.drop(*cols_to_drop)
print(f"Đã xóa {len(cols_to_drop)} cột có tỷ lệ thiếu > 40%.")
print(f"Số cột còn lại sau khi xóa: {len(df.columns)}")

# Lấy danh sách các cột số và cột phân loại còn lại
numeric_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, (IntegerType, DoubleType, FloatType))]
categorical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType)]

# Điền giá trị thiếu cho các cột số bằng median
imputer_numeric = Imputer(
    inputCols=[c for c in numeric_cols if c != 'SK_ID_CURR'],
    outputCols=[f"{c}_imputed" for c in numeric_cols if c != 'SK_ID_CURR']
).setStrategy("median")
print("Đã điền giá trị thiếu cho các cột số bằng giá trị trung vị.")

# Điền giá trị thiếu cho các cột phân loại bằng MODE
pipeline_impute = Pipeline(stages=[imputer_numeric])
df = pipeline_impute.fit(df).transform(df)

for c in categorical_cols:
    mode_val = df.groupBy(c).count().orderBy(F.desc("count")).first()[0]
    df = df.withColumn(
        f"{c}_imputed",
        F.when(F.col(c).isNull(), mode_val).otherwise(F.col(c))
    )
print("Đã điền giá trị thiếu cho các cột phân loại bằng giá trị mode.")

original_cols_to_drop = [c for c in numeric_cols if c != 'SK_ID_CURR'] + categorical_cols
df = df.drop(*original_cols_to_drop)

for c in df.columns:
    if c.endswith("_imputed"):
        df = df.withColumnRenamed(c, c.replace("_imputed", ""))
print("Đã điền giá trị thiếu cho các cột còn lại.")

Đã xóa 49 cột có tỷ lệ thiếu > 40%.
Số cột còn lại sau khi xóa: 72
Đã điền giá trị thiếu cho các cột số bằng giá trị trung vị.
Đã điền giá trị thiếu cho các cột phân loại bằng giá trị mode.
Đã điền giá trị thiếu cho các cột còn lại.


In [116]:
# Xử lý biến phân loại với StringIndexer và OneHotEncoder
# Lấy lại danh sách các cột chuỗi sau khi đã impute
categorical_cols = [f.name for f in df.schema.fields if str(f.dataType) == 'StringType']

indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_index", handleInvalid="keep") for c in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{c}_index", outputCol=f"{c}_vec") for c in categorical_cols]

pipeline_encode = Pipeline(stages=indexers + encoders)
df_encoded = pipeline_encode.fit(df).transform(df)

# Xóa các cột chuỗi gốc và các cột index trung gian
df_encoded = df_encoded.drop(*categorical_cols)
df_encoded = df_encoded.drop(*[f"{c}_index" for c in categorical_cols])

print(f"Đã áp dụng StringIndexer và OneHotEncoder.")
print(f"Kích thước cuối cùng của dữ liệu đã xử lý: ({df_encoded.count()}, {len(df_encoded.columns)})")

print("\nHoàn tất! Dưới đây là schema của DataFrame đã sẵn sàng cho mô hình:")
df_encoded.printSchema()


Đã áp dụng StringIndexer và OneHotEncoder.
Kích thước cuối cùng của dữ liệu đã xử lý: (48744, 72)

Hoàn tất! Dưới đây là schema của DataFrame đã sẵn sàng cho mô hình:
root
 |-- SK_ID_CURR: integer (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: double (nullable = true)
 |-- AMT_CREDIT: double (nullable = true)
 |-- AMT_ANNUITY: double (nullable = true)
 |-- AMT_GOODS_PRICE: double (nullable = true)
 |-- REGION_POPULATION_RELATIVE: double (nullable = true)
 |-- DAYS_BIRTH: integer (nullable = true)
 |-- DAYS_EMPLOYED: integer (nullable = true)
 |-- DAYS_REGISTRATION: double (nullable = true)
 |-- DAYS_ID_PUBLISH: integer (nullable = true)
 |-- FLAG_MOBIL: integer (nullable = true)
 |-- FLAG_EMP_PHONE: integer (nullable = true)
 |-- FLAG_WORK_PHONE: integer (nullable = true)
 |-- FLAG_CONT_MOBILE: integer (nullable = true)
 |-- FLAG_PHONE: integer (nullable = true)
 |-- FLAG_EMAIL: integer (nullable = true)
 |-- CNT_FAM_MEMBERS: double (nullable = tr

In [117]:
# Xác định các cột dạng chuỗi cần xử lý
# Lấy tất cả các cột có kiểu dữ liệu là 'string'
string_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType)]

print(f"Các cột dạng chuỗi sẽ được xóa khoảng trắng: {string_columns[:5]}...") # In ra 5 cột đầu

# Dùng vòng lặp và withColumn để xóa khoảng trắng
for c in string_columns:
    df = df.withColumn(c, trim(col(c)))

print("Đã xóa khoảng trắng thừa ở các cột dạng chuỗi.")

Các cột dạng chuỗi sẽ được xóa khoảng trắng: ['NAME_CONTRACT_TYPE', 'CODE_GENDER', 'FLAG_OWN_CAR', 'FLAG_OWN_REALTY', 'NAME_TYPE_SUITE']...
Đã xóa khoảng trắng thừa ở các cột dạng chuỗi.


In [119]:
# Lấy danh sách các cột số để chuẩn hóa (trừ cột ID và TARGET)
numeric_cols_for_scaling = [
    f.name for f in df.schema.fields if isinstance(f.dataType, (IntegerType, DoubleType, FloatType))
    and f.name not in ['SK_ID_CURR', 'TARGET']
]

print(f"\nChuẩn bị chuẩn hóa {len(numeric_cols_for_scaling)} cột số...")

# 1. VectorAssembler: Gộp các cột số thành một cột vector duy nhất
assembler = VectorAssembler(inputCols=numeric_cols_for_scaling, outputCol="features", handleInvalid="skip")
df_assembled = assembler.transform(df)

# 2. StandardScaler: Tính toán mean/std và áp dụng chuẩn hóa
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)

# Fit scaler vào dữ liệu để học các tham số
scaler_model = scaler.fit(df_assembled)

# Transform dữ liệu
df_scaled = scaler_model.transform(df_assembled)

print("Đã chuẩn hóa dữ liệu thành công. Cột mới 'scaledFeatures' đã được thêm vào.")
df_scaled.select("features", "scaledFeatures").show(5, truncate=False)


Chuẩn bị chuẩn hóa 59 cột số...
Đã chuẩn hóa dữ liệu thành công. Cột mới 'scaledFeatures' đã được thêm vào.
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [120]:
output_csv_path = "C:\\Users\\DatGo\\OneDrive\\Documents\\Personal_Project\\Home_Credit_Default_Risk\\cleaned_application_test_csv"
print(f"\nBắt đầu lưu DataFrame đã xử lý vào thư mục: {output_csv_path}")

# Xóa thư mục nếu đã tồn tại để tránh lỗi ghi đè
if os.path.exists(output_csv_path):
	shutil.rmtree(output_csv_path)

df_encoded.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_csv_path)

print(f"Hoàn tất! Đã lưu thành công vào thư mục '{output_csv_path}'.")
print("Bạn sẽ tìm thấy một tệp CSV duy nhất bên trong thư mục đó.")


Bắt đầu lưu DataFrame đã xử lý vào thư mục: C:\Users\DatGo\OneDrive\Documents\Personal_Project\Home_Credit_Default_Risk\cleaned_application_test_csv
Hoàn tất! Đã lưu thành công vào thư mục 'C:\Users\DatGo\OneDrive\Documents\Personal_Project\Home_Credit_Default_Risk\cleaned_application_test_csv'.
Bạn sẽ tìm thấy một tệp CSV duy nhất bên trong thư mục đó.
