#### 第五章 决策树

##### 分类树

In [55]:
import numpy as np
from typing import Callable

class Tree:
    """
    决策树
    :param nodes 节点
    :param axis 维度
    :param value 叶节点取值 
    """
    def __init__(self, nodes:dict, axis: int, value:float|None):
        self.nodes = nodes
        self.axis = axis
        self.value = value

def entropy(arr:np.ndarray) -> float:
    """
    计算信息熵
    :param Y: 标签，一维列表
    :return entropy: 信息熵
    """
    # 统计唯一值及其出现次数
    unique_vals, counts = np.unique(arr, return_counts=True)

    # 计算概率
    total = len(arr)
    probability = counts / total

    # 计算信息熵
    entropy = -np.sum(probability * np.log2(probability))
    return entropy

def gini(arr:np.ndarray) -> float:
    """
    计算基尼指数
    :param Y: 标签，一维列表
    :return gini: 基尼指数
    """
    # 统计唯一值及其出现次数
    unique_vals, counts = np.unique(arr, return_counts=True)

    # 计算概率
    total = len(arr)
    probability = counts / total

    # 计算信息熵
    gini = 1 - np.sum(np.power(probability, 2))
    return gini
        
class DecisionTree:
    """
    决策树算法
    :param X: 特征矩阵 (n_samples, n_features)
    :param Y: 标签数组 (n_samples,)
    :param feat_name: 特征名称
    """

    def __init__(self, X: np.ndarray, Y: np.ndarray, feat_name: list[str]):
        self.X = X
        self.Y = Y
        self.feat_name = feat_name

    def calculate_info_gain(self, feat:np.ndarray, Y:np.ndarray, metric: Callable[[np.ndarray], float]) -> float:
        """
        计算特征 feat 对标签 Y 的信息增益
        信息增益公式：IG = 熵(整体Y) - 条件熵(feat划分后的Y)
        
        :param feat: 特征数组，一维 np.ndarray（每个样本的特征值）
        :param Y: 标签数组，一维 np.ndarray（每个样本的标签）
        :param metric: 计算熵/基尼指数的函数，输入一维数组，返回浮点数（
        :return float: 信息增益值
        """
        # 计算原始值
        base_metric = metric(Y)

        # 统计唯一值及其出现次数
        unique_feat_vals, feat_counts = np.unique(feat, return_counts=True)
        conditional_metric = 0.0
        
        for val, count in zip(unique_feat_vals, feat_counts):
            # 取出该特征值对应的所有标签
            y_subset = Y[feat == val]
            # 累加
            conditional_metric += (count / len(feat)) * metric(y_subset)
        
        # 4. 计算信息增益
        info_gain = base_metric - conditional_metric
        return info_gain

    def _choose_best_feature(self, X: np.ndarray, Y: np.ndarray, metric: Callable[[np.ndarray], float], ratio: bool=False) -> int:
        """
        选择最优划分特征（信息增益最大的特征）
        :param X: 特征矩阵 (n_samples, n_features)
        :param Y: 标签数组 (n_samples,)
        :param metric: 不纯度计算函数
        :param ratio: 是否计算信息增益比
        :return: 最优特征的维度索引
        """
        n_features = X.shape[1]
        best_gain = -1  # 初始化最优增益
        best_feature_idx = -1  # 初始化最优特征索引
        
        # 遍历所有特征
        for idx in range(n_features):
            # 取出第idx列特征
            feat = X[:, idx]
            # 计算该特征的信息增益
            gain = self.calculate_info_gain(feat, Y, metric)
            if ratio:
                gain = gain / metric(feat)
            # 更新最优特征
            if gain > best_gain:
                best_gain = gain
                best_feature_idx = idx
        
        return best_feature_idx

    def _majority_vote(self, Y: np.ndarray) -> float:
        """
        返回出现次数最多的标签（叶节点取值）
        :param Y: 标签数组
        :return: 最频繁的标签值
        """
        unique_vals, counts = np.unique(Y, return_counts=True)
        return unique_vals[np.argmax(counts)]

    def fit_tree(self, X:np.ndarray, Y:np.ndarray, metric: Callable[[np.ndarray], float], ratio: bool=False) -> Tree:
        """
        递归构建决策树
        :param X: 特征矩阵 (n_samples, n_features)
        :param Y: 标签数组 (n_samples,)
        :param metric: 不纯度计算函数（entropy/gini）
        :param ratio: 是否计算信息增益
        :return: 决策树根节点
        """
        # 终止条件1：所有样本标签相同，返回叶节点
        if len(np.unique(Y)) == 1:
            return Tree(nodes={}, axis=-1, value=Y[0])

        # 终止条件2：没有特征可划分（只剩1列特征或所有样本特征相同）
        if X.shape[1] == 0 or (X == X[0]).all():
            majority_val = self._majority_vote(Y)
            return Tree(nodes={}, axis=-1, value=majority_val)
        
        # 其他
        best_feature_idx = self._choose_best_feature(X, Y, metric, ratio)
        best_feature = X[:, best_feature_idx]
        unique_vals = np.unique(best_feature)
        current_tree = Tree(nodes={}, axis=best_feature_idx, value=None)
        # 递归构建子树
        for val in unique_vals:
            mask = (best_feature == val)
            X_subset = X[mask]
            Y_subset = Y[mask]
            # 移除已划分的特征（避免重复使用）
            X_subset = np.delete(X_subset, best_feature_idx, axis=1)
            # 递归构建子树
            current_tree.nodes[val] = self.fit_tree(X_subset, Y_subset, metric)
        
        return current_tree

    def fit(self, metric: Callable[[np.ndarray], float] = None, ratio: bool=False):
        """
        训练决策树
        :param metric: 信息计算函数，默认用entropy
        :param ratio: 是否计算信息增益
        """
        if metric is None:
            metric = entropy
        self.tree = self.fit_tree(self.X, self.Y, metric, ratio)
    
    def predict(self, X: np.ndarray, tree: Tree) -> float:
        """
        递归预测
        :param x: 单个样本特征 (n_features,)
        :param tree: 决策树根节点
        :return: 预测标签
        """
        # 叶节点，直接返回值
        if tree.value is not None:
            return tree.value
        
        # 分支节点，找对应的子树
        feat_val = X[tree.axis]
        x = np.delete(X, tree.axis, axis=0)
        return self.predict(x, tree.nodes[feat_val])

    def print_tree(self, tree: Tree = None, depth: int = 0, prefix: str = "根节点: "):
        """
        递归打印决策树结构
        :param tree: 要打印的树节点（默认用训练好的self.tree）
        :param depth: 递归深度（用于缩进）
        :param prefix: 节点前缀（区分分支/叶节点）
        """
        # 初始化：如果没传tree，用训练好的根节点
        if tree is None:
            if self.tree is None:
                print("决策树尚未训练！")
                return
            tree = self.tree
        
        # 缩进符（按深度增加，让结构更清晰）
        indent = "  " * depth
        
        # 情况1：叶节点 → 直接打印预测值
        if tree.value is not None:
            print(f"{indent}{prefix} 预测结果 = {tree.value}")
            return
        
        # 情况2：分支节点 → 打印划分特征 + 递归打印子节点
        feature_name = self.feat_name[tree.axis]
        print(f"{indent}{prefix} 按【{feature_name}】划分")
        
        # 遍历所有子节点
        for feat_val, subtree in tree.nodes.items():
            self.print_tree(
                tree=subtree,
                depth=depth + 1,
                prefix=f"当{feature_name} = {feat_val} → "
            )


