In [None]:
from sklearn import datasets
from sklearn.model_selection import train_test_split
import numpy as np

In [None]:
class Node:
    def __init__(self, feature= None, entropy= 0, clf_metric_val= 0, class_val={}, class_=""):
        self.feature= feature   # Feature name to split upon
        self.entropy= entropy   # current entropy
        self.clf_metric_val= clf_metric_val   # value of classfication metric used
        self.children= {}       # dict of child nodes with key as feature value and value as Node object 
        self.class_val= class_val  #dictionary where key is class name and value is number of data points
        self.class_= class_    #class i.e. output return by this node

In [None]:
class DecisionTree:
    def __init__(self,clf_metric="gain_ratio"):
        self.__root= None # root node of decision tree
        self.clf_metric= clf_metric  #classification metric used
        if self.clf_metric not in ["gain_ratio", "gini_index"]:
            self.clf_metric= "gain_ratio"
        self.__cont_val_features=[]   #index of continuous valued features
        self.depth= 0  #depth of decision tree
        self.leaf_count=0
    
    #Private Methods
    def __decisionTree(self, X, Y, visited):
        visited=list(visited)
        y= list(set(Y))
        if len(y)==0:
            #if a feature that consist of only one value, then splitting on this feature Y will be empty 
            # None is returned for these features
            return None
        if len(y)==1:# pure node
            newNode= Node(class_val={y[0]: len(Y)}, class_= y[0])
            return newNode
        
        elif visited.count(False)==0: #if no feature is left
            entropy, class_val= self.__entrpyANDclass_val(Y)
            class_= sorted(class_val, key= lambda x: class_val[x], reverse=True)[0]
            newNode= Node(entropy= entropy, class_val= class_val, class_= class_)
            return newNode
        
        from math import inf
        
        best_to_split= -1   #index of feature with maximum classification metric value
        best_metric_val=-1*inf  # maximum metric value
        #Choosing a feature for splitting
        for i in range(X.shape[1]):
            if visited[i]:
                continue
                
            #Calculating metric value for a particular feature
            if i in self.__cont_val_features: #checking if a feature is continuous valued or not
                val, b= self.__handle_cont_val_features(X[:,i], Y)  #val for metric value and b for decision boundary 
            else:
                val, b= self.__handle_discrt_val_features(X[:,i], Y)
            if val> best_metric_val:
                best_metric_val= val
                best_to_split= i
                boundary= b
    
        visited[best_to_split]= True   # marking chosen feature to be visited
        children={}
        feature_chose= X[:, best_to_split]
        if best_to_split in self.__cont_val_features:
            for i in range(2):
                if i==0:
                    new_x= X[feature_chose <= boundary]
                    new_y= Y[feature_chose <= boundary]
                else:
                    new_x= X[feature_chose > boundary]
                    new_y= Y[feature_chose > boundary]
                #recursive call
                if i==0:
                    children[boundary]= self.__decisionTree(new_x, new_y, visited)
                    if children[boundary] is None:
                        del children[boundary]
                else:
                    children[inf]= self.__decisionTree(new_x, new_y, visited)
                    if children[inf] is None:
                        del children[inf]
            
        else:
            if self.clf_metric=="gain_ratio":
                for i in list(set(feature_chose)):
                    new_x= X[feature_chose == i]
                    new_y= Y[feature_chose == i]
                    children[i]= self.__decisionTree(new_x, new_y, visited) #recursive call
                    if children[i] is None:
                        del children[i]
                    
            elif self.clf_metric=="gini_index":
                #Boundary is a list of 2 tuples
                #Because gini index leads to binary split 
                #Therefore, one tuple contains a single discrete value and second tuple contains all other values of feature
                for i in boundary: 
                    new_x=[]
                    new_y=[]
                    for j in range(len(feature_chose)):
                        if feature_chose[j] in i:
                            new_x.append(X[j])
                            new_y.append(Y[j])
                    new_x= np.array(new_x)
                    new_y= np.array(new_y)
                    children[i]= self.__decisionTree(new_x, new_y, visited) #recursive call
                    if children[i] is None:
                        del children[i]

        entropy, class_val= self.__entrpyANDclass_val(Y)
        class_= sorted(class_val, key= lambda x: class_val[x], reverse=True)[0]
        newNode= Node(feature= best_to_split, entropy= entropy, clf_metric_val= best_metric_val, class_val= class_val, 
                      class_= class_)  #creating a new node object
        newNode.children= children

        return newNode
                
    def __chk_cont_val_feature(self, X):
        #setting a threshold at 10
        #if number of distinct values of a feature is greater than 10, then it is a continuous feature
        #else a categorical feature
        if len(set(X))>10:
            return True #continuous valued feature
        else:
            return False #categorical feature
        
    def __entrpyANDclass_val(self, Y):
        #Calculating Entropy or information required
        from collections import Counter
        from math import log
        class_val= dict(Counter(Y)) #a dictionary with key as class and value as number of data points
        entropy= 0
        for i in class_val:
            entropy+=(-1*class_val[i]/len(Y))*log(class_val[i]/len(Y),2) #calculating entropy =-sigma(plogp)
            
        return round(entropy,3), class_val
    
    def __handle_cont_val_features(self, X, Y):
        #handling continuous valued features
        from math import inf
        
        x= list(set(X))
        x.sort() #sorting distinct values of feature 
        max_val=-1*inf #maximum metric value
        boundary= None #corresponding boundary value
        for i in range(1, len(x)):
            mid= (x[i]+x[i-1])/2 #calculating mid value
            split_new_y=[[],[]]  #splitting Y according to mid value
            split_new_y[0]= Y[X <= mid]
            split_new_y[1]= Y[X > mid]
            if self.clf_metric == "gain_ratio":
                val= self.__gain_ratio(Y, split_new_y)  #calculating gain ratio
            elif self.clf_metric == "gini_index":
                val= self.__gini_index(Y, split_new_y)  #calculating delta G(f)= Gini_org- Gini_split
            
            if val> max_val:
                max_val= val
                boundary= round(mid,5)
        
        return max_val, boundary
    
    def __handle_discrt_val_features(self, X, Y):
        #handling categorical features
        x= list(set(X)) #distinct values of feature X
        if self.clf_metric == "gain_ratio":
            split_y= [ Y[X ==x[i]] for i in range(len(x))] #splitting Y for all distinct values of X
            val= self.__gain_ratio(Y, split_y) #calculating gain ratio
            return val, None
        elif self.clf_metric == "gini_index":
            from math import inf
            
            max_val= -1*inf #maximum value of delta gini
            boundary= None
            #using one vs rest approach
            #For eg if a feature= [red, green, blue]
            #then splitting Y if feature value is red or non-red(i.e. green or blue)
            for i in range(len(x)):
                split_y=[[],[]]
                split_y[0]= Y[X == x[i]]
                split_y[1]= Y[X != x[i]]
                val= self.__gini_index(Y, split_y)
                if val> max_val:
                    max_val= val
                    x1= list(x)
                    x1.remove(x[i])
                    x1= tuple(x1)
                    boundary= [(x[i],), x1]  #splitting feature value 
            return max_val, boundary
        
    def __gain_ratio(self, Y_org, split_Y):  #list of numpy array of splitted Y is send as an argument
        #Calculating gain ratio
        from math import log, inf
        
        info_org, class_val= self.__entrpyANDclass_val(Y_org) #calculating info required for orginal Y
        #class_val is of no use
        info_f=0  #weighted informaton required for splitted Y
        split_info= 0
        D= len(Y_org)  #total number of data points in original data
        for y in split_Y:
            info, class_val= self.__entrpyANDclass_val(y)  #inforamtion required for splitted Y
            info_f+= ((len(y)/D)*info)  #(|D1|/|D|)*info_req(D1)
            split_info+= (-1*(len(y)/D)*log(len(y)/D, 2))  #(|D1|/|D|)*log2((|D1|/|D|))
            
        info_gain= info_org- info_f  #information gain
        
        if split_info==0:
            return inf
        return round(info_gain/ split_info,3)  #Gain ratio
        
    def __gini_index(self, Y_org, split_Y):
        #Gini_index= 1-sigma(p**2)
        from collections import Counter
        
        class_val= dict(Counter(Y_org))
        D= len(Y_org)
        gini_org=1
        gini_split=0
        
        for i in class_val:
            gini_org-= (class_val[i]/D)**2 #Calculating gini index of original Y
        for y in split_Y:
            gini=1
            cls_v= dict(Counter(y))
            for i in cls_v:
                gini-= (cls_v[i]/len(y))**2 #Calculating gini index of splitted Y
            gini_split+= ( (len(y)/D)*gini ) #weighted adding of each split
        
        return round(gini_org-gini_split,3)  #return difference in gini
    
    
    def __print_tree_helper(self, node, level, col_name):
        #printing each node of a tree
        if node is None:
            return
        if len(node.children)==0:
            #printing leaf nodes
            self.leaf_count+=1#updating value of leaf count
            if self.depth<=level:
                self.depth= level+1#updating depth
            print(f"Level {level}") #printing using f-strings
            for i in sorted(node.class_val):
                print(f"Count of {i}= {node.class_val[i]}")
            print(f"Current Entropy is= {node.entropy}")
            print("Reached leaf node\n")
            
            return
        
        #printing non-leaf nodes
        print(f"Level {level}")
        for i in sorted(node.class_val):
            print(f"Count of {i}= {node.class_val[i]}")
        print(f"Current Entropy is= {node.entropy}")
        if col_name==[]:
            print(f"Splitting on feature X{node.feature} with {self.clf_metric} {node.clf_metric_val}\n")
        else:
            print(f"Splitting on feature {col_name[node.feature]} with {self.clf_metric} {node.clf_metric_val}\n")
        
        for i in node.children:
            self.__print_tree_helper(node.children[i], level+1, col_name) #recursive call for each children
       
    #Public Methods
    def print_tree(self, col_name=[]):
        #printing tree nodes with features names as given in col_names
        self.__print_tree_helper(self.__root, 0, col_name)
    
    def fit(self, X, Y):
        #Trainnig decision tree
        visited= [False for i in range(X.shape[1])] #visited list of features
        for i in range(X.shape[1]):#storing index of each continuous feature
            if self.__chk_cont_val_feature(X[:,i]):
                self.__cont_val_features.append(i)
        self.__root= self.__decisionTree(X, Y, visited)
        
    def predict_val(self, x, node):
        #predicting value for 1 datapoint
        if node is None:
            return np.nan
        if len(node.children)==0: #leaf node
            return node.class_
        try:
            if node.feature in self.__cont_val_features: #for continuous features
                boundary_val= sorted(node.children)
                if x[node.feature]<=boundary_val[0]:
                    return self.predict_val(x, node.children[boundary_val[0]])
                else:
                    return self.predict_val(x, node.children[boundary_val[1]])
            else: #for discrete features
                if self.clf_metric=='gain_ratio': 
                    if x[node.feature] in node.children: #if feature value is any of the key of children of this node
                        return self.predict_val(x, node.children[x[node.feature]])
                    else: #else return output of this node
                        return node.class_
                elif self.clf_metric=='gini_index':
                    boundary_val= sorted(node.children)
                    if x[node.feature] in boundary_val[0]:#if feature value is in any of the key of children of this node
                        return self.predict_val(x, node.children[boundary_val[0]])
                    elif x[node.feature] in boundary_val[1]:
                        return self.predict_val(x, node.children[boundary_val[1]])
        except:
            pass
        return node.class_
    
    def predict(self, X):
        #predicting value by traversing tree for each data-point
        Y_pred= []
        for i in range(X.shape[0]):
            Y_pred.append(self.predict_val(X[i], self.__root))
        Y_pred= np.array(Y_pred)
        return Y_pred
    
    def score(self, Y_truth, Y_pred):
        #Calculating precentage accuracy
        accurate=0
        for i in range(len(Y_truth)):
            if(Y_truth[i]== Y_pred[i] ):
                accurate+=1
        return accurate/len(Y_truth)
    
    def export_to_pdf(self, feature_names=None, class_names=None, filename=None):
        #Representing formed decision tree in graphical form
        #pip install graphviz
        from graphviz import Digraph
        dot= Digraph()
        dot.attr('node',shape="rectangle") #changing shape of each node to rectangle
        parent=[] #a Queue to save parent node of each node so that edges can be created
        queue= [] #Queue to store current nodes
        queue.append(self.__root)
        j=0
        while len(queue)!=0:  #Level order Traversal
            nodecount= len(queue)
            while nodecount:
                nodecount-=1
                node= queue.pop(0)
                if len(node.children)!=0:  #if not a leaf node, include feature in the label of node
                    if feature_names is not None:
                        label= f"{feature_names[node.feature]}"
                        if node.feature in self.__cont_val_features:
                            label+= f"<={sorted(node.children)[0]}"
                        label+="\n"
                    else:
                        label= f"X{node.feature}"
                        if node.feature in self.__cont_val_features:
                            label+= f"<={sorted(node.children)[0]}"
                        label+="\n"
                else:
                    label=""
                #Values stored in each node of tree
                #here value of gini index is delta gini(f)
                label+=f"{self.clf_metric}= {node.clf_metric_val}\nEntropy= {node.entropy}\nvalues= {node.class_val}\n"
                if class_names is not None: #class returned by a node
                    label+=f"class= {class_names[node.class_]}\n"
                dot.node(str(j), label) #creating a node with identifier j and value of node is label
                if len(parent)!=0:  #not root node
                    prnt=parent.pop(0)
                    #creating edges with label on each edge like true, false and discrete values of a feature
                    dot.edge(prnt[0], str(j),label= prnt[1]) 
                
                fl= True #first children of continuous valued feature node is true and second is false
                #to identify first and second children fl is used
                for i in sorted(node.children):
                    if node.feature in self.__cont_val_features:
                        if fl:
                            label="True" #edge label
                            fl=False
                        else:
                            label="False" #edge label
                    else:
                        if self.clf_metric=="gain_ratio":
                            label=str(i) #edge label-> discrete values
                        elif self.clf_metric=="gini_index":
                            if fl:
                                label= str(i[0]) #edge label
                                fl= False
                            else:
                                label= "rest" #edge label
                    parent.append((str(j), label)) #storing identifier of parent node
                    queue.append(node.children[i]) #storing children of current node
                j+=1
        if filename is not None:
            import pydotplus
            graph = pydotplus.graph_from_dot_data(dot.source)
            graph.write_pdf(filename) # creating pdf using dot data
        else:
            return dot.source
    
    def confusion_matrix(self, Y_truth, Y_pred):
        #calculating confusion matrix
        matrix= [[0 for j in range(max(Y_truth)+1)] for i in range(max(Y_truth)+1)]
        for i in range(len(Y_truth)):
            matrix[Y_truth[i]][Y_pred[i]]+=1
        for i in range(len(matrix[0])):
            print(matrix[i])

