# CART Algorithm (Classification and Regression Trees)

Reference :

https://blog.csdn.net/weixin_45666566/article/details/107954454

https://zhuanlan.zhihu.com/p/461032990

Cart algorithm is a simple algorithm that uses a decision tree to predict the target variable based on several input variables.

First we implement the necessary packages.

In [11]:
import numpy as np
from math import log
import operator
import json

## Splitting Creteria
* For classification tasks, CART uses Gini impurity as the splitting criterion. The lower the Gini impurity, the more pure the subset is. 

* For regression tasks, CART uses residual reduction as the splitting criterion. The lower the residual reduction, the better the fit of the model to the data.

### Regression Tree
Here, we try to implement the algorithm used in the regression tree.

* Reference : https://github.com/datawhalechina/statistical-learning-method-solutions-manual/blob/master/docs/chapter05/ch05.md

In [14]:
class Node:
    def __init__(self, value, feature, left=None, right=None):
        """
        Node class represents a node in the regression tree.
        
        Args:
            value (list): The value of the node.
            feature (list): The feature of the node.
            left (Node): The left child node.
            right (Node): The right child node.
        """
        self.value = value.tolist()
        self.feature = feature.tolist()
        self.left = left
        self.right = right

class MyLeastSquareRegTree:
    def __init__(self, train_X, y, epsilon):
        """
        MyLeastSquareRegTree class represents a Least Square Regression Tree.

        Args:
            train_X (numpy.ndarray): The training features.
            y (numpy.ndarray): The target values.
            epsilon (float): The threshold for stopping tree growth.
        """
        self.x = train_X
        self.y = y
        self.epsilon = epsilon
        self.feature_count = len(train_X[0])
        self.tree = None

    def _fit(self, x, y, feature_count):
        """
        Fits the regression tree recursively.

        Args:
            x (numpy.ndarray): The features.
            y (numpy.ndarray): The target values.
            feature_count (int): The number of features.

        Returns:
            Node: The root node of the fitted tree.
        """
        (j, s, minval, c1, c2) = self._divide(x, y, feature_count)
        tree = Node(feature=j, value=x[s, j], left=None, right=None)

        if minval < self.epsilon or len(y[np.where(x[:, j] <= x[s, j])]) <= 1:
            tree.left = c1
        else:
            tree.left = self._fit(x[np.where(x[:, j] <= x[s, j])],
                                  y[np.where(x[:, j] <= x[s, j])],
                                  self.feature_count)
        if minval < self.epsilon or len(y[np.where(x[:, j] > s)]) <= 1:
            tree.right = c2
        else:
            tree.right = self._fit(x[np.where(x[:, j] > x[s, j])],
                                   y[np.where(x[:, j] > x[s, j])],
                                   self.feature_count)
        return tree

    def fit(self):
        """
        Fits the regression tree to the training data.
        """
        self.tree = self._fit(self.x, self.y, self.feature_count)
        return self
    

    def printtree(self, node=None, indent='', last=True):
        """
        Prints the regression tree in a tree-like structure.

        Args:
            node (Node): The current node.
            indent (str): The indentation for pretty printing.
            last (bool): Flag indicating if the current node is the last child.
        """
        if node is None:
            node = self.tree

        if isinstance(node, Node):
            print(indent, end='')
            if last:
                print("└─ ", end='')
                indent += "   "
            else:
                print("├─ ", end='')
                indent += "│  "
            print(f"Feature: {node.feature}, Value: {node.value}")
            if node.left is not None or node.right is not None:
                self.printtree(node.left, indent, last=False)
                self.printtree(node.right, indent, last=True)
        else:
            print(indent, end='')
            if last:
                print("└─ ", end='')
            else:
                print("├─ ", end='')
            print(f"Prediction: {node}")
            

    @staticmethod
    def _divide(x, y, feature_count):
        """
        Finds the best split for the given data.

        Args:
            x (numpy.ndarray): The features.
            y (numpy.ndarray): The target values.
            feature_count (int): The number of features.

        Returns:
            tuple: The best split information (feature index, split index, minimum value, left child mean, right child mean).
        """
        
        cost = np.zeros((feature_count, len(x)))
        for i in range(feature_count):
            for k in range(len(x)):
                value = x[k, i]
                y1 = y[np.where(x[:, i] <= value)]
                c1 = np.mean(y1)
                y2 = y[np.where(x[:, i] > value)]
                if len(y2) == 0:
                    c2 = 0
                else:
                    c2 = np.mean(y2)
                
                y1[:] = y1[:] - c1
                y2[:] = y2[:] - c2
                cost[i, k] = np.sum(y1 * y1) + np.sum(y2 * y2)
        
        cost_index = np.where(cost == np.min(cost))
        j = cost_index[0][0]
        s = cost_index[1][0]
        
        c1 = np.mean(y[np.where(x[:, j] <= x[s, j])])
        c2 = np.mean(y[np.where(x[:, j] > x[s, j])])
        
        return j, s, cost[cost_index], c1, c2


