In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# 导入用于模型构建和评估的机器学习库
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, IsolationForest, AdaBoostClassifier, StackingClassifier
from sklearn.svm import OneClassSVM
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, roc_auc_score, roc_curve
from sklearn.model_selection import RandomizedSearchCV, GridSearchCV
from sklearn.datasets import make_classification
from sklearn.ensemble import VotingClassifier, BaggingClassifier, AdaBoostClassifier, GradientBoostingClassifier

# 导入TensorFlow/Keras库用于使用自编码器进行异常检测
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import regularizers

# 导入库用于绘图
import networkx as nx

# 导入库用于保存和加载模型
import joblib

# 设置数据可视化的风格
sns.set(style="whitegrid")

# 打印确认所有依赖项已成功导入
print("所有依赖项均已成功导入")

In [3]:
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score


In [None]:
# 定义数据集的文件路径
file_path = r"\creditcard.csv"  

try:
    data = pd.read_csv(file_path)
    print("数据集加载成功")
except Exception as e:
    print(f"加载数据集时出错: {e}")

# 显示数据集的前几行
print("\n数据集的前 5 行:")
print(data.head())

# 检查缺失值
print("\n每列中缺少的值:")
print(data.isnull().sum())

# 处理缺失值（如果有）
data.fillna(0, inplace=True)
print("\n处理后缺失值:")
print(data.isnull().sum())

# 设置图形大小和seaborn风格
plt.figure(figsize=(10, 6))
sns.set_theme(style="whitegrid")

# 绘制欺诈交易与非欺诈交易的计数图
ax = sns.countplot(
    x='Class', 
    data=data, 
    hue='Class',  
    palette="coolwarm", 
    edgecolor='black',
    legend=False  
)

# 添加标题和轴标签
ax.set_title(
    "欺诈交易与非欺诈交易的分布", 
    fontsize=18, 
    fontweight='bold', 
    color='navy', 
    pad=20
)

ax.set_xlabel(
    "类别 (0: 非欺诈, 1: 欺诈)", 
    fontsize=14, 
    fontweight='semibold', 
    color='gray', 
    labelpad=10
)
ax.set_ylabel(
    "交易数量", 
    fontsize=14, 
    fontweight='semibold', 
    color='gray', 
    labelpad=10
)
# 在条形图上标注计数
for p in ax.patches:
    ax.annotate(
        f'{p.get_height():,.0f}', 
        (p.get_x() + p.get_width() / 2., p.get_height()), 
        ha='center', 
        va='center', 
        fontsize=12, 
        color='darkred', 
        fontweight='bold',
        xytext=(0, 5), 
        textcoords='offset points'
    )

# 自定义x刻度标签
ax.set_xticks([0, 1])
ax.set_xticklabels(
    ['非欺诈 (0)', '欺诈 (1)'], 
    fontsize=12, 
    fontweight='semibold', 
    color='gray'
)

# 移除不必要的边框以获得更简洁的外观
sns.despine(left=True, bottom=True)

# 调整布局以获得更好的间距
plt.tight_layout()

# 显示图形
plt.show()

In [None]:
# 显示数据集的前几行
print("\n数据集的前5行:")
print(data.head())

# 检查每列是否有缺失值
print("\n每列的缺失值数量:")
print(data.isnull().sum())

# 处理缺失值（如果有）
data.fillna(0, inplace=True)
print("\n处理缺失值后的每列缺失值数量:")
print(data.isnull().sum())

# 特征缩放
scaler = StandardScaler()
data[['V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount']] = scaler.fit_transform(data[['V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount']])

# 将数据分为特征 (X) 和目标变量 (y)
X = data.drop('Class', axis=1)  # 特征（除 'Class' 列外的所有列）
y = data['Class']  # 目标变量 ('Class' 列)

# 将数据分为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 显示训练集和测试集的形状
print("\n数据集的形状:")
print(f"训练特征 (X_train): {X_train.shape}")
print(f"测试特征 (X_test): {X_test.shape}")
print(f"训练标签 (y_train): {y_train.shape}")
print(f"测试标签 (y_test): {y_test.shape}")

# 打印数据预处理完成的消息
print("\n数据预处理已完成")

# -------------------------- 训练欺诈检测模型（监督学习） --------------------------