In [56]:
import numpy as np

# 特征矩阵 X（保留原始文本，不编码）
X = np.array([
    ["青年", "否", "否", "一般"],
    ["青年", "否", "否", "好"],
    ["青年", "是", "否", "好"],
    ["青年", "是", "是", "一般"],
    ["青年", "否", "否", "一般"],
    ["中年", "否", "否", "一般"],
    ["中年", "否", "否", "好"],
    ["中年", "是", "是", "好"],
    ["中年", "否", "是", "非常好"],
    ["中年", "否", "是", "非常好"],
    ["老年", "否", "是", "非常好"],
    ["老年", "否", "是", "好"],
    ["老年", "是", "否", "好"],
    ["老年", "是", "否", "非常好"],
    ["老年", "否", "否", "一般"]
], dtype=str)

# 类别 Y（保留原始文本）
Y = np.array([
    "否", "否", "是", "是", "否",
    "否", "否", "是", "是", "是",
    "是", "是", "是", "是", "否"
], dtype=str)

feat_name = ["年龄", "有工作", "有自己的房子", "信贷情况"]

dt = DecisionTree(X,Y, feat_name)
dt.fit(gini)
dt.print_tree()

根节点:  按【有自己的房子】划分
  当有自己的房子 = 否 →  按【有工作】划分
    当有工作 = 否 →  预测结果 = 否
    当有工作 = 是 →  预测结果 = 是
  当有自己的房子 = 是 →  预测结果 = 是


In [57]:
dt = DecisionTree(X,Y, feat_name)
dt.fit(gini, ratio=True)
dt.print_tree()

根节点:  按【有自己的房子】划分
  当有自己的房子 = 否 →  按【有工作】划分
    当有工作 = 否 →  预测结果 = 否
    当有工作 = 是 →  预测结果 = 是
  当有自己的房子 = 是 →  预测结果 = 是


In [59]:
dt.predict(["老年", "否", "否", "一般"], dt.tree)

np.str_('否')

#### 回归树

In [67]:
def find_best_split(feat: np.ndarray, Y:np.ndarray) -> float:
    """
    寻找回归树的最佳分裂点
    :param feat 一维特征数列
    :param Y 一维标签数列
    :return s分割点
    """
    unique_vals = np.sort(np.unique(feat))
    
    var = 1e9
    for val in unique_vals[:-1]:
        idx = (feat <= val)
        #计算该分割点下的预测误差
        cur_var = np.var(Y[idx]) + np.var(Y[~idx])
        
        if cur_var < var:
            s = val
    return s

X = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], dtype=np.float64)
y = np.array([4.50, 4.75, 4.91, 5.34, 5.80, 7.05, 7.90, 8.23, 8.70, 9.00], dtype=np.float64)
find_best_split(X, y)

np.float64(9.0)