In [1]:
# Import Libraries
import pandas as pd
import numpy as np

# Preprocessing
from sklearn.preprocessing import StandardScaler, KBinsDiscretizer

# Model Selection and Evaluation
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import roc_auc_score, accuracy_score, confusion_matrix, classification_report

# Models
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, StackingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
#-----------------------------------------------------------------
# 读取源Excel文件
source_file_path = 'resources/German_credit.xlsx' 
df = pd.read_excel(source_file_path)
# 创建一个新的Excel文件并将数据写入其中
new_file_path = 'resources/German_credit_NEW.xlsx'
df.to_excel(new_file_path, index=False)
print(f'Data successfully copied to {new_file_path}')
# data现在是我们用来分析的源数据附件
data = pd.read_excel(new_file_path)
#-----------------------------------------------------------------
# 选择的影响力大的变量数量，前十名----------------------
numVar = 10

# 绘制数据相关性热图
# 找到对target最有影响力的变量，依次排名，选numVar个较有影响力的
def find_top_correlated_features(df2, target1, n):
    corr_matrix = df2.corr()
    target_cor = corr_matrix[target1].abs().sort_values(ascending=False)
    top_features = target_cor.index[1:n+1]  # 排除掉target本身
    return top_features, target_cor

selected_features_CHMap, target_corr = find_top_correlated_features(df, 'target', numVar)

# 绘制随机森林模型相关图
# 特征选择和目标变量
features = data.columns[data.columns != 'target'] 
target = 'target'
# 分割数据集
X = data[features]
y = data[target]
# 构建随机森林模型以评估特征重要性
rf = RandomForestClassifier(random_state=42)
rf.fit(X, y)
# 获取特征重要性
importances = rf.feature_importances_
feature_importances = pd.DataFrame({'Feature': features, 'Importance': importances})
feature_importances = feature_importances.sort_values(by='Importance', ascending=False)
# 筛选重要性较高的变量，依次排名，选numVar个较有影响力的
selected_features_rf = feature_importances.nlargest(numVar, 'Importance')['Feature']
#-----------------------------------------------------------------
# 采用了合并并去重采用了两列排名靠前的变量
# 合并并去重
combined_features = selected_features_rf.tolist() + selected_features_CHMap.tolist()
unique_features_set = set(combined_features)
selected_features_final = list(unique_features_set)
print(selected_features_final)
#-----------------------------------------------------------------
# 分箱函数
def binning(bdata, bcolumn, n_bins, strategy='uniform'): 
    est = KBinsDiscretizer(n_bins=n_bins, encode='ordinal', strategy=strategy)
    bdata[bcolumn + '_binned'] = est.fit_transform(bdata[[bcolumn]])
    return bdata, est

# 计算WOE, IV函数
def calculate_woe_iv(cdata, ccolumn, ctarget):
    woe_dict = {}
    iv = 0
    total_good = sum(cdata[ctarget] == 0)
    total_bad = sum(cdata[ctarget] == 1)
    
    for bin in sorted(cdata[ccolumn].unique()):
        bin_data = cdata[cdata[ccolumn] == bin]
        good = sum(bin_data[ctarget] == 0)
        bad = sum(bin_data[ctarget] == 1)
        good_dist = good / total_good if total_good != 0 else 0
        bad_dist = bad / total_bad if total_bad != 0 else 0
        
        # 处理可能的零或NaN情况
        if good_dist == 0:
            good_dist = 1e-10
        if bad_dist == 0:
            bad_dist = 1e-10
        
        woe = np.log(good_dist / bad_dist)
        woe_dict[bin] = woe
        iv += (good_dist - bad_dist) * woe
    
    return woe_dict, iv
# 特征选择和目标变量, 我们已经选择好了几个对目标值有影响力的变量所以只用表明目标是哪个，在这里就是“target”
target_column = 'target'

