In [1]:
import numpy as np
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, mean_squared_error
from typing import Tuple, Union, Optional, List, Any, Callable

In [2]:
def calculate_gini(y: np.ndarray) -> float:
    """计算基尼指数

    公式：Gini(p) = 1 - sum(p_k^2)

    Args:
        y (np.ndarray): 标签列表 (shape: (n_samples,))
    Returns:
        float: 基尼指数
    """
    y = y.tolist()
    # 1. 获取各类别的概率分布
    probs = [y.count(i)/len(y) for i in set(y)]
    # 2. 计算 1 - 概率平方和
    return 1 - np.sum(np.array(probs) ** 2)

In [3]:
def feature_split(X: np.ndarray, feature_i: int, threshold: float) -> Tuple[np.ndarray, np.ndarray]:
    """根据特征和阈值，将数据集拆分为两个子集

    CART 是二叉树，所以只返回两个子集：
    - left: 特征值 >= threshold (或 <，具体看你约定)
    - right: 特征值 < threshold (或 >=)

    Args:
        X (np.ndarray): 数据集(二维数组)
        feature_i (int): 特征索引
        threshold (float): 分裂阈值

    Returns:
        Tuple[np.ndarray, np.ndarray]: (X_left, X_right)
    """
    x_left = X[X[:, feature_i] <= threshold]
    x_right = X[X[:, feature_i] > threshold]
    return x_left,x_right

In [4]:
class TreeNode:
    def __init__(self,
                 depth: Optional[int] = 0,
                 feature_i: Optional[int] = None,
                 threshold: Optional[float] = None,
                 leaf_value: Optional[Any] = None,
                 left_branch: Optional['TreeNode'] = None,
                 right_branch: Optional['TreeNode'] = None):
        """
        Args:
            feature_i: 切分特征的索引 (仅内部结点需要)
            threshold: 切分阈值 (仅内部结点需要)
            leaf_value: 叶子结点的预测值 (仅叶子结点需要)
            left_branch: 左子树，递归 TreeNode
            right_branch: 右子树，递归 TreeNode
        """
        self.depth = depth
        self.feature_i = feature_i
        self.threshold = threshold
        self.leaf_value = leaf_value
        self.left_branch = left_branch
        self.right_branch = right_branch

In [5]:
class BinaryDecisionTree:
    def __init__(self,
                 min_samples_split: int = 2,
                 min_gini_impurity: float = float("inf"),
                 max_depth: float = float("inf"),
                 loss: Any = None):

        self.root: Optional[TreeNode] = None  # 根结点
        self.min_samples_split = min_samples_split
        self.min_gini_impurity = min_gini_impurity
        self.max_depth = max_depth

        # 下面这两个函数将在子类(ClassificationTree)中定义
        self._impurity_calculation = None # 也就是 calculate_gini
        self._leaf_value_calculation = None # 也就是 majority_vote

    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        """训练入口"""
        # 拼接 X 和 y，方便统一处理
        self.root = self._build_tree(X, y)


    def _build_tree(self, X: np.ndarray, y: np.ndarray, current_depth: int = 0) -> TreeNode:
        """递归构建决策树 (核心函数)

        Args:
            X (np.ndarray): 训练数据
            y (np.ndarray): 训练数据的标签
            current_depth (int, optional): _description_. Defaults to 0.

        Returns:
            TreeNode: _description_
        """
        Xy = np.hstack([X, y])
        n_samples, n_features = X.shape

        # --- 步骤 1: 检查停止条件 (预剪枝) ---
        # 如果样本数 < min_samples_split，或者深度 >= max_depth
        # -> 直接返回叶子结点 (计算当前 y 的 leaf_value)
        if n_samples < self.min_samples_split or \
           current_depth >= self.max_depth or \
           len(np.unique(y)) == 1:
            return TreeNode(
                depth=current_depth,
                leaf_value=self._leaf_value_calculation(y)
            )

        # --- 步骤 2: 寻找最佳切分点 ---
        best_impurity = float("inf")
        best_criteria = {} # 存 {'feature_i': ?, 'threshold': ?}
        best_sets = {}     # 存 (left_dataset, right_dataset)

        # 遍历所有特征 feature_i:
        for i in range(n_features):
            # 获取该特征所有的唯一值作为候选阈值 thresholds
            thresholds = np.unique(X[:, i])

            # 遍历所有 threshold:
            for threshold in thresholds:
                # 1. 用 feature_split 切分数据得到 Xy1, Xy2
                Xy1, Xy2 = feature_split(Xy, i, threshold)

                # 2. 如果切分后的子集不为空:
                if len(Xy1) == 0 or len(Xy2) == 0:
                    continue
                # 计算加权基尼指数 (impurity)
                impurity = self._impurity_calculation(y, Xy1[:, -1], Xy2[:, -1])
                # 如果 impurity < best_impurity:
                if impurity < best_impurity:
                    # 更新 best_impurity, best_criteria, best_sets
                    best_impurity = impurity
                    best_criteria = {'feature_i': i, 'threshold': threshold}
                    best_sets = {
                        'left_X': Xy1[:, :-1],
                        'left_y': Xy1[:, -1],
                        'right_X': Xy2[:, :-1],
                        'right_y': Xy2[:, -1],
                    }


        # --- 步骤 3: 递归构建 ---
        # 如果计算出的 best_impurity 依然很大 (大于 min_gini_impurity) -> 不分了，返回叶子结点
        if best_impurity > self.min_gini_impurity:
            return TreeNode(depth=current_depth,leaf_value=self._leaf_value_calculation(y))
        # 否则，递归构建左右子树
        else:
            self.min_gini_impurity = best_impurity
            left_branch = self._build_tree(best_sets['left_X'], best_sets['left_y'].reshape(-1, 1), current_depth + 1)
            right_branch = self._build_tree(best_sets['right_X'], best_sets['right_y'].reshape(-1, 1), current_depth + 1)
            # 返回中间结点 (TreeNode)
            return TreeNode(feature_i=best_criteria["feature_i"],
                            threshold=best_criteria["threshold"],
                            left_branch=left_branch,
                            right_branch=right_branch,
                            depth=current_depth)


    def predict_value(self, x: np.ndarray, tree: Optional[TreeNode] = None) -> Any:
        """递归检索单条样本的预测值

        Args:
            x (np.ndarray): _description_
            tree (Optional[TreeNode], optional): _description_. Defaults to None.

        Returns:
            Any: _description_
        """

        tree = self.root if tree is None else tree
        # print(tree.depth)
        # 如果是叶子结点 (leaf_value is not None) -> 返回 leaf_value
        if tree.leaf_value is not None:
            return tree.leaf_value
        # 否则 -> 根据 x[feature_i] 和 threshold 决定去左子树还是右子树
        else:
            tree = tree.left_branch if x[tree.feature_i] <= tree.threshold else tree.right_branch

            return self.predict_value(x, tree)


    def predict(self, X: np.ndarray) -> List[Any]:
        """预测多条样本"""
        return [self.predict_value(x) for x in X]

