In [5]:
import math
import pandas as pd
import queue
import random

In [6]:
random.seed(200)

mushroom = pd.read_csv('data/agaricus-lepiota.data')

# check for missing values that are ? in the dataset
missing_values = mushroom.isin(['?']).sum()
for column in list(mushroom.loc[:,]):
    # print(mushroom[column].value_counts())
    pass

# replace values with most common in that column
mushroom = mushroom.replace('?', 'b')
# print(mushroom)

# using 30% of data points as a training dataset
mushroom_testing = mushroom.sample(frac=0.3, random_state=200)
print(mushroom_testing)
mushroom_training = mushroom.drop(mushroom_testing.index)
print(mushroom_training)
mushroom_testing = mushroom_testing.reset_index(drop=True)
mushroom_training = mushroom_training.reset_index(drop=True)

mushroom = mushroom_training
print(len(mushroom))
print(mushroom)
features = mushroom.columns[1:]
print(features)

variable_info = {}

for feature in features:
    variable_info[feature] = mushroom[feature].unique()

print(variable_info)

     class  cap-shape  cap-surface  cap-color  bruises  odor  gill-attachment  \
1899     e          x            s          g        f     n                f   
802      p          x            s          n        t     p                f   
1831     e          x            f          g        t     n                f   
2084     e          f            f          g        f     n                f   
582      e          x            y          w        t     l                f   
...    ...        ...          ...        ...      ...   ...              ...   
3417     p          x            f          g        f     f                f   
6253     p          x            y          e        f     s                f   
5727     e          f            f          n        f     n                f   
2752     e          x            y          n        t     n                f   
7899     p          k            s          e        f     y                f   

      gill-spacing  gill-si

In [7]:
# entropy calculation
def entropy(num_pos, num_neg):
    if num_pos == 0 or num_neg == 0:
        return 0
    p = num_pos / (num_pos + num_neg)
    n = num_neg / (num_pos + num_neg)
    return -(p * math.log2(p) + n * math.log2(n))

# printing binary trees, with child node names no and yes
def printTree(node, level=0):
    if node != None:
        printTree(node.yes, level+1)
        print('     '* 4 * level + '-> ' + str(node))
        printTree(node.no, level+1)

# printing non-binary trees
def printNonBinaryTree(node, level=0, split_type=None):
    if node != None:
        keys = list(node.split.keys())
        num_of_values = len(keys)
        for i in range(int(num_of_values/2)):
            printNonBinaryTree(node.split[keys[i]], level+1, keys[i])
        if split_type == None:
            print('     '* 4 * level + '-> ' + str(node))
        else:
            print('     '* 4 * level + str(split_type) + ' -> ' + str(node))
        for i in range(int(num_of_values - (num_of_values/2))):
            printNonBinaryTree(node.split[keys[i+int(num_of_values/2)]], level+1, keys[i+int(num_of_values/2)])


In [10]:
# For non-binary tree approach

class Decision_tree_node():
    def __init__(self, feature):
        self.feature = feature
        self.split = {}

    def add(self, split_name, node):
        self.split[split_name] = node

    def __str__(self):
        return f"{self.feature}"

class Dataset_targets():

    def __init__(self, target_feature, feature_positive, feature_negative):
        self.target_feature = target_feature
        self.feature_positive = feature_positive
        self.feature_negative = feature_negative
    
    def load(self):
        return self.target_feature, self.feature_positive, self.feature_negative

