In [28]:
# importing necessary packages
import numpy as np
import pandas as pd
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
from collections import Counter
from sklearn.metrics import accuracy_score
from sklearn.tree import DecisionTreeClassifier

In [24]:
class TreeNode:
    """
    Single tree node representation
    """
    def __init__(self, gain=None, value=None, data_left=None, data_right=None, threshold=None, feature=None):
        self.gain = gain
        self.value = value
        self.data_left = data_left
        self.data_right = data_right
        self.threshold = threshold
        self.feature = feature

In [25]:
class MyDecisionTreeClassifier:
    """
    Implementation of decision tree classifier algorithm
    """
    def __init__(self, min_no_samples=5, max_depth_tree=2):
        """
        Hyperparams initialzation
        """
        self.min_no_samples = min_no_samples
        self.max_depth_tree = max_depth_tree
        self.root_node = None # initializing the top node of decision tree
    
    @staticmethod
    def calculate_entropy(s):
        """
        Helper function to calculate entropy. Formula entropy = -summation(Pilog2(Pi)) i-> class label
        """
        entropy = 0
        count_class = np.bincount(np.array(s, np.int64)) # getting independent classes count and converting to int
        percentages = count_class / len(s) # getting the percentage split of classes
        for p in percentages:
            if p > 0:
                entropy += p * np.log2(p)
        return - entropy
    
    def information_gain(self, parent, left_child, right_child):
        """
        Helper function to calculate information gain of the split.
        Formula : IG = E(parent) - (left_child/parent * E(left_child) + right_child/paretn * E(right_child))
        """
        left = len(left_child) / len(parent)
        right = len(right_child) / len(parent)
        
        info_gain = self.calculate_entropy(parent) - (left * self.calculate_entropy(left_child) + right * self.calculate_entropy(right_child))
        return np.round(info_gain, 5)
    
    def get_best_split(self, X, y):
        """
        Function to calculate the best split
        """
        best_split = {}
        best_info_gain = -1
        
        n_rows, n_cols = X.shape # getting rows and columns
        
        for f_id in range(n_cols) : # get every feature
            X_curr = X[:, f_id] # getting all data from every feature
            for threshold in np.unique(X_curr): # get unique values of features
                
                # creating dataset
                df = np.concatenate((X, y.reshape(1, -1).T), axis = 1)
                df_left = np.array([row for row in df if row[f_id] <= threshold])
                df_right = np.array([row for row in df if row[f_id] > threshold])
                
                # check if left and right splits have data
                if len(df_left) > 0 and len(df_right) > 0:
                    # getting values of target variables
                    y_ = df[:, -1]
                    y_left = df_left[:, -1]
                    y_right = df_right[:, -1]
                    
                    gain = self.information_gain(y_, y_left, y_right)
                    if gain > best_info_gain:
                        # save the split
                        best_split = {
                            "gain" : gain,
                            "feature_idx": f_id,
                            "threshold": threshold,
                            "data_left": df_left,
                            "data_right": df_right,
                        }
                        # update gain
                        best_info_gain = gain
        return best_split
    
    def build_dec_tree(self, X, y, depth=0):
        """
        Building decision tree recursively
        """
        # getting no of rows left
        n_rows, _ = X.shape
        
        # checking exit conditions to break the build
        if n_rows >= self.min_no_samples and depth <= self.max_depth_tree:
            
            # get the best split
            best_split = self.get_best_split(X, y)

            # check if split is not entirely pure
            if best_split['gain'] > 0:
                
                left_side = self.build_dec_tree(X=best_split['data_left'][:,:-1], 
                                                y=best_split['data_left'][:,-1], depth=depth+1)
                
                right_side = self.build_dec_tree(X=best_split['data_right'][:,:-1], 
                                                 y=best_split['data_right'][:,-1], depth=depth+1)
                
                return TreeNode(gain=best_split['gain'], data_left=left_side, data_right=right_side, 
                                threshold=best_split['threshold'], feature=best_split['feature_idx'])
        
        # return value if split is pure
        return TreeNode(value=Counter(y).most_common(1)[0][0])
    
    def fit(self, X, y):
        """
        Function to train decision tree classifier
        """
        self.root = self.build_dec_tree(X, y)
        
    def set_predictions(self, x, tree):
        """
        Calculate predictions for single observations
        """
        if tree.value is not None:
            return tree.value
        
        feature_value = x[tree.feature]
        if feature_value <= tree.threshold:
            # traverse left
            return self.set_predictions(x, tree.data_left)
        if feature_value > tree.threshold:
            # traverse right
            return self.set_predictions(x, tree.data_right)
    
    def predict(self, X):
        """
        Function to get predictions for every value of features
        """
        return np.array([self.set_predictions(x, self.root) for x in X], np.int64)

In [29]:
# loading play datasets from sklearn for multi class classification
iris = load_iris()
# getting training data and target data
X = iris.data
y = iris.target

# splitting dataset
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 42)

print(f"Origianal data : {X.shape}, Train data : {X_train.shape}")

# custom model
my_model = MyDecisionTreeClassifier()
my_model.fit(X_train, y_train)
preds = my_model.predict(X_test)
my_score = accuracy_score(y_test, preds)

# sk model

sk_model = DecisionTreeClassifier()
sk_model.fit(X_train, y_train)
sk_preds = sk_model.predict(X_test)
sk_score = accuracy_score(y_test, sk_preds)

print(f"Decision Tree Classifiers result -> custom model: {my_score}, sklearn : {sk_score}")


Origianal data : (150, 4), Train data : (120, 4)
Decision Tree Classifiers result -> custom model: 0.9666666666666667, sklearn : 1.0