* Test the result

In [15]:
train_X = np.array([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]).T
y = np.array([4.50, 4.75, 4.91, 5.34, 5.80, 7.05, 7.90, 8.23, 8.70, 9.00])

model_tree = MyLeastSquareRegTree(train_X, y, epsilon=0.2)
model_tree.fit()
model_tree.printtree()


└─ Feature: 0, Value: 5
   ├─ Feature: 0, Value: 3
   │  ├─ Prediction: 4.72
   │  └─ Prediction: 5.57
   └─ Feature: 0, Value: 7
      ├─ Feature: 0, Value: 6
      │  ├─ Prediction: 7.05
      │  └─ Prediction: 7.9
      └─ Feature: 0, Value: 8
         ├─ Prediction: 8.23
         └─ Prediction: 8.85


### Classification Tree
Here, we try to implement the algorithm used in the regression tree.

This Section mainly contains :

* The implementation of basic data structures and initialization

* Dini 

* Splitting

* The final output of a classification tree.

In [57]:
## Initialize the dataset
def createDataSet():
    """
    Create example data / Read data
    @param dataSet: The dataset
    @return dataSet labels: The dataset, Feature set
    """
    dataSet = [('青年', '否', '否', '一般', '不同意'),
               ('青年', '否', '否', '好', '不同意'),
               ('青年', '是', '否', '好', '同意'),
               ('青年', '是', '是', '一般', '同意'),
               ('青年', '否', '否', '一般', '不同意'),
               ('中年', '否', '否', '一般', '不同意'),
               ('中年', '否', '否', '好', '不同意'),
               ('中年', '是', '是', '好', '同意'),
               ('中年', '否', '是', '非常好', '同意'),
               ('中年', '否', '是', '非常好', '同意'),
               ('老年', '否', '是', '非常好', '同意'),
               ('老年', '否', '是', '好', '同意'),
               ('老年', '是', '否', '好', '同意'),
               ('老年', '是', '否', '非常好', '同意'),
               ('老年', '否', '否', '一般', '不同意')]
    # 特征集
    labels = ['年龄', '有工作', '有房子', '信贷情况']
    return dataSet,labels


Get a closer look at the dataset and its structure.

In [58]:
dataset,labels = createDataSet()
print(dataset[0])
print("---------------")
for i in range(len(dataset[0])):
    unique_values = set(data[i] for data in dataset) # Remove duplicates
    print(unique_values)


('青年', '否', '否', '一般', '不同意')
---------------
{'中年', '青年', '老年'}
{'是', '否'}
{'是', '否'}
{'非常好', '一般', '好'}
{'不同意', '同意'}


We assume the "first feature" (p) is "同意"

In [59]:
feature = '同意'

First, we need a function to effectively split the dataset according to the given features.

In [60]:
def split_dataset(data_set, index, value):
    """
    Split the dataset based on a specific feature value.
    @param data_set: The dataset
    @param index: Index of the feature column
    @param value: Value of the feature to split on
    @return subset: Subset of the dataset with the specified feature value
    """
    subset = [data for data in data_set if data[index] == value]
    return subset

### Test the split_dataset function
subset = split_dataset(dataset, 0, '青年')
for i in range(len(subset)):
    print(subset[i])

('青年', '否', '否', '一般', '不同意')
('青年', '否', '否', '好', '不同意')
('青年', '是', '否', '好', '同意')
('青年', '是', '是', '一般', '同意')
('青年', '否', '否', '一般', '不同意')


Next, use a simple function to calculate the probability 'p'

