The code below implements a CART tree for classification using gini impurity to decide on splitting.
* The tree only supports numerical input data (float, int etc.)
* The tree only supports categorical input data if it is one hot encoded. This way a categorical feature can be used to split based on just two categories (0 or 1) which makes it a lot easier
* The ground truth (y values) are either just 0/1 (binary classification) or ordinal (0,1,2,3, etc.) for multi class case
* The stopping criterion for splitting:
    * If either a specified max depth has been reached or the number of samples at the node is less than or equal to a specified minimum required to split
    * If all the labels(predictions) at the node are of the same class
    * If the gini cost is not calculated because a split is not possible i.e. every split point that is evaluated results in a situation where one side of the split has no samples
* Logic
    1. At the root create a node whose gini is calculated utilizing the whole training set. Assign the entire dataset (X and y values) to the root node. Set the root node as the current node
    2. Loop through all features for the dataset portion for the current node and try each one as a possible candidate for splitting
    3. For each such feature if the feature is categorical (one hot encoded so values are 0 or 1) try splitting between all 0's and 1's and calculate cost
    4. If the feature is numeric, then try each value in the column as a possible splitting point and calculate cost
    5. From steps 3 and 4 pick the split with minimum cost. Create the left and the right children nodes for the current node based on this split
    6. Then recursively repeat from step 2 for the left and right node
    7. Stop when stopping criteria is reached
    




In [27]:
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score,recall_score
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import os
from collections import Counter



