In [3]:
from __future__ import annotations # For type hinting my own class!

import numpy as np
from typing import Tuple, Union

class DecisionTree():
    def __init__(self,
        min_samples_per_node: int = 5,
        max_depth: int = 5,
        impurity_measure: str = 'gini',
        num_targets: int = None
    ) -> None:

        # Decision Tree Hyperparameters
        self.min_samples_per_node = min_samples_per_node
        self.max_depth = max_depth

        # Impurity Functions
        self.impurity_measure = impurity_measure
        impurity_functions = {
            'gini': self._compute_gini_impurity,
            'entropy': self._compute_entropy,
            'variance': self._compute_variance
        }
        self.impurity_function = impurity_functions[impurity_measure]

        # And Prediction Storage Functions
        pred_storage_functions = {
            'gini': self._store_classification_pred,
            'entropy': self._store_classification_pred,
            'variance': self._store_regression_pred
        }
        self.pred_storage_function = pred_storage_functions[impurity_measure]

        # Number of targets for counting purpose
        self.num_targets =  num_targets # to be determined

        # Class Variables
        self.depth = 0

        # Node Variables
        self.left_node = None
        self.right_node = None
        self.splitting_feature = None
        self.splitting_threshold = None
        self.pred = None

    ######################################################################################################
    #
    #                                         Training Methods
    #
    ######################################################################################################


    # Currently writing for training only
    # I will ammend this to enable prediction
    def build_tree(self, 
        X: np.ndarray, 
        y: np.ndarray,
        depth: int = 0
    ) -> DecisionTree:
        
        # Store depth
        self.depth = depth

        # Check stopping criterion
        if self.check_stopping_criterion(X, y) is True:
            self.pred = self.pred_storage_function(y)
            return self

        # Get the Decision Split and the Data Split
        data_splits, feature_index, threshold = self.find_best_split(X, y)
        if data_splits is None:
            return None
        self.splitting_feature = feature_index
        self.splitting_threshold = threshold

        # Store the prediction
        self.pred = self.pred_storage_function(y)

        # Build Left and Right (Children) Tree
        (X_left, y_left), (X_right, y_right) = data_splits 
        self.left_node = DecisionTree(
            min_samples_per_node = self.min_samples_per_node,
            max_depth = self.max_depth,
            impurity_measure = self.impurity_measure,
            num_targets = self.num_targets
        ).build_tree(X_left, y_left, depth + 1)     # Add 1 to depth

        self.right_node = DecisionTree(
            min_samples_per_node = self.min_samples_per_node,
            max_depth = self.max_depth,
            impurity_measure = self.impurity_measure,
            num_targets = self.num_targets
        ).build_tree(X_right, y_right, depth + 1)   # Add 1 to depth

        # Return the Node
        return self

    def check_stopping_criterion(self,
        X: np.ndarray, 
        y: np.ndarray
    ) -> bool:
        
        # Check max depth
        if self.depth > self.max_depth:
            print("Stopping with max_depth: ", self.max_depth)
            return True
        
        # Check minimum number of samples per node
        if X.shape[0] <= self.min_samples_per_node:
            print("Stopping with min_samples_per_node: ", X.shape[0])
            return True
        
        return False

    def find_best_split(self,
        X: np.ndarray, 
        y: np.ndarray
    ) -> Tuple[
        Tuple[Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]],    # Left Data and Right Data
        Union[int, str],                                                        # Feature index or feature name 
        float                                                                   # Threshold
    ]:
        
        # Find the best split
        best_split = None
        best_impurity = np.inf
        best_threshold = None
        best_feature = None

        num_samples, num_feautures = X.shape[0], X.shape[1]
        for i in range(num_feautures):

            # TO BE IMPROVED by using buckets / histograms
            # Find the threshold with the least impurity
            thresholds = np.unique(X[:, i])      
            for threshold in thresholds:
                left_data, right_data = self.split_data(X, i, y, threshold)
                X_left, y_left = left_data
                X_right, y_right = right_data

                # Skip the lowest and highest values because no split
                if (X_left.shape[0] == 0) or (X_right.shape[0] == 0):
                    continue

                # Compute impurity
                left_impurity = self.impurity_function(y_left)
                right_impurity = self.impurity_function(y_right)
                impurity = (y_left.shape[0] / y.shape[0]) * left_impurity + \
                    (y_right.shape[0] / y.shape[0]) * right_impurity
                
                # Compare with the best impurity
                if impurity < best_impurity:
                    best_split = (left_data, right_data)
                    best_impurity = impurity
                    best_feature = i
                    best_threshold = threshold

        # If no best split
        # Return (data split, feature/feature name, feature threshold)
        if best_split is None:
            return None, None, None

        # If same number of samples after best split, return None
        # TO BE IMPROVED
        # Do we need this?
        num_samples_left = best_split[0][0].shape[0]
        num_samples_right = best_split[1][0].shape[0]
        if (num_samples_left == 0) or (num_samples_right == 0):
            print("im here")
            return None, None, None

        # Normally we would compare the parent's impurities to the 
        # children's impurities. But mathematically, the children's impurities 
        # will always be less or equal to the parent's impurities

        return best_split, best_feature, best_threshold
    
    def split_data(self,
        X: np.ndarray,
        i: Union[int, str],
        y: np.ndarray,
        threshold: float
    ) -> Tuple[Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]]: 
        
        # Split mask according to a specific feature
        X_i = X[:, i]
        left_indices = np.where(X_i <= threshold)
        right_indices = np.where(X_i > threshold)
        
        # Apply split mask to the whole dataset
        X_left, y_left = X[left_indices], y[left_indices]
        X_right, y_right = X[right_indices], y[right_indices]

        return (X_left, y_left), (X_right, y_right)

    ######################################################################################################
    #
    #                               Prediction and Prediction Storage Methods
    #
    ######################################################################################################    

    def predict(self, 
        X: np.ndarray
    ) -> Union[int, float]:
        
        # If node is a leaf
        if self.left_node is None and self.right_node is None:
            return self.pred
        
        # Else traverse the tree by recursion
        is_left = X[self.splitting_feature] <= self.splitting_threshold
        if is_left:
            return self.left_node.predict(X)
        else:
            return self.right_node.predict(X)

    def _store_classification_pred(self, 
        y: np.ndarray   # int
    ) -> int:
        target_dist = self._compute_target_dist(y)
        best_class = np.argmax(target_dist)

        return best_class

    def _store_regression_pred(self, 
        y: np.ndarray   # float
    ) -> float:
        best_pred = np.mean(y)
        
        return best_pred

    ######################################################################################################
    #
    #                                     Visualization Methods
    #
    ######################################################################################################    

    def visualize_decision_nodes(self, 
        depth: int = 0
    ) -> None:

        # Indentation to represent tree depth
        indent = " " * (4 * depth)

        # If the node is a leaf, print the prediction
        if self.left_node is None and self.right_node is None:
            print(f"{indent}Leaf Node: Prediction = {self.pred}")
            return

        # Print the current node's feature and threshold
        print(f"{indent}Node: Feature[{self.splitting_feature}] <= {self.splitting_threshold}")

        # Recur for left and right children
        if self.left_node is not None:
            print(f"{indent}  Left:")
            self.left_node.visualize_decision_nodes(depth + 1)
        
        if self.right_node is not None:
            print(f"{indent}  Right:")
            self.right_node.visualize_decision_nodes(depth + 1)


    ######################################################################################################
    #
    #                        Impurity Methods | Supposedly methods of a Node Class
    #
    ######################################################################################################
    def _compute_target_dist(self, 
        y: np.ndarray
    ) -> float:
        
        # Count the number of samples per target
        # TO BE IMPROVED
        target_dist = []
        for i in range(self.num_targets):
            target_dist.append(np.mean((y == i)*1))

        return target_dist

    # For Classification
    def _compute_gini_impurity(self, 
        y: np.ndarray
    ) -> float:
        
        target_dist = np.array(self._compute_target_dist(y))
        gini_impurity = np.sum(target_dist * (1-target_dist))

        return gini_impurity
    
    # For Classification
    def _compute_entropy(self,
        y: np.ndarray
    ) -> float:
        
        target_dist = np.array(self._compute_target_dist(y))
        target_dist = target_dist[target_dist > 0]                  # Remove zero probabilities
        entropy = -1 * np.sum(target_dist * np.log2(target_dist))   # Zero prob results to log undef

        return entropy
    
    # For Regression
    def _compute_variance(self,
        y: np.ndarray
    ) -> float:
        
        num_samples = y.shape[0]
        if num_samples == 0:
            return 0
        
        insides = y - np.mean(y)
        squares = np.power(insides, 2)
        variance = np.mean(squares)

        return variance
    

