<a href="https://colab.research.google.com/github/wannasmile/colab_code_note/blob/main/IRC013.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

 GBNet 是一个将梯度提升框架（如 XGBoost 和 LightGBM）与 PyTorch 深度学习框架结合的工具库，其核心思想是通过 PyTorch 的自动微分和计算图能力，为传统梯度提升模型（GBM）提供更灵活的训练和扩展能力。以下是通俗易懂的讲解：

---

### **1. GBNet 的核心思想**
GBNet 的目标是将 **梯度提升算法**（Gradient Boosting）与 **PyTorch 的神经网络训练流程** 结合，实现以下功能：
- **复用 PyTorch 生态**：利用 PyTorch 的损失函数、优化器和自动微分功能，简化梯度提升模型的训练过程。
- **灵活扩展模型**：支持同时训练多个梯度提升模型（如 XGBoost 和 LightGBM），并通过 PyTorch 的模块化设计组合它们（例如拼接输出或相乘），实现更复杂的预测逻辑。
- **统一训练流程**：将传统 GBM 的串行训练过程融入 PyTorch 的 `fit`/`step` 循环中，降低用户学习成本。

---

### **2. 梯度提升算法基础**
在深入 GBNet 之前，需要先理解 **梯度提升** 的基本原理：
1. **初始化模型**：从简单模型（如均值预测）开始。
2. **迭代改进**：每轮训练一个新模型（通常是决策树），专门拟合上一轮模型的 **残差**（即真实值与预测值的差）。
3. **累加结果**：新模型的预测结果加到总预测中，逐步逼近真实值。

例如，预测年龄时，第一棵树预测 20 岁，发现残差为 -6 岁（实际 14 岁），第二棵树专门预测 -6 岁，最终累加结果准确。

---

### **3. GBNet 的实现原理**
GBNet 通过以下方式将梯度提升与 PyTorch 结合：
#### **(1) PyTorch 模块封装**
GBNet 提供了 `XGBModule` 和 `LGBModule` 两个类，将 XGBoost/LightGBM 封装为 PyTorch 模块：
- **输入输出**：接受数据（如 `X`）并输出预测结果（如 `F(X)`）。
- **自动微分**：通过 `loss.backward()` 计算梯度，调用 `gb_step()` 更新模型参数。

#### **(2) 训练流程**
与传统 GBM 类似，但融入 PyTorch 的训练循环：
```python
# 初始化模型
model = XGBModule(input_dim, output_dim)

# 定义损失函数和优化器
loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

# 迭代训练
for epoch in range(100):
    optimizer.zero_grad()  # 清空梯度
    preds = model(X)  # 前向传播
    loss = loss_fn(preds, y)  # 计算损失
    loss.backward()  # 反向传播
    model.gb_step()  # 更新 GBM 参数
    optimizer.step()  # 更新优化器参数
```
- **关键区别**：传统 GBM 使用小批量或随机梯度，而 GBNet 需要全量数据以支持缓存优化。

#### **(3) 联合训练多个模型**
GBNet 支持同时训练多个梯度提升模型，并通过 PyTorch 的 `nn.Module` 组合它们：
```python
class CombinedModel(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.xgb = XGBModule(...)
        self.lgb = LGBModule(...)
        self.linear = torch.nn.Linear(...)

    def forward(self, x):
        xgb_out = self.xgb(x)
        lgb_out = self.lgb(x)
        combined = self.linear(xgb_out + lgb_out)
        return combined
```
这种设计允许用户像搭积木一样组合不同模型，例如用户和物品的嵌入向量相乘。

---

### **4. GBNet 的优势**
- **灵活性**：支持自定义损失函数和优化器，适应不同任务（如回归、分类、时间序列预测）。
- **扩展性**：可轻松集成到现有 PyTorch 项目中，支持分布式训练和混合精度计算。
- **性能**：在部分场景（如时间序列预测）中，GBNet 的自定义模型（如 `Forecast`）表现优于传统工具（如 Prophet）。

---

### **5. 总结**
GBNet 的本质是 **用 PyTorch 重新包装梯度提升算法**，使其兼具深度学习的灵活性和梯度提升的高效性。用户无需深入理解 GBM 的底层细节，即可通过类似神经网络的 API 快速构建和优化复杂模型。其核心价值在于 **降低梯度提升与深度学习的迁移成本**，同时保留 GBM 在结构化数据任务中的优势。

In [1]:
import abc
import torch
from torch import nn

# 定义一个抽象基类，继承自PyTorch的nn.Module和Python的abc.ABC
# 用于规范梯度提升模块的通用接口和功能实现
class BaseGBModule(nn.Module, abc.ABC):
   # 类属性，定义最小Hessian值，默认为0.0
   min_hess = 0.0

   # 构造方法：初始化基类和模块属性
   # min_hess: 允许用户设置最小Hessian值
   # grad/hess: 用于存储梯度/二阶导数数据，默认初始化为None
   def __init__(self, min_hess=0.0):
       super(BaseGBModule, self).__init__()  # 调用nn.Module的构造方法
       self.min_hess = min_hess  # 设置最小Hessian值
       self.grad = None  # 初始化梯度存储变量
       self.hess = None  # 初始化二阶导数存储变量

   # 抽象方法：输入数据验证与预处理
   # input_data: 模型特定格式的输入数据
   # 返回值: 处理后的输入数据（直接返回原数据或进行格式转换）
   @abc.abstractmethod
   def _input_checking_setting(self, input_data):
       pass  # 具体实现由子类完成

   # 抽象方法：模型前向传播
   # input_data: 模型特定格式的输入数据
   # return_tensor: 是否返回PyTorch张量（默认True）
   # 返回值: 模型预测结果（张量或NumPy数组）
   @abc.abstractmethod
   def forward(self, input_data, return_tensor=True):
       pass  # 具体实现由子类完成

   # 内部方法：计算梯度与二阶导数（基于FX的梯度）
   # grad: 梯度值（按样本数缩放后的结果）
   # hess: 二阶导数矩阵（每列对应一个输出特征的Hessian）
   def _get_grad_hess_FX(self):
       # 计算梯度：原始梯度乘以样本数量（用于批量计算的平均）
       grad = self.FX.grad * self.FX.shape[0]

       # 初始化Hessian列表
       hesses = []
       # 按输出特征维度逐列计算Hessian
       for i in range(self.output_dim):
           # 对梯度列求和，计算该列的Hessian
           # retain_graph=True保留计算图以便多次反向传播
           hessian_col = torch.autograd.grad(
               grad[:, i].sum(), self.FX, retain_graph=True
           )[0][:, i : (i + 1)]
           hesses.append(hessian_col)
       # 拼接所有Hessian列，并确保值不低于min_hess
       hess = torch.maximum(
           torch.cat(hesses, axis=1), torch.Tensor([self.min_hess])
       )
       return grad, hess  # 返回梯度和Hessian

   # 抽象方法：执行一次梯度提升迭代
   # 包含三个核心步骤：
   # 1. 获取当前样本的梯度/二阶导数
   # 2. 训练一个弱学习器（基模型）
   # 3. 更新模型预测结果
   @abc.abstractmethod
   def gb_step(self):
       pass  # 具体实现由子类完成

