In [2]:
import numpy as np
import matplotlib.pyplot as plt
import sklearn.datasets as dt

In [3]:
class Utilities():

    def entropy_calc(self, data):

        # returns entropy scaler value #
        # calculates the entropy of target var in a dataset #

        entropy = 0
        for target_val in np.unique(data[:, -1]):
            prob = len(data[data[:, -1] == target_val]) / len(data[:, -1])
            log_prob = np.log2(prob)
            entropy -= (prob * log_prob)

        return entropy, len(data[:, -1])


    def information_gain_calc(self, split_val, feature_data):

        # returns the information gain #
        # calculates the information gain of a split in data #
        joined_data = np.concatenate(   (np.reshape(feature_data[0], (feature_data[0].shape[0], 1)), 
                                        np.reshape(feature_data[1], (feature_data[0].shape[0], 1))), 
                                        axis=1
                                    )
        original_entropy, _ = self.entropy_calc(joined_data)

        upper_split_entropy, upper_len = self.entropy_calc(joined_data[joined_data[:, 0] >= split_val])
        lower_split_entropy, lower_len = self.entropy_calc(joined_data[joined_data[:, 0] < split_val])
        lower_dist, upper_dist = (lower_len / (upper_len + lower_len)), (upper_len / (upper_len + lower_len))
        distributed_entropy = (upper_dist * upper_split_entropy) + (lower_dist * lower_split_entropy)

        return original_entropy - distributed_entropy


    def partitioner(self, data, num_partitions, feature_num):

        # partitions continuous data using information gain #
        # returns best split in continuous data for decision tree as well as new split data #
        feature_data = (data[0][:, feature_num], data[1])
        test_splits = np.arange(min(feature_data[0]),
                                max(feature_data[0]),
                                num_partitions + 1
                                )[1:]

        best_split = 0
        best_information_gain = 0
        for split_val in test_splits:
            information_gain = self.information_gain_calc(split_val, feature_data)
            if information_gain > best_information_gain:
                best_information_gain = information_gain
                best_split = split_val

        joined_data = np.concatenate((data[0], np.reshape(data[1], (data[0].shape[0], 1))), axis=1)
        joined_upper_split = joined_data[joined_data[:, feature_num] >= best_split]
        joined_lower_split = joined_data[joined_data[:, feature_num] < best_split]
        upper_split_data = (joined_upper_split[:, 0:-1], joined_upper_split[:, -1])
        lower_split_data = (joined_lower_split[:, 0:-1], joined_lower_split[:, -1])
        
        return best_split, best_information_gain, upper_split_data, lower_split_data


    def bagger(bag_size, num_bags, num_features, data):

        # uses bagging and subspace sampling to create mini datasets for each tree #
        # returns a dictionary of bags of data with information about features selected #

        data_dict = {}

        for i in range(num_bags):
            unique = False
            while unique == False:
                rand_features = np.random.randint(0, data[0][0].shape[1], num_features)
                if len(np.unique(rand_features)) == len(rand_features):
                    unique = True
            rand_sample = np.random.randint(0, data[0][0].shape[0], bag_size)
            cropped_data = data[0][0][rand_sample, :]
            cropped_data_2 = cropped_data[:, rand_features]
            X_data = data[0][0][rand_sample, :]
            X_data = X_data[:, rand_features]
            y_data = data[0][1][rand_sample]
            data_dict[f'bag_{i}'] = {}
            data_dict[f'bag_{i}']['data'] = (X_data, y_data)
            data_dict[f'bag_{i}']['samples'] = rand_sample
            data_dict[f'bag_{i}']['features'] = rand_features

        return data_dict


    def build_node_data_storage(self, max_tree_depth):

        # builds a tree to store data at each node #
        # returns a dictionary to store split data at nodes #

        data_storage_tree = {}
        for i in range(max_tree_depth):
            data_storage_tree[f'level_{i}'] = {}
            for j in range(0, 2 ** i):
                data_storage_tree[f'level_{i}'][f'node_{j}'] = np.array([])
        
        return data_storage_tree

    
    def train_test_val_split(split_vals, data):

        # splits data into training, testing and validation sets #
        # returns split data #

        np.random.seed(42)
        indices = np.arange(data[0].shape[0])
        np.random.shuffle(indices)
        train_data = (  data[0][indices[0:int(np.floor(indices.shape[0] * split_vals[0]))]],
                        data[1][indices[0:int(np.floor(indices.shape[0] * split_vals[0]))]]
                    )
        test_data = (   data[0][indices[int(np.ceil(indices.shape[0] * split_vals[0])): 
                                        int(np.floor(indices.shape[0] * (split_vals[0] + split_vals[1])))]],
                        data[1][indices[int(np.ceil(indices.shape[0] * split_vals[0])): 
                                        int(np.floor(indices.shape[0] * (split_vals[0] + split_vals[1])))]]
                    )
        val_data = (    data[0][indices[int(np.ceil(indices.shape[0] * (split_vals[0] + split_vals[1]))):]],
                        data[1][indices[int(np.ceil(indices.shape[0] * (split_vals[0] + split_vals[1]))):]]
                    )
        return (train_data, test_data, val_data)
    

    def entropy_check(forest):

        # checks drop in entropy down tree levels #
        # prints graph showing entropy drop in trees #

        plt.figure(figsize=(15, 8))
        plt.title('Average entropy drop in tree layers within forest')
        plt.grid()

        colours = ['r', 'g', 'b', 'c', 'm', 'k', 'y']

        for tree in forest:
            level_count = 0
            entropy_list = []
            for level in forest[tree]:
                level_count += 1
                total_entropy = 0
                node_count = 0
                for node in forest[tree][level]:
                    if len(forest[tree][level][node]) != 0:
                        total_entropy += forest[tree][level][node]['entropy']
                        node_count += 1
                entropy_list.append(total_entropy / node_count)
            plt.plot(list(range(level_count)), entropy_list, c=colours[np.random.randint(0, len(colours))])
        
        plt.xlabel('Tree levels')
        plt.ylabel('Entropy')
        plt.show()


    def best_feature_check(forest):

        # checks which features have the best information gain / most useful #
        # prints graph showing information gain of each feature #

        pass


    def leaf_decision_choice(self, data):

        # determines whether leaf node will predict 0 or 1 #
        # returns a 0 or 1 #

        decision = 1
        if data[data[:, -1] == 0].shape[0] >= data[data[:, -1] == 1].shape[0]:
            decision = 0
        
        return decision
    

    def accuracy(y, y_bar):
        
        # calculates accuracy #
        # returns accuracy of model on labels input #

        difference = abs(y - y_bar)
        return 100 * (len(difference[difference == 0]) / len(y))
    

    def grid_size_calc(param_dict):

        # calculates number of tests to perform #
        # returns integer of number of tests to run #
        total_tests = 1
        for param in param_dict:
            total_tests *= len(param_dict[param])
        
        return total_tests
    
    def hyperparam_analysis(hyperparam_choices, param_dict, performance_dict):

        # plot graph of two chosen hyperparameters against model accuracy #
        fig = plt.figure(figsize=(15, 15))
        ax = plt.axes(projection='3d')
        accuracy_array = np.zeros((len(param_dict[hyperparam_choices[0]]),
                                len(param_dict[hyperparam_choices[1]])))
        param_1_array = np.zeros((len(param_dict[hyperparam_choices[0]]),
                                len(param_dict[hyperparam_choices[1]])))
        param_2_array = np.zeros((len(param_dict[hyperparam_choices[0]]),
                                len(param_dict[hyperparam_choices[1]])))
        x_counter = 0
        y_counter = 0
        for test in performance_dict:
            accuracy_array[x_counter, y_counter] = performance_dict[test]['accuracy']
            param_1_array[x_counter, y_counter] = performance_dict[test][hyperparam_choices[0]]
            param_2_array[x_counter, y_counter] = performance_dict[test][hyperparam_choices[1]]
            x_counter += 1
            if x_counter == len(param_dict[hyperparam_choices[0]]):
                x_counter = 0
                y_counter += 1
        ax.plot_surface(param_1_array, 
                        param_2_array,
                        accuracy_array, 
                        rstride=1, 
                        cstride=1, 
                        cmap='viridis', 
                        edgecolor='none')

