In [2]:
import pandas as pd
import numpy as np
import torch
import pickle

In [3]:
#Input Dataset
dataset = pd.read_csv('Dataset/HeartDisease.csv')
dataset = dataset.values
dataset_train = dataset[:46000, :]
dataset_test = dataset[46000:, :]
X_train = dataset_train[:, 1:]
Y_train = dataset_train[:, 0]

(56003, 17)

In [107]:
#Node class
class Node():
    def __init__(self, index = None, threshold = None, left = None, right = None, info_gain = None, value = None):
        self.index = index
        self.threshold = threshold
        self.left = left
        self.right = right
        self.info_gain = info_gain
        self.value = value

#Decision Tree class
class DecisionTree():
    def __init__(self, min_samples = None, max_depth = None):
        self.root = None
        self.min_samples = min_samples
        self.max_depth = max_depth
        self.pos = 0

    #Return: Node
    def build_tree(self, dataset, pos):

        X = dataset[:, 1:]
        Y = dataset[:, 0]

        num_sample, num_feature = np.shape(X)

        if num_sample > self.min_samples and pos < self.max_depth:
            best_split = self.get_best_split(dataset, num_sample, num_feature)
            if best_split["info_gain"] > 0:
                self.pos = self.pos + 1
                left_subtree = self.build_tree(best_split["left_data"], self.pos)
                right_subtree= self.build_tree(best_split["right_data"], self.pos)

                return Node(index = best_split["index"], threshold = best_split["threshold"], left = left_subtree, right = right_subtree,info_gain = best_split["info_gain"])

        value = self.leaf_value(Y)
        return Node(value = value)

    #Return a dictionary
    def get_best_split(self, dataset, num_sample, num_feature):

        best_split = {}
        max_info_gain = -999999999999
        for index in range (1, num_feature + 1):
            threshold_values = dataset[:, index]
            unique_values = np.unique(threshold_values)
            for value in unique_values:
                dataset_x, dataset_y = self.split(dataset, index, value)
                if len(dataset_x) > 0 and len(dataset_y) > 0:
                  info_gain = self.get_info_gain(dataset, dataset_x, dataset_y)
                  if info_gain > max_info_gain:
                      max_info_gain = info_gain
                      best_split["index"] = index
                      best_split["threshold"] = value
                      best_split["left_data"] = dataset_x
                      best_split["right_data"] = dataset_y
                      best_split["info_gain"] = info_gain
        return best_split

    def split(self, dataset, index, threshold):
        dataset_x = np.array([row for row in dataset if row[index] <= threshold])
        dataset_y = np.array([row for row in dataset if row[index] > threshold])
        return dataset_x, dataset_y

    def get_info_gain(self, dataset, dataset_x, dataset_y):
        weight_x = len(dataset_x)/len(dataset)
        weight_y = len(dataset_y)/len(dataset)
        return self.gini_index(dataset) - weight_x*self.gini_index(dataset_x) - weight_y*self.gini_index(dataset_y)

    def gini_index(self, dataset):
        data_label = dataset[:, 0]
        result = 0
        labels = np.unique(data_label)
        for label in labels:
            value = len(data_label[data_label == label ])/len(data_label)
            result += value**2

        return 1 - result

    def leaf_value(self, Y):
        Y = list(Y)
        return max(Y, key = Y.count)

    def fit(self, dataset):
        self.root = self.build_tree(dataset, self.pos)

    def make_predictions(self, X):
        return [self.predict(row) for row in X]

    def predict(self, X):
        cur_node = self.root
        while cur_node.value is None:
            if X[cur_node.index - 1] <= cur_node.threshold:
                cur_node = cur_node.left
            else:
                cur_node = cur_node.right
        return cur_node.value

    def print_tree(self, tree=None, indent=" "):

        if not tree:
            tree = self.root

        if tree.value is not None:
            print(tree.value)

        else:
            print("X_"+str(tree.index), "<=", tree.threshold, "?", tree.info_gain)
            print("%sleft:" % (indent), end="")
            self.print_tree(tree.left, indent + indent)
            print("%sright:" % (indent), end="")
            self.print_tree(tree.right, indent + indent)

