## 数据预处理部分

### 1. 读取数据

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("LoanDefaultPrediction").getOrCreate()

data = spark.read.csv('file:///home/Lsc/application_data.csv', header=True, inferSchema=True)

### 2. 处理类别特征

将 WEEKDAY_APPR_PROCESS_START 转换为数值

In [2]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="WEEKDAY_APPR_PROCESS_START", outputCol="WEEKDAY_INDEX")

data = indexer.fit(data).transform(data)

### 3. SMOTE平衡数据

In [None]:
from imblearn.over_sampling import SMOTE
import pandas as pd

# 定义需要标准化的特征列
featureCols = ["CNT_CHILDREN", "REGION_POPULATION_RELATIVE", "FLAG_EMP_PHONE",
               "FLAG_WORK_PHONE", "FLAG_CONT_MOBILE", "FLAG_PHONE", "FLAG_EMAIL",
               "REGION_RATING_CLIENT", "REGION_RATING_CLIENT_W_CITY", "REG_REGION_NOT_WORK_REGION",
               "LIVE_REGION_NOT_WORK_REGION", "REG_CITY_NOT_LIVE_CITY", "REG_CITY_NOT_WORK_CITY",
               "LIVE_CITY_NOT_WORK_CITY", "HOUR_APPR_PROCESS_START", "WEEKDAY_INDEX"]

scaled_columns = [column for column in featureCols]+ ["TARGET"]
data = data.select(scaled_columns)
pdf = data.toPandas()

# 分离特征和标签
X = pdf[[c for c in featureCols]]  # 特征列
y = pdf['TARGET']  # 目标列

# 应用SMOTE
smote = SMOTE(random_state=42)
X_res, y_res = smote.fit_resample(X, y)

# 将处理后的数据转换回Spark DataFrame
res_df = pd.concat([pd.DataFrame(X_res, columns=[c for c in featureCols]), pd.DataFrame(y_res, columns=['TARGET'])], axis=1)
data = spark.createDataFrame(res_df)

### 4. 标准归一化特征

使用 StandardScaler 对特征进行标准化

In [3]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# 定义需要标准化的特征列
featureCols = ["CNT_CHILDREN", "REGION_POPULATION_RELATIVE", "FLAG_EMP_PHONE",
               "FLAG_WORK_PHONE", "FLAG_CONT_MOBILE", "FLAG_PHONE", "FLAG_EMAIL",
               "REGION_RATING_CLIENT", "REGION_RATING_CLIENT_W_CITY", "REG_REGION_NOT_WORK_REGION",
               "LIVE_REGION_NOT_WORK_REGION", "REG_CITY_NOT_LIVE_CITY", "REG_CITY_NOT_WORK_CITY",
               "LIVE_CITY_NOT_WORK_CITY", "HOUR_APPR_PROCESS_START", "WEEKDAY_INDEX"]

# 首先，使用 VectorAssembler 将每个列转换为向量
assemblers = [VectorAssembler(inputCols=[column], outputCol=column + "_vec") for column in featureCols]

# 然后，为每个向量列创建一个 StandardScaler 实例
scalers = [StandardScaler(inputCol=column + "_vec", outputCol=column + "_scaled", withStd=True, withMean=False) for column in featureCols]

# 使用 Pipeline 来串联这些操作
pipeline = Pipeline(stages=[*assemblers, *scalers])

# 应用 Pipeline
scalerModel = pipeline.fit(data)
data = scalerModel.transform(data)

# 现在，每个特征都有一个对应的标准化后的列，例如 'CNT_CHILDREN_scaled' 等


In [4]:
# 创建一个包含归一化列名的列表
scaled_columns = [column + "_scaled" for column in featureCols]+ ["TARGET"]

# 从原始 DataFrame 中选择这些标准化列
scaled_data = data.select(scaled_columns)

# 显示归一化后的数据
scaled_data.show()

+--------------------+---------------------------------+---------------------+----------------------+-----------------------+--------------------+-----------------+---------------------------+----------------------------------+---------------------------------+----------------------------------+-----------------------------+-----------------------------+------------------------------+------------------------------+--------------------+------+
| CNT_CHILDREN_scaled|REGION_POPULATION_RELATIVE_scaled|FLAG_EMP_PHONE_scaled|FLAG_WORK_PHONE_scaled|FLAG_CONT_MOBILE_scaled|   FLAG_PHONE_scaled|FLAG_EMAIL_scaled|REGION_RATING_CLIENT_scaled|REGION_RATING_CLIENT_W_CITY_scaled|REG_REGION_NOT_WORK_REGION_scaled|LIVE_REGION_NOT_WORK_REGION_scaled|REG_CITY_NOT_LIVE_CITY_scaled|REG_CITY_NOT_WORK_CITY_scaled|LIVE_CITY_NOT_WORK_CITY_scaled|HOUR_APPR_PROCESS_START_scaled|WEEKDAY_INDEX_scaled|TARGET|
+--------------------+---------------------------------+---------------------+----------------------+-----

### 5. 移除潜在异常数据

滤掉超出平均值±3倍标准差的值

In [5]:
from pyspark.sql.functions import col, avg, stddev, lit
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import DoubleType

# 定义一个UDF，从向量中提取第一个元素
def vector_to_double(vec):
    return float(vec[0])

vector_to_double_udf = udf(vector_to_double, DoubleType())

# 首先将向量列转换为数值列
for feature in featureCols:
    scaled_data = scaled_data.withColumn(feature + "_value", vector_to_double_udf(col(feature + "_scaled")))