class DecisionTree():

    def __init__(self, training_data, validation_data, targets=None, predict_on_max_depth=False):
        self.root = None
        # If split is not set just use dataset as training and testing
        self.training_data = training_data
        self.validation_data = validation_data
        self.data_features = training_data.columns[1:]
        self.feature_values = {}
        for feature in self.data_features:
            self.feature_values[feature] = self.training_data[feature].unique()
        
        self.target_feature, self.feat_is_positive, self.feat_is_negative = targets.load()
        self.predict_on_max_depth = predict_on_max_depth

    @staticmethod
    def information_gain(entropy_prev, expected_entropy):
        return entropy_prev - expected_entropy

    @staticmethod
    def expected_entropy(num_in_feature, num_pos_in_feature, num_neg_in_feature, num_not_in_feature, num_pos_not_in_feature, num_neg_not_in_feature, total):
        return (num_in_feature/total) * entropy(num_pos_in_feature, num_neg_in_feature) + (num_not_in_feature/total) * entropy(num_pos_not_in_feature, num_neg_not_in_feature)

    
    def train(self, max_depth=10, do_output=False):
        if do_output:
            print(f"Training Decision Tree to depth of {max_depth} -->")
        remaining_features = list(self.data_features)
        Queue = queue.Queue()

        split_type = 'root'
        self.root = Decision_tree_node(None)
        current_node = None
        depth = 0

        Queue.put((self.training_data, split_type, None, depth))

        while Queue.empty() == False:
            data, split_type, current_node, depth = Queue.get()
            if do_output:
                print(f'depth: {depth}', end= f":  {split_type} -> ")
            if len(data) == 0 or len(remaining_features) == 0:
                if do_output:
                    print(" everything classified")
                continue
            max_gain = 0
            best_feature = None
            total = len(data)
            total_pos = len(data[data[self.target_feature] == self.feat_is_positive])
            total_neg = len(data[data[self.target_feature] == self.feat_is_negative])

            entropy_prev = entropy(total_pos, total_neg)
            
            if self.predict_on_max_depth:
                if depth > max_depth-1:
                    if total_pos > total_neg:
                        current_node.add(split_type, Decision_tree_node(self.feat_is_positive))
                    else:
                        current_node.add(split_type, Decision_tree_node(self.feat_is_negative))
                    if do_output:
                        print(" reached desired depth")
                    continue
            else:
                if depth > max_depth:
                    if do_output:
                        print(" reached desired depth")
                    continue
            
            if entropy_prev == 0:
                if total_pos > total_neg:
                    current_node.add(split_type, Decision_tree_node(self.feat_is_positive))
                else:
                    current_node.add(split_type, Decision_tree_node(self.feat_is_negative))
                if do_output:
                    print(' Entropy is 0')
                continue

            for feature in remaining_features:
                for value in self.feature_values[feature]:

                    num_in_feature = len(data[data[feature] == value])
                    num_not_in_feature = len(data[data[feature] != value])
                    num_pos_in_feature = len(data[(data[feature] == value) & (data[self.target_feature] == self.feat_is_positive)])
                    num_neg_in_feature = len(data[(data[feature] == value) & (data[self.target_feature] == self.feat_is_negative)])
                    num_pos_not_in_feature = len(data[(data[feature] != value) & (data[self.target_feature] == self.feat_is_positive)])
                    num_neg_not_in_feature = len(data[(data[feature] != value) & (data[self.target_feature] == self.feat_is_negative)])

                    expected_ent = DecisionTreeBinary.expected_entropy(num_in_feature, num_pos_in_feature, num_neg_in_feature, num_not_in_feature, num_pos_not_in_feature, num_neg_not_in_feature, total)

                    gain = DecisionTreeBinary.information_gain(entropy_prev, expected_ent)

                    if gain > max_gain:
                        max_gain = gain
                        best_feature = feature

            if split_type == 'root':
                self.root = Decision_tree_node(best_feature)
                current_node = self.root
            else:
                current_node.add(split_type, Decision_tree_node(best_feature))
                current_node = current_node.split[split_type]
            
            for value in self.feature_values[best_feature]:
                Queue.put(((data[data[best_feature] == value]), value, current_node, depth+1))

            if do_output:
                print(best_feature, max_gain)
            remaining_features.remove(best_feature)

        if do_output:
            printNonBinaryTree(self.root)

    def printTree(self):
        printNonBinaryTree(self.root)

    def classify_item(self, item):
        current_node = self.root
        while current_node != None:
            feature = current_node.feature
            value = item[feature]
            if value not in list(current_node.split.keys()):
                break
            current_node = current_node.split[value]

            if current_node.feature == self.feat_is_positive:
                return self.feat_is_positive
            elif current_node.feature == self.feat_is_negative:
                return self.feat_is_negative

        # if the tree couldn't identify the item randomly select positive or negative
        # rand_num = random.randint(0,1)
        # if rand_num == 0:
        #     return self.feat_is_negative
        # else:
        #     return self.feat_is_positive

        # if the tree couldn't identify the item return 'does not know' 
        return 'dnk'

    def training_accuracy_at_depth(self, depth):
        self.train(max_depth=depth)
        total = len(self.training_data)
        correct=0
        training_test_answers = self.training_data.loc[:, [self.target_feature]]
        predictions=[]
        train_data_test = self.training_data.drop(self.target_feature, axis=1)
        for i in range(len(train_data_test)):
            item = train_data_test.loc[i]
            predictions.append(self.classify_item(item))
        for i in range(total):
            if training_test_answers.iloc[i,0] == predictions[i]:
                correct +=1
        return correct/total

    def validate(self):
        predictions=[]
        validation_data = self.validation_data.drop(self.target_feature, axis=1)
        for i in range(len(validation_data)):
            item = validation_data.loc[i]
            predictions.append(self.classify_item(item))
        return predictions

    def validation_accuracy(self):
        test_answers = self.validation_data.loc[:, [self.target_feature]]
        total = len(test_answers)
        correct = 0
        test_predictions = self.validate()
        for i in range(total):
            if test_answers.iloc[i,0] == test_predictions[i]:
                correct += 1
        return correct, total

    def validation_accuracy_at_depth(self, depth, do_output=False):
        self.train(max_depth=depth)
        correct, total = self.validation_accuracy()
        if do_output:
            print(f'Validation accuracy: {correct}/{total} -> {(correct/total)*100:.2f}%\n')
        return correct/total
    
    def testing_accuracy_at_depth(self, depth, testing_data):
        self.train(max_depth=depth)
        total = len(testing_data)
        correct=0
        test_answers = testing_data.loc[:, [self.target_feature]]
        predictions=[]
        testing_data = testing_data.drop(self.target_feature, axis=1)
        for i in range(len(testing_data)):
            item = testing_data.loc[i]
            predictions.append(self.classify_item(item))
        for i in range(total):
            if test_answers.iloc[i,0] == predictions[i]:
                correct +=1
        return correct/total

