In [1]:
from pyspark.sql import SparkSession,Row
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [2]:
spark=(
    SparkSession.builder.master("yarn").appName("Pyspark with XGBoot")
    .config("spark.driver.cores","4")
    .config("spark.driver.memory","4g")
    .config("spark.executor.memory","4g")
    .config("spark.executor.cores","4")
    .getOrCreate()
)

24/01/30 20:21:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/01/30 20:21:45 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [3]:
# import data
data=spark.read.csv(
    "/pyspark_xgboot/data.csv",inferSchema=True,header=True,encoding="gbk"
)
print(data.count(),len(data.columns))

                                                                                

16000 82


In [4]:
# 观察特征的类型
data.printSchema()

root
 |-- 个人编码: double (nullable = true)
 |-- 一天去两家医院的天数: integer (nullable = true)
 |-- 就诊的月数: integer (nullable = true)
 |-- 月就诊天数_MAX: integer (nullable = true)
 |-- 月就诊天数_AVG: double (nullable = true)
 |-- 月就诊医院数_MAX: integer (nullable = true)
 |-- 月就诊医院数_AVG: double (nullable = true)
 |-- 就诊次数_SUM: integer (nullable = true)
 |-- 月就诊次数_MAX: integer (nullable = true)
 |-- 月就诊次数_AVG: double (nullable = true)
 |-- 月统筹金额_MAX: double (nullable = true)
 |-- 月统筹金额_AVG: double (nullable = true)
 |-- 月药品金额_MAX: double (nullable = true)
 |-- 月药品金额_AVG: double (nullable = true)
 |-- 医院_就诊天数_MAX: integer (nullable = true)
 |-- 医院_就诊天数_AVG: double (nullable = true)
 |-- 医院_统筹金_MAX: double (nullable = true)
 |-- 医院_统筹金_AVG: double (nullable = true)
 |-- 医院_药品_MAX: double (nullable = true)
 |-- 医院_药品_AVG: double (nullable = true)
 |-- 医院编码_NN: integer (nullable = true)
 |-- 顺序号_NN: integer (nullable = true)
 |-- 交易时间DD_NN: integer (nullable = true)
 |-- 交易时间YYYY_NN: integer (nullable = true)
 |--

In [5]:
# 定义一个函数来检查列名是否以 "NN" 结尾
def is_not_nn_column(col_name):
    return not col_name.endswith("NN")
# 选择非 “NN” 结尾的数据
data = data.select(*(F.col(c) for c in data.columns if is_not_nn_column(c)))

In [6]:
print(len(data.columns)) # 6 个特征被删除

76


In [7]:
import pandas as pd
pd.set_option("display.max_rows",1000)

In [8]:
# 将特征中的二元特征提取出来
is_binary = data.agg(
    *[
        (F.size(F.collect_set(x)) == 2).alias(x)
        for x in data.columns
    ]
).toPandas()
is_binary.unstack()

24/01/30 20:22:14 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

个人编码               0    False
一天去两家医院的天数         0    False
就诊的月数              0    False
月就诊天数_MAX          0    False
月就诊天数_AVG          0    False
月就诊医院数_MAX         0    False
月就诊医院数_AVG         0    False
就诊次数_SUM           0    False
月就诊次数_MAX          0    False
月就诊次数_AVG          0    False
月统筹金额_MAX          0    False
月统筹金额_AVG          0    False
月药品金额_MAX          0    False
月药品金额_AVG          0    False
医院_就诊天数_MAX        0    False
医院_就诊天数_AVG        0    False
医院_统筹金_MAX         0    False
医院_统筹金_AVG         0    False
医院_药品_MAX          0    False
医院_药品_AVG          0    False
住院天数_SUM           0    False
个人账户金额_SUM         0    False
统筹支付金额_SUM         0    False
ALL_SUM            0    False
可用账户报销金额_SUM       0    False
药品费发生金额_SUM        0    False
药品费自费金额_SUM        0    False
药品费申报金额_SUM        0    False
贵重药品发生金额_SUM       0    False
中成药费发生金额_SUM       0    False
中草药费发生金额_SUM       0    False
检查费发生金额_SUM        0    False
检查费自费金额_SUM        0    False
检查费申报金额_SU