下面将用通俗的语言解释这段代码中二阶导数（Hessian 矩阵）的计算原理，并结合上下文说明它的作用。

---

### **1. 什么是二阶导数（Hessian 矩阵）？**
在数学中，二阶导数描述了函数的「曲率」。对于机器学习模型来说：
- **一阶导数（梯度）**：告诉我们模型在哪个方向上调整参数能最快降低损失（类似爬山时往陡坡下走）。
- **二阶导数（Hessian 矩阵）**：告诉我们这个方向上的「陡峭程度」。如果二阶导数为正，说明这是一个山谷；如果为负，则是山峰。在优化中，Hessian 用于调整学习步长，防止步子太大或太小。

在梯度提升中，Hessian 的作用类似于正则化项，能让模型更新更稳定。

---

### **2. GBNet 中的二阶导数计算逻辑**
代码中的 `_get_grad_hess_FX` 方法负责计算梯度和 Hessian 矩阵。以下是分步解释：

#### **(1) 计算梯度（一阶导数）**
```python
grad = self.FX.grad * self.FX.shape[0]
```
- `self.FX` 是模型的预测值（类似 `y_pred`）。
- `self.FX.grad` 是预测值对输入数据的梯度（即损失函数对 `FX` 的导数）。
- `self.FX.shape[0]` 是对梯度进行缩放（假设输入是批量数据，这里取平均梯度）。

#### **(2) 计算 Hessian 矩阵（二阶导数）**
```python
hesses = []
for i in range(self.output_dim):
    hessian_col = torch.autograd.grad(
        grad[:, i].sum(),  # 对第i列梯度求和
        self.FX,  # 反向传播的目标变量
        retain_graph=True  # 保留计算图以复用
    )[0][:, i : (i + 1)]  # 提取第i列的Hessian
    hesses.append(hessian_col)
hess = torch.maximum(torch.cat(hesses, axis=1), torch.Tensor([self.min_hess]))
```
- **逐列计算**：Hessian 矩阵的每一列对应一个输出的二阶导数。
- **为什么对梯度列求和？**  
  目的是将单个样本的梯度信息聚合为整体方向（类似求平均值），从而稳定 Hessian 的计算。
- **反向传播**：通过 `torch.autograd.grad` 计算梯度列的和对 `FX` 的导数，得到该列对应的 Hessian。
- **数值稳定性**：`torch.maximum(..., self.min_hess)` 确保 Hessian 最小值为 `min_hess`，防止矩阵中出现零或负数导致优化失败。

---

### **3. Hessian 在梯度提升中的作用**
GBNet 中的 Hessian 主要有以下用途：
1. **学习率自适应**：  
   在梯度提升中，Hessian 用于调整每棵新树的学习率（类似 XGBoost 中的 `eta` 参数）。较大的 Hessian 值（曲率更大）意味着模型在这一区域的预测较敏感，需要更小的步长。
2. **正则化**：  
   通过限制 Hessian 的最小值（`min_hess`），可以防止优化过程中出现数值不稳定或过拟合。

---

### **4. 举例类比**
假设我们要预测房价（输出是连续值），某轮迭代中模型的预测值 `FX` 和真实值 `y` 之间的残差较大。此时：
- **梯度**：指向减小残差最快的方向（类似地图上的指南针）。
- **Hessian**：告诉你这个方向是陡坡还是平地。如果是陡坡（Hessian 大），你需要迈小步；如果是平地（Hessian 小），可以迈大步。

---

### **总结**
这段代码通过 PyTorch 的自动微分功能，实现了梯度提升中二阶导数的计算：
1. **梯度计算**：指导模型往损失下降最快的方向走。
2. **Hessian 计算**：控制步长大小，确保更新稳定且不陷入局部最优。

这种设计让 GBNet 在保留梯度提升高效性的同时，能够灵活集成到 PyTorch 的深度学习框架中。

In [2]:
from typing import Union

import numpy as np
import pandas as pd

from scipy.linalg import cho_solve, cho_factor
import torch
import torch.nn as nn

#from gbnet.base import BaseGBModule


class GBLinear(BaseGBModule):
    """实现基于梯度提升的线性模型模块

    该模块通过梯度提升框架训练线性模型，维护迭代状态并通过计算的梯度/二阶导数更新参数
    """

    def __init__(
        self,
        input_dim,  # 输入特征维度
        output_dim,  # 输出预测维度
        bias=True,  # 是否包含偏置项
        lr=0.1,  # 参数更新学习率
        min_hess=0.0,  # Hessian最小阈值
        lambd=0.01  # L2正则化系数
    ):
        super(BaseGBModule, self).__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.min_hess = min_hess
        self.bias = bias
        self.lr = lr
        self.lambd = lambd

        # 创建PyTorch线性层
        self.linear = nn.Linear(self.input_dim, self.output_dim, bias=self.bias)
        # 缓存变量
        self.FX = None  # 当前预测值
        self.input = None  # 输入数据缓存
        self.g = None  # 梯度缓存
        self.h = None  # Hessian缓存


    def _input_checking_setting(self, x: Union[torch.Tensor, np.ndarray, pd.DataFrame]):
        """统一输入格式并进行预处理

        确保输入数据转换为PyTorch张量，并在训练模式下缓存原始数据
        """
        assert isinstance(x, (torch.Tensor, np.ndarray, pd.DataFrame)), "输入类型不支持"

        # 类型转换
        if isinstance(x, np.ndarray):
            x = torch.Tensor(x)
        elif isinstance(x, pd.DataFrame):
            x = torch.Tensor(np.array(x))

        # 训练模式需要缓存输入数据
        if self.training:
            self.input = x.detach().numpy()  # 转换为NumPy数组并断开计算图

        return x


    def forward(self, x: Union[torch.Tensor, np.ndarray, pd.DataFrame]):
        """前向传播计算预测值

        将输入数据通过线性层得到预测结果，并在训练模式下保留梯度信息
        """
        x = self._input_checking_setting(x)  # 统一输入格式

        self.FX = self.linear(x)  # 线性变换得到预测值
        if self.training:
            self.FX.retain_grad()  # 保留梯度以便后续反向传播
        return self.FX


    def gb_calc(self):
        """计算梯度并存储

        调用基类方法获取梯度/二阶导数，用于参数更新
        """
        if self.FX is None or self.FX.grad is None:
            raise RuntimeError("必须先进行反向传播")

        self.g, self.h = self._get_grad_hess_FX()  # 获取梯度和Hessian矩阵


    def gb_step(self):
        """执行梯度提升迭代步骤

        使用缓存的梯度信息更新线性层参数，包含正则化和学习率控制
        """
        if self.g is None and self.h is None:
            self.gb_calc()  # 确保已计算梯度

        with torch.no_grad():  # 禁用梯度计算以加速运算
            # 构建设计矩阵X（包含偏置项）
            if self.bias:
                X = np.concatenate(
                    [np.ones([self.input.shape[0], 1]), self.input],  # 添加全1列作为偏置项
                    axis=1
                )
            else:
                X = self.input

            # 解决带L2正则化的线性回归问题
            beta = ridge_regression(X, (self.g / self.h).detach().numpy(), self.lambd)

            # 更新权重参数
            self.linear.weight -= self.lr * torch.Tensor(beta[1:])  # 偏移参数
            if self.bias:
                self.linear.bias -= self.lr * torch.Tensor(beta[0])  # 偏置项