# OR dataset

In [None]:
x=np.array([[0,0],[0,1],[1,0]])
y=np.array([0,1,1])
clf= DecisionTree(clf_metric= "gini_index")
clf.fit(x,y)
clf.print_tree()

In [None]:
clf.export_to_pdf(filename="OR.pdf")

In [None]:
clf.predict(np.array([[1,1]]))

In [None]:
clf.confusion_matrix([1],[1])

# Random Dataset

In [None]:
X,Y = datasets.make_classification(n_samples=1000, n_features=5, n_classes=3,n_informative=3 , random_state=0)
for i in range(X.shape[0]):
    for j in [0,2,4]:
        X[i][j]=int(X[i][j])
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, random_state=0)
clf= DecisionTree(clf_metric="gini_index")
clf.fit(X_train,Y_train)
clf.print_tree()

In [None]:
clf.export_to_pdf(filename= "Random_Dataset.pdf")

In [None]:
clf.leaf_count,clf.depth

In [None]:
y_pred= clf.predict(X_test)
clf.score(Y_test, y_pred)

In [None]:
y_train_pred= clf.predict(X_train)
clf.score(Y_train, y_train_pred)

# Iris Dataset

In [None]:
iris= datasets.load_iris()
clf= DecisionTree()
x_train, x_test, y_train, y_test = train_test_split(iris.data, iris.target, random_state = 1)
clf.fit(x_train, y_train)
clf.print_tree(col_name= iris.feature_names)

