In [62]:
# stop depth at 5
import numpy as np
import pandas as pd


class DecisionTree:
    def __init__(self, feature_names, category_mappings, stopping_depth=None):
        self.tree = None
        self.feature_names = feature_names  # Stores feature names
        self.category_mappings = category_mappings  # put different string category into integer (private for 0, self-employed for 1 ...etc)
        self.stopping_depth = stopping_depth  # Controls the maximum depth of the tree to prevent overfitting

    # compute the entropy of y
    def entropy(self, y):
        values, counts = np.unique(y, return_counts=True)
        probs = counts / counts.sum()
        return -np.sum(probs * np.log2(probs))

    # compute information gain
    def information_gain(self, X_column, y, threshold):
        # The threshold of the split point
        # if x_column smaller or equal to threshold, put it to the lhs
        # if X_column = [1, 2, 2, 3,4,5]，threshold = 3，then left_mask = [True, True, True, False, False]
        left_mask = X_column <= threshold
        # the rest, rhs
        right_mask = ~left_mask
        # compute entropy for lhs(X_column <= threshold)
        left_entropy = self.entropy(y[left_mask])
        # compute entropy for rhs(X_column > threshold)
        right_entropy = self.entropy(y[right_mask])
        # generate the number of total samples
        n = len(y)
        # compute number of samples for lhs
        n_left = left_mask.sum()
        # compute number of samples for rhs
        n_right = right_mask.sum()
        weighted_entropy = (n_left / n) * left_entropy + (n_right / n) * right_entropy
        return self.entropy(y) - weighted_entropy

    # find the best feature to split, this is the main part for decision tree. the purpose is to go through all the feature and find the best threshold which is the best split point.
    def best_split(self, X, y):
        """
        X = np.array([
        [25, 'Private', 'Bachelors'],
        [45, 'Self-employed', 'Masters'],
        [35, 'Government', 'Bachelors'],
        [50, 'Private', 'PhD'],
        [60, 'Government', 'Masters']
        """
        # Initialize the information gain to 0.
        best_gain = 0
        # Initialize the feature to None。
        best_feature = None
        # Initialize the threshold to None。
        best_threshold = None
        # X.shape[1] is the number of features
        for feature in range(X.shape[1]):
            # if X[:, feature] = [1, 2, 2, 3], then thresholds = [1, 2, 3]。
            thresholds = np.unique(X[:, feature])
            for threshold in thresholds:
                # compute current feature's threshold and information gain
                gain = self.information_gain(X[:, feature], y, threshold)
                # if the current feature's threshold and information gain is larger than best information gain, then update the best feature and threshold
                if gain > best_gain:
                    best_gain, best_feature, best_threshold = gain, feature, threshold
        return best_feature, best_threshold

    # Recursively a decision tree, by recursively splitting the data set, and gradually building a complete decision tree
    def build_tree(self, X, y, depth=0):
        if len(np.unique(y)) == 1:
            return y[0]
        if self.stopping_depth is not None and depth >= self.stopping_depth:
            return np.bincount(y).argmax()
        feature, threshold = self.best_split(X, y)
        if feature is None:
            return np.bincount(y).argmax()
        left_mask = X[:, feature] <= threshold
        right_mask = ~left_mask
        left_tree = self.build_tree(X[left_mask], y[left_mask], depth + 1)
        right_tree = self.build_tree(X[right_mask], y[right_mask], depth + 1)
        if left_tree == right_tree:
            return left_tree
        return {"feature": self.feature_names[feature], "threshold": threshold, "left": left_tree, "right": right_tree}

    # train decision tree
    def fit(self, X, y):
        self.tree = self.build_tree(np.array(X), np.array(y))

    # predict single sample
    def predict_sample(self, x, tree):
        if not isinstance(tree, dict):
            return tree
        """
        tree = {
        "feature": "age",
        "threshold": 25,
        "left": 0,
        "right": {
            "feature": "age",
            "threshold": 50,
            "left": 1,
            "right": 0
            }
        }
        x = [40, 'Self-employed', 'Masters']
        """
        if x[self.feature_names.index(tree["feature"])] <= tree["threshold"]:
            return self.predict_sample(x, tree["left"])
        else:
            return self.predict_sample(x, tree["right"])

    # predict more samples
    def predict(self, X):
        predictions = np.array([self.predict_sample(x, self.tree) for x in X])
        print("Predictions are ", predictions)
        return predictions

    # print tree
    def print_tree(self, tree=None, depth=0, prefix="|"):
        if tree is None:
            tree = self.tree
        if not isinstance(tree, dict):
            print(prefix + "-- " + f"Leaf: {tree}")
            return
        feature_name = tree['feature']
        threshold = tree['threshold']
        if feature_name in self.category_mappings:
            threshold = self.category_mappings[feature_name][int(threshold)]
        print(prefix + "-- " + f"Node: {feature_name} <= {threshold}")
        self.print_tree(tree['left'], depth + 1, prefix + "   |")
        self.print_tree(tree['right'], depth + 1, prefix + "   |")