def ridge_regression(X, y, lambd):
    """使用Cholesky分解求解岭回归

    通过正规方程和数值稳定的方法快速计算带正则化的系数
    """
    n, d = X.shape
    # 构造正规方程矩阵A = X^TX + λI
    A = X.T @ X + lambd * np.eye(d)
    # 构造右侧向量c = X^Ty
    c = X.T @ y

    # Cholesky分解
    L = cho_factor(A)
    # 求解线性方程组Ly = c
    beta = cho_solve(L, c)
    return beta

In [3]:
from typing import Union
import lightgbm as lgb
import numpy as np
import pandas as pd
import torch
from torch import nn

#from gbnet.base import BaseGBModule


class LGBModule(BaseGBModule):
    """封装LightGBM的PyTorch模块

    该模块将LightGBM梯度提升算法集成到PyTorch框架中，支持训练和推理两种模式
    """

    def __init__(
        self,
        batch_size,  # 训练批次大小
        input_dim,  # 输入特征维度
        output_dim,  # 输出预测维度
        params={},  # LightGBM参数字典
        min_hess=0  # Hessian最小阈值
    ):
        super(BaseGBModule, self).__init__()
        self.batch_size = batch_size
        self.input_dim = input_dim
        self.output_dim = output_dim

        self.params = params
        self.bst = None  # LightGBM模型实例

        # 初始化预测张量（可训练参数）
        self.FX = nn.Parameter(
            torch.tensor(
                np.zeros([batch_size, output_dim]),  # [batch_size,output_dim]形状的零张量
                dtype=torch.float
            )
        )
        self.train_dat = None  # 缓存的LightGBM数据集
        self.min_hess = min_hess
        self.grad = None  # 梯度缓存
        self.hess = None  # Hessian缓存


    def _set_train_dat(self, input_dataset: lgb.Dataset):
        """设置训练数据集并配置参数

        确保训练时不输出日志信息
        """
        if input_dataset.params is None:
            input_dataset.params = {"verbose": -1}
        else:
            input_dataset.params.update({"verbose": -1})
        input_dataset.free_raw_data = False  # 保留原始数据内存
        self.train_dat = input_dataset


    def _input_checking_setting(
        self, input_dataset: Union[lgb.Dataset, np.ndarray, pd.DataFrame]
    ):
        """统一输入格式并进行预处理

        确保输入数据类型正确，并在训练模式时锁定数据集
        """
        assert isinstance(input_dataset, (lgb.Dataset, np.ndarray, pd.DataFrame)), "输入类型不支持"

        if self.training:
            if self.train_dat is None:
                # 创建LightGBM数据集
                if isinstance(input_dataset, lgb.Dataset):
                    self._set_train_dat(input_dataset)
                else:
                    self._set_train_dat(lgb.Dataset(input_dataset))
            if self.bst is None:
                return self.train_dat
            # 训练期间禁止更换数据集
            assert isinstance(input_dataset, lgb.Dataset), "训练中不能更换数据集"
            input_dataset.free_raw_data = False
            return self.train_dat
        else:
            # 推理模式返回原始数据
            if isinstance(input_dataset, lgb.Dataset):
                input_dataset.free_raw_data = False
                input_dataset.construct()
                return input_dataset.get_data()
            return input_dataset


    def forward(
        self,
        input_dataset: Union[lgb.Dataset, np.ndarray, pd.DataFrame],
        return_tensor=True,
    ):
        """前向传播计算预测值

        支持训练和推理两种模式，自动维护预测状态
        """
        input_dataset = self._input_checking_setting(input_dataset)

        # 根据模式获取预测结果
        if self.training:
            if self.bst is not None:
                # 使用现有模型预测
                preds = self.bst._Booster__inner_predict(0).copy()  # 获取内部预测结果
            else:
                # 初始化为零矩阵
                preds = np.zeros([self.batch_size, self.output_dim])
        else:
            if self.bst is not None:
                # 推理模式使用完整模型预测
                preds = self.bst.predict(input_dataset).copy()
            else:
                # 未训练时返回零张量
                preds = np.zeros(
                    [input_dataset.shape[0], self.output_dim], dtype=torch.float
                )

        # 更新训练模式的预测缓存
        if self.training:
            FX_detach = self.FX.detach()
            FX_detach.copy_(
                torch.tensor(
                    preds.reshape([self.batch_size, self.output_dim]),  # 调整形状为[batch,output]
                    dtype=torch.float
                )
            )

        # 返回结果
        if return_tensor:
            if self.training:
                return self.FX
            else:
                return torch.tensor(
                    preds.reshape([-1, self.output_dim]),  # 自动适应批量大小
                    dtype=torch.float
                )
        return preds


    def gb_calc(self):
        """计算梯度并存储

        调用基类方法获取梯度/二阶导数
        """
        self.grad, self.hess = self._get_grad_hess_FX()


    def gb_step(self):
        """执行梯度提升迭代步骤

        使用计算的梯度更新LightGBM模型
        """
        if self.grad is None and self.hess is None:
            self.gb_calc()

        # 创建自定义目标函数
        obj = LightGBObj(self.grad, self.hess)
        input_params = self.params.copy()
        input_params.update(
            {
                "objective": obj,  # 自定义损失函数
                "num_class": self.output_dim,  # 输出类别数
                "verbose": -1,  # 关闭日志输出
                "verbosity": -1  # 更高级别静默
            }
        )

        # 更新或创建模型
        if self.bst is not None:
            self.bst.update(train_set=self.train_dat, fobj=obj)
        else:
            # 训练新模型（仅需一轮提升）
            self.bst = lgb.train(
                params=input_params,
                train_set=self.train_dat,
                num_boost_round=1,  # 只训练一个基学习器
                keep_training_booster=True  # 保留训练过程中的全部模型
            )
        self.grad = None
        self.hess = None


