In [1]:
import numpy as np

In [2]:
MIN_SPLIT = 10
MIN_SAMPLES_LEAF = 5
MAX_HEIGHT = 5
CONTINUOUS = "CO"
CATEGORICAL = "CA"

In [3]:
#Sorts a matrix by a given column
def sort_by_column(matrix,sort_coordinate):
    new_order = matrix[:,sort_coordinate].argsort()
    out = matrix.copy()[new_order]
    return out

In [4]:
class Node:
    def __init__(self,val,column,function,prediction=None):
        self.val = val
        self.function = function #recieves two numbers and returns either True or False, True go left, False go right
        self.column = column
        self.left = None
        self.right = None
        self.prediction = prediction #Mode of all samples that pass trough node

In [5]:
class DecisionTreeClassifier:
    def __init__(self,error = "gini",max_height = MAX_HEIGHT,min_samples_leaf = MIN_SAMPLES_LEAF,min_split = MIN_SPLIT):
        self.head = None
        self.error_metric = self.__return_error_metric(error)
        self.max_height = max_height
        self.min_samples_leaf = min_samples_leaf
        self.min_split = min_split

    def __mode(self,arr):
        unique, counts = np.unique(arr, return_counts=True)
        max_id = np.argmax(counts)
        return unique[max_id]

    def __is_smaller(self,a,b):
        return a<b

    def __is_equal(self,a,b):
        return a==b

    def __measure_array_gini(self,arr):
        unique, counts = np.unique(arr, return_counts=True)
        return 1-sum((counts/len(arr))**2)

    def __measure_array_entropy(self,arr):
        unique, counts = np.unique(arr, return_counts=True)
        probabilities = (counts/len(arr))
        log_probabilities = np.log(probabilities)
        return -1*sum(probabilities*log_probabilities)

    def __return_error_metric(self,error):
        if error == 'entropy':
            return self.__measure_array_entropy
        else:
            return self.__measure_array_gini
        
    def __find_continuous_column_best_split(self,data,column):
        data = sort_by_column(data,column)
        
        column_data = data[:,column]
        
        best_error = np.inf

        best_split_val = None
        
        best_left,best_right = None,None

        last_val = column_data[0]
            
        for i in range(self.min_samples_leaf,len(column_data)-self.min_samples_leaf):
            if column_data[i] == last_val:
                continue

            last_val = column_data[i]

            split_val = (column_data[i] + column_data[i-1])/2
            
            left_arr = data[:i , :]
            right_arr = data[i: , :]
            
            split_error = self.error_metric(left_arr[:,-1]) + self.error_metric(right_arr[:,-1]) 

            
            if split_error < best_error:
                
                best_error =  split_error
                best_split_val = split_val
                best_left = left_arr
                best_right = right_arr
                
        
        return best_split_val,best_error,best_left,best_right
    
    def __find_categorical_column_best_split(self,data,column):    
        column_data = data[:,column]
    
        classes = np.unique(column_data)
        
        best_error = np.inf
        
        best_split_val = None
        
        best_left,best_right = None,None
            
        for c in classes:
            left_arr = data[column_data == c]
            right_arr = data[column_data != c]
    
            if len(left_arr) < self.min_samples_leaf or len(right_arr) < self.min_samples_leaf:
                continue
            
            split_error = self.error_metric(left_arr[:,-1]) + self.error_metric(right_arr[:,-1]) 
            if split_error < best_error:
                
                best_error =  split_error
                best_split_val = c
                best_left = left_arr
                best_right = right_arr
                
        
        return best_split_val,best_error,best_left,best_right
    
    def __find_column_best_split(self,data,column,data_type):
        if data_type == CATEGORICAL:
            return self.__find_categorical_column_best_split(data,column)
        if data_type == CONTINUOUS:
            return self.__find_continuous_column_best_split(data,column)

    def __find_best_split(self,data,data_types):
        best_column = -1
        best_val = data[0][0]
        best_error = np.inf
        best_left = None
        best_right = None
        for col in range(len(data[0])-1): #Iterate over all columns but the last one which is the label
            col_best_split_val,col_best_error,col_best_left,col_best_right = self.__find_column_best_split(data,col,data_types[col])
            if col_best_error < best_error:
                best_val,best_error,best_left,best_right = col_best_split_val,col_best_error,col_best_left,col_best_right
                best_column = col
        return best_column,best_val,best_left,best_right
    
    def __build_tree(self,data,data_types,height=0):
        if (height >= self.max_height) or (len(data) <= self.min_split):
            return None
        best_column,best_val,left_data,right_data = self.__find_best_split(data,data_types)
        if left_data is None:
            return None
        function = self.__is_smaller if data_types[best_column] == CONTINUOUS else self.__is_equal
        prediction = self.__mode(data[:,-1])
        node = Node(val=best_val,column=best_column,function=function,prediction=prediction)
        node.left = self.__build_tree(left_data,data_types,height+1)
        node.right = self.__build_tree(right_data,data_types,height+1)
        return node
    
    def fit(self,X,y,data_types):
        H = np.hstack([X,y.reshape(len(y),1)]).copy()
        self.head = self.__build_tree(H,data_types)

    def __predict_point(self,x):
        prev = None
        curr = self.head
        while curr is not None:
            col_to_check = curr.column
            val = curr.val
            function = curr.function
            prev = curr
            if function(x[col_to_check] , val):
                curr = curr.left
            else:
                curr = curr.right
        return prev.prediction

    def predict(self,X):
        out = []
        for x in X:
            out.append(self.__predict_point(x))
        return np.array(out)

<h3>Using our model and comparing it to Sklearn's DecisionTreeClassifier</h3>

In [6]:
import pandas as pd
from sklearn.datasets import load_iris
data = load_iris()

<h5>Flower classification dataset from sklearn</h5>

In [7]:
data['data'][:5],data['target'][:5]

(array([[5.1, 3.5, 1.4, 0.2],
        [4.9, 3. , 1.4, 0.2],
        [4.7, 3.2, 1.3, 0.2],
        [4.6, 3.1, 1.5, 0.2],
        [5. , 3.6, 1.4, 0.2]]),
 array([0, 0, 0, 0, 0]))

In [8]:
data_types = [CONTINUOUS,CONTINUOUS,CONTINUOUS,CONTINUOUS]

In [9]:
indexes = np.array(list(range(len(data['data']))))
np.random.shuffle(indexes)
X = data['data'][indexes]
y = data['target'][indexes]
test_size = int(len(X)/3)
X_train, X_test, y_train, y_test = X[:-test_size],X[-test_size:],y[:-test_size],y[-test_size:]

In [10]:
DT = DecisionTreeClassifier(error='entropy')
DT.fit(X_train,y_train,data_types)
my_pred = DT.predict(X_test) 

In [11]:
my_pred[:10]

array([1., 2., 2., 2., 0., 0., 1., 1., 2., 0.])

<h3>Now lets try with Sklearn</h3>

In [12]:
from sklearn.tree import DecisionTreeClassifier as DTR
DTR = DTR(max_depth=MAX_HEIGHT,min_samples_leaf=MIN_SAMPLES_LEAF,min_samples_split=MIN_SPLIT,criterion='entropy')
DTR.fit(X_train,y_train)
sklearn_pred = DTR.predict(X_test)

<h3>Now lets compare the performance</h3>

In [13]:
from sklearn.metrics import accuracy_score

In [14]:
print("Sklearn's tree performance : " , accuracy_score(y_test,sklearn_pred))
print("Our tree's performance : " , accuracy_score(y_test,my_pred))

Sklearn's tree performance :  0.92
Our tree's performance :  0.92