mushroom_targets = Dataset_targets(target_feature='class', feature_positive='e', feature_negative='p')

dt = DecisionTree(mushroom_training, mushroom, mushroom_targets, predict_on_max_depth=True)
dt.train(10, True)
print(dt.testing_accuracy_at_depth(10, mushroom_testing))

dt = DecisionTree(mushroom_training, mushroom, mushroom_targets, predict_on_max_depth=True)
dt.train(3, True)
print(dt.testing_accuracy_at_depth(3, mushroom_testing))

dt = DecisionTree(mushroom_training, mushroom, mushroom_targets)
dt.train(3, True)
print(dt.testing_accuracy_at_depth(3, mushroom_testing))

Training Decision Tree to depth of 10 -->
depth: 0:  root ->  odor 0.5218384683610888
depth: 1:  p ->  Entropy is 0
depth: 1:  a ->  Entropy is 0
depth: 1:  l ->  Entropy is 0
depth: 1:  n ->  spore-print-color 0.11827797504729963
depth: 1:  f ->  Entropy is 0
depth: 1:  c ->  Entropy is 0
depth: 1:  y ->  Entropy is 0
depth: 1:  s ->  Entropy is 0
depth: 1:  m ->  Entropy is 0
depth: 2:  k ->  Entropy is 0
depth: 2:  n ->  Entropy is 0
depth: 2:  u ->  everything classified
depth: 2:  h ->  Entropy is 0
depth: 2:  w ->  gill-size 0.2504210402281517
depth: 2:  r ->  Entropy is 0
depth: 2:  o ->  Entropy is 0
depth: 2:  y ->  Entropy is 0
depth: 2:  b ->  Entropy is 0
depth: 3:  n ->  stalk-surface-below-ring 0.6509308906150979
depth: 3:  b ->  Entropy is 0
depth: 4:  s ->  cap-color 0.828055725379504
depth: 4:  y ->  Entropy is 0
depth: 4:  f ->  Entropy is 0
depth: 4:  k ->  everything classified
depth: 5:  n ->  Entropy is 0
depth: 5:  y ->  everything classified
depth: 5:  w ->  Ent

In [11]:
# Validation and testing -->
def single_split(data_set, data_targets, test_data, training_validation_split=0.5, tree_depth=1, predict_on_max_depth=False):
    training_data = data_set.sample(frac=training_validation_split, random_state=200)
    validation_data = data_set.drop(training_data.index)
    training_data = training_data.reset_index(drop=True)
    validation_data = validation_data.reset_index(drop=True)
    print("\n-----------------------------------------------")
    print(f"Decision tree with {int(training_validation_split*100)}/{int((1-training_validation_split)*100)} split at depth {tree_depth}")
    print("-----------------------------------------------")
    dt = DecisionTree(training_data=training_data, validation_data=validation_data, targets = data_targets, predict_on_max_depth=predict_on_max_depth)
    print(f'Validation Accuracy --> {dt.validation_accuracy_at_depth(tree_depth)}')
    print(f'Testing Accuracy --> {dt.testing_accuracy_at_depth(tree_depth, test_data)}')
    print("-----------------------------------------------\n")