class LightGBObj:
    """LightGBM专用目标函数封装类

    将PyTorch计算的梯度转换为LightGBM可接受的格式
    """
    def __init__(self, grad, hess):
        self.grad = grad.detach().numpy()  # 转换为NumPy数组
        self.hess = hess.detach().numpy()

    def __call__(self, y_true, y_pred):
        """实现LightGBM的目标函数接口

        返回梯度和Hessian矩阵
        """
        if self.grad.shape[1] > 1:
            return self.grad, self.hess
        else:
            return self.grad.flatten(), self.hess.flatten()

总结来说，forward方法在训练时返回当前模型的预测值，用于计算残差，而gb_step方法利用这些残差和Hessian信息，通过自定义目标函数指导LightGBM模型的更新，每轮迭代训练一个基学习器，逐步提升模型性能。这些步骤背后的数学原理涉及梯度提升框架中的目标函数优化、残差计算以及决策树的分裂策略。

 ### 1. **通过 `forward` 方法实现预测前向流的数学原理**

`forward` 方法的核心功能是完成模型的前向传播，生成预测值。其数学原理与梯度提升框架（GBDT）的迭代预测过程密切相关：

- **训练模式**：  
  在训练阶段，`forward` 方法返回当前模型的预测值 `FX`（作为可训练参数），用于计算残差（即真实值与预测值的差值）。根据梯度提升框架，残差表示当前模型未能拟合的部分，后续迭代将基于此残差构建新模型。  
  - **数学表达**：设第 $m$ 轮模型的预测值为 $F_m(x)$，则残差 $r_{mi} = y_i - F_m(x_i)$，其中 $y_i$ 为真实值，$x_i$ 为输入特征。`forward` 方法返回的 `FX` 即为 $F_m(x)$，用于计算残差 $r_{mi}$。

- **推理模式**：  
  在推理阶段，`forward` 方法直接调用 LightGBM 完整模型的预测结果，确保输出与实际应用一致。

### 2. **利用 `gb_step` 方法完成梯度提升迭代的数学原理**

`gb_step` 方法通过梯度提升框架实现模型迭代优化，其核心数学原理如下：

- **梯度与 Hessian 的作用**：  
  梯度提升的目标是最小化损失函数 $L(y, F(x))$，通过迭代添加新模型 $h_m(x)$ 逐步逼近最优函数 $F(x)$：  
  $$
  F_{m+1}(x) = F_m(x) + \eta h_m(x)
  $$  
  其中，$\eta$ 为学习率，$h_m(x)$ 需最小化目标函数的一阶导数（梯度）和二阶导数（Hessian）：

  $$
  h_m(x) = \arg\min_h \left[ \sum_{i=1}^n \left( \frac{\partial L(y_i, F_m(x_i))}{\partial F_m(x_i)} \right) h(x_i) + \frac{1}{2} \sum_{i=1}^n \left( \frac{\partial^2 L(y_i, F_m(x_i))}{\partial (F_m(x_i))^2} \right) h^2(x_i) \right]
  $$
  
  在代码中，`gb_calc` 方法计算当前预测值 `FX` 的梯度 `grad` 和 Hessian `hess`，传递给自定义目标函数 `LightGBObj`，用于指导 LightGBM 的分裂点选择。

- **LightGBM 的优化策略**：  
  LightGBM 通过直方图算法和 Leaf-wise 生长策略高效实现上述目标：  
  - **直方图算法**：将连续特征离散化为固定区间（桶），减少分裂点计算量，将时间复杂度从 $O(n \cdot d)$ 降至 $O(k \cdot d)$（$k$ 为桶数）。  
  - **Leaf-wise 策略**：每次选择增益最大的叶子节点进行分裂，相比 Level-wise 策略更高效，但需结合最大深度限制防止过拟合。

- **迭代过程**：  
  `gb_step` 方法每次仅训练一个基学习器（`num_boost_round=1`），通过 `bst.update` 更新模型。每轮迭代后，残差 $r_{mi}$ 被新模型 $h_m(x)$ 修正，逐步逼近真实值。

### 总结

- **`forward` 方法**：通过返回当前模型预测值（或残差），为梯度提升提供迭代所需的基础预测流。  
- **`gb_step` 方法**：基于梯度与 Hessian 优化目标函数，利用 LightGBM 的高效算法（直方图、Leaf-wise）完成模型迭代，逐步提升预测性能。

### **关键结论**
- **模型初始化**时通过 `lgb.train()` 创建全新模型，仅一轮迭代生成基础预测器。  
- **模型更新**时通过 `update()` 方法追加弱学习器，逐步优化预测性能。  
- **梯度计算**与 **Hessian 矩阵**通过 PyTorch 自动微分实现，指导 LightGBM 的特征分裂策略。  
- **推理过程**直接调用 LightGBM 的预测接口，返回与训练模式兼容的结果。




 在LightGBM中，训练全新模型时仅一轮迭代而追加模型时需调用`update()`方法的原因与梯度提升框架的迭代机制有关：

1. **全新模型初始化**  
   当训练一个全新模型时，LightGBM首先会初始化一个基础预测值（如常数或简单模型），然后通过一轮迭代生成第一个弱学习器（决策树），用于拟合初始预测值与真实标签的残差。这一轮迭代是模型构建的起点，目的是生成首个纠正项。

2. **追加弱学习器**  
   当已有模型需要继续优化时，`update()`方法通过新一轮迭代生成新的弱学习器，用于拟合当前模型预测值与真实标签的残差。这种迭代方式逐步累积多个弱学习器，形成集成模型。每次调用`update()`相当于GBDT框架中的一轮迭代，通过梯度优化方向调整模型。

这种设计符合梯度提升的核心思想：每轮迭代通过新弱学习器最小化损失函数，逐步逼近真实值。初始化时仅需一轮生成首个树，后续追加时需多次迭代以持续优化模型性能。

In [4]:
from unittest import mock, TestCase

import lightgbm as lgb
import numpy as np
import pandas as pd
import torch
#from gbnet import lgbmodule as lgm