In [9]:
# 创建4个顶级变量
identifiers="个人编码"
target_column= "RES"
binary_columns=[
    "BZ_民政救助",
    "BZ_城乡优抚",
    "是否挂号"
]
continuous_columns=[
    x
    for x in data.columns
    if x not in binary_columns
    and x not in target_column
    and x not in identifiers
]

In [10]:
def count_null_values(df):
    null_counts = df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns])
    return null_counts

# 查找DataFrame data 中每一列的缺失值数量
null_values = count_null_values(data)
null_values.show()

+--------+--------------------+----------+--------------+--------------+----------------+----------------+------------+--------------+--------------+--------------+--------------+--------------+--------------+-----------------+-----------------+---------------+---------------+-------------+-------------+------------+----------------+----------------+-------+--------------------+------------------+------------------+------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+--------------------+----------------------+--------------------+----------------+----------------+--------------------------+------------------+----------------------------+------------------------------+--------------------+------------------------

In [11]:
# 获取存在null的特征列名
not_zero_columns = []
for col_name,col_val in zip(null_values.columns,null_values.collect()[0]):
    if col_val != 0:
        not_zero_columns.append(col_name)
print(not_zero_columns)



['出院诊断LENTH_MAX']


                                                                                

In [12]:
# 缺失值处理
# 删除只包含 null 的数据
# 并使用 mean 填充缺失值
data=data.dropna(
    how="all",
    subset=[x for x in data.columns if x not in identifiers]
)
data=data.dropna(subset=target_column)
mean_value = data.select(F.mean(*not_zero_columns)).collect()[0][0]
data=data.fillna(mean_value,subset=not_zero_columns)
print(data.count(),len(data.columns))

16000 76


In [13]:
# 观察是否存在null
null_values_fill = count_null_values(data)
null_values_fill.show()

+--------+--------------------+----------+--------------+--------------+----------------+----------------+------------+--------------+--------------+--------------+--------------+--------------+--------------+-----------------+-----------------+---------------+---------------+-------------+-------------+------------+----------------+----------------+-------+--------------------+------------------+------------------+------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+--------------------+----------------------+--------------------+----------------+----------------+--------------------------+------------------+----------------------------+------------------------------+--------------------+------------------------

In [14]:
# 清洗没有用的特征列 只需要清洗continuous_columns
continuous_columns=data.select(
    *[
        x for x in continuous_columns
        if (data.select(F.countDistinct(F.col(x))).collect()[0][0]!=1)
    ]
).columns
print(len(continuous_columns))
print(continuous_columns)

                                                                                