In [6]:
class ClassificationTree(BinaryDecisionTree):
    def _calculate_gini_impurity(self, y: np.ndarray, y1: np.ndarray, y2: np.ndarray) -> float:
        """
        计算切分后的加权基尼指数
        Args:
            y: 切分前的标签
            y1: 切分后左子树的标签
            y2: 切分后右子树的标签
        """
        left_weight = len(y1)/len(y)
        right_weight = len(y2)/len(y)
        left_gini = calculate_gini(y1)
        right_gini = calculate_gini(y2)
        return left_weight * left_gini + right_weight * right_gini

    def _majority_vote(self, y: np.ndarray) -> Any:
        """
        多数投票（定义叶子结点的值）
        返回 y 中出现次数最多的类别
        """
        c = Counter(y.flatten())
        return c.most_common(1)[0][0]


    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        self._impurity_calculation = self._calculate_gini_impurity
        self._leaf_value_calculation = self._majority_vote
        super(ClassificationTree, self).fit(X, y)

In [7]:
class RegressionTree(BinaryDecisionTree):

    def _calculate_variance_reduction(self, y: np.ndarray, y1: np.ndarray, y2: np.ndarray) -> float:
        """计算方差减少量
        公式：Var_reduction = Var(y) - ( frac_1 * Var(y1) + frac_2 * Var(y2) )

        Args:
            y (np.ndarray): 切分前的标签
            y1 (np.ndarray): 切分后左子树的标签
            y2 (np.ndarray): 切分后右子树的标签

        Returns:
            float: 方差减少量
        """
        frac_1 = len(y1)/len(y)
        frac_2 = len(y2)/len(y)
        var_reduction = ( frac_1 * np.var(y1) + frac_2 * np.var(y2) )
        return var_reduction

    def _mean_of_y(self, y: np.ndarray) -> float:
        """
        计算叶子结点的预测值
        回归树直接返回当前节点标签 y 的平均值
        """
        return np.mean(y)

    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        """
        训练模型
        继承函数，并将上述两个函数注册到基类中
        """
        self._impurity_calculation = self._calculate_variance_reduction
        self._leaf_value_calculation = self._mean_of_y
        super(RegressionTree, self).fit(X, y)

In [8]:
from sklearn import datasets
data = datasets.load_iris()
X, y = data.data, data.target
# 注意！是否要对y进行reshape取决于numpy版本
y = y.reshape(-1,1)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)
clf = ClassificationTree()
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
print(accuracy_score(y_pred, y_test))

0.8666666666666667


In [9]:
import pandas as pd

# 波士顿房价数据集的原始 URL
data_url = "http://lib.stat.cmu.edu/datasets/boston"

# 从 URL 加载数据
raw_df = pd.read_csv(data_url, sep="\s+", skiprows=22, header=None)

# 处理数据
data = np.hstack([raw_df.values[::2, :], raw_df.values[1::2, :2]])  # 拼接特征数据
target = raw_df.values[1::2, 2]  # 目标变量

# 将数据和目标变量转换为 NumPy 数组
X = np.array(data)
y = np.array(target)

y = y.reshape(-1,1)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)
model = RegressionTree()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)

print("Mean Squared Error:", mse)

  raw_df = pd.read_csv(data_url, sep="\s+", skiprows=22, header=None)


Mean Squared Error: 28.974363518590916