# 定义一个函数来训练和评估监督学习模型
def train_and_evaluate_supervised_model(model, X_train, X_test, y_train, y_test):
    """
    训练并评估一个监督学习模型（分类）。
    
    参数:
        model: 要训练和评估的监督学习模型。
        X_train: 训练特征。
        X_test: 测试特征。
        y_train: 训练标签。
        y_test: 测试标签。
    
    返回:
        accuracy, precision, recall, f1: 评估指标。
    """
    # 训练模型
    model.fit(X_train, y_train)
    
    # 在测试数据上进行预测
    y_pred = model.predict(X_test)
    
    # 计算评估指标
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    
    # 打印评估指标
    print(f"\n{model.__class__.__name__} 模型评估:")
    print(f"准确率: {accuracy:.4f}")
    print(f"精确率: {precision:.4f}")
    print(f"召回率: {recall:.4f}")
    print(f"F1 分数: {f1:.4f}")
    
    return accuracy, precision, recall, f1

# 逻辑回归模型
log_reg_model = LogisticRegression(max_iter=2000, solver='liblinear', random_state=42)
print("\n正在训练逻辑回归模型...")
log_reg_metrics = train_and_evaluate_supervised_model(log_reg_model, X_train, X_test, y_train, y_test)

# 决策树分类器
dt_model = DecisionTreeClassifier(random_state=42)
print("\n正在训练决策树模型...")
dt_model_metrics = train_and_evaluate_supervised_model(dt_model, X_train, X_test, y_train, y_test)

# 随机森林分类器
rf_model = RandomForestClassifier(random_state=42)
print("\n正在训练随机森林模型...")
rf_model_metrics = train_and_evaluate_supervised_model(rf_model, X_train, X_test, y_train, y_test)

# 确认所有模型都已成功训练
print("\n所有监督学习模型已成功训练")

In [None]:
def train_and_evaluate_anomaly_model(model, X_train, X_test, y_test):
    """
    训练并评估一个异常检测模型。
    
    参数:
        model: 要训练和评估的异常检测模型。
        X_train: 训练特征。
        X_test: 测试特征。
        y_test: 测试标签。
    
    返回:
        report: 模型的分类报告。
    """
    # 在训练数据上拟合模型
    model.fit(X_train)
    
    # 在测试数据上进行预测
    y_pred = model.predict(X_test)
    
    # 将模型输出转换为与 y_test 标签匹配（1 表示正常，0 表示异常）
    y_pred = [1 if x == -1 else 0 for x in y_pred]  # 将异常 (-1) 转换为 1 (欺诈)，正常 (1) 转换为 0 (非欺诈)
    
    # 生成分类报告
    report = classification_report(y_test, y_pred, target_names=['Non-Fraud', 'Fraud'])
    
    print(f"\n{model.__class__.__name__} 模型评估:")
    print(report)
    
    return report

# 隔离森林模型
iso_forest_model = IsolationForest(contamination=0.01, random_state=42)
print("\n正在训练隔离森林模型...")
iso_forest_report = train_and_evaluate_anomaly_model(iso_forest_model, X_train, X_test, y_test)

# 一类支持向量机模型
oc_svm_model = OneClassSVM(nu=0.01, kernel="rbf", gamma="scale")
print("\n正在训练一类支持向量机模型...")
oc_svm_report = train_and_evaluate_anomaly_model(oc_svm_model, X_train, X_test, y_test)

# 确认所有异常检测模型都已成功训练
print("\n异常检测模型已成功训练")


# -------------------------- 自编码器模型 --------------------------