# load data
def load_data(file_path):
    df = pd.read_csv(file_path)
    # Random data and set random state into 70 to get different database every time
    # frac=1：keep all the database
    df = df.sample(frac=1, random_state=20).reset_index(drop=True)
    df = df.apply(lambda x: pd.factorize(x)[0])
    feature_names = df.columns[:-1].tolist()
    category_mappings = {}
    for col in df.columns:
        df[col], mapping = pd.factorize(df[col])
        category_mappings[col] = mapping.tolist()
    X = df.iloc[:, :-1].values
    y = df.iloc[:, -1].values
    X_train, X_val = X[:-500], X[-500:]
    y_train, y_val = y[:-500], y[-500:]
    return X_train, y_train, X_val, y_val, feature_names, category_mappings


def evaluate_model(tree, X_test, y_test):
    y_pred = tree.predict(X_test)
    accuracy = np.mean(y_pred == y_test)
    print(f"Model Accuracy: {accuracy:.4f}")


# train and test decision tree
if __name__ == "__main__":
    train_file = "adult_train_data.csv"
    test_file = "adult_test_data.csv"

    X_train, y_train, X_val, y_val, feature_names, category_mappings = load_data(train_file)
    X_test, y_test, _, _, _, _ = load_data(test_file)

    #print(category_mappings)
    tree = DecisionTree(feature_names, category_mappings, stopping_depth=5)
    tree.fit(X_train, y_train)
    tree.print_tree()

    print("Validation Set Evaluation:")
    evaluate_model(tree, X_val, y_val)

    print("Test Set Evaluation:")
    evaluate_model(tree, X_test, y_test)

