In [33]:
import pandas as pd
import numpy as np
from typing import Tuple

In [16]:
col_names = ['sepal_length','sepal_width','petal_length','petal_width','species']
data = pd.read_csv('iris.csv', skiprows=1, header=None, names=col_names)
data.head()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species
1,5.1,3.5,1.4,0.2,Iris-setosa
2,4.9,3.0,1.4,0.2,Iris-setosa
3,4.7,3.2,1.3,0.2,Iris-setosa
4,4.6,3.1,1.5,0.2,Iris-setosa
5,5.0,3.6,1.4,0.2,Iris-setosa


In [17]:
from sklearn.preprocessing import LabelEncoder
encoder = LabelEncoder()

#Transform cateogorical string targets into categorical numerical targets
data['spieces_type'] = encoder.fit_transform(data['species'])
data.drop(columns=['species'], inplace=True)
data.head()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,spieces_type
1,5.1,3.5,1.4,0.2,0
2,4.9,3.0,1.4,0.2,0
3,4.7,3.2,1.3,0.2,0
4,4.6,3.1,1.5,0.2,0
5,5.0,3.6,1.4,0.2,0


### Node Class

In [18]:
class Node():
    def __init__(self, feature_index=None, threshold=None, left=None, right=None, info_gain=None, value=None):
        '''
            feature_index: Defines conditions
            threshold: Defines conditions 
            left: Accesses left child
            right: Accesses right child
            info_gain: Stores informatino gain by split of particular decision node
            value: Majority class of leaf note; only requisite value for leaf node
        '''
        #Decision node
        self.feature_index = feature_index
        self.threshold = threshold
        self.left = left
        self.right = right
        self.info_gain = info_gain

        #Leaf node
        self.value = value

### Tree Class

In [19]:
class DecisionTreeClassifier():
    def __init__(self, min_samples_split=2, max_depth=2):
        '''
            min_samples_split:
            max_depth:
        '''
        #Init root
        self.root = None

        #Stop conditions
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth


    def entropy(self, y):
        '''
        Method to compute information gain: A measure of unpredictability or disorder in a dataset [0,1].

        PARAMS:
            - Labeled samples

        RETURNS:
            - Entropy
        '''

        class_labels = np.unique(y)
        entropy = 0

        for cls in class_labels:
            
            p_cls = len(y[y==cls]) / len(y) #Probability of class
            entropy += -p_cls * np.log2(p_cls) #Running sum -P(cls) * log2(P(cls))

        return entropy
    
    def gini_index(self, y):
        '''
        Method to compute information gain: A measure of the probability of incorrectly classifying a randomly selected element.

        NOTE: Gini is more computationally efficient than entropy because squaring is more efficient than log2()
        NOTE: Gini is more robust to class imbalance

        PARAMS: 
            - Labeled samples

        RETURNS:
            - Gini index
        '''
        class_labels = np.unique(y)
        gini = 0

        #For each class, 
        for cls in class_labels:
            p_cls = len(y[y==cls]) / len(y) #Probability of class
            gini += p_cls**2 #Running sum P(cls)**2

        return 1-gini

    def information_gain(self, parent: np.array, l_child: np.array, r_child: np.array, mode='entropy') -> float:
        '''
        Computes information gain with either entropy or gini
        '''

        #Compute relative weights (child:parent)
        weight_l = len(l_child) / len(parent)
        weight_r = len(r_child) / len(parent)

        if mode == 'gini':
            gain = self.gini_index(parent) - (weight_l*self.gini_index(l_child) + weight_r*self.gini_index(r_child))
        else:
            gain = self.entropy(parent) - (weight_l*self.entropy(l_child) + weight_r*self.entropy(r_child))

        return gain

    def calculate_leaf_value(self, y):
        '''
        Finds majority class and returns it.
        '''

        y = list(y)
        return max(y, key=y.count)


    def split(self, dataset: np.array, feature_index: int, threshold: float) -> Tuple[np.array, np.array]:
        '''
        Splits dataset, feature index, and theshold value and splits into left and right children
        '''

        dataset_left = np.array([row for row in dataset if row[feature_index]<=threshold])
        dataset_right = np.array([row for row in dataset if row[feature_index]>threshold])

        return dataset_left, dataset_right


    def get_best_split(self, dataset: np.array, num_features: int) -> dict:
        '''
        Returns dictionary defining the current optimal split.
        '''

        best_split = {}
        max_info_gain = -float("inf") #want to maximize information gain, so we start with lowest possible number

        #Traverse through all features
        for feature_index in range(num_features):
            feature_values = dataset[:, feature_index] #select rows from feature col
            possible_thresholds = np.unique(feature_values) #select all unique values to use as potential split thresholds

            #Traverse through all unique values encountered in the dataset for the feature
            for threshold in possible_thresholds:
                dataset_left, dataset_right = self.split(dataset, feature_index, threshold) #Splits dataset at threshold

                #Ensure children exist
                if len(dataset_left) > 0 and len(dataset_right) > 0: 

                    #Compute information gain with entropy or gini (this is always set to gini)
                    y, left_y, right_y = dataset[:,-1], dataset_left[:,-1], dataset_right[:,-1]
                    curr_info_gain = self.information_gain(y, left_y, right_y, 'gini')

                    #If the current information gain is greater than max information gain, update best_split
                    if curr_info_gain > max_info_gain:
                        best_split['feature_index'] = feature_index
                        best_split['threshold'] = threshold
                        best_split['dataset_left'] = dataset_left
                        best_split['dataset_right'] = dataset_right
                        best_split['info_gain'] = curr_info_gain

                        max_info_gain = curr_info_gain

        return best_split

    def build_tree(self, dataset: np.array, curr_depth: int=0) -> Node():
        '''
        Recursively build decision tree

        PARAMS:
            - dataset: An array of concat(X,y) where y (target) is the last column.
            - curr_depth: 

        RETURNS:
            - A decision node containing the split that more maximizes information gain (through minimizing entropy or gini index)
        '''

        #Split features and targets
        X, y = dataset[:,:-1], dataset[:,-1]
        n_samples, n_features = X.shape

        #Recursively split until stop conditions are met -> stop conditions: either number of samples in a node are too few or current depth is too great
        if n_samples < self.min_samples_split or curr_depth > self.max_depth: 

            #Find the best split dictionary
            best_split = self.get_best_split(dataset, n_features)

            #Ensure information gain > 0; if 0 then we'd split a node that's already pure (consists of one class)
            if best_split['info_gain'] > 0:

                #Recurse to create left and right subtrees
                left_subtree = self.build_tree(best_split["dataset_left"], curr_depth+1)
                right_subtree = self.build_tree(best_split["dataset_right"], curr_depth+1)

                #Return decision node
                return Node(best_split['feature_index'], best_split['threshold'], left_subtree, right_subtree, best_split['info_gain'])

        #Compute and return leaf node
        leaf_value = self.calculate_leaf_value(y)

        return Node(value=leaf_value)

    def print_tree(self, tree=None, indent=' '):
        '''
        Prints the tree for visualization
        '''

        if not tree:
            tree = self.root

        if tree.value is not None:
            print(tree.value)

        else:
            print(f'X_{tree.feature_index} <= {tree.threshold} ? {tree.info_gain}')
            print('%sleft' % (indent), end="")
            self.print_tree(tree.left, indent+indent)
            print('%sright' % (indent), end="")
            self.print_tree(tree.right, indent+indent)


    def fit(self, X: np.array, y: np.array):

        dataset = np.concatenate((X,y), axis=1)
        self.root = self.build_tree(dataset)


    def make_prediction(self, X: np.array, tree):

        if tree.value != None: return tree.value
        feature_val = X[tree.feature_index]
        if feature_val <= tree.threshold:
            return self.make_prediction(X, tree.left)
        else:
            return self.make_prediction(X, tree.right)


    def predict(self, X: np.array):
        predictions = [self.make_prediction(x, self.root) for x in X]
        return predictions