In [108]:
#Adaboost class
class Adaboost():
    def __init__(self, nb_model):
        self.nb_model = nb_model
        self.stumps = []
        self.amount_of_says = []
    
    def fit(self, dataset):
        X = dataset[:, 1:]
        Y = dataset[:, 0]
        num_sample, num_feature = np.shape(dataset)
        prob_weight = torch.ones(num_sample)
        prob_weight = prob_weight/num_sample
        prob_weight = prob_weight.numpy()
        count = 0
        sample_info = [[w, 0] for w in prob_weight]
        weight = None
        for i in range(self.nb_model):
            if count == 0:
                training_data = dataset
            else:
                training_data = self.create_new_dataset(dataset, weight, num_sample)
            stump = DecisionTree(min_samples=13, max_depth=3)
            stump.fit(training_data)
            predictions = np.array(stump.make_predictions(X))
            amount_of_say, result = self.calculate_aos(Y, predictions, num_sample)
            self.stumps.append(stump)
            self.amount_of_says.append(amount_of_say)
            #Update result of sample testing
            for j in range(len(sample_info)):
                sample_info[j][1] = result[j]
            #Update original weight
            for index in range(len(result)):
                if sample_info[index][1] == 0:
                    sample_info[index][0] = self.update_weight(sample_info[index][0], amount_of_say, False)
                else:
                    sample_info[index][0] = self.update_weight(sample_info[index][0], amount_of_say, True)
            weight = self.get_weight(sample_info)
            for m in range(num_sample):
                sample_info[m][0] = sample_info[m][0]/np.sum(weight)
            weight = self.get_weight(sample_info)
            count += 1

    def get_weight(self, info):
        result = []
        for i in range(len(info)):
            result.append(info[i][0])
        return np.array(result)

    def calculate_aos(self, Y, predictions, num_sample):
        result = predictions == Y
        result = result.astype(int)
        error = min(1 - (np.sum(result)/num_sample), (np.sum(result)/num_sample))
        #error = 1 - (np.sum(result)/num_sample)
        amount_of_say = np.log((1-error)/error)/2
        return amount_of_say, result
    
    def update_weight(self, weight, amount_of_say, state_of_prediction):
        if state_of_prediction == False:
            return weight*np.exp(amount_of_say)
        else:
            return weight*np.exp(-amount_of_say)
        
    def create_new_dataset(self, dataset, weight, num_sample):
        data_index = []
        for count in range(num_sample):
            random = np.random.rand()
            index = 0
            a = weight[0]
            while a < random:
                index += 1
                a += weight[index]
            data_index.append(index)
        new_dataset = np.stack([dataset[ind] for ind in data_index])
        return new_dataset
    
    def make_prediction(self, X):
        return [self.ada_predict(row) for row in X]

    def ada_predict(self, X):
        aof_true = 0
        aof_false = 0
        for ind in range(self.nb_model):
            if self.stumps[ind].predict(X) == 1:
                aof_true += self.amount_of_says[ind]
            else:
                aof_false += self.amount_of_says[ind]
        return aof_true > aof_false

In [115]:
#Model fitting/loading
try:
    model = pickle.load(open('adaboost_model.pkl', 'rb'))
except:
    model = Adaboost(30)
    model.fit(dataset_train)
    filename = 'adaboost_model.pkl'
    pickle.dump(model, open(filename, 'wb'))

In [116]:
#Model Testing
X = dataset_test[:, 1:]
Y = dataset_test[:, 0]
prediction = model.make_prediction(X)
bool_result = prediction == Y
bool_result = bool_result.astype(int)
num_correct = np.sum(bool_result)
print(f"Accuracy for testing {num_correct/np.shape(Y)[0]*100}")

Accuracy for testing 0.9341197640707788