In [61]:
def calculate_probability(class_list, feature):
    """
    Calculate the probability that a sample point belongs to a specific class.
    @param class_list: List of class labels
    @param feature: Feature to calculate the probability for
    @return probability: Probability of the feature
    """
    total_samples = len(class_list)
    feature_count = np.sum(np.array(class_list) == feature)
    return feature_count / total_samples

### Test the calculate_probability function
probability1 = calculate_probability(subset, '同意')
probability2 = calculate_probability(subset, '不同意')
print(probability1, probability2)

0.4 0.6


Now , we implement to core part of the aigorithm : choose the split to minimize Dini index.
> Note: Gini index of the probability distribution for binary classification problem is $Gini(p)=2p(1-p)$

In [62]:
def choose_best_feature_to_split(data_set):
    """
    Choose the best feature for splitting the dataset.
    @param data_set: The dataset
    @return best_feature: Index of the best feature
    """
    num_features = len(data_set[0]) - 1
    # If there is only one feature, return 0
    if num_features == 1:
        return 0

    best_gini = 1
    best_feature = -1
    for i in range(num_features):
        # Get the unique values of the feature
        unique_values = set(data[i] for data in data_set) # Remove duplicates
        feature_gini = 0  # Gini index for the feature
        
        # Calculate the Gini index for the feature
        for value in unique_values:
            subset = split_dataset(data_set, i, value) # Split the dataset
            prob = len(subset) / len(data_set)
            p = calculate_probability([data[-1] for data in subset], feature)
            feature_gini += prob * (2 * p * (1 - p))
        # Choose the feature with the lowest Gini index
        if feature_gini < best_gini:
            best_gini = feature_gini
            best_feature = i

    return best_feature
# Test the choose_best_feature_to_split function
best_feature = choose_best_feature_to_split(dataset)
print(labels[best_feature])

有房子


Remember? When the algorithm meets the termination conditions but encounters leaf nodes with multiple classes, we determine the majority class.

In [63]:
def majorityCnt(classList):
    """
    Determines the most frequent class in a list.
    
    @param classList: A list containing class labels.
    @return majorityClass: The most frequent class label in the list.
    """
    class_count = {}  # Dictionary to store the count of each class
    for label in classList:
        class_count[label] = class_count.get(label, 0) + 1  # Count the occurrences of each class
    # Sort the class counts in descending order and return the most frequent class
    majorityClass = max(class_count, key=class_count.get)
    return majorityClass
# Test the majorityCnt function
classList = [data[-1] for data in dataset]
majority_class = majorityCnt(classList)
print(majority_class)

同意


Now we can complete the whole algorithm and create the classifictaion tree.

In [141]:
def createTree(dataSet, labels):
    """
    Classify the last feature, sort by the number of classes after classification, e.g., if the final classification is 2 agree and 1 disagree, it is determined as agree.
    @param dataSet: The dataset
    @param labels: The feature set
    @return myTree: The decision tree
    """
    
    # Get the last value of each row, i.e., the class of each row
    classList = [example[-1] for example in dataSet]  
    
    # When the dataset has only one class
    if classList.count(classList[0]) == len(classList):
        return classList[0]
    
    # When there is only one column left in the dataset (i.e., the class), classify based on the last feature
    if len(dataSet[0]) == 1:
        return majorityCnt(classList)
    
    # Other cases
    bestFeat = choose_best_feature_to_split(dataSet)  # Choose the best feature (column)
    bestFeatLabel = labels[bestFeat]  # The best feature
    del(labels[bestFeat])  # Remove the current best feature from the feature set
    uniqueVals = set(example[bestFeat] for example in dataSet)  # Select the unique values corresponding to the best feature
    myTree = {bestFeatLabel: {}}  # Save the classification result as a dictionary
    
    for value in uniqueVals:
        subLabels = labels[:]  # Deep copy, the copied value is independent of the original value (normal copying is shallow copying, changes to the original value or the copied value affect each other)
        myTree[bestFeatLabel][value] = createTree(split_dataset(dataSet, bestFeat, value), subLabels)  # Recursively call to create the decision tree
        
    return myTree


In [142]:
dataSet, labels = createDataSet()  
print(createTree(dataSet, labels)) 

{'有房子': {'是': '同意', '否': {'有工作': {'是': '同意', '否': '不同意'}}}}


We can better represent our tree using more clear **TREE** structures.