|-- Node: marital_status <= 0
|   |-- Leaf: 0
|   |-- Node: marital_status <= 1
|   |   |-- Node: occupation <= 7
|   |   |   |-- Node: education_num <= 1
|   |   |   |   |-- Node: education_num <= 0
|   |   |   |   |   |-- Leaf: 0
|   |   |   |   |   |-- Leaf: 1
|   |   |   |   |-- Leaf: 0
|   |   |   |-- Leaf: 0
|   |   |-- Leaf: 0
Validation Set Evaluation:
Predictions are  [0 0 1 1 0 1 1 0 0 0 0 0 0 0 1 0 0 0 1 0 0 1 1 0 1 0 0 0 0 0 0 0 1 1 0 0 0
 0 0 0 0 0 0 0 1 1 0 1 0 1 0 1 0 0 0 0 0 1 0 0 0 1 0 1 0 0 1 0 0 1 1 1 0 0
 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 1 0 0 1 0 0 1 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0
 1 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 0 1 0 1 1 0 1 0 0 0 0 0 1
 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 1 0 0 1 1 0
 0 0 0 0 0 0 1 0 0 0 0 0 0 1 0 0 1 1 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 1 0 0
 1 0 0 1 1 0 0 0 0 1 0 1 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 1 0 0 0 0 1 0 0 0 0
 0 1 1 0 0 0 1 1 0 1 1 0 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 1 0 0 0 0 0 0 1 0 1

In [64]:
# stop depth at 10
import numpy as np
import pandas as pd


class DecisionTree:
    def __init__(self, feature_names, category_mappings, stopping_depth=None):
        self.tree = None
        self.feature_names = feature_names  # Stores feature names
        self.category_mappings = category_mappings  # put different string category into integer (private for 0, self-employed for 1 ...etc)
        self.stopping_depth = stopping_depth  # Controls the maximum depth of the tree to prevent overfitting

    # compute the entropy of y
    def entropy(self, y):
        values, counts = np.unique(y, return_counts=True)
        probs = counts / counts.sum()
        return -np.sum(probs * np.log2(probs))

    # compute information gain
    def information_gain(self, X_column, y, threshold):
        # The threshold of the split point
        # if x_column smaller or equal to threshold, put it to the lhs
        # if X_column = [1, 2, 2, 3,4,5]，threshold = 3，then left_mask = [True, True, True, False, False]
        left_mask = X_column <= threshold
        # the rest, rhs
        right_mask = ~left_mask
        # compute entropy for lhs(X_column <= threshold)
        left_entropy = self.entropy(y[left_mask])
        # compute entropy for rhs(X_column > threshold)
        right_entropy = self.entropy(y[right_mask])
        # generate the number of total samples
        n = len(y)
        # compute number of samples for lhs
        n_left = left_mask.sum()
        # compute number of samples for rhs
        n_right = right_mask.sum()
        weighted_entropy = (n_left / n) * left_entropy + (n_right / n) * right_entropy
        return self.entropy(y) - weighted_entropy

    # find the best feature to split, this is the main part for decision tree. the purpose is to go through all the feature and find the best threshold which is the best split point.
    def best_split(self, X, y):
        """
        X = np.array([
        [25, 'Private', 'Bachelors'],
        [45, 'Self-employed', 'Masters'],
        [35, 'Government', 'Bachelors'],
        [50, 'Private', 'PhD'],
        [60, 'Government', 'Masters']
        """
        # Initialize the information gain to 0.
        best_gain = 0
        # Initialize the feature to None。
        best_feature = None
        # Initialize the threshold to None。
        best_threshold = None
        # X.shape[1] is the number of features
        for feature in range(X.shape[1]):
            # if X[:, feature] = [1, 2, 2, 3], then thresholds = [1, 2, 3]。
            thresholds = np.unique(X[:, feature])
            for threshold in thresholds:
                # compute current feature's threshold and information gain
                gain = self.information_gain(X[:, feature], y, threshold)
                # if the current feature's threshold and information gain is larger than best information gain, then update the best feature and threshold
                if gain > best_gain:
                    best_gain, best_feature, best_threshold = gain, feature, threshold
        return best_feature, best_threshold

    # Recursively a decision tree, by recursively splitting the data set, and gradually building a complete decision tree
    def build_tree(self, X, y, depth=0):
        if len(np.unique(y)) == 1:
            return y[0]
        if self.stopping_depth is not None and depth >= self.stopping_depth:
            return np.bincount(y).argmax()
        feature, threshold = self.best_split(X, y)
        if feature is None:
            return np.bincount(y).argmax()
        left_mask = X[:, feature] <= threshold
        right_mask = ~left_mask
        left_tree = self.build_tree(X[left_mask], y[left_mask], depth + 1)
        right_tree = self.build_tree(X[right_mask], y[right_mask], depth + 1)
        if left_tree == right_tree:
            return left_tree
        return {"feature": self.feature_names[feature], "threshold": threshold, "left": left_tree, "right": right_tree}

    # train decision tree
    def fit(self, X, y):
        self.tree = self.build_tree(np.array(X), np.array(y))

    # predict single sample
    def predict_sample(self, x, tree):
        if not isinstance(tree, dict):
            return tree
        """
        tree = {
        "feature": "age",
        "threshold": 25,
        "left": 0,
        "right": {
            "feature": "age",
            "threshold": 50,
            "left": 1,
            "right": 0
            }
        }
        x = [40, 'Self-employed', 'Masters']
        """
        if x[self.feature_names.index(tree["feature"])] <= tree["threshold"]:
            return self.predict_sample(x, tree["left"])
        else:
            return self.predict_sample(x, tree["right"])

    # predict more samples
    def predict(self, X):
        predictions = np.array([self.predict_sample(x, self.tree) for x in X])
        print("Predictions are ", predictions)
        return predictions

    # print tree
    def print_tree(self, tree=None, depth=0, prefix="|"):
        if tree is None:
            tree = self.tree
        if not isinstance(tree, dict):
            print(prefix + "-- " + f"Leaf: {tree}")
            return
        feature_name = tree['feature']
        threshold = tree['threshold']
        if feature_name in self.category_mappings:
            threshold = self.category_mappings[feature_name][int(threshold)]
        print(prefix + "-- " + f"Node: {feature_name} <= {threshold}")
        self.print_tree(tree['left'], depth + 1, prefix + "   |")
        self.print_tree(tree['right'], depth + 1, prefix + "   |")


# load data
def load_data(file_path):
    df = pd.read_csv(file_path)
    # Random data and set random state into 70 to get different database every time
    # frac=1：keep all the database
    df = df.sample(frac=1, random_state=20).reset_index(drop=True)
    df = df.apply(lambda x: pd.factorize(x)[0])
    feature_names = df.columns[:-1].tolist()
    category_mappings = {}
    for col in df.columns:
        df[col], mapping = pd.factorize(df[col])
        category_mappings[col] = mapping.tolist()
    X = df.iloc[:, :-1].values
    y = df.iloc[:, -1].values
    X_train, X_val = X[:-500], X[-500:]
    y_train, y_val = y[:-500], y[-500:]
    return X_train, y_train, X_val, y_val, feature_names, category_mappings


def evaluate_model(tree, X_test, y_test):
    y_pred = tree.predict(X_test)
    accuracy = np.mean(y_pred == y_test)
    print(f"Model Accuracy: {accuracy:.4f}")


# train and test decision tree
if __name__ == "__main__":
    train_file = "adult_train_data.csv"
    test_file = "adult_test_data.csv"

    X_train, y_train, X_val, y_val, feature_names, category_mappings = load_data(train_file)
    X_test, y_test, _, _, _, _ = load_data(test_file)

    #print(category_mappings)
    tree = DecisionTree(feature_names, category_mappings, stopping_depth=10)
    tree.fit(X_train, y_train)
    tree.print_tree()

    print("Validation Set Evaluation:")
    evaluate_model(tree, X_val, y_val)

    print("Test Set Evaluation:")
    evaluate_model(tree, X_test, y_test)

|-- Node: marital_status <= 0
|   |-- Node: relationship <= 0
|   |   |-- Node: hours_per_week <= 1
|   |   |   |-- Node: occupation <= 2
|   |   |   |   |-- Node: occupation <= 1
|   |   |   |   |   |-- Node: workclass <= 2
|   |   |   |   |   |   |-- Leaf: 0
|   |   |   |   |   |   |-- Node: education <= 1
|   |   |   |   |   |   |   |-- Leaf: 0
|   |   |   |   |   |   |   |-- Node: age <= 0
|   |   |   |   |   |   |   |   |-- Node: sex <= 0
|   |   |   |   |   |   |   |   |   |-- Leaf: 0
|   |   |   |   |   |   |   |   |   |-- Node: fnlwgt <= 0
|   |   |   |   |   |   |   |   |   |   |-- Leaf: 0
|   |   |   |   |   |   |   |   |   |   |-- Leaf: 1
|   |   |   |   |   |   |   |   |-- Leaf: 0
|   |   |   |   |   |-- Node: hours_per_week <= 0
|   |   |   |   |   |   |-- Leaf: 0
|   |   |   |   |   |   |-- Node: sex <= 0
|   |   |   |   |   |   |   |-- Node: native_country <= 0
|   |   |   |   |   |   |   |   |-- Node: age <= 0
|   |   |   |   |   |   |   |   |   |-- Leaf: 0
|   |   |   

In [66]:
# no stop depth
import numpy as np
import pandas as pd


class DecisionTree:
    def __init__(self, feature_names, category_mappings, stopping_depth=None):
        self.tree = None
        self.feature_names = feature_names  # Stores feature names
        self.category_mappings = category_mappings  # put different string category into integer (private for 0, self-employed for 1 ...etc)
        self.stopping_depth = stopping_depth  # Controls the maximum depth of the tree to prevent overfitting

    # compute the entropy of y
    def entropy(self, y):
        values, counts = np.unique(y, return_counts=True)
        probs = counts / counts.sum()
        return -np.sum(probs * np.log2(probs))

    # compute information gain
    def information_gain(self, X_column, y, threshold):
        # The threshold of the split point
        # if x_column smaller or equal to threshold, put it to the lhs
        # if X_column = [1, 2, 2, 3,4,5]，threshold = 3，then left_mask = [True, True, True, False, False]
        left_mask = X_column <= threshold
        # the rest, rhs
        right_mask = ~left_mask
        # compute entropy for lhs(X_column <= threshold)
        left_entropy = self.entropy(y[left_mask])
        # compute entropy for rhs(X_column > threshold)
        right_entropy = self.entropy(y[right_mask])
        # generate the number of total samples
        n = len(y)
        # compute number of samples for lhs
        n_left = left_mask.sum()
        # compute number of samples for rhs
        n_right = right_mask.sum()
        weighted_entropy = (n_left / n) * left_entropy + (n_right / n) * right_entropy
        return self.entropy(y) - weighted_entropy

    # find the best feature to split, this is the main part for decision tree. the purpose is to go through all the feature and find the best threshold which is the best split point.
    def best_split(self, X, y):
        """
        X = np.array([
        [25, 'Private', 'Bachelors'],
        [45, 'Self-employed', 'Masters'],
        [35, 'Government', 'Bachelors'],
        [50, 'Private', 'PhD'],
        [60, 'Government', 'Masters']
        """
        # Initialize the information gain to 0.
        best_gain = 0
        # Initialize the feature to None。
        best_feature = None
        # Initialize the threshold to None。
        best_threshold = None
        # X.shape[1] is the number of features
        for feature in range(X.shape[1]):
            # if X[:, feature] = [1, 2, 2, 3], then thresholds = [1, 2, 3]。
            thresholds = np.unique(X[:, feature])
            for threshold in thresholds:
                # compute current feature's threshold and information gain
                gain = self.information_gain(X[:, feature], y, threshold)
                # if the current feature's threshold and information gain is larger than best information gain, then update the best feature and threshold
                if gain > best_gain:
                    best_gain, best_feature, best_threshold = gain, feature, threshold
        return best_feature, best_threshold

    # Recursively a decision tree, by recursively splitting the data set, and gradually building a complete decision tree
    def build_tree(self, X, y, depth=0):
        if len(np.unique(y)) == 1:
            return y[0]
        if self.stopping_depth is not None and depth >= self.stopping_depth:
            return np.bincount(y).argmax()
        feature, threshold = self.best_split(X, y)
        if feature is None:
            return np.bincount(y).argmax()
        left_mask = X[:, feature] <= threshold
        right_mask = ~left_mask
        left_tree = self.build_tree(X[left_mask], y[left_mask], depth + 1)
        right_tree = self.build_tree(X[right_mask], y[right_mask], depth + 1)
        if left_tree == right_tree:
            return left_tree
        return {"feature": self.feature_names[feature], "threshold": threshold, "left": left_tree, "right": right_tree}

    # train decision tree
    def fit(self, X, y):
        self.tree = self.build_tree(np.array(X), np.array(y))

    # predict single sample
    def predict_sample(self, x, tree):
        if not isinstance(tree, dict):
            return tree
        """
        tree = {
        "feature": "age",
        "threshold": 25,
        "left": 0,
        "right": {
            "feature": "age",
            "threshold": 50,
            "left": 1,
            "right": 0
            }
        }
        x = [40, 'Self-employed', 'Masters']
        """
        if x[self.feature_names.index(tree["feature"])] <= tree["threshold"]:
            return self.predict_sample(x, tree["left"])
        else:
            return self.predict_sample(x, tree["right"])

    # predict more samples
    def predict(self, X):
        predictions = np.array([self.predict_sample(x, self.tree) for x in X])
        print("Predictions are ", predictions)
        return predictions

    # print tree
    def print_tree(self, tree=None, depth=0, prefix="|"):
        if tree is None:
            tree = self.tree
        if not isinstance(tree, dict):
            print(prefix + "-- " + f"Leaf: {tree}")
            return
        feature_name = tree['feature']
        threshold = tree['threshold']
        if feature_name in self.category_mappings:
            threshold = self.category_mappings[feature_name][int(threshold)]
        print(prefix + "-- " + f"Node: {feature_name} <= {threshold}")
        self.print_tree(tree['left'], depth + 1, prefix + "   |")
        self.print_tree(tree['right'], depth + 1, prefix + "   |")


# load data
def load_data(file_path):
    df = pd.read_csv(file_path)
    # Random data and set random state into 70 to get different database every time
    # frac=1：keep all the database
    df = df.sample(frac=1, random_state=20).reset_index(drop=True)
    df = df.apply(lambda x: pd.factorize(x)[0])
    feature_names = df.columns[:-1].tolist()
    category_mappings = {}
    for col in df.columns:
        df[col], mapping = pd.factorize(df[col])
        category_mappings[col] = mapping.tolist()
    X = df.iloc[:, :-1].values
    y = df.iloc[:, -1].values
    X_train, X_val = X[:-500], X[-500:]
    y_train, y_val = y[:-500], y[-500:]
    return X_train, y_train, X_val, y_val, feature_names, category_mappings


def evaluate_model(tree, X_test, y_test):
    y_pred = tree.predict(X_test)
    accuracy = np.mean(y_pred == y_test)
    print(f"Model Accuracy: {accuracy:.4f}")


# train and test decision tree
if __name__ == "__main__":
    train_file = "adult_train_data.csv"
    test_file = "adult_test_data.csv"

    X_train, y_train, X_val, y_val, feature_names, category_mappings = load_data(train_file)
    X_test, y_test, _, _, _, _ = load_data(test_file)

    #print(category_mappings)
    tree = DecisionTree(feature_names, category_mappings, stopping_depth= None)
    tree.fit(X_train, y_train)
    tree.print_tree()

    print("Validation Set Evaluation:")
    evaluate_model(tree, X_val, y_val)

    print("Test Set Evaluation:")
    evaluate_model(tree, X_test, y_test)

|-- Node: marital_status <= 0
|   |-- Node: relationship <= 0
|   |   |-- Node: hours_per_week <= 1
|   |   |   |-- Node: occupation <= 2
|   |   |   |   |-- Node: occupation <= 1
|   |   |   |   |   |-- Node: workclass <= 2
|   |   |   |   |   |   |-- Leaf: 0
|   |   |   |   |   |   |-- Node: education <= 1
|   |   |   |   |   |   |   |-- Leaf: 0
|   |   |   |   |   |   |   |-- Node: age <= 0
|   |   |   |   |   |   |   |   |-- Node: sex <= 0
|   |   |   |   |   |   |   |   |   |-- Leaf: 0
|   |   |   |   |   |   |   |   |   |-- Node: fnlwgt <= 0
|   |   |   |   |   |   |   |   |   |   |-- Leaf: 0
|   |   |   |   |   |   |   |   |   |   |-- Leaf: 1
|   |   |   |   |   |   |   |   |-- Leaf: 0
|   |   |   |   |   |-- Node: hours_per_week <= 0
|   |   |   |   |   |   |-- Node: race <= 0
|   |   |   |   |   |   |   |-- Node: sex <= 0
|   |   |   |   |   |   |   |   |-- Node: fnlwgt <= 2
|   |   |   |   |   |   |   |   |   |-- Leaf: 0
|   |   |   |   |   |   |   |   |   |-- Node: age <= 0


I have created three decision trees with diffenert stopping depth:
the first one with stopping depth at 5
the second with depth at 10
and the third one with no stopping depth
We can see that the third decision tree have the biggest difference between the train and test data model accuracy which can cause overfitting with large group of data.
we can use different splitting criterion apart from information gain. 
For example: Chi-square test: If there are a large number of low-frequency features in the data (such as "occupation"), they may be ignored.