70
['一天去两家医院的天数', '就诊的月数', '月就诊天数_MAX', '月就诊天数_AVG', '月就诊医院数_MAX', '月就诊医院数_AVG', '就诊次数_SUM', '月就诊次数_MAX', '月就诊次数_AVG', '月统筹金额_MAX', '月统筹金额_AVG', '月药品金额_MAX', '月药品金额_AVG', '医院_就诊天数_MAX', '医院_就诊天数_AVG', '医院_统筹金_MAX', '医院_统筹金_AVG', '医院_药品_MAX', '医院_药品_AVG', '个人账户金额_SUM', '统筹支付金额_SUM', 'ALL_SUM', '可用账户报销金额_SUM', '药品费发生金额_SUM', '药品费自费金额_SUM', '药品费申报金额_SUM', '贵重药品发生金额_SUM', '中成药费发生金额_SUM', '中草药费发生金额_SUM', '检查费发生金额_SUM', '检查费自费金额_SUM', '检查费申报金额_SUM', '贵重检查费金额_SUM', '治疗费发生金额_SUM', '治疗费自费金额_SUM', '治疗费申报金额_SUM', '手术费发生金额_SUM', '手术费自费金额_SUM', '手术费申报金额_SUM', '床位费发生金额_SUM', '床位费申报金额_SUM', '医用材料发生金额_SUM', '高价材料发生金额_SUM', '医用材料费自费金额_SUM', '成分输血申报金额_SUM', '其它发生金额_SUM', '其它申报金额_SUM', '一次性医用材料申报金额_SUM', '起付线标准金额_MAX', '起付标准以上自负比例金额_SUM', '医疗救助个人按比例负担金额_SUM', '最高限额以上金额_SUM', '基本统筹基金支付金额_SUM', '公务员医疗补助基金支付金额_SUM', '城乡救助补助金额_SUM', '基本个人账户支付_SUM', '非账户支付金额_SUM', '本次审批金额_SUM', '补助审批金额_SUM', '医疗救助医院申请_SUM', '残疾军人补助_SUM', '民政救助补助_SUM', '城乡优抚补助_SUM', '出院诊断LENTH_MAX', '药品在总金额中的占比', '个人支付的药品占比', '检查总费用在总金额占比', '个

In [15]:
# 进行异常值处理，采用4分位数
for column_name in continuous_columns:
    # 计算第一四分位数和第三四分位数
    q1 = data.select(F.percentile_approx(F.col(column_name), 0.25))
    q3 = data.select(F.percentile_approx(F.col(column_name), 0.75))
    # 计算IQR
    iqr = q3.collect()[0][0] - q1.collect()[0][0]
    # 计算异常值的阈值
    lower_bound = q1.collect()[0][0] - 1.5 * iqr
    upper_bound = q3.collect()[0][0] + 1.5 * iqr
    # 计算均值
    mean_value = data.select(F.mean(column_name)).collect()[0][0]
    data = data.withColumn(column_name, F.when(((F.col(column_name) < lower_bound) | (F.col(column_name) > upper_bound)), mean_value).otherwise(F.col(column_name)))

                                                                                

In [16]:
last_column_name = data.columns[-1]
result = data.agg(
    F.sum(F.when(F.col(last_column_name) == 0, 1).otherwise(0)).alias("zero_count"),
    F.sum(F.when(F.col(last_column_name) == 1, 1).otherwise(0)).alias("one_count")
).collect()
for row in result:
    print(f"Zero Count: {row['zero_count']}, One Count: {row['one_count']}")
# 对于类不平衡问题目前使用xgboost自带的参数解决


Zero Count: 15207, One Count: 793


In [17]:
# 将所有连续特征放在一列，以便后续处理
from pyspark.ml.feature import VectorAssembler
continuous_features=VectorAssembler(
    inputCols=continuous_columns,
    outputCol="continuous_features"
)
vector_data=data.select(continuous_columns)
for x in continuous_columns:
    vector_data=vector_data.where(~F.isnull(F.col(x)))
vector_variable=continuous_features.transform(vector_data)
vector_variable.select("continuous_features").show(5,False)

[Stage 1362:>                                                       (0 + 1) / 1]

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|continuous_features                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           

                                                                                

In [18]:
vector_variable.select("continuous_features").printSchema()

root
 |-- continuous_features: vector (nullable = true)



In [19]:
from pyspark.ml.stat import Correlation
# 获取相关系数矩阵
correlation = Correlation.corr(
    vector_variable,
    "continuous_features",
    method='pearson'
)



In [20]:
correlation.printSchema()

root
 |-- pearson(continuous_features): matrix (nullable = false)



In [21]:
correlation_values = correlation.collect()[0]["pearson(continuous_features)"].values
correlation_values

array([1.        , 0.08159469, 0.29335942, ..., 0.10070634, 0.00437948,
       1.        ])

In [22]:
# 删除高相关特征
def delHighlyCol(corr_values,inputcolumns,threshold=0.9):
    columns=inputcolumns.copy()
    # 提取高相关特征
    highly_corr = {}
    for i in range(len(columns)):
        for j in range(i):
            if abs(corr_values[i*len(columns)+j])>threshold:
                col = columns[i]
                related_col = columns[j]
                if col not in highly_corr:
                    highly_corr[col] = set()
                highly_corr[col].add(related_col)
    # 合并重复的特征并删除高相关特征
    for col,related_cols in list(highly_corr.items()):
        for related_col in list(related_cols):
            if related_col in highly_corr:
                highly_corr[col].update(highly_corr[related_col])
                del highly_corr[related_col]
    for col,related_cols in list(highly_corr.items()):
        for related_col in list(related_cols):
            if related_col in columns:
                columns.remove(related_col)
    return columns

In [23]:
continuous_columnsDelHigh=delHighlyCol(corr_values=correlation_values,inputcolumns=continuous_columns,threshold=0.95)
print(continuous_columnsDelHigh)

['一天去两家医院的天数', '就诊的月数', '月就诊天数_MAX', '月就诊天数_AVG', '月就诊医院数_MAX', '月就诊医院数_AVG', '月就诊次数_MAX', '月就诊次数_AVG', '月统筹金额_MAX', '月药品金额_MAX', '医院_就诊天数_MAX', '医院_就诊天数_AVG', '医院_统筹金_MAX', '医院_统筹金_AVG', '医院_药品_MAX', '医院_药品_AVG', '统筹支付金额_SUM', '可用账户报销金额_SUM', '药品费自费金额_SUM', '药品费申报金额_SUM', '贵重药品发生金额_SUM', '中成药费发生金额_SUM', '中草药费发生金额_SUM', '检查费申报金额_SUM', '贵重检查费金额_SUM', '治疗费申报金额_SUM', '手术费发生金额_SUM', '手术费自费金额_SUM', '手术费申报金额_SUM', '床位费申报金额_SUM', '医用材料发生金额_SUM', '高价材料发生金额_SUM', '医用材料费自费金额_SUM', '成分输血申报金额_SUM', '其它发生金额_SUM', '其它申报金额_SUM', '一次性医用材料申报金额_SUM', '起付线标准金额_MAX', '起付标准以上自负比例金额_SUM', '最高限额以上金额_SUM', '公务员医疗补助基金支付金额_SUM', '基本个人账户支付_SUM', '非账户支付金额_SUM', '本次审批金额_SUM', '医疗救助医院申请_SUM', '残疾军人补助_SUM', '民政救助补助_SUM', '城乡优抚补助_SUM', '出院诊断LENTH_MAX', '药品在总金额中的占比', '个人支付的药品占比', '检查总费用在总金额占比', '个人支付检查费用占比', '治疗费用在总金额占比', '个人支付治疗费用占比']


In [24]:
print(len(continuous_columnsDelHigh))
print(continuous_columnsDelHigh)

55
['一天去两家医院的天数', '就诊的月数', '月就诊天数_MAX', '月就诊天数_AVG', '月就诊医院数_MAX', '月就诊医院数_AVG', '月就诊次数_MAX', '月就诊次数_AVG', '月统筹金额_MAX', '月药品金额_MAX', '医院_就诊天数_MAX', '医院_就诊天数_AVG', '医院_统筹金_MAX', '医院_统筹金_AVG', '医院_药品_MAX', '医院_药品_AVG', '统筹支付金额_SUM', '可用账户报销金额_SUM', '药品费自费金额_SUM', '药品费申报金额_SUM', '贵重药品发生金额_SUM', '中成药费发生金额_SUM', '中草药费发生金额_SUM', '检查费申报金额_SUM', '贵重检查费金额_SUM', '治疗费申报金额_SUM', '手术费发生金额_SUM', '手术费自费金额_SUM', '手术费申报金额_SUM', '床位费申报金额_SUM', '医用材料发生金额_SUM', '高价材料发生金额_SUM', '医用材料费自费金额_SUM', '成分输血申报金额_SUM', '其它发生金额_SUM', '其它申报金额_SUM', '一次性医用材料申报金额_SUM', '起付线标准金额_MAX', '起付标准以上自负比例金额_SUM', '最高限额以上金额_SUM', '公务员医疗补助基金支付金额_SUM', '基本个人账户支付_SUM', '非账户支付金额_SUM', '本次审批金额_SUM', '医疗救助医院申请_SUM', '残疾军人补助_SUM', '民政救助补助_SUM', '城乡优抚补助_SUM', '出院诊断LENTH_MAX', '药品在总金额中的占比', '个人支付的药品占比', '检查总费用在总金额占比', '个人支付检查费用占比', '治疗费用在总金额占比', '个人支付治疗费用占比']


In [25]:
print(len(continuous_columns))
print(continuous_columns)

70
['一天去两家医院的天数', '就诊的月数', '月就诊天数_MAX', '月就诊天数_AVG', '月就诊医院数_MAX', '月就诊医院数_AVG', '就诊次数_SUM', '月就诊次数_MAX', '月就诊次数_AVG', '月统筹金额_MAX', '月统筹金额_AVG', '月药品金额_MAX', '月药品金额_AVG', '医院_就诊天数_MAX', '医院_就诊天数_AVG', '医院_统筹金_MAX', '医院_统筹金_AVG', '医院_药品_MAX', '医院_药品_AVG', '个人账户金额_SUM', '统筹支付金额_SUM', 'ALL_SUM', '可用账户报销金额_SUM', '药品费发生金额_SUM', '药品费自费金额_SUM', '药品费申报金额_SUM', '贵重药品发生金额_SUM', '中成药费发生金额_SUM', '中草药费发生金额_SUM', '检查费发生金额_SUM', '检查费自费金额_SUM', '检查费申报金额_SUM', '贵重检查费金额_SUM', '治疗费发生金额_SUM', '治疗费自费金额_SUM', '治疗费申报金额_SUM', '手术费发生金额_SUM', '手术费自费金额_SUM', '手术费申报金额_SUM', '床位费发生金额_SUM', '床位费申报金额_SUM', '医用材料发生金额_SUM', '高价材料发生金额_SUM', '医用材料费自费金额_SUM', '成分输血申报金额_SUM', '其它发生金额_SUM', '其它申报金额_SUM', '一次性医用材料申报金额_SUM', '起付线标准金额_MAX', '起付标准以上自负比例金额_SUM', '医疗救助个人按比例负担金额_SUM', '最高限额以上金额_SUM', '基本统筹基金支付金额_SUM', '公务员医疗补助基金支付金额_SUM', '城乡救助补助金额_SUM', '基本个人账户支付_SUM', '非账户支付金额_SUM', '本次审批金额_SUM', '补助审批金额_SUM', '医疗救助医院申请_SUM', '残疾军人补助_SUM', '民政救助补助_SUM', '城乡优抚补助_SUM', '出院诊断LENTH_MAX', '药品在总金额中的占比', '个人支付的药品占比', '检查总费用在总金额占比', '个

In [26]:
#   XGBoost On PySpark
import random
from pyspark.ml import Pipeline
from xgboost.spark import SparkXGBClassifier

In [27]:
# 构建在xgboost中使用的data
feature_cols=continuous_columnsDelHigh+binary_columns
label_col=target_column
all_cols=[identifiers]+feature_cols+[label_col]
print(all_cols)
df=data.select(
    *all_cols
)
df.show()

['个人编码', '一天去两家医院的天数', '就诊的月数', '月就诊天数_MAX', '月就诊天数_AVG', '月就诊医院数_MAX', '月就诊医院数_AVG', '月就诊次数_MAX', '月就诊次数_AVG', '月统筹金额_MAX', '月药品金额_MAX', '医院_就诊天数_MAX', '医院_就诊天数_AVG', '医院_统筹金_MAX', '医院_统筹金_AVG', '医院_药品_MAX', '医院_药品_AVG', '统筹支付金额_SUM', '可用账户报销金额_SUM', '药品费自费金额_SUM', '药品费申报金额_SUM', '贵重药品发生金额_SUM', '中成药费发生金额_SUM', '中草药费发生金额_SUM', '检查费申报金额_SUM', '贵重检查费金额_SUM', '治疗费申报金额_SUM', '手术费发生金额_SUM', '手术费自费金额_SUM', '手术费申报金额_SUM', '床位费申报金额_SUM', '医用材料发生金额_SUM', '高价材料发生金额_SUM', '医用材料费自费金额_SUM', '成分输血申报金额_SUM', '其它发生金额_SUM', '其它申报金额_SUM', '一次性医用材料申报金额_SUM', '起付线标准金额_MAX', '起付标准以上自负比例金额_SUM', '最高限额以上金额_SUM', '公务员医疗补助基金支付金额_SUM', '基本个人账户支付_SUM', '非账户支付金额_SUM', '本次审批金额_SUM', '医疗救助医院申请_SUM', '残疾军人补助_SUM', '民政救助补助_SUM', '城乡优抚补助_SUM', '出院诊断LENTH_MAX', '药品在总金额中的占比', '个人支付的药品占比', '检查总费用在总金额占比', '个人支付检查费用占比', '治疗费用在总金额占比', '个人支付治疗费用占比', 'BZ_民政救助', 'BZ_城乡优抚', '是否挂号', 'RES']
+---------+--------------------+----------+--------------+--------------+----------------+----------------+--------------+--------------+---

In [28]:
vec_assembler=VectorAssembler(inputCols=feature_cols,outputCol="features")
df_input = vec_assembler.transform(df)
classifier=SparkXGBClassifier(
    features_col="features",
    label_col=label_col,
    num_workers=3,
    max_delta_step = 5,
    silent = 0,
    min_child_weight = 1.3,
    max_depth = 6,
    prediction_col="prediction",
    probability_col="probability"
)

In [29]:
# 构建训练集和测试集
trainDF,testDF = df.randomSplit([0.8,0.2],seed=14)
trainDF_input = vec_assembler.transform(trainDF)
testDF_input = vec_assembler.transform(testDF)

In [31]:
# 进行参数调优
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pyspark.ml.tuning as tune
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
grid = (
    ParamGridBuilder().addGrid(classifier.max_delta_step,[1,2,3,4,5])
                      .addGrid(classifier.min_child_weight,[1.2,1.3,1.4,1.5])
                      .addGrid(classifier.max_depth,[3,4,5,6])
).build()
evaluator_classifier = MulticlassClassificationEvaluator(predictionCol="prediction",
                                                         probabilityCol="probability",
                                                     labelCol=label_col,
                                                     metricName="f1")
cv =CrossValidator(estimator=classifier,
                   evaluator=evaluator_classifier,
                   estimatorParamMaps=grid,
                   numFolds=3)
cvModel_classifier = cv.fit(trainDF_input)
cvPrediction = cvModel_classifier.transform(testDF_input)
evaluator_classifier.evaluate(cvPrediction)

2024-01-30 20:36:29,743 INFO XGBoost-PySpark: _fit Finished xgboost training!   
2024-01-30 20:36:30,824 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 3 workers with
	booster params: {'device': 'cpu', 'max_delta_step': 3, 'max_depth': 4, 'min_child_weight': 1.5, 'objective': 'binary:logistic', 'silent': 0, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-01-30 20:36:33,890 INFO XGBoost-PySpark: _fit Finished xgboost training!   
2024-01-30 20:36:34,844 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 3 workers with
	booster params: {'device': 'cpu', 'max_delta_step': 3, 'max_depth': 5, 'min_child_weight': 1.5, 'objective': 'binary:logistic', 'silent': 0, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-01-30 20:36:35,266 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 3 workers with
	booster 

0.9464498212988991

In [32]:
# 得到最佳参数
results = [
    ([
        {key.name: paramValue} for key, paramValue in zip(params.keys(), params.values())
    ], metric) for params, metric in zip(cvModel_classifier.getEstimatorParamMaps(), cvModel_classifier.avgMetrics)
]
sorted(results, key=lambda el:el[1], reverse=True)[0]

([{'max_delta_step': 5}, {'min_child_weight': 1.3}, {'max_depth': 6}],
 0.9524127740322434)

In [33]:
# 更改参数后执行
model = classifier.fit(trainDF_input)
model.get_booster().feature_names = feature_cols

2024-01-30 21:20:05,849 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 3 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'max_delta_step': 3, 'max_depth': 5, 'min_child_weight': 1.4, 'silent': 0, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-01-30 21:20:18,233 INFO XGBoost-PySpark: _fit Finished xgboost training!   


In [34]:
pre_df = model.transform(testDF_input)
pre_df.show(50,False)

+---------+--------------------+----------+--------------+--------------+----------------+----------------+--------------+--------------+--------------+--------------+-----------------+-----------------+---------------+---------------+-------------+-------------+----------------+--------------------+------------------+------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+--------------------+----------------------+--------------------+----------------+----------------+--------------------------+------------------+----------------------------+--------------------+------------------------------+--------------------+------------------+----------------+--------------------+----------------+-----------------+----------------+-----------------+--------------------+--------------------+----------------------+-----

In [35]:
model.get_feature_importances(importance_type="total_gain")

{'一天去两家医院的天数': 19.627227783203125,
 '就诊的月数': 60.575767517089844,
 '月就诊天数_MAX': 180.33143615722656,
 '月就诊天数_AVG': 302.2658386230469,
 '月就诊医院数_MAX': 103.4936294555664,
 '月就诊医院数_AVG': 86.1935043334961,
 '月就诊次数_MAX': 231.49981689453125,
 '月就诊次数_AVG': 118.66456604003906,
 '月统筹金额_MAX': 1171.976806640625,
 '月药品金额_MAX': 285.06256103515625,
 '医院_就诊天数_MAX': 177.23812866210938,
 '医院_就诊天数_AVG': 133.83120727539062,
 '医院_统筹金_MAX': 84.79666900634766,
 '医院_统筹金_AVG': 150.34107971191406,
 '医院_药品_MAX': 163.17266845703125,
 '医院_药品_AVG': 191.8198699951172,
 '统筹支付金额_SUM': 468.2744140625,
 '可用账户报销金额_SUM': 119.96389770507812,
 '药品费自费金额_SUM': 159.10702514648438,
 '药品费申报金额_SUM': 111.75946807861328,
 '贵重药品发生金额_SUM': 130.14511108398438,
 '中成药费发生金额_SUM': 257.98919677734375,
 '中草药费发生金额_SUM': 186.4278106689453,
 '检查费申报金额_SUM': 158.78269958496094,
 '贵重检查费金额_SUM': 2.9166650772094727,
 '治疗费申报金额_SUM': 273.91180419921875,
 '手术费申报金额_SUM': 11.067102432250977,
 '床位费申报金额_SUM': 3.1002109050750732,
 '医用材料发生金额_SUM': 104.0024871

In [44]:
features_importance = model.get_feature_importances(importance_type="gain")
features_importance = sorted(
    features_importance.items(),
    key=lambda x:x[1],
    reverse=True
)
features_importance = dict(features_importance)

In [45]:
features_importance

{'医疗救助医院申请_SUM': 66.21429443359375,
 '月统筹金额_MAX': 11.95894718170166,
 '统筹支付金额_SUM': 10.89010238647461,
 '本次审批金额_SUM': 9.625624656677246,
 '月就诊医院数_MAX': 8.624468803405762,
 '月就诊次数_MAX': 7.982752323150635,
 '就诊的月数': 6.730640888214111,
 '治疗费申报金额_SUM': 5.370819568634033,
 '月就诊天数_AVG': 4.875255584716797,
 '月就诊天数_MAX': 4.745563983917236,
 '个人支付治疗费用占比': 4.093888282775879,
 '一次性医用材料申报金额_SUM': 3.631941795349121,
 '月药品金额_MAX': 3.608386754989624,
 '起付标准以上自负比例金额_SUM': 3.4135854244232178,
 '医院_药品_MAX': 3.330054521560669,
 '药品费自费金额_SUM': 3.314729690551758,
 '中草药费发生金额_SUM': 3.270663261413574,
 '中成药费发生金额_SUM': 3.26568603515625,
 '医院_药品_AVG': 3.0938689708709717,
 '月就诊次数_AVG': 3.0426812171936035,
 '医院_统筹金_AVG': 2.947864294052124,
 '药品在总金额中的占比': 2.8763608932495117,
 '医院_就诊天数_MAX': 2.8586795330047607,
 '医用材料费自费金额_SUM': 2.808976888656616,
 '非账户支付金额_SUM': 2.7750051021575928,
 '贵重药品发生金额_SUM': 2.7113564014434814,
 '可用账户报销金额_SUM': 2.6658644676208496,
 '药品费申报金额_SUM': 2.539987802505493,
 '医院_就诊天数_AVG': 2.5251171

In [47]:

kept = []
i=1
rem_dict = {}
for feature in features_importance.keys():
    kept.append(feature)
    kept_assembler = VectorAssembler(inputCols=kept,outputCol="feature_subset")
    classifierInFor=SparkXGBClassifier(
        features_col="feature_subset",
        label_col=label_col,
        num_workers=3,
        max_delta_step = 5,
        silent = 1,
        min_child_weight = 1.3,
        max_depth = 6,
        prediction_col="prediction",
        probability_col="probability"
    )
    with_selected_feature = kept_assembler.transform(trainDF)
    test_data = kept_assembler.transform(testDF)
    xgb_model = classifierInFor.fit(with_selected_feature)
    pre_xgb_df = xgb_model.transform(test_data)
    evaluator_model = MulticlassClassificationEvaluator(
        predictionCol="prediction",
        labelCol=label_col,
        metricName="f1",
        probabilityCol="probability"
    )
    eva_val = evaluator_model.evaluate(pre_xgb_df)
    rem_dict[i] = eva_val
    i=i+1

2024-01-30 21:26:52,521 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 3 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'max_delta_step': 5, 'max_depth': 6, 'min_child_weight': 1.3, 'silent': 1, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Running xgboost-2.0.3 on 3 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'max_delta_step': 5, 'max_depth': 6, 'min_child_weight': 1.3, 'silent': 1, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-01-30 21:26:55,499 INFO XGBoost-PySpark: _fit Finished xgboost training!   
INFO:XGBoost-PySpark:Finished xgboost training!
2024-01-30 21:26:56,944 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 3 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'max_

In [48]:
rem_dict

{1: 0.9377660438617569,
 2: 0.938804800264124,
 3: 0.9405034245038704,
 4: 0.9388032110690053,
 5: 0.9415238794418225,
 6: 0.9480186033978746,
 7: 0.9466517666536984,
 8: 0.943612725738576,
 9: 0.9436279585216357,
 10: 0.9456632762871052,
 11: 0.9463723282799213,
 12: 0.9426872555372215,
 13: 0.943396345904183,
 14: 0.9445531769577328,
 15: 0.9438607062281099,
 16: 0.945264188572407,
 17: 0.9461915808805762,
 18: 0.9487438542359062,
 19: 0.9478055737845724,
 20: 0.9480924815368859,
 21: 0.9466324328218179,
 22: 0.9482283651562649,
 23: 0.9482283651562649,
 24: 0.9470863659742963,
 25: 0.9483130464047335,
 26: 0.9488801238594331,
 27: 0.947386256795891,
 28: 0.9489542516228825,
 29: 0.9465549155303391,
 30: 0.9484697289553603,
 31: 0.9482597516449627,
 32: 0.9457177967359555,
 33: 0.9467923027622739,
 34: 0.9503432515787226,
 35: 0.9463946844837642,
 36: 0.9499306847944703,
 37: 0.9454992249990947,
 38: 0.9480464917085036,
 39: 0.9473491291811009,
 40: 0.9466887311785351,
 41: 0.9480924