### Train-Test Split

In [20]:
from sklearn.model_selection import train_test_split

X = data.iloc[:,:-1].values
y = data.iloc[:,-1].values.reshape(-1,1)

#80% train / 20% test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

In [32]:
X.shape

(150, 4)

In [21]:
classifier = DecisionTreeClassifier(min_samples_split=3, max_depth=3)
classifier.fit(X_train, y_train)
classifier.print_tree()

X_2 <= 1.9 ? 0.3334027777777778
 left0.0
 rightX_3 <= 1.6 ? 0.38486920426065185
  leftX_2 <= 4.9 ? 0.1342403628117913
    left1.0
    rightX_0 <= 6.0 ? 0.11999999999999983
        left2.0
        right2.0
  rightX_2 <= 4.8 ? 0.016158818097876115
    leftX_1 <= 2.8 ? 0.4444444444444444
        left2.0
        right1.0
    right2.0


### Basic Model Evaluation

In [22]:
from sklearn.metrics import accuracy_score

y_pred = classifier.predict(X_test)
accuracy_score(y_test, y_pred)

0.9333333333333333

### Now Let's Use Sklearn and Evaluate the Result 

In [23]:
from sklearn.tree import DecisionTreeClassifier

clf = DecisionTreeClassifier(criterion='gini', max_depth=3)
clf.fit(X_train, y_train)

In [24]:
y_pred = clf.predict(X_test)

accuracy_score(y_test, y_pred)

0.9666666666666667

### Hyperparameter Tuning

In [25]:
from sklearn.model_selection import GridSearchCV

clf = DecisionTreeClassifier()

param_grid = {
    'criterion': ['gini', 'entropy'], #Splitting strategy
    'max_depth': [2, 3, 5, 10], #Control tree depth
    'min_samples_split': [2, 5, 10], #Min samples needed to split a node
    'min_samples_leaf': [1, 2,3,4,5], #Min samples requred at a leaf node
    'max_features': [None, 'sqrt', 'log2'] #Max number of features considered at each decision node {None: all features, 'sqrt': n_features**0.5, 'log2': log2(n_features)}
}

grid_search = GridSearchCV(
    estimator=clf,
    param_grid=param_grid,
    cv=5, #5-fold cross-validation
    scoring='accuracy', #Optimize for accuracy (this is what we used for the rest... in practice accuracy can be skewed by class imbalance)
    n_jobs = -1, #Use all CPU cores
    # verbose=1 #Display progress
)

grid_search.fit(X_train, y_train)

print(f'Best Params: {grid_search.best_params_}')
#Train final model with best params

best_model = grid_search.best_estimator_

Best Params: {'criterion': 'gini', 'max_depth': 10, 'max_features': 'log2', 'min_samples_leaf': 1, 'min_samples_split': 5}


### Evaluate Tuned Model

In [26]:
y_pred = best_model.predict(X_test)

accuracy_score(y_test, y_pred)

0.9666666666666667

### Further Evaluation

Accuracy is often a poor indicator of performance, especially if there is class imabalance

In [28]:
#No class imbalance here, but let's see the other evaluation methods anyway
data['spieces_type'].value_counts()

spieces_type
0    50
1    50
2    50
Name: count, dtype: int64