# 寻找最优分箱数量 和 WOE & IV 计算
def select_best_binning(df, selected_features, target_column):
    results = {}
    woe_iv_values = {}
    df_copies = {}  # 存储每次分箱后的数据框
    for n_bins in range(2, 61):
        df_copy = df.copy()
        woe_iv_values[n_bins] = {}
        for column in selected_features:
            df_copy, binning_estimator = binning(df_copy, column, n_bins)
            woe_dict, iv = calculate_woe_iv(df_copy, column + '_binned', target_column)
            df_copy[column + '_woe'] = df_copy[column + '_binned'].map(woe_dict)
            woe_iv_values[n_bins][column] = {'woe': woe_dict, 'iv': iv}
        
        # 模型训练和验证，用了逻辑回归模型
        X = df_copy[[col + '_woe' for col in selected_features]]
        y = df_copy[target_column]
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
        
        model = LogisticRegression(max_iter=1000)
        model.fit(X_train, y_train)
        y_pred_proba = model.predict_proba(X_test)[:, 1]
        auc = roc_auc_score(y_test, y_pred_proba)
        results[n_bins] = auc
        
        df_copies[n_bins] = df_copy
    
    # 找到AUC最高的分箱方法,AUC代表Area Under the ROC Curve,用来评估分类模型性能的一个常用指标
    best_n_bins = max(results, key=results.get)
    best_df = df_copies[best_n_bins]
    
    return best_n_bins, results[best_n_bins], woe_iv_values[best_n_bins], best_df

# 寻找和打印最后的选择数据,data为后面建立模型的数据库
best_bins, best_auc, best_woe_iv_values, data  = select_best_binning(data, selected_features_final, target_column)

# 打印结果
print("\n***************************\n"+f"Best binning number: {best_bins} with AUC: {best_auc}"+'\n***************************\n')
#-----------------------------------------------------------------
# 最终变量选择，选择IV值最高的前几个,但经过测试，发现按照我的步骤，11个变量会是最高准确率,在这里也就是省略了一个
NumFea = 11
iv_values_sorted = sorted(best_woe_iv_values.items(), key=lambda item: item[1]['iv'], reverse=True)
top_iv_features = iv_values_sorted[:NumFea]

# 打印出来为了更方便追踪数据
print("\nSelected top IV features for modeling:")
for feature, values in top_iv_features:
    print(f" {feature}, IV value: {values['iv']}")
# 提取特征名称列表
top_iv_feature_names = [item[0] for item in top_iv_features]
# print("\nTop IV feature Selected Names:", top_iv_feature_names)
#-----------------------------------------------------------------
# # 分割数据集并保存原始索引
# 添加索引列
data['Original_index'] = data.index + 2 
# 分割数据集为训练集和测试集在我选择的变量中
X = data[[col + '_woe' for col in top_iv_feature_names]]
y = data[target_column]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, random_state=42, stratify=y)
test_indices = X_test.index
# 确保模型训练使用了所有选择的特征
print(f"Features used in Model Training: {X_train.columns.tolist()}")

# 打印每个选择特征在分箱和WOE处理后的情况
# for feature in selected_features_final:
#     if feature + '_woe' not in data.columns:
#         print(f"Feature {feature}_woe is missing in the data")
#     else:
#         print(f"Feature {feature}_woe is present in the data")

#-----------------------------------------------------------------
# 定义几个常用的模型,可以自行选择尝试---------------
# 建立逻辑回归模型 (Logistic Regression)
model_LR = LogisticRegression(max_iter=10000, C=1.0, penalty='l2', solver='lbfgs')

# 建立决策树模型 (Decision Tree)
model_DT = DecisionTreeClassifier(random_state=42)

# 建立随机森林模型 (Random Forest)
model_RF = RandomForestClassifier(random_state=42)

# 建立梯度提升机模型 (Gradient Boosting Machine, GBM)
model_GBM = GradientBoostingClassifier(random_state=42)

# 建立支持向量机模型 (Support Vector Machine, SVM)
model_SVC = SVC(probability=True, random_state=42)
# 定义基础模型的元组
estimators = [
    ('lr', model_LR),
    ('dt', model_DT),
    ('rf', model_RF),
    ('gbm', model_GBM),
    ('svc', model_SVC)
]

# 定义堆叠分类器，最终模型选择了逻辑回归模型
stacking_clf = StackingClassifier(
    estimators=estimators,
    final_estimator=model_LR
)

# 交叉验证评估模型性能
models = {
    'Logistic Regression': model_LR,
    'Decision Tree': model_DT,
    'Random Forest': model_RF,
    'Gradient Boosting': model_GBM,
    'Support Vector Machine': model_SVC,
    'Stacking Classifier': stacking_clf
}