In [None]:
y_train_pred= clf.predict(x_train)
y_test_pred= clf.predict(x_test)

In [None]:
clf.export_to_pdf(feature_names= iris.feature_names, class_names= iris.target_names, filename="iris2.pdf")

In [None]:
clf.score(y_train, y_train_pred)

In [None]:
clf.score(y_test, y_test_pred)

In [None]:
clf.leaf_count, clf.depth

In [None]:
clf.confusion_matrix(y_test, y_test_pred)

In [None]:
clf.confusion_matrix(y_train, y_train_pred)

# Sklearn Decision Tree

In [None]:
from sklearn.tree import DecisionTreeClassifier

In [None]:
clf1 = DecisionTreeClassifier()
clf1.fit(x_train, y_train)

In [None]:
import pydotplus
from sklearn.tree import export_graphviz
dot_data = export_graphviz(clf1, out_file=None,
                          feature_names=iris.feature_names,
                          class_names=iris.target_names)
graph = pydotplus.graph_from_dot_data(dot_data)
graph.write_pdf("iris.pdf")

In [None]:
y_train_pred1 = clf1.predict(x_train)
y_test_pred1 = clf1.predict(x_test)

In [None]:
clf1.score(x_train, y_train)

In [None]:
clf1.score(x_test, y_test)

In [None]:
from sklearn.metrics import confusion_matrix

In [None]:
confusion_matrix(y_train, y_train_pred1)

In [None]:
confusion_matrix(y_test, y_test_pred1)