# 测试基本损失函数计算与梯度更新流程
def test_basic_loss():
   # 创建LGBModule实例，参数包括5个叶子节点，3个树，1个输出，设置min_data_in_leaf为0
   gbm = LGBModule(5, 3, 1, params={"min_data_in_leaf": 0})
   # 定义均方误差损失函数
   floss = torch.nn.MSELoss()

   # 清空梯度
   gbm.zero_grad()
   # 设置随机种子保证结果可复现
   np.random.seed(11010)
   # 生成5行3列的随机数据作为输入数据集
   input_dataset = lgb.Dataset(np.random.random([5, 3]))
   # 前向传播计算预测值
   preds = gbm(input_dataset)
   # 计算预测值与目标值[1,2,3,4,5]的均方误差损失
   loss = floss(preds.flatten(), torch.Tensor(np.array([1, 2, 3, 4, 5])).flatten())

   # 执行反向传播，创建计算图以便后续计算高阶导数
   loss.backward(create_graph=True)

   # 使用mock模拟LightGBM的LightGBObj和lgb.train函数
   m_obj = mock.MagicMock(side_effect=LightGBObj)
   m_train = mock.MagicMock(side_effect=lgb.train)
   # 在上下文管理器中执行梯度提升步骤，此时会调用模拟的LightGBObj和lgb.train
   with (
       #mock.patch("gbnet.lgbmodule.LightGBObj", m_obj),
       mock.patch("__main__.LightGBObj", m_obj),
       mock.patch("lightgbm.train", m_train),
   ):
       gbm.gb_step()

   # 断言模拟的LightGBObj最后一次调用的梯度参数与理论值误差小于1e-8
   assert (
       np.max(
           np.abs(
               m_obj.call_args_list[-1].args[0].detach().numpy()
               - np.array([-2, -4, -6, -8, -10]).reshape([-1, 1])
           )
       )
       < 1e-8
   )

   # 断言模拟的LightGBObj最后一次调用的hessian参数与理论值误差小于1e-8
   assert (
       np.max(
           np.abs(
               m_obj.call_args_list[-1].args[1].detach().numpy()
               - np.array([2, 2, 2, 2, 2]).reshape([-1, 1])
           )
       )
       < 1e-8
   )

   # 断言lgb.train被调用了一次
   m_train.assert_called_once()

# 测试LightGBObj的梯度计算功能
def test_LightGBObj():
   # 生成随机梯度矩阵（20行10列）和hessian矩阵（20行10列）
   grad = torch.tensor(np.random.random([20, 10]))
   hess = torch.tensor(np.random.random([20, 10]))

   # 创建LightGBObj实例
   obj = LightGBObj(grad, hess)

   # 调用obj(1,2)计算梯度与hessian
   ograd, ohess = obj(1, 2)
   # 断言计算出的梯度与原始梯度完全一致
   assert (
       np.max(np.abs(ograd - grad.detach().numpy())) == 0
   ), "LightGBObj grad does not match instantiation"
   # 断言计算出的hessian与原始hessian完全一致
   assert (
       np.max(np.abs(ohess - hess.detach().numpy())) == 0
   ), "LightGBObj hess does not match instantiation"

# 测试LGBModule的输入检查功能
class TestLGBModule(TestCase):
   # 测试输入为lgb.Dataset且训练模式为True时的行为
   def test_input_is_dataset_training_true_train_dat_none(self):
       # 创建LGBModule实例
       module = LGBModule(100, 10, 1)
       # 生成随机数据
       data = np.random.rand(100, 10)
       # 转换为lgb.Dataset
       dataset = lgb.Dataset(data)
       # 调用输入检查方法
       result = module._input_checking_setting(dataset)
       # 断言返回结果与train_dat相同且类型正确
       self.assertIs(result, module.train_dat)
       self.assertIsInstance(result, lgb.Dataset)

   # 测试输入为np.ndarray且训练模式为True时的行为
   def test_input_is_ndarray_training_true_train_dat_none(self):
       # 创建LGBModule实例
       module = LGBModule(100, 10, 1)
       # 生成随机数据
       data = np.random.rand(100, 10)
       # 调用输入检查方法
       result = module._input_checking_setting(data)
       # 断言返回结果与train_dat相同且类型正确
       self.assertIs(result, module.train_dat)
       self.assertIsInstance(result, lgb.Dataset)

   # 测试输入为pd.DataFrame且训练模式为True时的行为
   def test_input_is_dataframe_training_true_train_dat_none(self):
       # 创建LGBModule实例
       module = LGBModule(100, 10, 1)
       # 生成随机数据
       data = pd.DataFrame(np.random.rand(100, 10))
       # 调用输入检查方法
       result = module._input_checking_setting(data)
       # 断言返回结果与train_dat相同且类型正确
       self.assertIs(result, module.train_dat)
       self.assertIsInstance(result, lgb.Dataset)

   # 测试输入为lgb.Dataset且训练模式为True时，当train_dat已设置且行数相同的行为
   def test_input_is_dataset_training_true_train_dat_set_same_nrows(self):
       # 创建LGBModule实例
       module = LGBModule(100, 10, 1)
       # 生成随机数据
       data = np.random.rand(100, 10)
       # 转换为lgb.Dataset
       dataset = lgb.Dataset(data)
       # 调用输入检查方法
       result = module._input_checking_setting(dataset)
       # 断言返回结果与train_dat相同
       self.assertIs(result, module.train_dat)
       self.assertIs(result, dataset)

   # 测试输入为lgb.Dataset且训练模式为False时的行为
   def test_input_is_dataset_training_false(self):
       # 创建LGBModule实例
       module = LGBModule(100, 10, 1)
       # 切换到评估模式
       module.eval()
       # 生成随机数据
       data = np.random.rand(100, 10)
       # 转换为lgb.Dataset
       dataset = lgb.Dataset(data)
       # 调用输入检查方法
       result = module._input_checking_setting(dataset)
       # 断言返回原始数据
       self.assertTrue(np.array_equal(result, data))
       # 断言数据集不释放原始数据
       self.assertFalse(dataset.free_raw_data)

   # 测试输入为np.ndarray且训练模式为False时的行为
   def test_input_is_ndarray_training_false(self):
       # 创建LGBModule实例
       module = LGBModule(100, 10, 1)
       # 切换到评估模式
       module.eval()
       # 生成随机数据
       data = np.random.rand(100, 10)
       # 调用输入检查方法
       result = module._input_checking_setting(data)
       # 断言返回原始数据
       self.assertIs(result, data)

   # 测试输入为无效类型时的行为
   def test_input_invalid_type(self):
       # 创建LGBModule实例
       module = LGBModule(10, 5, 1)
       # 生成无效输入数据
       data = [1, 2, 3]
       # 调用输入检查方法，预期抛出AssertionError
       with self.assertRaises(AssertionError):
           module._input_checking_setting(data)

   # 测试当self.bst为None时的行为
   def test_if_bst_is_none(self):
       # 创建LGBModule实例
       module = LGBModule(10, 5, 1)
       # 设置train_dat
       arr = np.random.rand(10, 5)
       module._set_train_dat(lgb.Dataset(arr))
       # 调用输入检查方法，预期返回train_dat
       result = module._input_checking_setting(np.random.rand(10, 5))
       self.assertIs(result, module.train_dat)

   # 测试输入为lgb.Dataset且self.bst不为None时的行为
   def test_raises_input_changed_lgb_dataset(self):
       # 创建LGBModule实例
       module = LGBModule(10, 5, 1)
       # 设置train_dat
       arr = np.random.rand(10, 5)
       initial_dataset = lgb.Dataset(arr)
       initial_dataset.construct()
       module._set_train_dat(initial_dataset)
       # 设置bst为非None对象
       module.bst = object()
       # 调用输入检查方法，预期抛出AssertionError
       with self.assertRaises(AssertionError):
           module._input_checking_setting(lgb.Dataset(np.random.random([10, 5])))

   # 测试输入为ndarray且self.bst不为None时的行为
   def test_raises_input_changed_ndarray(self):
       # 创建LGBModule实例
       module = LGBModule(10, 5, 1)
       # 设置train_dat
       arr = np.random.rand(10, 5)
       initial_dataset = lgb.Dataset(arr)
       initial_dataset.construct()
       module._set_train_dat(initial_dataset)
       # 设置bst为非None对象
       module.bst = object()
       # 调用输入检查方法，预期抛出AssertionError
       with self.assertRaises(AssertionError):
           module._input_checking_setting(np.random.random([11, 5]))