# 存储结果的字典
results = {}

# 训练和评估每个模型
for name, model in models.items():
    # 使用交叉验证评估模型性能
    cv_scores = cross_val_score(model, X_train, y_train, cv=5, scoring='accuracy')
    print(f"{name} Cross-validated accuracy: {cv_scores.mean():.4f} ± {cv_scores.std():.4f}")
    
    # 在训练集上训练模型
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    y_pred_proba = model.predict_proba(X_test)[:, 1]
    
    accuracy = accuracy_score(y_test, y_pred)
    auc = roc_auc_score(y_test, y_pred_proba)
    
    results[name] = {
        'accuracy': accuracy,
        'auc': auc,
        'confusion_matrix': confusion_matrix(y_test, y_pred),
        'classification_report': classification_report(y_test, y_pred)
    }

# 找出准确率最高的模型
best_model_name = max(results, key=lambda name: results[name]['accuracy'])
best_model_results = results[best_model_name]
best_model = models[best_model_name]

# 输出最佳模型的结果
print(f"\nBest Model: {best_model_name}")
print(f"Accuracy: {best_model_results['accuracy']:.4f}")
print(f"AUC: {best_model_results['auc']:.4f}")
print("Confusion Matrix:")
print(best_model_results['confusion_matrix'])
print("Classification Report:")
print(best_model_results['classification_report'])

Data successfully copied to resources/German_credit_NEW.xlsx
['Purpose', 'Sex & Marital Status', 'Length of current employment', 'Instalment per cent', 'Account Balance', 'Payment Status of Previous Credit', 'Value Savings/Stocks', 'Credit Amount', 'Age (years)', 'Duration of Credit (month)', 'Most valuable available asset', 'Concurrent Credits']

***************************
Best binning number: 54 with AUC: 0.8203571428571428
***************************


Selected top IV features for modeling:
 Duration of Credit (month), IV value: 1.0495353305806965
 Credit Amount, IV value: 1.0314557255592238
 Account Balance, IV value: 0.6660115033513336
 Age (years), IV value: 0.6129095040344794
 Payment Status of Previous Credit, IV value: 0.2932335473908263
 Value Savings/Stocks, IV value: 0.19600955690422672
 Purpose, IV value: 0.16919506567307832
 Most valuable available asset, IV value: 0.11263826240979674
 Length of current employment, IV value: 0.086433631026641
 Concurrent Credits, IV valu

In [4]:
 # 构建评分卡函数
import os
def compute_scorecard(model, scaler, X, base_score=600, pdo=50):
    coef = model.coef_[0]
    intercept = model.intercept_[0]
    features = X.columns

    B = pdo / np.log(2)
    A = base_score - B * intercept  # 这里包含了截距

    scorecard = {}
    for feature, weight in zip(features, coef):
        feature_mean = scaler.mean_[list(features).index(feature)]
        feature_std = scaler.scale_[list(features).index(feature)]
        scorecard[feature] = {
            'weight': weight,
            'mean': feature_mean,
            'std': feature_std,
            'score': -weight * B / feature_std
        }
    return A, B, scorecard

# 计算评分卡
scaler = StandardScaler().fit(X_train)
X_train_scaled = scaler.transform(X_train)
X_test_scaled = scaler.transform(X_test)

X_train_df = pd.DataFrame(X_train_scaled, columns=X_train.columns)
X_test_df = pd.DataFrame(X_test_scaled, columns=X_test.columns)
# # 确认标准化后的数据形状和列名
# print(f"X_train_scaled shape: {X_train_scaled.shape}")
# print(f"X_train_df columns: {X_train_df.columns.tolist()}")
# 
# print(f"X_test_scaled shape: {X_test_scaled.shape}")
# print(f"X_test_df columns: {X_test_df.columns.tolist()}")


A, B, scorecard = compute_scorecard(best_model, scaler, X_train)
# # 确认评分卡中的特征数量
# print(f"Number of features in scorecard: {len(scorecard)}")
# print(f"Features in scorecard: {list(scorecard.keys())}")