In [4]:
class RandomForest(Utilities):
    def __init__(self, num_trees, max_tree_depth, min_entropy, num_partitions):
        super(RandomForest, self).__init__()
        self.num_trees = num_trees
        self.max_tree_depth = max_tree_depth
        self.min_entropy = min_entropy
        self.num_partitions = num_partitions
    

    def build_forest(self):
        
        # builds trees ready for fitting #
        # returns a dictionary of trees ready for use #

        forest = {}
        for i in range(self.num_trees):
            forest[f'tree_{i}'] = {}
            for j in range(self.max_tree_depth):
                forest[f'tree_{i}'][f'level_{j}'] = {}
                for k in range(0, 2 ** j):
                    forest[f'tree_{i}'][f'level_{j}'][f'node_{k}'] = {}
        
        return forest
    

    def fit(self, bagged_data, forest):

        # fits forest to training data #
        # returns fit forest #

        for i, bag in enumerate(bagged_data):
            node_data_dict = self.build_node_data_storage(self.max_tree_depth)
            node_data_dict['level_0']['node_0'] = bagged_data[bag]['data']
            for j in range(self.max_tree_depth):
                for k in range(0, 2 ** j):
                    node_data = node_data_dict[f'level_{j}'][f'node_{k}']
                    # check if any data at node
                    if len(node_data) != 0:
                        joined_node_data = np.concatenate(( node_data[0], 
                                                            np.reshape(node_data[1], (node_data[0].shape[0], 1))), 
                                                            axis=1)
                        # if entropy at node is zero then store info but don't create new nodes #
                        if self.entropy_calc(joined_node_data)[0] == 0:
                            forest[f'tree_{i}'][f'level_{j}'][f'node_{k}']['type'] = 'leaf'
                            forest[f'tree_{i}'][f'level_{j}'][f'node_{k}']['decision'] = self.leaf_decision_choice(joined_node_data)
                            forest[f'tree_{i}'][f'level_{j}'][f'node_{k}']['entropy'] = self.entropy_calc(joined_node_data)[0]
                        
                        # if not at max depth and not 0 entropy then do create new nodes #
                        elif (self.entropy_calc(joined_node_data)[0] != 0) and ((j + 1) < self.max_tree_depth):
                            forest[f'tree_{i}'][f'level_{j}'][f'node_{k}']['type'] = 'node'
                            forest[f'tree_{i}'][f'level_{j}'][f'node_{k}']['entropy'] = self.entropy_calc(joined_node_data)[0]
                            best_information_gain = 0
                            for indice, feature_num in enumerate(bagged_data[bag]['features']):
                                split, information_gain, upper_split_data, lower_split_data = self.partitioner( node_data, 
                                                                                                                self.num_partitions, 
                                                                                                                indice
                                                                                                                )
                                if information_gain >= best_information_gain:
                                    best_information_gain = information_gain
                                    forest[f'tree_{i}'][f'level_{j}'][f'node_{k}']['feature'] = feature_num
                                    forest[f'tree_{i}'][f'level_{j}'][f'node_{k}']['split'] = split
                                    forest[f'tree_{i}'][f'level_{j}'][f'node_{k}']['info_gain'] = best_information_gain
                                    node_data_dict[f'level_{j + 1}'][f'node_{2 * k}'] = lower_split_data
                                    node_data_dict[f'level_{j + 1}'][f'node_{(2 * k) + 1}'] = upper_split_data
                        
                        # if at max depth then store info at node but don't create new nodes #
                        else:
                            forest[f'tree_{i}'][f'level_{j}'][f'node_{k}']['type'] = 'leaf'
                            forest[f'tree_{i}'][f'level_{j}'][f'node_{k}']['decision'] = self.leaf_decision_choice(joined_node_data)
                            forest[f'tree_{i}'][f'level_{j}'][f'node_{k}']['entropy'] = self.entropy_calc(joined_node_data)[0]
                            best_information_gain = 0
                            for indice, feature_num in enumerate(bagged_data[bag]['features']):
                                split, information_gain, upper_split_data, lower_split_data = self.partitioner( node_data, 
                                                                                                                self.num_partitions, 
                                                                                                                indice
                                                                                                                )
                                if information_gain >= best_information_gain:
                                    best_information_gain = information_gain
                                    forest[f'tree_{i}'][f'level_{j}'][f'node_{k}']['feature'] = feature_num
                                    forest[f'tree_{i}'][f'level_{j}'][f'node_{k}']['split'] = split
                                    forest[f'tree_{i}'][f'level_{j}'][f'node_{k}']['info_gain'] = best_information_gain

        return forest


    def predict(self, test_data, forest):

        # uses trained forest to make predictions on test data #
        # returns predictions #

        predictions = np.zeros((test_data[0].shape[0]))
        for i in range(test_data[0].shape[0]):
            data_point = test_data[0][i, :]
            for tree in forest:
                current_tree = forest[tree]
                current_node_number = 0
                for j, level in enumerate(current_tree):
                    node = current_tree[level][f'node_{current_node_number}']
                    if node['type'] == 'node':
                        if data_point[node['feature']] >= node['split']:
                            current_node_number = (2 * current_node_number) + 1
                        else:
                            current_node_number = 2 * current_node_number
                    else:
                        predictions[i] = node['decision']
                        break
        
        return predictions

