In [1]:
import numpy as np

In [2]:
class TreeNode():
    """树节点"""
    def __init__(self, feature_idx = None, feature_val = None, node_val = None, left_child = None, right_child = None):
        """
        feature_idx: 划分特征的索引
        feature_val: 划分特征对应的值
        node_val: 叶节点所存储的值，只有叶节点才能存储类别,分类树存储次数出现最多的类别，回归树存储节点的平均值
        left_child: 左子树
        right_child: 右子树
        """
        self._feature_idx = feature_idx
        self._feature_val = feature_val
        self._node_val = node_val
        self._left_child = left_child
        self._right_child = right_child
        
class CART(object):
    def __init__(self, min_sample = 2, min_gain = 1e-6, max_depth = np.inf):
        """
        min_sample: 当某个节点的样本数小于min_sample将不再划分
        min_gain: 如果划分后的增益不超过min_gain将不再划分，
        对于分类树指的是基尼系数是否有足够的下降，
        对于回归树指的是平方误差是否有足够的下降
        max_depth: 指的是树的最大深度
        """
        self._root = None
        self._min_sample = min_sample
        self._min_gain = min_gain
        self._max_depth = max_depth
        
    def fit(self, X, y):
        """模型训练过程就是树建立的过程"""
        self._root = self._build_tree(X, y)
    
    def predict(self, X):
        """给定样本，预测类别或输出平均值"""
        return self._predict(X, self._root)
    
    def _build_tree(self, X, y, cur_depth = 0):
        """构建树"""
        # cur_depth 表示当前树的高度
        # 如果子树只剩下一个类别则直接记为子节点,分类树是投票最多的类别，回归树是平均值        
        if np.unique(y).shape[0] == 1:
            node_val = self._calc_node_val(y)
            return TreeNode(node_val = node_val)
        
        before_divide = self._calc_evaluation(y) # 计算划分前的基尼指数或平方差
        min_evaluation = np.inf # 划分后基尼指数或平方差的最小值
        best_feature_idx = None # 最佳划分特征索引
        best_feature_val = None # 最佳划分特征的值
        
        n_sample, n_feature = X.shape
        # 当样本数大于等于某一阈值，并且当前树的深度小于等于最大深度时才能继续划分
        if n_sample >= self._min_sample and cur_depth <= self._max_depth:
            for i in range(n_feature):  # 依次遍历每个特征
                feature_value = np.unique(X[:, i])
                for fea_val in feature_value: # 遍历这个特征每个可能的取值
                    X_left = X[X[:, i] <= fea_val]
                    y_left = y[X[:, i] <= fea_val]
                    
                    X_right = X[X[:, i] > fea_val]
                    y_right = y[X[:, i] > fea_val]
                    
                    if X_left.shape[0] > 0 and y_left.shape[0] > 0 and X_right.shape[0] > 0 and y_right.shape[0] > 0:
                        after_divide = self._calc_division(y_left, y_right)
                        if after_divide < min_evaluation:
                            min_evaluation = after_divide
                            best_feature_idx = i
                            best_feature_val = fea_val
        
        # 如果划分前和划分后的基尼指数或平方差有足够的下降才能继续划分
        if before_divide - min_evaluation > self._min_gain:
            X_left = X[X[:, best_feature_idx] <= best_feature_val]
            y_left = y[X[:, best_feature_idx] <= best_feature_val]
            
            X_right = X[X[:, best_feature_idx] > best_feature_val]
            y_right = y[X[:, best_feature_idx] > best_feature_val]
            
            left_child = self._build_tree(X_left, y_left, cur_depth + 1) # 递归构建左子树
            right_child = self._build_tree(X_right, y_right, cur_depth + 1) # 递归构建右子树
            
            return TreeNode(feature_idx = best_feature_idx, feature_val = best_feature_val, left_child = left_child, right_child = right_child)
        
        # 当样本数小于某一阈值，当前树的深度大于最大深度时, 或者未找到合适的划分时则看成子节点
        node_val = self._calc_node_val(y)
        return TreeNode(node_val = node_val)
        
    def _predict(self, X, tree = None):
        """给定输入来预测输出"""
        if tree is None:
            tree = self._root
        
        if tree._node_val is not None:
            return tree._node_val
        
        feature_val = X[tree._feature_idx]
        if feature_val <= tree._feature_val:
            return self._predict(X, tree._left_child)
        return self._predict(X, tree._right_child)
    
    def _calc_division(self, y_left, y_right):
        """计算划分后的基尼指数或平方差"""
        return NotImplementedError
    
    def _calc_evaluation(self, y):
        """计算数据集的基尼指数或平方差"""
        return NotImplementedError
        
    def _calc_node_val(self, y):
        """计算叶节点的值，分类树和回归树分别实现"""
        return NotImplementedError()
    
