In [47]:
import numpy as np
import time
import math

class Node:
    '''
    Class to define node of tree with opportunity to iterate throw
    
    ...

    Attributes
    ----------
    left_node : Node
        Left child node
    right_node : Node
        Right child node
    data : tuple
        Data for decision tree stored in node. Feature based on what split will be performed and border value
    ans : int
        Answer stored in leath node
    direction : str
        Direction from parent node to this
    father_node : Node
        Parent node of one of this node
    depth : int
        Depth of current node in whole tree
    '''
    
    def __init__(self, direction, father_node):
        """
        Parameters
        ----------
        direction : str
            Direction from parent node to this
        father_node : Node
            Parent node of this node
        """

        self.left_node = None
        self.right_node = None
        self.data = None
        self.ans = None
        self.father_node = father_node
        self.direction = direction
        self.depth = None

class SimpleTree(object):
    '''
    Custom class of desicion binary tree
    '''
    
    def __init__(self, max_depth, max_features, min_leaf_size = 1, criterion = 'entropy'):
        """
        Parameters
        ----------
        max_depth : int
            Maximum depth of tree
        max_features : int
            Number of features to consider for split
        min_leaf_size : int, optional
            Number of elements in leaf, which causes stop splitting (default is 1)
        criterion : str, optional
            Function to evaluate split quality (default is 'entropy')
        """
        
        self.max_depth = max_depth
        self.min_leaf_size = min_leaf_size
        self.max_features = max_features
        self.criterion = criterion
        self.root = None
        
    
    def fit(self, X, y, direction = None, father_node = None):
        """
        Parameters
        ----------
        X : np.ndarray of shape (n_samples, n_features)
            Training sample
        y : np.ndarray of shape (n_samples,)
            Targer values
        direction : str, optional
            Direction from parent node to one, which will be created in method (default is None)
        father_node : Node, optional
            Parent node of one, which will be created in method (default is None)
        """
        
        cur_node = Node(direction, father_node)
        if (direction != None) and (father_node != None):
            cur_node.depth = cur_node.father_node.depth + 1
            if cur_node.direction == 'r':
                cur_node.father_node.right_node = cur_node
            else:
                cur_node.father_node.left_node = cur_node
        else:
            cur_node.depth = 1
            self.root = cur_node
        
        if (len(y) == self.min_leaf_size) or (cur_node.depth == self.max_depth):
            cur_node.ans = np.round(np.average(y))
            return
        
        np.random.seed((int)(str(time.time()).split('.')[1]))
        all_features = X.shape[1]
        chosen_features = np.sort(np.random.permutation(all_features)[:self.max_features])
        min_loss = np.inf
        opt_feature = None
        opt_t = None
        opt_j = None
        for ind_feature in chosen_features:
            for j in range(X.shape[0]):
                X_r, y_r, X_l, y_l = self.__get_parts(X, y, ind_feature, j)
                if (len(y_r) == 0) or (len(y_l) == 0):
                    continue
                assert (X_l.shape[0] + X_r.shape[0] == X.shape[0]), "Object was lost!"
                assert (len(y_l) + len(y_r) == len(y)), "Object was lost!"
                assert ((len(y_r) != 0) and (len(y_l) != 0)), 'Ooops'
                if (self.__loss(X_l.shape[0], X_r.shape[0], y_l, y_r) < min_loss):
                    min_loss = self.__loss(X_l.shape[0], X_r.shape[0], y_l, y_r)
                    opt_feature = ind_feature
                    opt_t = X[j, ind_feature]
                    opt_j = j
        
        cur_node.data = (opt_feature, opt_t)
        X_r, y_r, X_l, y_l = self.__get_parts(X, y, opt_feature, opt_j)
        self.fit(X_r, y_r, 'r', cur_node)
        self.fit(X_l, y_l, 'l', cur_node)
        
    def predict(self, X):
        """
        Parameters
        ----------
        X : np.ndarray of shape (n_samples, n_features)
            Sample to predict results
            
        Returns
        -------
        np.ndarray
            array of predicted values
        """

        y = np.zeros(X.shape[0])
        for i in range(X.shape[0]):
            y[i] = self.__run_throw_tree(X[i, :])
        return y
            
    def __loss(self, X_l_size, X_r_size, y_l, y_r):
        '''
        Parameters
        ----------
        X_l_size : int
            Number of elements in left splitter sample
        X_r_size : int
            Number of elements in right splitter sample
        y_l : np.ndarray of shape (n_left_samples,)
            Targer values of left sample
        y_r : np.ndarray of shape (n_left_samples,)
            Targer values of right sample
            
        Returns
        -------
        int
            value of error criterion
        '''
        
        return X_l_size / (X_l_size + X_r_size) * self.__H(y_l) + X_r_size / (X_l_size + X_r_size) * self.__H(y_r)

    def __H(self, y):
        '''
        Parameters
        ----------
        y : np.ndarray of shape (n_samples,)
            Targer values of sample

        Returns
        -------
        int
            value of information criterion
        '''
        
        p1 = (np.count_nonzero(y)) / len(y)
        if self.criterion == 'entropy':
            return - p1 * math.log2(1e-7 + p1) - (1 - p1) * math.log2(1e-7 + 1 - p1)
        if self.criterion == 'gini':
            return p1 * (1 - p1) + (1 - p1) * p1
    
    def __get_parts(self, X, y, ind_feature, j):
        '''
        Parameters
        ----------
        X : np.ndarray of shape (n_samples, n_features)
            Sample to split
        y : np.ndarray of shape (n_samples,)
            Targer values to split
        ind_feature : int
            Index of feature based on which split will be performed
        j : int
            Index of element in column which will be splitter

        Returns
        -------
        np.ndarray of shape (n_right_samples, n_targets)
            Right splitted sample
        np.ndarray of shape (n_riht_samples,)
            Targer values of right sample
        np.ndarray of shape (n_left_samples, n_targets)
            Left splitted sample
        np.ndarray of shape (n_left_samples,)
            Targer values of left sample
        '''
        
        X_r = X[X[:, ind_feature] <= X[j, ind_feature], :]
        y_r = y[X[:, ind_feature] <= X[j, ind_feature]]
        X_l = X[X[:, ind_feature] > X[j, ind_feature], :]
        y_l = y[X[:, ind_feature] > X[j, ind_feature]]
        return X_r, y_r, X_l, y_l
    
    def __run_throw_tree(self, x):
        '''
        Parameters
        ----------
        x : np.ndarray of shape (n_features)
            Features of one fixed object

        Returns
        -------
        int
            Target variable for given objest
        '''
        
        cur_node = self.root
        while (cur_node.ans == None):
            if (x[cur_node.data[0]] <= cur_node.data[1]):
                cur_node = cur_node.right_node
            else:
                cur_node = cur_node.left_node
        return cur_node.ans