In [143]:
class TreeNode:
    def __init__(self, feature=None, value=None, prediction=None):
        self.feature = feature          # Feature name
        self.value = value              # Feature value
        self.prediction = prediction    # If it's a leaf node, prediction value; otherwise, None
        self.children = {}              # Children nodes, stored as {feature value: child node}

def createTree(dataSet, labels):
    """
    Create a decision tree based on the dataset and labels.
    @param dataSet: The dataset
    @param labels: The feature labels
    @return myTree: The root node of the decision tree
    """
    classList = [example[-1] for example in dataSet]

    # When all samples belong to the same class, return that class as the prediction of the leaf node
    if classList.count(classList[0]) == len(classList):
        return TreeNode(prediction=classList[0])
    # When there's only one feature left in the dataset, choose the majority class as the prediction of the leaf node
    if len(dataSet[0]) == 1:
        return TreeNode(prediction=majorityCnt(classList))

    # Choose the best splitting feature
    bestFeat = choose_best_feature_to_split(dataSet)
    bestFeatLabel = labels[bestFeat]

    # Create the current node
    currentNode = TreeNode(feature=bestFeatLabel)

    # For each value of the current feature, recursively build the subtree
    uniqueVals = set(example[bestFeat] for example in dataSet)
    for value in uniqueVals:
        subLabels = labels[:]  # Create a copy of labels
        currentNode.children[value] = createTree(split_dataset(dataSet, bestFeat, value), subLabels)
    return currentNode

def printTree(node, depth=0):
    """
    Print the decision tree in a human-readable format.
    @param node: The root node of the decision tree
    @param depth: The depth of the current node in the tree
    """
    if node is None:
        return
    if node.prediction is not None:
        print("  " * depth, "Prediction:", node.prediction)
    else:
        print("  " * depth, "Feature:", node.feature)
        for value, child in node.children.items():
            print("  " * (depth + 1), f"Value: {value}")
            printTree(child, depth + 2)

# Example usage:
dataSet, labels = createDataSet()
myTree = createTree(dataSet, labels)
printTree(myTree)


 Feature: 有房子
   Value: 是
     Prediction: 同意
   Value: 否
     Feature: 有工作
       Value: 是
         Prediction: 同意
       Value: 否
         Prediction: 不同意


Let's try our classification tree now!

In [147]:
def classify(inputTree, featLabels, testVec):
    """
    Classify the input vector based on the decision tree.
    @param inputTree: The decision tree
    @param featLabels: The feature set
    @param testVec: The input vector
    @return classLabel: The class label
    """
    currentNode = inputTree
    while currentNode.children:
        feature = currentNode.feature
        featIndex = featLabels.index(feature)
        value = testVec[featIndex]
        if value in currentNode.children:
            currentNode = currentNode.children[value]
        else: # If the value is not found in the children, return None (or handle appropriately)
            return None
    return currentNode.prediction


print(classify(myTree, ['年龄', '有工作', '有房子', '信贷情况'], ['青年', '否', '否', '一般']))
print(classify(myTree, ['年龄', '有工作', '有房子', '信贷情况'], ['老年', '是', '否', '好']))

不同意
同意


## Cut Branches

First, we need a function to calculate the loss

* Normaly we use the cross validation,here for simplicity, we just validate on the whole dataset.

In [155]:
def calculate_error(tree, testData, labels):
    """
    Calculates the error of the decision tree on the test data.
    
    @param tree: The decision tree
    @param testData: Test data to evaluate the decision tree
    @param labels: The feature set
    @return error: The error rate of the decision tree on the test data
    """
    num_samples = len(testData)
    num_errors = 0
    for data in testData:
        pred = classify(tree, labels, data[:-1])  # Get the prediction from the decision tree
        if pred != data[-1]:
            num_errors += 1
    
    error_rate = num_errors / num_samples
    return float(error_rate)

# Test the calculate_error function

dataSet, labels = createDataSet() 
myTree = createTree(dataSet, labels)
#note we have deleted some feature coloumns,we have to keep record of the deleted feature columns

testData = [('青年', '否', '否', '一般', '同意'),
            ('老年', '是', '否', '好', '同意')]

print(calculate_error(myTree, testData,labels))


0.5


In [2]:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

data = load_iris()
x, y = data.data, data.target
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3)
from sklearn.tree import DecisionTreeClassifier

model = DecisionTreeClassifier(criterion='gini')
model.fit(x_train, y_train)
print(model.score(x_test, y_test))

1.0