class CARTClassification(CART):
    def _calc_division(self, y_left, y_right):
        """计算划分后的基尼指数"""
        left_evaluation = self._calc_evaluation(y_left)
        right_evaluation = self._calc_evaluation(y_right)
        p_left = y_left.shape[0] / (y_left.shape[0] + y_right.shape[0])
        p_right = y_right.shape[0] / (y_left.shape[0] + y_right.shape[0])
        after_divide = p_left * left_evaluation + p_right * right_evaluation
        return after_divide
    
    def _calc_evaluation(self, y):
        """计算数据集的基尼指数"""
        _, num = np.unique(y, return_counts = True)
        gini = 1
        for n in num:
            gini -= (n / y.shape[0]) ** 2        
        return gini
        
    def _calc_node_val(self, y):
        """计算叶节点的值，分类树从标签中进行投票，出现次数最多的是叶节点的类别"""
        label, num_label = np.unique(y, return_counts = True)        
        return label[np.argmax(num_label)]
    
class CARTRegression(CART):
    def _calc_division(self, y_left, y_right):
        """计算划分后的平方差"""
        left_evaluation = self._calc_evaluation(y_left)
        right_evaluation = self._calc_evaluation(y_right)
        after_divide = left_evaluation + right_evaluation
        return after_divide
    
    def _calc_evaluation(self, y):
        """计算数据集的平方差"""    
        return np.sum(np.power(y - np.mean(y), 2))
        
    def _calc_node_val(self, y):
        """计算叶节点的值，回归树将标签的平均值作为叶节点的预测值"""        
        return np.mean(y)

In [3]:
# CART分类树效果
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
iris_data, iris_y = load_iris(return_X_y=True) #return_X_y为True，表示因变量和自变量独立导出
xtrain, xtest, ytrain, ytest = train_test_split(iris_data, iris_y, train_size=0.8, shuffle=True)

min_sample = 2
max_depth = 20
min_gain = 1e-6
model = CARTClassification(min_sample, min_gain, max_depth)
model.fit(xtrain, ytrain)

n_test = xtest.shape[0]
n_right = 0
for i in range(n_test):
    y_pred = model.predict(xtest[i])
    if y_pred == ytest[i]:
        n_right += 1
print("CART分类树在测试集上的准确率为：{}%".format((n_right * 100) / n_test))

from sklearn.tree import DecisionTreeClassifier
skmodel = DecisionTreeClassifier()
skmodel.fit(xtrain, ytrain)
print("sklearn分类模型在测试集上准确率为：{}%".format(100 * skmodel.score(xtest, ytest)))

CART分类树在测试集上的准确率为：96.66666666666667%
sklearn分类模型在测试集上准确率为：96.66666666666667%


In [5]:
# CART回归树效果
from sklearn.datasets import load_boston
boston_data, boston_y = load_boston(return_X_y=True)
boston_xtrain, boston_xtest, boston_ytrain, boston_ytest = train_test_split(boston_data, boston_y, train_size=0.8, shuffle=True)

min_sample = 2
max_depth = 20
min_gain = 1e-6
regressor_model = CARTRegression(min_sample, min_gain, max_depth)
regressor_model.fit(boston_xtrain, boston_ytrain)

n_boston_test = boston_xtest.shape[0]
diff = 0
for i in range(n_boston_test):
    boston_y_pred = regressor_model.predict(boston_xtest[i])
    diff += abs(boston_y_pred - boston_ytest[i])
    
from sklearn.tree import DecisionTreeRegressor
boston_skmodel = DecisionTreeRegressor()
boston_skmodel.fit(boston_xtrain, boston_ytrain)
y_skpred = boston_skmodel.predict(boston_xtest)

sk_diff = 0
for i in range(n_boston_test):
    sk_diff += abs(y_skpred[i] - boston_ytest[i])

print("CART回归树模型平均预测误差为：{:.1f}".format(diff / n_boston_test))
print("skmodel回归模型平均预测误差为：{:.1f}".format(sk_diff / n_boston_test))

CART回归树模型平均预测误差为：2.7
skmodel回归模型平均预测误差为：2.8
