# 决策树和随机森林

导入包。

In [19]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

读取鸢尾花数据集。

In [20]:
iris_dataset = pd.read_csv(
    "iris.data",
    names=[
        "sepal-length",
        "sepal-width",
        "petal-length",
        "petal-width",
        "species",
    ],
)

读取收入数据集。

In [21]:
adult_dataset = pd.read_csv(
    "adult.data",
    names=[
        "age",
        "workclass",
        "fnlwgt",
        "education",
        "education-num",
        "marital-status",
        "occupation",
        "relationship",
        "race",
        "sex",
        "capital-gain",
        "capital-loss",
        "hours-per-week",
        "native-country",
        "income",
    ],
    nrows=1000,
)

将数据集划分为训练集和测试集。

In [22]:
train_set, test_set = train_test_split(adult_dataset, test_size=0.2)

`calculate_gini` 函数计算一组计数的基尼系数。

基尼系数代表了一个数据集的“混乱程度”。基尼系数越高，说明对应的数据集越混乱；反之则越整齐。

例如，计数 `[3,3,4]` 的基尼系数为 1 - 0.3² - 0.3² - 0.4² = 0.66，说明这组计数背后的数据集很混乱。

而计数 `[0,0,10]` 的基尼系数为 1 - 0² - 0² - 1² = 0，说明这组计数背后的数据集很整齐。

In [23]:
def calculate_gini(counts: list[int]):
    total = sum(counts)

    if total == 0:
        return 0

    return 1 - sum((count / total) ** 2 for count in counts)

`calculate_weighted_gini` 函数计算若干组计数的加权平均基尼系数，其中每组计数的权重为：该组计数的总数 / 所有计数的总数。

例如，`[3,3,4]` 和 `[0,0,10]` 这两组计数的加权平均基尼系数为 0.5 * 0.66 + 0.5 * 0 = 0.33。

In [24]:
def calculate_weighted_gini(counts_list: list[list[int]]):
    total = sum(sum(counts) for counts in counts_list)
    return sum((sum(counts) / total) * calculate_gini(counts) for counts in counts_list)

`divide_by_discrete_feature` 函数基于指定的离散型特征，将数据集划分为两个子集。

以收入数据集为例：基于“学历是否为本科”，可以将数据集划分为两个子集。

在划分数据集时，该特征的所有可取的类别中，存在一个“最佳类别”。与基于其它“非最佳类别”的划分相比，基于“最佳类别”的划分可以使得两个子集的加权平均基尼系数最小。

函数的第一个返回值是最佳类别，第二、三个返回值是划分后的两个子集，第四个返回值是加权平均基尼系数。

In [25]:
def divide_by_discrete_feature(dataset: pd.DataFrame, feature: str):
    # 统计出所有可取的类别。
    choices = dataset[feature].unique()

    # 对于每个类别，都计算一遍划分后的加权平均基尼系数，然后找到使得加权平均基尼系数最小的最佳类别。
    best_choice = None
    best_yes_subset = None
    best_no_subset = None
    min_gini = 1.0
    for choice in choices:
        # 根据“特征是否取该类别”划分出两个子集。
        yes_subset: pd.DataFrame = dataset[dataset[feature] == choice]
        no_subset: pd.DataFrame = dataset[dataset[feature] != choice]

        # 分别统计两个子集中，各标签的数量。
        yes_counts = yes_subset.iloc[:, -1].value_counts()
        no_counts = no_subset.iloc[:, -1].value_counts()

        # 计算当前划分下的加权平均基尼系数。
        gini = calculate_weighted_gini([yes_counts, no_counts])

        # 更新最佳类别和最小系数。
        if gini < min_gini:
            best_choice = choice
            best_yes_subset = yes_subset
            best_no_subset = no_subset
            min_gini = gini

    return best_choice, best_yes_subset, best_no_subset, min_gini

`divide_by_continuous_feature` 函数基于指定的连续型特征，将数据集划分为两个子集。

以收入数据集为例：基于“年龄是否小于 30.5 岁”，可以将数据集划分为两个子集。

为了将连续的特征离散化，我们取该特征下所有相邻值的二分点，作为若干个可能的阈值。