In [5]:
test_basic_loss()
test_LightGBObj()

  return Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass


在 `test_basic_loss` 测试用例中，`mock` 模块的核心作用是通过**模拟 LightGBM 的关键函数**（`LightGBObj` 和 `lgb.train`），实现对 `LGBModule` 类训练过程的**纯单元测试**。以下是具体实现的详细解析：

---

### **1. 为什么要使用 mock？**
- **隔离依赖**：  
  `LGBModule` 的训练依赖于外部库 `lightgbm`，直接测试会导致以下问题：
  1. 测试结果可能受 LightGBM 版本或配置的影响；
  2. 实际训练过程耗时且不可控（如树的生长、正则化项计算等）；
  3. 难以直接验证内部梯度传递的逻辑是否正确。

- **精确控制测试流程**：  
  通过模拟 `LightGBObj` 和 `lgb.train`，可以：
  1. **截断**真实的梯度提升过程，仅关注 `LGBModule` 的输入输出是否符合预期；
  2. **伪造**梯度/二阶导数值，验证模型参数更新逻辑；
  3. **统计**函数调用次数和参数，确保代码符合设计意图。

---

### **2. mock 的具体用法**
#### **(1) 模拟 `LightGBObj` 和 `lgb.train`**
```python
m_obj = mock.MagicMock(side_effect=lgm.LightGBObj)
m_train = mock.MagicMock(side_effect=lgb.train)
```
- **`MagicMock`**：  
  创建一个通用的模拟对象，自动模拟任意未定义的方法和属性。
- **`side_effect`**：  
  指定当模拟函数被调用时，返回预先定义的对象（如 `lgm.LightGBObj`）或函数（如 `lgb.train`）。

#### **(2) 上下文管理器中的 `patch`**
```python
with (
    mock.patch("gbnet.lgbmodule.LightGBObj", m_obj),
    mock.patch("lightgbm.train", m_train),
):
    gbm.gb_step()
```
- **`patch` 装饰器**：  
  临时替换目标函数/类（如 `LightGBObj` 和 `lgb.train`）为模拟对象。
- **作用域控制**：  
  仅在 `with` 块内生效，测试结束后自动恢复原函数。

#### **(3) 验证模拟调用**
```python
# 断言 LightGBObj 最后一次调用的梯度参数是否正确
assert np.max(abs(m_obj.call_args_list[-1].args[0] - expected_grad)) < 1e-8

# 断言 lgb.train 是否被调用了一次
m_train.assert_called_once()
```
- **`call_args_list`**：  
  记录模拟对象的所有调用参数列表。
- **`assert_called_once()`**：  
  验证 `lgb.train` 是否被调用且仅调用一次。

---

### **3. 在上下文管理器中执行梯度提升步骤**
#### **(1) 执行 `gb_step()` 的逻辑**
```python
gbm.gb_step()
```
- **`gb_step()`** 是 `LGBModule` 的核心训练方法，包含以下步骤：
  1. 调用 `gb_calc()` 计算梯度 (`self.grad`) 和 Hessian (`self.hess`)；
  2. 将梯度转换为 LightGBM 兼容格式 (`LightGBObj`)；
  3. 根据当前模型状态（是否存在 `self.bst`）选择更新现有模型或创建新模型。

#### **(2) 上下文管理器的作用**
- **隔离模拟环境**：  
  在 `gb_step()` 执行期间，所有对 `LightGBObj` 和 `lgb.train` 的调用都被替换为模拟对象。
- **捕获调用细节**：  
  模拟对象记录了以下关键信息：
  - `LightGBObj` 接收到的梯度 (`args[0]`) 和 Hessian (`args[1]`)；
  - `lgb.train` 是否被调用及其参数。

---

### **4. 测试的核心验证点**
#### **(1) 梯度计算的正确性**
```python
expected_grad = np.array([-2, -4, -6, -8, -10]).reshape([-1, 1])
```
- **理论值**：  
  通过反向传播计算的梯度应为 `[y_true - preds] * learning_rate`（此处简化为固定值）。
- **断言逻辑**：  
  验证 `LightGBObj` 接收到的梯度与理论值的误差小于 `1e-8`，确保 `LGBModule` 的梯度计算无误。

#### **(2) 模型更新的逻辑**
```python
m_train.assert_called_once()
```
- **预期行为**：  
  在首次训练时（`self.bst` 为空），应调用 `lgb.train()` 创建新模型；后续迭代调用 `update()` 追加弱学习器。
- **断言逻辑**：  
  确保 `lgb.train()` 仅被调用一次，符合单轮迭代的设定。

---

### **5. 为什么需要 `create_graph=True`？**
```python
loss.backward(create_graph=True)
```
- **目的**：  
  启用 PyTorch 的高阶导数计算能力，以获取 Hessian 矩阵（二阶导数）。
- **必要性**：  
  LightGBM 的目标函数优化依赖梯度和 Hessian（如正则化项 `λΩ(h)`），需通过二阶泰勒展开近似损失函数。

---

### **总结**
通过 `mock` 模拟 `LightGBObj` 和 `lgb.train`，测试实现了以下目标：
1. **隔离外部依赖**：避免直接调用 LightGBM 库，使测试独立于库版本和配置。
2. **精确验证逻辑**：通过伪造梯度/二阶导数值，检查 `LGBModule` 的参数传递是否正确。
3. **控制训练流程**：确保 `gb_step()` 方法的行为符合设计（如首次创建模型或追加弱学习器）。
4. **加速测试执行**：跳过真实的树生长和迭代过程，大幅缩短测试时间。

