In [34]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from itertools import combinations

from sklearn.model_selection import train_test_split

from sklearn.metrics import accuracy_score

In [35]:
penguins = sns.load_dataset('penguins')
penguins.dropna(inplace=True)
X = np.array(penguins.drop(columns='species'))
y = np.array(penguins['species'])

In [36]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=0)

In [37]:
# loss functions
def gini_index(y):
    size = len(y)
    classes, counts = np.unique(y, return_counts=True)
    pmk = counts/size
    
    return np.sum(pmk*(1-pmk))

def cross_entropy(y):
    size = len(y)
    classes, counts = np.unique(y, return_counts=True)
    pmk = counts/size
    
    return -np.sum(pmk * np.log2(pmk))

def split_loss(child1, child2, loss=cross_entropy):
    return (len(child1)*loss(child1) + len(child2)*loss(child2))/(len(child1) + len(child2))

In [38]:
def all_rows_equal(X):
    return (X == X[0]).all()

def possible_splits(x):
    L_values = []
    for i in range(1, int(np.floor(len(x)/2)) + 1):
        L_values.extend(list(combinations(x, i)))
    
    return L_values

In [39]:
class Node:
    
    def __init__(self, Xsub, ysub, ID, obs, depth=0, parent_ID=None, leaf=True):
        self.ID = ID
        self.Xsub = Xsub
        self.ysub = ysub
        self.obs = obs
        self.size = len(ysub)
        self.depth = depth
        self.parent_ID = parent_ID
        self.leaf = leaf
        
class Splitter:
    
    def __init__(self):
        self.loss = np.inf
        self.no_split = True
        
        
    def _replace_split(self, Xsub_d, loss, d, dtype='quant', t=None, L_values=None):
        self.loss = loss
        self.d = d
        self.dtype = dtype
        self.t = t
        self.L_values = L_values
        self.no_split = False
        
        if dtype == 'quant':
            self.L_obs = self.obs[Xsub_d <= t]
            self.R_obs = self.obs[Xsub_d > t]
        else:
            self.L_obs = self.obs[np.isin(Xsub_d, L_values)]
            self.R_obs = self.obs[~np.isin(Xsub_d, L_values)]
        