在划分数据集时，该特征的所有可能的阈值中，存在一个“最佳阈值”。与基于其它“非最佳阈值”的划分相比，基于“最佳阈值”的划分可以使得两个子集的加权平均基尼系数最小。

函数的第一个返回值是最佳阈值，第二、三个返回值是划分后的两个子集，第四个返回值是加权平均基尼系数。

In [26]:
def divide_by_continuous_feature(dataset: pd.DataFrame, feature: str):
    # 计算出所有可能的阈值。
    unique_values = dataset[feature].unique()
    thresholds = (unique_values[:-1] + unique_values[1:]) / 2

    # 对于每个阈值，都计算一遍划分后的加权平均基尼系数，然后找到使得加权平均基尼系数最小的最佳阈值。
    best_threshold = None
    best_less_subset = None
    best_greater_subset = None
    min_gini = 1.0
    for threshold in thresholds:
        # 根据“特征是否小于该阈值”划分出两个子集。
        less_subset: pd.DataFrame = dataset[dataset[feature] < threshold]
        greater_subset: pd.DataFrame = dataset[dataset[feature] > threshold]

        # 分别统计两个子集中，各标签的数量。
        less_counts = less_subset.iloc[:, -1].value_counts()
        greater_counts = greater_subset.iloc[:, -1].value_counts()

        # 计算当前划分下的加权平均基尼系数。
        gini = calculate_weighted_gini([less_counts, greater_counts])

        # 更新最佳阈值和最小系数。
        if gini < min_gini:
            best_threshold = threshold
            best_less_subset = less_subset
            best_greater_subset = greater_subset
            min_gini = gini

    return best_threshold, best_less_subset, best_greater_subset, min_gini

`find_best_division` 函数找到数据集的最佳划分。

在划分数据集时，它的所有特征中，存在一个“最佳特征”。与基于其它“非最佳特征”的划分相比，基于“最佳特征”的划分可以使得两个子集的加权平均基尼系数最小。

函数的第一个返回值是最佳特征，第二个返回值是最佳类别（若非离散型特征则为 `None`），第三个返回值是最佳阈值（若非连续型特征则为 `None`），第四、五个返回值是划分后的两个子集。

In [27]:
def find_best_division(dataset: pd.DataFrame):
    # 统计出所有可能的特征。
    features = dataset.columns[:-1]

    # 对于每个特征，都计算一遍划分后的加权平均基尼系数，然后找到使得加权平均基尼系数最小的最佳特征。
    best_feature = None
    best_choice = None
    best_threshold = None
    best_left_subset = None
    best_right_subset = None
    min_gini = 1.0
    for feature in features:
        # 在标签相同的情况下，如果该特征下只有一个类别或取值，
        # 说明该特征无法继续划分，直接跳过。
        if len(dataset[feature].unique()) == 1:
            continue

        # 如果是离散型特征，则找到最佳类别。
        if dataset[feature].dtype == object:
            choice, left_subset, right_subset, gini = divide_by_discrete_feature(
                dataset, feature
            )
            threshold = None

        # 如果是连续型特征，则找到最佳阈值。
        else:
            (
                threshold,
                left_subset,
                right_subset,
                gini,
            ) = divide_by_continuous_feature(dataset, feature)
            choice = None

        # 更新最佳特征、最佳类别、最佳阈值和最小系数。
        if gini < min_gini:
            best_feature = feature
            best_choice = choice
            best_threshold = threshold
            best_left_subset = left_subset
            best_right_subset = right_subset
            min_gini = gini

    return (
        best_feature,
        best_choice,
        best_threshold,
        best_left_subset,
        best_right_subset,
    )

`build_decision_tree` 递归构建决策树。

决策树每个节点的数据结构如下：

- `feature`：划分数据集的特征。
- `choice`：如果 `feature` 是离散型的，那么 `choice` 是最佳类别；否则为 `None`。
- `threshold`：如果 `feature` 是连续型的，那么 `threshold` 是最佳阈值；否则为 `None`。
- `left`：该节点的左子树。
- `right`：该节点的右子树。