# Iris Dataset | Classification

In [4]:
from sklearn.datasets import load_iris

iris = load_iris()

X = iris.data
y = iris.target

feature_names = iris.feature_names
target_names = iris.target_names

print("Features:", feature_names)
print("Target labels:", target_names)

from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

Features: ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)']
Target labels: ['setosa' 'versicolor' 'virginica']


In [8]:
decision_tree = DecisionTree(
    min_samples_per_node=5,
    max_depth=np.inf,
    impurity_measure='entropy',
    num_targets=3
)

wawa = decision_tree.build_tree(X_train, y_train)

Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  3
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  4
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  2
Stopping with min_samples_per_node:  3
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  5
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  5
Stopping with min_samples_per_node:  3
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  4
Stopping with min_samples_per_node:  3
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  2
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  5
Stopping with min_samples_per_node:  3
Stopping with min_samples_per_node:  4
Stopping with min_samples

In [9]:
for inde in range(X_test.shape[0]):
    print(wawa.predict(X_test[inde]), y_test[inde])

1 1
0 0
2 2
1 1
1 1
0 0
1 1
2 2
1 1
1 1
2 2
0 0
0 0
0 0
0 0
1 1
2 2
1 1
1 1
2 2
0 0
2 2
0 0
2 2
2 2
2 2
2 2
2 2
0 0
0 0