# 打印评分卡
print(f"Base Score (A): {A:.2f}")
print(f"Factor (B): {B:.2f}")
print("Scorecard:")
for feature, params in scorecard.items():
    print(f"{feature}: weight={params['weight']:.4f}, mean={params['mean']:.4f}, std={params['std']:.4f}, score={params['score']:.4f}")

# 将评分卡输出到Excel
scorecard_df = pd.DataFrame(scorecard).T
scorecard_df['feature'] = scorecard_df.index
scorecard_df = scorecard_df[['feature', 'weight', 'mean', 'std', 'score']]
output_dir = 'Outputs'
os.makedirs(output_dir, exist_ok=True)
output_file_path = os.path.join(output_dir, 'scorecard_Final.xlsx')
scorecard_df.to_excel(output_file_path, index=False)
print(f"\nScorecard successfully saved to {output_file_path}")

Base Score (A): 657.53
Factor (B): 72.13
Scorecard:
Duration of Credit (month)_woe: weight=-0.8080, mean=0.4710, std=2.9962, score=19.4531
Credit Amount_woe: weight=-0.7061, mean=-0.0925, std=2.1595, score=23.5849
Account Balance_woe: weight=-0.7989, mean=0.1615, std=0.8732, score=65.9944
Age (years)_woe: weight=-0.6500, mean=0.2507, std=1.9608, score=23.9124
Payment Status of Previous Credit_woe: weight=-0.6623, mean=0.0344, std=0.5512, score=86.6647
Value Savings/Stocks_woe: weight=-0.6856, mean=0.0629, std=0.4766, score=103.7727
Purpose_woe: weight=-0.9993, mean=0.0459, std=0.4334, score=166.3108
Most valuable available asset_woe: weight=-0.5296, mean=0.0247, std=0.3443, score=110.9610
Length of current employment_woe: weight=-0.5641, mean=0.0217, std=0.2940, score=138.3820
Concurrent Credits_woe: weight=-0.2772, mean=0.0147, std=0.2286, score=87.4429
Sex & Marital Status_woe: weight=-1.1432, mean=0.0008, std=0.2129, score=387.3138

Scorecard successfully saved to Outputs\scorecard_

In [3]:
# 定义函数以计算评分卡分数并打印结果
def calculate_score(X, A, B, scorecard):
    scores = np.full(X.shape[0], A)
    feature_scores = pd.DataFrame(index=X.index)
    for feature, params in scorecard.items():
        feature_contrib = (X[feature] - params['mean']) * params['score']
        scores += feature_contrib
        feature_scores[feature] = feature_contrib
    return scores

# 计算分数
test_scores = calculate_score(X_test_df, A, B, scorecard)

# 检查缺失分数的行
if np.isnan(test_scores).any():
    missing_indices = np.where(np.isnan(test_scores))[0]
    print("Missing scores for the following indices:")
    print(missing_indices)
else:
    print("All rows have scores.")

# 打印所有分数
print("Test Scores:")
print(test_scores)

# 将测试集上的分数添加到原始测试数据中
X_test_original = data.loc[test_indices].copy()
X_test_original = X_test_original.reset_index(drop=True)  # 重置索引以确保顺序
X_test_original['Credit Score'] = test_scores  # 直接按顺序添加分数

# 检查缺失分数的行
missing_scores = X_test_original[X_test_original['Credit Score'].isna()]
if not missing_scores.empty:
    print("Missing scores for the following rows:")
    print(missing_scores)
else:
    print("All rows have scores in the DataFrame.")

# 导出结果到Excel文件
output_dir = 'Outputs'
os.makedirs(output_dir, exist_ok=True)
output_file_path = os.path.join(output_dir, 'scoring_results.xlsx')
with pd.ExcelWriter(output_file_path) as writer:
    X_test_original.to_excel(writer, sheet_name='Test Data with Scores', index=False)

print(f"Results successfully saved to {output_file_path}")

All rows have scores.
Test Scores:
0      456.962681
1      953.947134
2      -52.502541
3      383.613406
4      -89.175935
          ...    
395    951.990107
396    433.915064
397    682.437595
398    378.589458
399    678.453927
Name: Duration of Credit (month)_woe, Length: 400, dtype: float64
All rows have scores in the DataFrame.
Results successfully saved to Outputs\scoring_results.xlsx