def print_scores(test, pred):
    print(accuracy_score(test, pred))
    print(f1_score(test, pred))
    print(roc_auc_score(test, pred))

In [49]:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, explained_variance_score
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score

# code for simple check of ideas on Fisher's iris dataset

iris = load_iris()
data_bin = iris.data[:100, :]
target_bin = iris.target[:100]
X_train, X_test, y_train, y_test = train_test_split(data_bin, target_bin, test_size=0.8)
iris_tree = SimpleTree(max_depth = 3, max_features = (int)(X_train.shape[1] / 2), criterion = 'gini')
iris_tree.fit(X_train, y_train)
y_pred = iris_tree.predict(X_test)
print_scores(y_test, y_pred)

0.975
0.9743589743589743
0.9761904761904762


In [52]:
import pandas as pd

# data was get from https://archive.ics.uci.edu/ml/datasets/Occupancy+Detection+

train_data = pd.read_csv("./test_data/datatraining.txt", sep=",")
test_data = pd.read_csv("./test_data/datatest.txt", sep=",")
train_data = train_data.drop(['date'], axis = 1)
test_data = test_data.drop(['date'], axis = 1)

# results with entropy criterion

occ_tree = SimpleTree(max_depth = 3, max_features = train_data.shape[1] - 1)
occ_tree.fit(train_data.drop(['Occupancy'], axis=1).to_numpy(), train_data['Occupancy'].to_numpy())
occ_test_data = test_data.drop(['Occupancy'], axis=1).to_numpy()
occ_test_target = test_data['Occupancy'].to_numpy()
prediction = occ_tree.predict(occ_test_data)
print_scores(occ_test_target, prediction)

0.9786116322701689
0.9714285714285715
0.9825087688594285


In [51]:
# results with gini criterion

occ_tree_gini = SimpleTree(max_depth = 3, max_features = train_data.shape[1] - 1, criterion = 'gini')
occ_tree_gini.fit(train_data.drop(['Occupancy'], axis=1).to_numpy(), train_data['Occupancy'].to_numpy())
prediction_gini = occ_tree_gini.predict(occ_test_data)
print_scores(occ_test_target, prediction_gini)

0.9782363977485928
0.9708835341365462
0.9817752960021779