Below is a class that represents a node. The node class at minimum needs the following properties:
1. The portions of the training dataset (X and y) associated with it. For the root node this will be the entire X and y.
2. The parent node (None if node is root)
3. The node depth
4. The right child node
5. The left child node
6. The dominant class (the class in this node's y data that has the highest count). This will be used to predict the class of a test sample if this node becomes a leaf node and is used for prediction
7. The feature index (column index from X) on whihc this node is being split
8. Property indicating whether the feature used to split is categorical or not
9. The feature value at which the split occurs (only applicable for non-categorical columns. For categorical (one hot encoded) columns the split always occurs such that all 0's go left and the 1's go right)
7. The gini score (this is not required - but good to keep around for each node for future visualization etc.)

In [28]:
class Node:
    def __init__(self, X: np.ndarray, y: np.ndarray, parent = None):
        self.X = X
        self.y = y
        # get the unique classes and their counts from the ground truth (y)
        values,counts = np.unique(y,return_counts=True)
        # set the class with the highest count as the class to be predicted using this node
        self.predict_class = values[np.argmax(counts)]
        # calculate gini for this node
        self.gini = 1 - np.sum([(y[y == cls].shape[0]/y.shape[0])**2 for cls in values])       
        self.parent = parent
        # increment depth as needed
        self.depth = 1 if parent is None else parent.depth + 1

        self.right: Node = None
        self.left: Node = None
        self.feature_idx = None
        self.categorical = None
        self.split_at = None

The class below is the main tree class.

In [37]:
        

class DecisionTreeClassifier:
    # constructor
    def __init__(self, max_depth = None, min_sample_split = None):
        # initialize root node - we do not have one yet
        self.head = None
        # save the stopping parameters
        self.max_depth = max_depth
        self.min_sample_split = min_sample_split
        # initialize the class list (we do not have this yet)
        self.clsarr = None

    '''
    This is the main method to fit (train) the tree
    Arguments:
        X - the features (input columns) of the training dataset
        y - the labels (0/1 for binary, ordinals (0,1,2,3...) for multiclass)
        categorical_col_idx: the list of column indices that are categorical (one hot encoded)
    '''
    def fit(self, X: np.ndarray, y: np.ndarray, categorical_col_idx: set[int]):
        # record the label classes
        self.clsarr = np.unique(y)
        # create the root node
        self.head = Node(X,y)
        # start splitting with the root node
        self.__split(self.head,categorical_col_idx)
        # return the tree again so that we can chain method calls
        return self

    '''
    This method (recursively) splits a node. If the method returns without splitting (stopping conditions met)
    then the node is a leaf node
    Arguments:
        cur_node: Current node under consideration for splitting
        categorical_col_idx: the list of column indices that are categorical (one hot encoded)
    '''
    def __split(self, cur_node: Node, categorical_col_idx: set[int]):        
        # stopping conditions
        # depth condition check
        if (cur_node.parent is not None and self.max_depth is not None and cur_node.parent.depth + 1 > self.max_depth):
            return
        # min samples required to split check
        if(self.min_sample_split is not None and cur_node.X.shape[0] <= self.min_sample_split):
            return
        # are all the samples at the current node already of a single class ? Then no point splitting.
        if (np.unique(cur_node.y).shape[0] == 1):
            return

        # calculate the split
        (feature_idx,split_value) = self.__calculate_feature_to_split_based_on_min_cost(cur_node.X,cur_node.y,categorical_col_idx)
        
        # for some reason we could not split
        if feature_idx == None:
            return
        
        if feature_idx in categorical_col_idx:
            # get the row indices from the samples attached to the current node where the categorical (one hot encoded) feature value is 0  
            left_indices = np.argwhere(cur_node.X[:,feature_idx] == 0).flatten()
            # get the row indices from the samples attached to the current node where the categorical (one hot encoded) feature value is 1
            right_indices = np.argwhere(cur_node.X[:,feature_idx] == 1).flatten()
        else:
            # get the row indices from the samples attached to the current node where the feature value is < split_value 
            left_indices = np.argwhere(cur_node.X[:,feature_idx] < split_value).flatten()
            # get the row indices from the samples attached to the current node where the feature value is >= split_value 
            right_indices = np.argwhere(cur_node.X[:,feature_idx] >= split_value).flatten()

        # if split is going to result in one side being "no samples" do not split
        if (left_indices.shape[0] == 0 or right_indices.shape[0] == 0):
            return
        
        # update the current node properties - record the index of the feature the node is getting split on, whether that feature is categorical and 
        # the feature value used in splitting 
        cur_node.feature_idx = feature_idx
        cur_node.categorical = feature_idx in categorical_col_idx
        cur_node.split_at = split_value
        
        # create the left and the right child nodes - associating the appropriate portions of the dataset to each for both the features (X) and labels (y)
        cur_node.left = Node(cur_node.X[left_indices,:],cur_node.y[left_indices],cur_node)
        cur_node.right = Node(cur_node.X[right_indices,:],cur_node.y[right_indices],cur_node)

        # recursively try to split the newly created left and right nodes 
        self.__split(cur_node.left,categorical_col_idx)
        self.__split(cur_node.right,categorical_col_idx)

    '''
    Given input data (X), ground truth (y - which is either just 0 or 1 for the binary case or ordinal for multi class case) figure 
    out which input column and which value in that input column can we split at so as to have the minimum cost.
    Arguments:
        X - input features
        y - ground truth label
        categorical_col_idx - indices of columns that are categorical
    Returns: 
        tuple of (feature_idx,split_value) where split_value is the x value pointed to by split_point and is None if the feature was categorical
    '''
    def __calculate_feature_to_split_based_on_min_cost(self, X: np.ndarray, y: np.ndarray, categorical_col_idx: set[int]):
        # create an empty array to hold all the costs (and associated values) as we loop through all the features
        cost_arr = np.empty((0,3))
        # we loop through each feature (column) in X
        for feature_idx in range(0,X.shape[1]):
            # we take the column at feature_idx and stack that with y making xy (n x 2 in shape) 
            xy = np.column_stack((X[:,feature_idx],y))

            # if the feature is categorical
            if feature_idx in categorical_col_idx:
                # calculate the cost and gini. Note that the split point we supply is None - since this is a categorical feature
                # our assumption is that it has been one hot encoded and hence the only possible values in this feature column is 0 or 1
                # and our split will simply be based on that
                cost = self.__calculate_cost_and_gini(xy,feature_idx,categorical=True,split_point = None)
                # if we got a valid cost and gini - store that
                if cost[0,0] != None:
                    cost_arr = np.concatenate((cost_arr,cost),axis=0)
            else:                
                #x_sort_indices = np.argsort(xy,axis=0)[:,0]
                #xy = np.take_along_axis(xy,np.column_stack((x_sort_indices,x_sort_indices)),axis=0)
                for possible_split_point_idx in range(0,X.shape[0]):
                    # get the cost and gini if we were to use this split point
                    cost_for_split_point = self.__calculate_cost_and_gini(xy,feature_idx,categorical=False,split_point=possible_split_point_idx)
                    # record it
                    if cost_for_split_point[0,0] != None:
                        cost_arr = np.concatenate((cost_arr,cost_for_split_point))
        # we did not get anything back for any of the possible splits
        # so we return None's
        if cost_arr.shape[0] == 0:
            return (None,None)
        # of all the cost's we recorded in the above loop for all features
        # find the one with the minimum cost. That split is the one we would use
        min_cost_idx = np.argmin(cost_arr[:,0],axis = 0)
        idx_of_feature_to_split_on = int(cost_arr[min_cost_idx,1])
        value_to_split_on = cost_arr[min_cost_idx,2]
        # return the index of feature on which we will split 
        # and the split value (which is the x value at the split_point and is None if the feature was categorical))
        return (idx_of_feature_to_split_on,value_to_split_on)
    
    '''
    Calculate the cost and gini impurity for a split

    Arguments:
        xy - n by 2 dataset where the first column is some feature(x) and the second column is the ground truth label (y)
        feature_idx - column index of the feature(x) in the original training data
        categorical - is the feature categorical ?
        split_point - the row index pointing to the row where we want to split
    Returns:
        1x5 array of [cost,gini_left,gini_right,feature_idx,split_value (which is the x value pointed to by split_point and is None if the feature was categorical)]
    '''
    def __calculate_cost_and_gini(self,xy: np.ndarray, feature_idx: int, categorical: bool, split_point: int):
        # to calculate gini and cost we only need the y values - so we split them
        if categorical:
            # left is all values of y where the x == 0, and right is all values of y where x == 1 
            # (x is catagorical aka one hot encoded - so possible x values are 0 or 1)
            y_left = xy[xy[:,0] == 0,1]
            y_right = xy[xy[:,0] == 1,1]
        else:
            #split_val = (xy[split_point,0] + (xy[split_point+1,0] - xy[split_point,0])/2)
            # left is all values of y where  x < (x value at row pointed to by split point) , 
            # and right is all values of y where  x >= (x value at row pointed to by split point) 
            y_left = xy[xy[:,0] < xy[split_point,0].item(),1]
            y_right = xy[xy[:,0] >= xy[split_point,0].item(),1]

        # a split was not possible at that split point because one or the other side resulted in no samples satisfying the split condition
        # we return all(3) None values
        if (y_left.shape[0] == 0 or y_right.shape[0] == 0):
            return np.array([None]*3).reshape(1,-1)
        # check the gini and cost formulae
        # the probabilities for each class is just the number of y values of that class dividieed by the total number of samples
        # to calculate gini we summ the squares of each class probability and subtract from 1
        # The cost is then a weighted sum of the two gini values where the weights are the proportion of the number of instances 
        # in the left/right subset to the total number of samples 
        gini_left = 1 - np.sum([(y_left[y_left == cls].shape[0]/y_left.shape[0])**2 for cls in self.clsarr])
        gini_right = 1 - np.sum([(y_right[y_right == cls].shape[0]/y_right.shape[0])**2 for cls in self.clsarr])
        cost = (gini_left * y_left.shape[0]/xy.shape[0]) + (gini_right * y_right.shape[0]/xy.shape[0])
        # return as a 1x3 array of [cost,feature_idx,split_value (which is the x value pointed to by split_point and is None if the feature was categorical)]
        return np.array([cost, feature_idx, xy[split_point,0].item() if not categorical else None]).reshape(1,-1)
    
    '''
    This is the main method to predict using a trained tree
    Arguments:
        X - the features (input columns) of the test dataset
    Returns:
        y - the predicted labels (0/1 for binary, ordinals (0,1,2,3...) for multiclass)
    '''
    def predict(self,X:np.ndarray) -> np.ndarray:
        # apply the internal prediction method once for each row in the test dataset
        # apply_along_axis will accumulate all the results into a single y array

        return np.apply_along_axis(self.__predict_internal,axis=1,arr=X,use_node = self.head)

    '''
    This method takes a single x array (1 x n features) of features and returns the 
    predicted y value (0/1 for binary, ordinals (0,1,2,3...) for multiclass). It recursively
    calls itself until a leaf node is reached and a decision can be made
    Arguments:
        X - one row of test data
        use_node - the current node to be used to make a decision
    Returns:
        A single y value (0/1 for binary, ordinals (0,1,2,3...) for multiclass)
    '''
    def __predict_internal(self, X:np.ndarray, use_node: Node) -> float:
        # base case - we have a reached a leaf node
        if (use_node.left is None and use_node.right is None):
            # return whatever class this node predicts
            return use_node.predict_class
        # else choose to go left or right depending on split criterion
        elif use_node.categorical:
            # if the feature associated with the current node is categorical then go left if the test row has a 0 for that feature else go right
            return self.__predict_internal(X,use_node.left if X[use_node.feature_idx].item() == 0 else use_node.right)
        else:
            # if the feature associated with the current node is non-categorical then go left if the test row has a value for that 
            # feature that is less than the split value associated with this node else go right
            return self.__predict_internal(X,use_node.left if X[use_node.feature_idx].item() < use_node.split_at else use_node.right)
    

In [38]:
df_data = pd.read_csv("c:/pix/ml/titanic/train.csv")
# drop columns that are meaningless for this
df_data = df_data.drop(["PassengerId","Name","Ticket","Cabin"],axis=1).dropna()
# get the X data
df_X = df_data.drop("Survived",axis=1)
# get the y data
df_y = df_data["Survived"]
# one hot encode categoricals
df_X = pd.get_dummies(df_X,columns=["Pclass","Sex","Embarked"])
# make all X data numeric
df_X = df_X.astype(float)
# split
train_X,test_X,train_y,test_y = train_test_split(df_X,df_y,test_size=0.15,stratify=df_y,shuffle=True,random_state=1234)

# play around with the stopping conditions
tree = DecisionTreeClassifier(min_sample_split=12, max_depth=16)
# fit the tree
tree.fit(train_X.values,train_y.values,categorical_col_idx=range(4,12))
# predict with the training set
predictions_train = tree.predict(train_X.values)
print(f"train accuracy = {accuracy_score(predictions_train,train_y.values)}")
print(f"train precision = {precision_score(predictions_train,train_y.values)}")
print(f"train recall = {recall_score(predictions_train,train_y.values)}")
# predict with the test set
predictions_test = tree.predict(test_X.values)
print(f"test accuracy = {accuracy_score(predictions_test,test_y.values)}")
print(f"test precision = {precision_score(predictions_test,test_y.values)}")
print(f"test recall = {recall_score(predictions_test,test_y.values)}")



train accuracy = 0.9057851239669421
train precision = 0.8081632653061225
train recall = 0.9519230769230769
test accuracy = 0.7850467289719626
test precision = 0.6046511627906976
test recall = 0.8125


In [21]:
a = np.array([[9,7],[1,22],[11,12]])
sort_indices = np.argsort(a,axis=0)[:,0]
np.sort(a,axis=0)

array([[ 1,  7],
       [ 9, 12],
       [11, 22]])

In [22]:
sort_indices = np.column_stack((sort_indices,sort_indices))
np.take_along_axis(a,sort_indices,axis=0)


array([[ 1, 22],
       [ 9,  7],
       [11, 12]])