In [40]:
class DecisionTreeClassifier:
    
    """
        FITTING METHODS
    """
    def fit(self, X, y, loss_func=cross_entropy, max_depth=100, min_size=2, C=None):
        
        # Store data
        self.X = X
        self.y = y
        self.N, self.D = self.X.shape
        dtypes = [np.array(list(self.X[:, d])).dtype for d in range(self.D)]
        self.dtypes = ['quant' if (dtype == float or dtype == int) else 'cat' for dtype in dtypes]
            
        # Regularization parameters
        self.loss_func = loss_func
        self.max_depth = max_depth
        self.min_size = min_size
        self.C = C
        
        # Initialize nodes
        self.nodes_dict = {}
        self.current_ID = 0
        initial_node = Node(Xsub=X, ysub=y, ID=self.current_ID, obs=np.arange(self.N), parent_ID=None)
        self.nodes_dict[self.current_ID] = initial_node
        self.current_ID += 1
        
        # Build tree
        self._build()
        
    def _build(self):
        
        eligible_buds = self.nodes_dict
        for layer in range(self.max_depth):
            
            # find eligible nodes for layer iteration
            eligible_buds = {
                ID: node for (ID, node) in self.nodes_dict.items() if
                                        (node.leaf == True) &
                                        (node.size >= self.min_size) &
                                        (~all_rows_equal(node.Xsub)) &
                                        (len(np.unique(node.ysub)) > 1)
            }
            
            if len(eligible_buds) == 0:
                break
                
            for ID, bud in eligible_buds.items():
                self._find_split(bud)
                if not self.splitter.no_split: # could be no split for RF
                    self._make_split()
        
    def _find_split(self, bud):
        
        splitter = Splitter()
        splitter.bud_ID = bud.ID
        splitter.obs = bud.obs
        
        # gather eligible predictors for Random Forests
        if self.C is None:
            eligible_predictors = np.arange(self.D)
        else:
            eligible_predictors = np.random.choice(np.arange(self.D), self.C, replace=False)
            
        for d in sorted(eligible_predictors):
            Xsub_d = bud.Xsub[:, d]
            dtype = self.dtypes[d]
            if len(np.unique(Xsub_d)) == 1:
                continue
                
            # for each threshold value
            if dtype == 'quant':
                for t in np.unique(Xsub_d)[:-1]:
                    ysub_L = bud.ysub[Xsub_d <= t]
                    ysub_R = bud.ysub[Xsub_d > t]
                    
                    loss = split_loss(ysub_L, ysub_R, loss=self.loss_func)
                    if loss < splitter.loss:
                        splitter._replace_split(Xsub_d, loss, d, dtype='quant', t=t)
            else:
                for L_values in possible_splits(np.unique(Xsub_d)):
                    ysub_L = bud.ysub[np.isin(Xsub_d, L_values)]
                    ysub_R = bud.ysub[~np.isin(Xsub_d, L_values)]
                    
                    loss = split_loss(ysub_L, ysub_R, loss=self.loss_func)
                    if loss < splitter.loss:
                        splitter._replace_split(Xsub_d, loss, d, 'cat', L_values=L_values)
                    
        self.splitter = splitter
                        
    def _make_split(self):
        
        parent_node = self.nodes_dict[self.splitter.bud_ID]
        parent_node.leaf = False
        parent_node.child_L = self.current_ID 
        parent_node.child_R = self.current_ID + 1
        parent_node.d = self.splitter.d
        parent_node.dtype = self.splitter.dtype
        parent_node.t = self.splitter.t
        parent_node.L_values = self.splitter.L_values
        parent_node.L_obs, parent_node.R_obs = self.splitter.L_obs, self.splitter.R_obs
        
        if parent_node.dtype == 'quant':
            L_condition = parent_node.Xsub[:, parent_node.d] <= parent_node.t
        else:
            L_condition = np.isin(parent_node.Xsub[:, parent_node.d], parent_node.L_values)
        
        Xchild_L = parent_node.Xsub[L_condition]
        ychild_L = parent_node.ysub[L_condition]
        Xchild_R = parent_node.Xsub[~L_condition]
        ychild_R = parent_node.ysub[~L_condition]
        
        child_node_L = Node(Xchild_L, ychild_L, obs=parent_node.L_obs, depth=parent_node.depth+1,
                            ID=self.current_ID, parent_ID=parent_node.ID)
        
        child_node_R = Node(Xchild_R, ychild_R, obs=parent_node.R_obs, depth=parent_node.depth+1,
                            ID=self.current_ID+1, parent_ID=parent_node.ID)
        
        self.nodes_dict[self.current_ID] = child_node_L
        self.nodes_dict[self.current_ID+1] = child_node_R
        self.current_ID += 2
        
    
    
    """
        PREDICTING FUNCTIONS
    """
    def _get_leaf_means(self):
        self.leaf_means = {}
        for node_ID, node in self.nodes_dict.items():
            if node.leaf:
                values, counts = np.unique(node.ysub, return_counts=True)
                self.leaf_means[node_ID] = values[np.argmax(counts)]
                
    def predict(self, X_test):
        self._get_leaf_means()
        
        yhat = []
        for x in X_test:
            node = self.nodes_dict[0]
            while not node.leaf:
                if node.dtype == 'quant':
                    if x[node.d] <= node.t:
                        node = self.nodes_dict[node.child_L]
                    else:
                        node = self.nodes_dict[node.child_R]
                else:
                    if x[node.d] in node.L_values:
                        node = self.nodes_dict[node.child_L]
                    else:
                        node = self.nodes_dict[node.child_R]
            yhat.append(self.leaf_means[node.ID])
        
        return np.array(yhat)
        

In [41]:
tree = DecisionTreeClassifier()
tree.fit(X_train, y_train, max_depth=10, min_size=10)

In [42]:
yhat_test = tree.predict(X_test)

In [43]:
accuracy_score(y_test, yhat_test)

0.9404761904761905