# 定义一个函数来训练和评估自编码器模型
def train_and_evaluate_autoencoder(X_train, X_test, y_test, encoding_dim=14, epochs=50, batch_size=256):
    """
    训练并评估用于异常检测的自编码器。
    
    参数:
        X_train: 训练特征。
        X_test: 测试特征。
        y_test: 测试标签。
        encoding_dim: 编码层中的神经元数量（默认: 14）。
        epochs: 训练轮数（默认: 50）。
        batch_size: 训练批次大小（默认: 256）。
    
    返回:
        report: 自编码器的分类报告。
    """
    # 定义自编码器模型架构
    input_layer = Input(shape=(X_train.shape[1],))
    encoded = Dense(encoding_dim, activation='relu', activity_regularizer=regularizers.l2(1e-5))(input_layer)
    decoded = Dense(X_train.shape[1], activation='sigmoid')(encoded)
    
    autoencoder = Model(input_layer, decoded)
    autoencoder.compile(optimizer='adam', loss='mean_squared_error')

    # 训练自编码器
    print("\n正在训练自编码器模型...")
    autoencoder.fit(
        X_train, X_train,
        epochs=epochs,
        batch_size=batch_size,
        shuffle=True,
        validation_data=(X_test, X_test),
        verbose=1
    )

    # 使用自编码器对测试数据进行预测
    X_test_pred = autoencoder.predict(X_test)

    # 计算重构误差（原始数据与预测数据之间的均方误差）
    mse = np.mean(np.power(X_test - X_test_pred, 2), axis=1)
    
    # 定义异常检测阈值（MSE 的第95百分位数）
    threshold = np.percentile(mse, 95)

    # 如果 MSE 超过阈值，则标记为异常
    y_pred = [1 if e > threshold else 0 for e in mse]

    # 生成分类报告
    report = classification_report(y_test, y_pred, target_names=['Non-Fraud', 'Fraud'])
    
    print("\n自编码器模型评估:")
    print(report)
    
    return report

# 训练并评估自编码器
print("\n正在训练和评估自编码器模型...")
autoencoder_report = train_and_evaluate_autoencoder(X_train, X_test, y_test)

# 确认自编码器模型已成功训练
print("\n自编码器模型已成功训练")


# -------------------------- 集成学习 --------------------------

# 结合监督学习和异常检测模型的预测结果
def ensemble_predictions(supervised_models, anomaly_models, X_test):
    """
    使用多数投票法结合预测结果。
    
    参数:
        supervised_models: 训练好的监督学习模型列表。
        anomaly_models: 训练好的异常检测模型列表。
        X_test: 测试特征。
    
    返回:
        y_pred_ensemble: 集成预测结果。
    """
    # 获取监督学习模型的预测结果
    supervised_preds = [model.predict(X_test) for model in supervised_models]
    
    # 获取异常检测模型的预测结果
    anomaly_preds = []
    for model in anomaly_models:
        if isinstance(model, IsolationForest) or isinstance(model, OneClassSVM):
            preds = model.predict(X_test)
            preds = [1 if x == -1 else 0 for x in preds]  # 将异常 (-1) 转换为 1 (欺诈)
        else:
            preds = model.predict(X_test)
        anomaly_preds.append(preds)
    
    # 合并所有预测结果
    all_preds = supervised_preds + anomaly_preds
    
    # 进行多数投票
    y_pred_ensemble = np.round(np.mean(all_preds, axis=0))
    
    return y_pred_ensemble

# 监督学习模型列表
supervised_models = [log_reg_model, rf_model]

# 异常检测模型列表
anomaly_models = [iso_forest_model, oc_svm_model]

# 获取集成预测结果
y_pred_ensemble = ensemble_predictions(supervised_models, anomaly_models, X_test)

# 评估集成预测结果
print("\n集成模型评估:")
print(f"准确率: {accuracy_score(y_test, y_pred_ensemble):.4f}")
print(f"精确率: {precision_score(y_test, y_pred_ensemble):.4f}")
print(f"召回率: {recall_score(y_test, y_pred_ensemble):.4f}")
print(f"F1 分数: {f1_score(y_test, y_pred_ensemble):.4f}")
print(f"ROC-AUC: {roc_auc_score(y_test, y_pred_ensemble):.4f}")


# -------------------------- 评估指标 --------------------------

# 评估集成预测结果
print("\n集成模型评估:")
print(f"准确率: {accuracy_score(y_test, y_pred_ensemble):.4f}")
print(f"精确率: {precision_score(y_test, y_pred_ensemble):.4f}")
print(f"召回率: {recall_score(y_test, y_pred_ensemble):.4f}")
print(f"F1 分数: {f1_score(y_test, y_pred_ensemble):.4f}")
print(f"ROC-AUC: {roc_auc_score(y_test, y_pred_ensemble):.4f}")



# -------------------------- 保存训练模型 --------------------------

# 保存监督学习模型
joblib.dump(log_reg_model, 'log_reg_model.pkl')
joblib.dump(rf_model, 'rf_model.pkl')

# 保存异常检测模型
joblib.dump(iso_forest_model, 'iso_forest_model.pkl')
joblib.dump(oc_svm_model, 'oc_svm_model.pkl')

# 保存缩放器
joblib.dump(scaler, 'scaler.pkl')

print("\n所有模型和缩放器已成功保存")