这种基于 `mock` 的单元测试模式是深度学习框架集成测试中的常见实践，尤其在混合精度训练和复杂梯度计算场景中至关重要。

In [6]:
# 导入必要的库
from typing import Union
import warnings
import numpy as np
import pandas as pd
import torch
import xgboost as xgb
from torch import nn

#from gbnet.base import BaseGBModule


class XGBModule(BaseGBModule):
    """XGBoost模块封装类，将XGBoost梯度提升集成到PyTorch框架中

    实现XGBoost模型的训练和推理功能，维护模型状态并与PyTorch计算图深度集成
    """

    def __init__(self, batch_size, input_dim, output_dim, params={}, min_hess=0):
        super().__init__()
        self.batch_size = batch_size  # 批次大小
        self.input_dim = input_dim  # 输入特征维度
        self.output_dim = output_dim  # 输出预测维度

        self.params = params  # XGBoost参数字典
        self.params["objective"] = "reg:squarederror"  # 设置损失函数为均方误差
        self.params["base_score"] = 0  # 初始化基学习器预测值为0
        self.n_completed_boost_rounds = 0  # 完成的提升轮数
        self.min_hess = min_hess  # Hessian最小阈值

        # 初始化XGBoost模型，使用零矩阵作为初始数据
        init_matrix = np.zeros([batch_size, input_dim])
        self.bst = xgb.train(
            self.params,
            xgb.DMatrix(init_matrix, label=np.zeros(batch_size * output_dim)),
            num_boost_round=0,
        )
        self.n_completed_boost_rounds = 0
        self.dtrain = None  # 训练数据集缓存
        self.training_n = None  # 训练样本数量

        # 初始化预测张量（可训练参数）
        self.FX = nn.Parameter(
            torch.tensor(
                np.zeros([batch_size, output_dim]),
                dtype=torch.float,
            )
        )


    def _check_training_data(self):
        """检查训练数据权重设置"""
        if self.dtrain.get_weight().shape[0] > 0:
            warnings.warn(
                "当输入数据包含权重时，建议通过损失函数而非DMatrix直接设置权重"
            )


    def _input_checking_setting(
        self, input_data: Union[xgb.DMatrix, pd.DataFrame, np.ndarray]
    ):
        """统一输入格式并进行预处理

        确保训练模式下输入数据一致性，推理模式直接返回原始数据
        """
        assert isinstance(input_data, (xgb.DMatrix, pd.DataFrame, np.ndarray))

        if self.training:
            if self.dtrain is None:
                # 创建训练数据集
                if isinstance(input_data, xgb.DMatrix):
                    input_data.set_label(np.zeros(self.batch_size * self.output_dim))
                    self.dtrain = input_data
                    self.training_n = input_data.num_row()
                    self._check_training_data()
                else:
                    self.dtrain = xgb.DMatrix(
                        input_data, label=np.zeros(self.batch_size * self.output_dim)
                    )
                    self.training_n = input_data.shape[0]
            # 检查输入数据一致性
            compare_n = (
                input_data.num_row() if isinstance(input_data, xgb.DMatrix) else input_data.shape[0]
            )
            assert (
                compare_n == self.training_n
            ), "训练期间禁止更换数据集"
            return self.dtrain
        else:
            # 推理模式返回原始数据格式
            return (
                input_data
                if isinstance(input_data, xgb.DMatrix)
                else xgb.DMatrix(input_data)
            )


    def forward(
        self,
        input_data: Union[xgb.DMatrix, np.ndarray, pd.DataFrame],
        return_tensor: bool = True,
    ):
        """前向传播计算预测值

        支持训练和推理两种模式，自动维护预测状态
        """
        input_data = self._input_checking_setting(input_data)
        preds = self.bst.predict(input_data)

        if self.training:
            FX_detach = self.FX.detach()
            FX_detach.copy_(
                torch.tensor(
                    preds.reshape([self.batch_size, self.output_dim]),  # 调整形状为[batch,output]
                    dtype=torch.float
                )
            )

        if return_tensor:
            if self.training:
                return self.FX
            else:
                return torch.tensor(
                    preds.reshape([-1, self.output_dim]),  # 自动适应批量大小
                    dtype=torch.float
                )
        return preds


    def gb_calc(self):
        """计算梯度并存储"""
        self.grad, self.hess = self._get_grad_hess_FX()


    def gb_step(self):
        """执行梯度提升迭代步骤

        1. 计算当前梯度和Hessian
        2. 更新XGBoost模型
        """
        if self.grad is None and self.hess is None:
            self.gb_calc()

        self._gb_step_grad_hess(self.grad, self.hess)
        self.grad = None
        self.hess = None


    def _gb_step_grad_hess(self, grad, hess):
        obj = XGBObj(grad, hess)
        g, h = obj(np.zeros([self.batch_size, self.output_dim]), None)

        # 根据XGBoost版本调用对应训练方法
        if xgb.__version__ <= "2.0.3":
            self.bst_boost(
                self.dtrain,
                self.n_completed_boost_rounds + 1,
                g,
                h,
            )
        else:
            #self.bst_boost(
            self.bst.boost(
                self.dtrain,
                self.n_completed_boost_rounds + 1,
                g,
                h,
            )
        self.n_completed_boost_rounds += 1


class XGBObj:
    """XGBoost专用目标函数封装类

    将PyTorch计算的梯度转换为XGBoost可接受的格式
    """
    def __init__(self, grad, hess):
        self.grad = grad
        self.hess = hess

    def __call__(self, preds, dtrain):
        if len(preds.shape) == 2:
            M = preds.shape[0]
            N = preds.shape[1]
        else:
            M = preds.shape[0]
            N = 1

        # 根据XGBoost版本调整输出格式
        if xgb.__version__ >= "2.1.0":
            g = self.grad.detach().numpy().reshape([M, N])
            h = self.hess.detach().numpy().reshape([M, N])
        else:
            g = self.grad.detach().numpy().reshape([M * N, 1])
            h = self.hess.detach().numpy().reshape([M * N, 1])

        return g, h

In [7]:
import xgboost as xgb
print(xgb.__version__)

2.1.4


In [8]:
# 导入必要的库
from unittest import mock, TestCase  # 单元测试框架[1](@ref)
import numpy as np  # 数组操作库
import torch  # 深度学习框架
import xgboost as xgb  # XGBoost库