In [7]:
def main():

  # make data
  data = dt.make_classification(
                                  n_samples = 20000,
                                  n_features = 10,
                                  n_informative = 8,
                                  n_redundant = 2,
                                  n_repeated = 0,
                                  n_classes = 2,
                                  n_clusters_per_class = 3
                              )
  splitVals = (0.7, 0.2, 0.1)
  split_data = Utilities.train_test_val_split(splitVals, data)

  # tune hyperparameters
  param_grid = {  'bag_size'          : [500, 750, 1000],
                  'num_trees'         : [30, 40, 50],
                  'num_features'      : [5, 6, 7],
                  'max_tree_depth'    : [10, 12, 14],
                  'num_partitions'    : [3]
              }

  total_tests = Utilities.grid_size_calc(param_grid)
  print(f'... num tests to complete : {total_tests} ... ')
  print('')

  performance_dict = {}
  test_count = 0
  best_accuracy = 0
  for bag_size in param_grid['bag_size']:
      for num_trees in param_grid['num_trees']:
          for num_features in param_grid['num_features']:
              for max_tree_depth in param_grid['max_tree_depth']:
                  for num_partitions in param_grid['num_partitions']:
                      performance_dict[f'test_{test_count}'] = {  'bag_size'          : bag_size,
                                                                  'num_trees'         : num_trees,
                                                                  'num_features'      : num_features,
                                                                  'max_tree_depth'    : max_tree_depth,
                                                                  'num_partitions'    : num_partitions
                                                                  }
                      num_bags = num_trees
                      bagged_data = Utilities.bagger(bag_size, num_bags, num_features, split_data)
                      rfc = RandomForest( num_trees=num_trees, 
                                          max_tree_depth=max_tree_depth, 
                                          min_entropy=0, 
                                          num_partitions=num_partitions)
                      forest = rfc.build_forest()
                      forest = rfc.fit(bagged_data=bagged_data, forest=forest)
                      val_predictions = rfc.predict(split_data[2], forest)
                      val_accuracy = Utilities.accuracy(split_data[2][1], val_predictions)
                      performance_dict[f'test_{test_count}']['accuracy'] = val_accuracy
                      if val_accuracy >= best_accuracy:
                          best_accuracy = val_accuracy
                          best_classifier = rfc
                          best_forest = forest
                          best_test = test_count
                      test_count += 1
                      if test_count % 10 == 0:
                          print(f'tests complete : {round(100 * (test_count / total_tests), 2)}% : best acc : {best_accuracy}%')
  print('')
  print('... testing complete ... ')
  print('')
  print('Best tree parameters:')
  print('')
  print(performance_dict[f'test_{best_test}'])

  # performance on test set
  test_predictions = best_classifier.predict(split_data[1], best_forest)
  test_accuracy = Utilities.accuracy(split_data[1][1], test_predictions)
  print(f'accuracy of best forest on test set : {round(test_accuracy, 2)}%')


if __name__ == '__main__':
  main()

... num tests to complete : 81 ... 

tests complete : 12.35% : best acc : 77.8%
tests complete : 24.69% : best acc : 78.0%
tests complete : 37.04% : best acc : 78.0%
tests complete : 49.38% : best acc : 79.2%
tests complete : 61.73% : best acc : 79.5%
tests complete : 74.07% : best acc : 79.9%
tests complete : 86.42% : best acc : 79.9%
tests complete : 98.77% : best acc : 81.2%

... testing complete ... 

Best tree parameters:

{'bag_size': 1000, 'num_trees': 50, 'num_features': 7, 'max_tree_depth': 12, 'num_partitions': 3, 'accuracy': 81.2}
accuracy of best forest on test set : 81.45%