# 然后计算每个数值列的平均值和标准差
stats = scaled_data.select([avg(c + "_value").alias(c + '_avg') for c in featureCols] +
                   [stddev(c + "_value").alias(c + '_stddev') for c in featureCols]).collect()[0]

# 为每个特征添加极端值的标记列
for feature in featureCols:
    mean_value = stats[feature + '_avg']
    stddev_value = stats[feature + '_stddev']
    # 使用提取的数值进行比较
    scaled_data = scaled_data.withColumn(feature + '_extreme', 
                           (col(feature + "_value") < (mean_value - 3 * stddev_value)) | 
                           (col(feature + "_value") > (mean_value + 3 * stddev_value)))

# 创建一个列来标记任何极端值
extreme_flags = [col(f + '_extreme') for f in featureCols]
scaled_data = scaled_data.withColumn('any_extreme', lit(False))
for flag in extreme_flags:
    scaled_data = scaled_data.withColumn('any_extreme', col('any_extreme') | flag)

# 过滤掉含有任何极端值的行
scaled_data = scaled_data.filter(~col('any_extreme'))
#scaled_data.show()

In [6]:
for flag in extreme_flags:
    scaled_data.select(flag).show()

+--------------------+
|CNT_CHILDREN_extreme|
+--------------------+
|               false|
|               false|
|               false|
|               false|
|               false|
|               false|
|               false|
|               false|
|               false|
|               false|
|               false|
|               false|
|               false|
|               false|
|               false|
|               false|
|               false|
|               false|
|               false|
|               false|
+--------------------+
only showing top 20 rows

+----------------------------------+
|REGION_POPULATION_RELATIVE_extreme|
+----------------------------------+
|                             false|
|                             false|
|                             false|
|                             false|
|                             false|
|                             false|
|                             false|
|                             false|
|              

+------------------------------+
|REG_CITY_NOT_WORK_CITY_extreme|
+------------------------------+
|                         false|
|                         false|
|                         false|
|                         false|
|                         false|
|                         false|
|                         false|
|                         false|
|                         false|
|                         false|
|                         false|
|                         false|
|                         false|
|                         false|
|                         false|
|                         false|
|                         false|
|                         false|
|                         false|
|                         false|
+------------------------------+
only showing top 20 rows

+-------------------------------+
|LIVE_CITY_NOT_WORK_CITY_extreme|
+-------------------------------+
|                          false|
|                          false|
|           

In [7]:
# 删除辅助列
for feature in featureCols:
    scaled_data = scaled_data.drop(feature + '_extreme')
scaled_data = scaled_data.drop('any_extreme')

# 划分数据集前选择这些列
scaled_data = scaled_data.select([column + "_scaled" for column in featureCols] + ["TARGET"])
scaled_data.show()

+--------------------+---------------------------------+---------------------+----------------------+-----------------------+--------------------+-----------------+---------------------------+----------------------------------+---------------------------------+----------------------------------+-----------------------------+-----------------------------+------------------------------+------------------------------+--------------------+------+
| CNT_CHILDREN_scaled|REGION_POPULATION_RELATIVE_scaled|FLAG_EMP_PHONE_scaled|FLAG_WORK_PHONE_scaled|FLAG_CONT_MOBILE_scaled|   FLAG_PHONE_scaled|FLAG_EMAIL_scaled|REGION_RATING_CLIENT_scaled|REGION_RATING_CLIENT_W_CITY_scaled|REG_REGION_NOT_WORK_REGION_scaled|LIVE_REGION_NOT_WORK_REGION_scaled|REG_CITY_NOT_LIVE_CITY_scaled|REG_CITY_NOT_WORK_CITY_scaled|LIVE_CITY_NOT_WORK_CITY_scaled|HOUR_APPR_PROCESS_START_scaled|WEEKDAY_INDEX_scaled|TARGET|
+--------------------+---------------------------------+---------------------+----------------------+-----

### 6. 划分数据集

按照 8:2 的比例随机划分训练集和测试集

In [8]:
# 划分数据集
train, test = scaled_data.randomSplit([0.8, 0.2])

### 7. 存储数据

In [9]:
# 对于每个归一化后的特征列，使用UDF转换为数值列
for feature in featureCols:
    train = train.withColumn(feature + "_value", vector_to_double_udf(col(feature + "_scaled")))
    test = test.withColumn(feature + "_value", vector_to_double_udf(col(feature + "_scaled")))

# 选择要保存的归一化特征列和目标列
columnsToSave = [column + "_value" for column in featureCols] + ["TARGET"]

# 选择这些列并保存为 CSV
train.select(columnsToSave).coalesce(1).write.csv('file:///home/Lsc/train_data', header=True)
test.select(columnsToSave).coalesce(1).write.csv('file:///home/Lsc/test_data', header=True)

spark.stop()

In [10]:
for feature in featureCols:
    print(feature)

CNT_CHILDREN
REGION_POPULATION_RELATIVE
FLAG_EMP_PHONE
FLAG_WORK_PHONE
FLAG_CONT_MOBILE
FLAG_PHONE
FLAG_EMAIL
REGION_RATING_CLIENT
REGION_RATING_CLIENT_W_CITY
REG_REGION_NOT_WORK_REGION
LIVE_REGION_NOT_WORK_REGION
REG_CITY_NOT_LIVE_CITY
REG_CITY_NOT_WORK_CITY
LIVE_CITY_NOT_WORK_CITY
HOUR_APPR_PROCESS_START
WEEKDAY_INDEX