# 测试基本损失函数计算
def test_basic_loss():
   # 初始化XGBoost模块（5个特征，3个树，1个输出）
   gbm = XGBModule(5, 3, 1)
   # 定义均方误差损失函数
   floss = torch.nn.MSELoss()

   # 清空梯度缓存
   gbm.zero_grad()
   # 设置随机种子保证结果可复现
   np.random.seed(11010)
   # 生成随机输入数据并转换为DMatrix格式
   input_dmatrix = xgb.DMatrix(np.random.random([5, 3]))
   # 前向传播计算预测值
   preds = gbm(input_dmatrix)
   # 计算预测值与真实值（1-5）的MSE损失
   loss = floss(preds.flatten(), torch.Tensor(np.array([1, 2, 3, 4, 5])).flatten())

   # 反向传播计算梯度（create_graph=True保留二阶导数信息）
   loss.backward(create_graph=True)

   # 使用mock模拟XGBObj类和boost方法
   with (
       mock.patch("__main__.XGBObj", side_effect=XGBObj) as m_obj,
       mock.patch.object(gbm.bst, "boost", side_effect=gbm.bst.boost) as m_boost,
   ):
       # 执行一次梯度提升步骤
       gbm.gb_step()

   # 验证XGBObj的梯度计算结果是否符合预期（-2, -4, -6, -8, -10）
   assert np.all(
       np.isclose(
           m_obj.call_args_list[-1].args[0].detach().numpy(),
           np.array([-2, -4, -6, -8, -10]).reshape([-1, 1]),
       )
   )
   # 验证hessian矩阵是否为常数2
   assert np.all(
       np.isclose(
           m_obj.call_args_list[-1].args[1].detach().numpy(),
           np.array([2, 2, 2, 2, 2]).reshape([-1, 1]),
       )
   )

   # 验证boost方法被正确调用一次
   m_boost.assert_called_once()

# 测试XGBObj在不同XGBoost版本下的行为
def test_XGBObj():
   # 生成随机梯度矩阵和hessian矩阵
   grad = torch.tensor(np.random.random([20, 10]))
   hess = torch.tensor(np.random.random([20, 10]))

   # 初始化XGBObj对象
   obj = XGBObj(grad, hess)

   # 测试XGBoost 2.0.0版本行为
   with mock.patch("xgboost.__version__", new="2.0.0"):
       ograd, ohess = obj(grad, hess)
       # 验证梯度矩阵形状调整为列向量
       assert np.all(np.isclose(ograd, grad.detach().numpy().reshape([-1, 1])))
       # 验证hessian矩阵形状调整为列向量
       assert np.all(np.isclose(ohess, hess.detach().numpy().reshape([-1, 1])))

   # 测试XGBoost 2.1.0版本行为
   with mock.patch("xgboost.__version__", new="2.1.0"):
       ograd, ohess = obj(grad, hess)
       # 验证梯度矩阵保持原始形状
       assert np.all(np.isclose(ograd, grad.detach().numpy()))
       # 验证hessian矩阵保持原始形状
       assert np.all(np.isclose(ohess, hess.detach().numpy()))

# 测试输入数据类型检查逻辑
class TestInputChecking(TestCase):
   # 测试DMatrix输入且训练模式开启的情况
   def test_input_is_dmatrix_training_true_dtrain_none(self):
       module = XGBModule(10, 5, 3)
       data = np.random.rand(10, 5)
       dmatrix = xgb.DMatrix(data)
       result = module._input_checking_setting(dmatrix)
       # 验证dtrain属性被正确设置为输入DMatrix
       self.assertIs(result, module.dtrain)
       # 验证返回结果与输入DMatrix是同一对象
       self.assertIs(result, dmatrix)
       # 验证标签矩阵全零且形状正确
       np.testing.assert_array_equal(
           result.get_label(), np.zeros(module.batch_size * module.output_dim)
       )
       # 验证训练样本数量正确
       self.assertEqual(module.training_n, dmatrix.num_row())

   # 测试ndarray输入且训练模式开启的情况
   def test_input_is_ndarray_training_true_dtrain_none(self):
       module = XGBModule(10, 5, 3)
       data = np.random.rand(10, 5)
       result = module._input_checking_setting(data)
       # 验证dtrain属性被正确转换为DMatrix
       self.assertIs(result, module.dtrain)
       # 验证返回结果为DMatrix类型
       self.assertIsInstance(result, xgb.DMatrix)
       # 验证标签矩阵全零且形状正确
       np.testing.assert_array_equal(
           result.get_label(), np.zeros(module.batch_size * module.output_dim)
       )
       # 验证训练样本数量正确
       self.assertEqual(module.training_n, data.shape[0])

   # 测试DMatrix输入且训练模式开启，dtrain已设置且行数相同的情况
   def test_input_is_dmatrix_training_true_dtrain_set_same_nrows(self):
       module = XGBModule(10, 5, 3)
       data = np.random.rand(10, 5)
       dmatrix = xgb.DMatrix(data)
       result = module._input_checking_setting(dmatrix)
       # 验证返回结果与已设置的dtrain是同一对象
       self.assertIs(result, module.dtrain)
       # 验证返回结果与输入DMatrix是同一对象
       self.assertIs(result, dmatrix)

   # 测试DMatrix输入且训练模式开启，dtrain已设置但行数不同的情况
   def test_input_is_dmatrix_training_true_dtrain_set_different_nrows(self):
       module = XGBModule(10, 5, 3)
       data1 = np.random.rand(10, 5)
       module(data1)
       data2 = np.random.rand(5, 5)
       # 验证输入数据行数不同时抛出异常
       with self.assertRaises(AssertionError) as context:
           module(data2)
       # 验证异常信息包含特定错误提示
       self.assertIn(
           "训练过程中不支持更改数据集",
           str(context.exception),
       )

   # 测试DMatrix输入且训练模式关闭的情况
   def test_input_is_dmatrix_training_false(self):
       module = XGBModule(10, 5, 3)
       module(np.random.rand(10, 5))
       module.eval()
       data = np.random.rand(10, 5)
       dmatrix = xgb.DMatrix(data)
       result = module._input_checking_setting(dmatrix)
       # 验证返回结果为输入的DMatrix
       self.assertIs(result, dmatrix)

   # 测试ndarray输入且训练模式关闭的情况
   def test_input_is_ndarray_training_false(self):
       module = XGBModule(10, 5, 3)
       module(np.random.rand(10, 5))
       module.eval()
       data = np.random.rand(10, 5)
       result = module._input_checking_setting(data)
       # 验证返回结果为DMatrix类型
       self.assertIsInstance(result, xgb.DMatrix)
       # 验证DMatrix行数与输入ndarray一致
       self.assertEqual(result.num_row(), data.shape[0])

   # 测试无效输入类型的情况
   def test_input_invalid_type(self):
       module = XGBModule(10, 5, 3)
       data = [1, 2, 3]  # 非DMatrix或ndarray类型
       # 验证输入无效类型时抛出异常
       with self.assertRaises(AssertionError):
           module._input_checking_setting(data)

In [9]:
test_XGBObj()

In [10]:
test_basic_loss()