# Breast Cancer Dataset | Classification

In [15]:
from sklearn.datasets import load_breast_cancer
cancer = load_breast_cancer(as_frame=True)
X, y = cancer.data.to_numpy(), cancer.target.to_numpy()
# print(X.head())  # Display first few rows

from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

print(cancer.target_names)  # Display target classes

['malignant' 'benign']


In [16]:
decision_tree = DecisionTree(
    min_samples_per_node=5,
    max_depth=10,
    impurity_measure='entropy',
    num_targets=2
)

In [17]:
wawa = decision_tree.build_tree(X_train, y_train)

Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with max_depth:  10
Stopping with max_depth:  10
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with max_depth:  10
Stopping with max_depth:  10
Stopping with min_samples_per_node:  4
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  5
Stopping with min_samples_per_node:  3
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  5
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  2
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_

In [20]:
for inde in range(X_test.shape[0]):
    print(wawa.predict(X_test[inde]), y_test[inde])

1 1
0 0
0 0
1 1
1 1
0 0
0 0
0 0
0 1
1 1
1 1
0 0
1 1
1 0
1 1
0 0
1 1
1 1
1 1
0 0
1 0
1 1
0 0
1 1
1 1
1 1
1 1
1 1
1 1
0 0
1 1
1 1
0 1
1 1
1 1
1 1
0 0
1 1
0 0
1 1
1 1
0 0
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
0 0
0 0
0 1
1 1
1 1
1 1
1 1
0 0
0 0
1 1
1 1
0 0
0 0
1 1
1 1
1 1
0 0
0 0
1 1
1 1
0 0
0 0
1 1
0 0
1 1
1 1
1 1
1 0
1 1
1 1
0 0
1 1
1 0
0 0
0 0
0 0
0 0
0 0
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
0 0
0 0
1 1
0 0
0 0
1 1
0 0
0 0
1 1
1 1
1 1
0 0
0 1
1 1
0 0
1 1
1 1
0 0


# Diabetes Dataset | Regression

In [21]:
from sklearn.datasets import load_diabetes
diabetes = load_diabetes(as_frame=True)
X, y = diabetes.data.to_numpy(), diabetes.target.to_numpy()

from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)


In [22]:
decision_tree = DecisionTree(
    min_samples_per_node=2,
    max_depth=10,
    impurity_measure='variance',
    # num_targets=2
)

In [23]:
wawa = decision_tree.build_tree(X_train, y_train)

Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  2
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  2
Stopping with max_depth:  10
Stopping with max_depth:  10
Stopping with min_samples_per_node:  2
Stopping with min_samples_per_node:  2
Stopping with max_depth:  10
Stopping with max_depth:  10
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  2
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  2
Stopping with min_samples_per_node:  2
Stopping with min_samples_per_node:  2
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  2
Stopping with max_depth:  10
Stopping with max_depth:  10
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stopping with min_samples_per_node:  1
Stoppin

In [24]:
for inde in range(X_test.shape[0]):
    print(wawa.predict(X_test[inde]), y_test[inde])

187.4 219.0
166.0 70.0
187.4 202.0
150.0 230.0
230.0 111.0
87.0 84.0
235.0 242.0
248.0 272.0
203.0 94.0
157.71428571428572 96.0
113.0 94.0
200.0 252.0
71.33333333333333 99.0
214.0 297.0
78.80952380952381 135.0
60.0 67.0
296.0 295.0
190.0 264.0
293.0 170.0
178.0 275.0
165.5 310.0
62.0 64.0
48.4 128.0
220.57142857142858 232.0
87.5 129.0
157.71428571428572 118.0
282.0 263.0
111.5 77.0
59.0 48.0
230.0 107.0
292.5 140.0
55.0 113.0
224.0 90.0
287.5 164.0
165.5 180.0
190.0 233.0
78.5 42.0
125.5 84.0
206.75 172.0
96.0 63.0
59.0 48.0
66.0 108.0
209.0 156.0
169.5 168.0
187.4 90.0
73.5 52.0
31.0 200.0
174.5 87.0
59.0 90.0
179.0 258.0
141.5 136.0
73.5 158.0
144.5 69.0
174.5 72.0
292.0 171.0
141.5 95.0
42.0 72.0
273.5 151.0
42.0 168.0
58.0 60.0
67.0 122.0
165.5 52.0
141.5 187.0
88.0 102.0
124.0 214.0
236.0 248.0
92.0 181.0
101.5 110.0
150.0 140.0
179.0 202.0
179.0 101.0
244.0 222.0
243.0 281.0
111.5 61.0
73.5 89.0
238.5 91.0
182.5 186.0
188.0 220.0
157.71428571428572 237.0
238.5 233.0
48.4 68.0
78.