In [1]:
import os
os.sys.path.append(os.path.dirname(os.path.abspath('.')))

## 数据准备

In [2]:
import numpy as np
from datasets.dataset import load_boston
data=load_boston()
X,Y=data.data,data.target
del data

from model_selection.train_test_split import train_test_split
X_train,X_test,Y_train,Y_test=train_test_split(X,Y,test_size=0.2)

# print(X_train.shape,X_test.shape,Y_train.shape,Y_test.shape)

# 把X，Y拼起来便于操作
training_data=np.c_[X_train,Y_train]
testing_data=np.c_[X_test,Y_test]

# print(training_data.shape,testing_data.shape)

## 模型基础
对于CART用做回归时，分裂的依据有很多的可选方法，这里使用MSE作为分裂依据。注意这里的MSE跟模型评估时的MSE不同，这里的MSE计算方式为：
$$
H(X_{m})=\frac{1}{N_{m}}\sum\limits_{i{\in}N_{m}}(y_{i}-\bar{y}_{m})^{2}
$$
其中$\bar{y}_{m}$为第$m$个叶节点中所有训练样本的目标值均值，详见[这里](https://scikit-learn.org/stable/modules/tree.html#regression-criteria)。

In [3]:
def MSE(data,y_idx=-1):
    '''
    返回数据集目标值的MSE
    '''
    N=len(data)
    mean=np.mean(data[:,y_idx])
    return np.sum(np.square(data[:,y_idx]-mean))/N

# MSE(training_data)

定义一个在指定特征与特征值下将数据集二分的函数，这里将小于等于分割值的数据集放入左分支，大于分割值的数据集放入右分支。

In [4]:
def BinSplitData(data,f_idx,f_val):
    '''
    以指定特征与特征值二分数据集
    '''
    data_left=data[data[:,f_idx]<=f_val]
    data_right=data[data[:,f_idx]>f_val]
    return data_left,data_right

# SplitData(training_data,0,0)

分割函数与分割指标计算函数都有了，接下来就可以在数据集中迭代寻找最佳分割特征与特征值了。

In [5]:
def Test(data, criteria='mse', min_samples_split=5, min_samples_leaf=5, min_impurity_decrease=0.0):
    '''
    对数据做test，找到最佳分割特征与特征值
    return: best_f_idx, best_f_val，前者为空时代表叶节点，两者都为空时说明无法分裂
    min_samples_split: 分裂所需的最小样本数，大于1
    min_samples_leaf: 叶子节点的最小样本数，大于0
    min_impurity_decrease: 分裂需要满足的最小增益
    '''
    n_sample, n_feature = data.shape

    # 数据量小于阈值则直接返回叶节点，数据已纯净也返回叶节点
    if n_sample < min_samples_split or len(np.unique(data[:,-1]))==1:    
        return None, np.mean(data[:, -1])

    MSE_before = MSE(data)    # 分裂前的MSE
    best_gain = 0
    best_f_idx = None
    best_f_val = np.mean(data[:, -1])    # 默认分割值设为目标均值，当找不到分割点时返回该值作为叶节点

    # 遍历所有特征与特征值
    for f_idx in range(n_feature-1):
        for f_val in np.unique(data[:, f_idx]):
            data_left, data_right = BinSplitData(data, f_idx, f_val)    # 二分数据

            # 分割后的分支样本数小于阈值则放弃分裂
            if len(data_left) < min_samples_leaf or len(data_right) < min_samples_leaf:
                continue

            # 分割后的加权MSE
            MSE_after = len(data_left)/n_sample*MSE(data_left) + \
                len(data_right)/n_sample*MSE(data_right)
            gain = MSE_before-MSE_after    # MSE的减小量为增益

            # 分裂后的增益小于阈值或小于目前最大增益则放弃分裂
            if gain < min_impurity_decrease or gain < best_gain:
                continue
            else:
                # 否则更新最大增益
                best_gain = gain
                best_f_idx, best_f_val = f_idx, f_val

    # 返回一个最佳分割特征与最佳分割点，注意会有空的情况
    return best_f_idx, best_f_val

# Test(training_data)

最后就可以使用递归来生成树了。树中每一个节点需要保存的信息有：分割特征，分割点，以及左右分支。

In [6]:
def CART(data,criteria='mse',min_samples_split=5,min_samples_leaf=5,min_impurity_decrease=0.0):
    # 首先是做test，数据集的质量由Test函数来保证并提供反馈
    best_f_idx,best_f_val=Test(data,criteria,min_samples_split,min_samples_leaf,min_impurity_decrease)
    
    tree={}
    tree['cut_f']=best_f_idx
    tree['cut_val']=best_f_val
    
    if best_f_idx==None:    # f_idx为空表示需要生成叶节点
        return best_f_val
    
    data_left,data_right=BinSplitData(data,best_f_idx,best_f_val)
    tree['left']=CART(data_left,criteria,min_samples_split,min_samples_leaf,min_impurity_decrease)
    tree['right']=CART(data_right,criteria,min_samples_split,min_samples_leaf,min_impurity_decrease)
    
    return tree

tree=CART(training_data)
# print(tree)

In [7]:
def predict_one(x_test, tree, default=-1):
    if isinstance(tree, dict):    # 非叶节点才做左右判断
        cut_f_idx, cut_val = tree['cut_f'], tree['cut_val']
        sub_tree = tree['left'] if x_test[cut_f_idx] <= cut_val else tree['right']
        return predict_one(x_test, sub_tree)
    else:    # 叶节点则直接返回值
        return tree
    
# test_idx=100
# print(predict_one(X_test[test_idx],tree),Y_test[test_idx])
    
def predict(X_test,tree):
    return [predict_one(x_test,tree) for x_test in X_test]
    
Y_pred=predict(X_test,tree)
print('MSE:{}'.format(np.mean(np.square(Y_pred-Y_test))))

MSE:15.101454048749126


使用sklearn中的回归树来做效果对比。

In [8]:
from sklearn.tree import DecisionTreeRegressor
dt_reg=DecisionTreeRegressor(min_samples_split=5, min_samples_leaf=5)
dt_reg.fit(X_train,Y_train)
Y_pred=dt_reg.predict(X_test)
print('MSE:{}'.format(np.mean(np.square(Y_pred-Y_test))))

MSE:13.951683170768305