def cross_validation(data_set, data_targets, test_data, folds=2, tree_depth=1, predict_on_max_depth=False):
    fold_frac = 1/folds
    data_folds = []
    validation_accuracies = []
    test_accuracies = []
    all_data = data_set
    rest_of_data = data_set
    for i in range(folds):
        if i == folds-1:
            data_folds.append(rest_of_data)
        else:
            data_fold = rest_of_data.sample(frac=fold_frac, random_state=200)
            rest_of_data = rest_of_data.drop(data_fold.index)
            data_folds.append(data_fold)
    print("\n-----------------------------------------------")
    print(f"Decision trees using {folds}-fold cross-validation at depth {tree_depth}")
    print("-----------------------------------------------")
    for i in range(folds):
        validation_data = data_folds[i]
        training_data = all_data.drop(validation_data.index)
        training_data = training_data.reset_index(drop=True)
        validation_data = validation_data.reset_index(drop=True)

        dt = DecisionTree(training_data=training_data, validation_data=validation_data, targets = data_targets, predict_on_max_depth=predict_on_max_depth)
        validation_accuracies.append(dt.validation_accuracy_at_depth(tree_depth))
        test_accuracies.append(dt.testing_accuracy_at_depth(tree_depth, test_data))
        print(f'Fold {i+1}: Validation Accuracy --> {validation_accuracies[i]}')
        print(f'                Test Accuracy --> {test_accuracies[i]}')
    print(f'Overall Validation Accuracy = {sum(validation_accuracies)/folds}')
    print(f'Overall Testing Accuracy = {sum(test_accuracies)/folds}')
    print("-----------------------------------------------\n")

def leave_one_out(data_set, data_targets, tree_depth=1, predict_on_max_depth=False):
    folds = len(data_set)-1
    cross_validation(data_set=data_set, data_targets= data_targets, folds=folds, tree_depth=tree_depth, predict_on_max_depth=predict_on_max_depth)

mushroom_targets = Dataset_targets(target_feature='class', feature_positive='e', feature_negative='p')

# Tree depth 1
single_split(mushroom_training, mushroom_targets, mushroom_testing, training_validation_split=0.7, tree_depth=1)
cross_validation(mushroom_training, mushroom_targets, mushroom_testing)
cross_validation(mushroom_training, mushroom_targets, mushroom_testing, folds=10)

# Tree depth 2
single_split(mushroom_training, mushroom_targets, mushroom_testing, training_validation_split=0.7, tree_depth=2)
cross_validation(mushroom_training, mushroom_targets, mushroom_testing, tree_depth=2)
cross_validation(mushroom_training, mushroom_targets, mushroom_testing, folds=10, tree_depth=2)

# Tree depth 3
single_split(mushroom_training, mushroom_targets, mushroom_testing, training_validation_split=0.7, tree_depth=3)
cross_validation(mushroom_training, mushroom_targets, mushroom_testing, tree_depth=3)
cross_validation(mushroom_training, mushroom_targets, mushroom_testing, folds=10, tree_depth=3)

# Tree depth 4
single_split(mushroom_training, mushroom_targets, mushroom_testing, training_validation_split=0.7, tree_depth=4)
cross_validation(mushroom_training, mushroom_targets, mushroom_testing, tree_depth=4)
cross_validation(mushroom_training, mushroom_targets, mushroom_testing, folds=10, tree_depth=4)

# Tree depth 4
single_split(mushroom_training, mushroom_targets, mushroom_testing, training_validation_split=0.7, tree_depth=4)
cross_validation(mushroom_training, mushroom_targets, mushroom_testing, tree_depth=4)
cross_validation(mushroom_training, mushroom_targets, mushroom_testing, folds=10, tree_depth=4)

# Tree depth 5
single_split(mushroom_training, mushroom_targets, mushroom_testing, training_validation_split=0.7, tree_depth=5)
cross_validation(mushroom_training, mushroom_targets, mushroom_testing, tree_depth=5)
cross_validation(mushroom_training, mushroom_targets, mushroom_testing, folds=10, tree_depth=5)

# Leave one out takes way too long for the entire dataset
# leave_one_out(mushroom, mushroom_targets)


-----------------------------------------------
Decision tree with 70/30 split at depth 1
-----------------------------------------------
Validation Accuracy --> 0.5668229777256741
Testing Accuracy --> 0.5662700041034058
-----------------------------------------------


-----------------------------------------------
Decision trees using 2-fold cross-validation at depth 1
-----------------------------------------------
Fold 1: Validation Accuracy --> 0.5562587904360057
                Test Accuracy --> 0.5662700041034058
Fold 2: Validation Accuracy --> 0.5747449876890609
                Test Accuracy --> 0.5662700041034058
Overall Validation Accuracy = 0.5655018890625333
Overall Testing Accuracy = 0.5662700041034058
-----------------------------------------------


-----------------------------------------------
Decision trees using 10-fold cross-validation at depth 1
-----------------------------------------------
Fold 1: Validation Accuracy --> 0.5553602811950791
                Tes