In [28]:
def build_decision_tree(dataset: pd.DataFrame, current_depth: int = 1):
    # 如果数据集中只有一种标签，那么直接返回这个类别。
    if len(dataset.iloc[:, -1].unique()) == 1:
        return dataset.iloc[:, -1].unique()[0]

    # 找到该数据集的最佳特征与最佳类别（或阈值）。
    feature, choice, threshold, left_subset, right_subset = find_best_division(dataset)

    # 如果找不到最佳特征，说明所有特征都只有一种类别或取值。
    # 用投票法决定该节点的类别。
    if feature is None:
        return dataset.iloc[:, -1].value_counts().index[0]

    # 递归地构建左右子树。
    left_tree = build_decision_tree(left_subset, current_depth + 1)
    right_tree = build_decision_tree(right_subset, current_depth + 1)

    # 返回当前节点。
    return {
        "feature": feature,
        "choice": choice,
        "threshold": threshold,
        "left": left_tree,
        "right": right_tree,
    }

`predict` 函数预测一行数据的标签。

In [29]:
def predict(row: pd.Series, decision_tree: dict):
    # 如果当前节点是叶子节点，那么直接返回这个节点的标签。
    if isinstance(decision_tree, str):
        return decision_tree

    # 如果当前节点的特征是离散型，则根据其指示的类别，判断是进入左子树还是右子树。
    if decision_tree["choice"] != None:
        if row[decision_tree["feature"]] == decision_tree["choice"]:
            return predict(row, decision_tree["left"])
        else:
            return predict(row, decision_tree["right"])

    # 如果当前节点的特征是连续型，则根据其指示的阈值，判断是进入左子树还是右子树。
    elif decision_tree["threshold"] != None:
        if row[decision_tree["feature"]] < decision_tree["threshold"]:
            return predict(row, decision_tree["left"])
        else:
            return predict(row, decision_tree["right"])

    # 如果当前节点的特征既不是离散型，也不是连续型，则抛出异常。
    else:
        raise Exception("Invalid decision tree node.")

`test` 函数在指定的数据集上，测试决策树的准确率。

In [30]:
def test(dataset: pd.DataFrame, decision_tree: dict):
    predictions = dataset.apply(predict, axis=1, args=(decision_tree,))
    accuracy = sum(predictions == dataset.iloc[:, -1]) / dataset.shape[0]
    return accuracy

运行。

In [31]:
decision_tree = build_decision_tree(train_set)
accuracy = test(test_set, decision_tree)
print(f"Accuracy: {accuracy}")

Accuracy: 0.79


`sample_dataset` 函数使用 Bootstrap 算法，从数据集中有放回地抽取同样多行数据，并随机选取一小部分属性，构成一个新的数据集。

In [32]:
def sample_dataset(dataset: pd.DataFrame, n_samples: int = 30):
    n_features = int(np.sqrt(dataset.shape[1]))
    samples: list[pd.DataFrame] = []

    for _ in range(n_samples):
        sample = dataset.sample(dataset.shape[0], replace=True)
        features = np.random.choice(sample.columns[:-1], n_features, replace=False)
        sample = pd.concat([sample[features], sample.iloc[:, -1]], axis=1)
        samples.append(sample)

    return samples

`build_random_forest` 函数在指定的数据集上，构建随机森林。

In [33]:
def build_random_forest(dataset: pd.DataFrame):
    samples = sample_dataset(dataset)
    decision_trees = [build_decision_tree(sample) for sample in samples]
    return decision_trees

`predict_with_random_forest` 函数使用随机森林预测一行数据的标签。

In [34]:
def predict_with_random_forest(row: pd.Series, random_forest: list[dict]):
    predictions = [predict(row, decision_tree) for decision_tree in random_forest]
    return max(set(predictions), key=predictions.count)

`test_with_random_forest` 函数在指定的数据集上，测试随机森林的准确率。

In [35]:
def test_with_random_forest(dataset: pd.DataFrame, random_forest: list[dict]):
    predictions = dataset.apply(
        predict_with_random_forest, axis=1, args=(random_forest,)
    )
    accuracy = sum(predictions == dataset.iloc[:, -1]) / dataset.shape[0]
    return accuracy

运行。

In [36]:
random_forest = build_random_forest(train_set)
accuracy = test_with_random_forest(test_set, random_forest)
print(f"Accuracy: {accuracy}")

Accuracy: 0.85
