In [44]:
import math
from heapq import nsmallest
import copy
from scipy.stats import mode
from random import choice

%run DataPreProcessing.ipynb

# Classification Decision Tree

In [46]:
def CalculateFeatureEntropy(dataset):
    '''
    Calculates the total entropy of a dataset
    
    @param dataset: classification target set
    '''
    #Gets
    values = dataset.unique()
    entropy = 0
    for value in values:
        count = len(dataset[dataset == value])
        p = count / len(dataset)
        entropy += -p * math.log(p, 2)
    return entropy

def CalculateFeatureInformationGain(dataset, feature, target):
    '''
    Calculates information gain of a feature
    
    @param dataset: classification set 
    @param feature: the feature
    @param target: class label
    '''
    # Calculate the entropy of the entire dataset
    entropy_total = CalculateFeatureEntropy(dataset[target])

    # Calculate the entropy of the feature
    values = dataset[feature].unique()
    entropy_feature = 0
    for value in values:
        subset = dataset[dataset[feature] == value]
        weight = len(subset) / len(dataset)
        entropy_feature += weight * CalculateFeatureEntropy(subset[target])

    # Calculate the information gain
    info_gain = entropy_total - entropy_feature
    return [info_gain,values]

def CalculateFeatureGainRatio(dataset,feature,target):
    '''
    Calculates the gain ratio of the feature
    
    @param dataset: classification set 
    @param feature: the feature
    @param target: class label
    '''
        
    #Calls functions to get the information gain of the feature as well as the unique 
    #values of the feature
    feature_gain = CalculateFeatureInformationGain(dataset, feature, target)[0]
    feature_values = CalculateFeatureInformationGain(dataset, feature, target)[1]
    feature_size = len(dataset.index)
    IV = 0
    
    #Calculates the intrinsic value over all the unique feature values
    for value in feature_values:
        value_count = dataset[feature].value_counts()[value]
        IV = IV + ((-value_count/feature_size)*np.log2(value_count/feature_size))

    if IV==0:
        IV=0.0001
        
    #Calculates and returns gain ratio
    g_ratio = feature_gain/IV
    
    return g_ratio

def GenerateClassificationTree(dataset, target):
    '''
    Builds the decision tree
    
    @param dataset: classification set 
    @param target: class label
    '''
    #Creates the list of features and removes the class label from it
    features = list(dataset.columns.values)
    features.remove(target)
    
    
    #Checks to make sure there are still features and classes to evaluate, and if not, returns the plurality of the target
    if len(dataset[target].unique()) == 1 or len(features) == 0:
        return dataset[target].mode()[0]

    # Gets feature with the highest information gain
    gain_ratio = [CalculateFeatureGainRatio(dataset, feature, target) for feature in features]
    best_feature = features[np.argmax(gain_ratio)]

    #Creates a tree dictionary that stores the subtree of the best feature
    tree = {best_feature: {}}

    # The chosen feature is removed from the features list
    features = [feature for feature in features if feature != best_feature]

    #Iterates through each value of the feature
    for value in dataset[best_feature].unique():
        
        #Creates a subtree by recursively calling the decision tree function and adds it to the main tree
        sub_df = dataset[dataset[best_feature] == value]
        subtree = GenerateClassificationTree(sub_df, target)
        tree[best_feature][value] = subtree

    #The final decision tree is returned
    return tree

def GetClassPredictions(final_tree, test_data):
    '''
    Makes test predictions using the constructed decision tree
    
    @param final_tree: the decision tree
    @param test_data: the test input
    '''
    
    predicted_values = []
    
    #Iterates through each test input and gets the predicted values
    for i, test_row in test_data.iterrows():
        predicted_values.append(PredictClass(final_tree, test_row))
    
    #Returns the final list of predicted values
    return predicted_values


def PredictClass(tree, test_row):
    '''
    Makes test predictions using the constructed decision tree and an individual test input row
    
    @param final_tree: the decision tree
    @param test_data: the test input
    '''
    #Gets the node by getting the first key of tree
    node = list(tree.keys())[0]
    #Randomly selects one of the feature values if the current one is not in the tree
    if test_row[node] not in tree:
            curr_keys = list((tree[node]).keys())
            test_row[node] = choice(curr_keys)
    #Creates a branch of a tree using the node and test row values
    branch = tree[node][test_row[node]]
    
    #Checks if the branch is a string or integer, and returns the prediction if it is
    if isinstance(branch, str) or isinstance(branch, np.int64):
        return branch
    #If it is not a leaf, calls itself recursively until leaf is found
    else:
        return PredictClass(branch, test_row)


# Regression Decision Tree

In [47]:
def MSE(target):
    '''
    Calculates the mean squared error of a target vector
    
    @param target: target vector
    
    '''
    mse = np.mean((target - np.mean(target)) ** 2)
    if np.isnan(mse):
        mse = 0
    return mse

def SplitData(data, target, feature, split_value):
    '''
    Splits numerical data into two and returns the mean squared error for each subset
    
    @param data: regression set 
    @param feature: the feature
    @param target: class label
    
    '''
    
    left_indices= data[feature] < split_value
    right_indices = data[feature] >= split_value
    left_subset = target[left_indices]
    right_subset = target[right_indices]
    left_mse = MSE(left_subset)
    right_mse = MSE(right_subset)
    return left_mse, right_mse
    
def SplitNominalData(data,class_label,feature):
    '''
    Splits categorical data and returns the mean squared error for each value
    
    @param data: regression set 
    @param feature: the feature
    @param target: class label
    
    '''
    mse_dict = {}
    for split_value in np.unique(data[feature]):
        target_values = data.loc[data[feature] == split_value,class_label]
        curr_mse = MSE(target_values)
        mse_dict[split_value]=curr_mse
    return mse_dict

def GenerateRegressionTree(data, class_label, depth=0, max_depth=3,nominal_features=[]):
    '''
    Builds the decision tree
    
    @param dataset: regresion set 
    @param target: class label
    @param depth: current recursive depth
    @param max_depth: maximum number of recursions
    @param nominal_features: list of nominal features, if any
    
    '''
    #Creates the list of features and removes the class label from it
    features = list(data.columns.values)
    features.remove(class_label)
    target = data[class_label]
    
    #Checks to make sure there are still features to evaluate and max depth hasn't been reached, and if not, returns the average of the target
    if MSE(target) == 0 or depth >= max_depth or len(features) == 0:
        return np.mean(target)

    #Gets the best feature by iterating through each feature, calculating the MSE, and choosing the one with the lowest MSE
    best_feature = None
    best_split_value = None
    best_mse = np.inf
    for feature in features:
        if feature not in nominal_features or len(nominal_features)==0:
            for split_value in np.unique(data[feature]):
                left_mse, right_mse = SplitData(data, target, feature, split_value)
                total_mse = left_mse + right_mse
                if total_mse < best_mse:
                    best_feature = feature
                    best_split_value = split_value
                    best_mse = total_mse
        elif feature in nominal_features:
            mse_dict = SplitNominalData(data,class_label,feature)
            total_mse = sum(mse_dict.values())
            if total_mse < best_mse:
                best_feature=feature
                best_mse=total_mse
                
    #Creates a tree dictionary that stores the subtree of the best feature
    tree = {best_feature: {}}

    #Creates subtree based on best split value from the lowest MSE and recursively calls generate tree function for numerical features
    if best_feature not in nominal_features or len(nominal_features)==0:
        l_data = data.loc[data[best_feature] < best_split_value]
        left_data = l_data.drop(columns=[best_feature])
        
        r_data = data.loc[data[best_feature] >= best_split_value]
        right_data = r_data.drop(columns=[best_feature])
        
        
        tree[best_feature]['<{}'.format(best_split_value)] = GenerateRegressionTree(left_data,class_label, depth=depth+1, max_depth=max_depth,nominal_features=nominal_features)
        tree[best_feature]['>{}'.format(best_split_value)] = GenerateRegressionTree(right_data,class_label, depth=depth+1, max_depth=max_depth,nominal_features=nominal_features)
    
    #Creates subtree by getting unique nominal feature values and recursively calls generate tree function 
    elif best_feature in nominal_features:
        for split_value in np.unique(data[best_feature]):
            indices = data.index[data[best_feature]==split_value].tolist()
            curr_data = data.loc[indices]
            curr_data.drop(columns=[best_feature])   
            tree[best_feature][split_value] = GenerateRegressionTree(curr_data, class_label, depth=depth+1, max_depth=max_depth,nominal_features=nominal_features)   

    #The final decision tree is returned
    return tree



def GetRegressionPredictions(final_tree, test_data, nominal_features):
    '''
    Makes test predictions using the constructed decision tree
    
    @param final_tree: the decision tree
    @param test_data: the test input
    @param nominal_features: list of nominal features, if any
    '''
    
    predicted_values = []
    
     #Iterates through each test input and gets the predicted values
    for i, test_row in test_data.iterrows():
        predicted_values.append(PredictRegressionValue(final_tree, test_row, nominal_features))
        
    #Replaces nan values in predictions with 0
    predicted_values = [0 if np.isnan(prediction) else prediction for prediction in predicted_values]
    
    #Returns the final list of predicted values
    return predicted_values


def PredictRegressionValue(tree, test_row, nominal_features):
    '''
    Makes test predictions using the constructed decision tree and an individual test input row
    
    @param tree: the decision tree
    @param test_row: the test input
    @param nominal_features: list of nominal features, if any
    
    '''
    #Gets the node by getting the first key of tree
    node = list(tree.keys())[0]
    if node not in nominal_features:
        for key in tree[node]:
            compare = key[0]
            compare_value = float(key.split(compare)[1])
            
            #Checks if the node is a greater than or less than range 
            #and checks the current node value
            if compare == '>':
                if test_row[node] >= compare_value:
                    branch = tree[node][key]
                    break
                else:
                    continue
            if compare == '<':
                if test_row[node] < compare_value:
                    branch = tree[node][key]
                    break
                else:
                    continue
    elif node in nominal_features:
        #Randomly selects one of the feature values if the current one is not in the tree
        if test_row[node] not in tree:
            curr_keys = list((tree[node]).keys())
            test_row[node] = choice(curr_keys)
        branch = tree[node][test_row[node]]
        
    #Checks if the branch is a string or integer or float, and returns the prediction if it is
    if isinstance(branch, str) or isinstance(branch, np.int64) or isinstance(branch,float):
        return branch
    
    #If it is not a leaf, calls itself recursively until leaf is found
    else:
        return PredictRegressionValue(branch, test_row, nominal_features)


# Reduced Error Pruning

In [48]:
def reduced_error_pruning(tree, dataset, class_label):
    '''
    Performs reduced error pruning on decision tree
    
    @param tree: built decision tree
    @param dataset: test data
    @param class_label: class label
    
    '''
    
    #Checks if the tree is a leaf and returns the tree if it is
    if len(tree) <= 1:
        return tree
    
    #Gets the node from the first key of the tree and all the subtrees of the tree
    node = list(tree.keys())[0]
    subtrees = tree[node]
    
    #Checks for leaf nodes
    if all(isinstance(subtrees[k], str) for k in subtrees):
        return tree
    
    #Recursively prunes each subtree
    for k in subtrees:
        if isinstance(subtrees[k], dict):
            subtree = {k: subtrees[k]}
            subtree_pruned = reduced_error_pruning(subtree, dataset, class_label)
            if len(subtree_pruned) == 1:
                #Prunes subtree if it has been reduced to a single leaf node
                tree[node][k] = subtree_pruned[k]
            else:
                # Checks accuracy of subtree and if greater than the previous accuracy, it is added onto the tree
                subtree_accuracy = get_accuracy(subtree_pruned, dataset, class_label)
                original_accuracy = get_accuracy(subtree, dataset, class_label)
                if subtree_accuracy >= original_accuracy:
                    tree[node][k] = subtree_pruned[k]
    
    # Return the pruned tree
    return tree


def get_accuracy(tree, test_data, class_label):
    '''
    Checks accuracy of pruning subtrees
    
    @param tree: pruning subtree
    @param test_data: test data
    @param class_label: class label
    
    '''
    num_correct = 0
    num_total = len(test_data)
    for value in test_data:
        prediction = predict(tree, instance)
        if prediction == value[class_label]:
            num_correct += 1
    accuracy = num_correct / num_total
    return accuracy

In [49]:
def getThresholds(target):  
    '''
    Gets error threshold values for tuning
    
    @param target: target vector
    
    '''
    
    #Gets the range of the target and gets the threholds for the 90%, 95%, and 99% confidence intervals
    arr_range = np.max(target) - np.min(target)
    accuracies = [0.9,0.95,0.99]
    thresholds = []
    
    #Adds each threshold value to list and returns
    for value in accuracies:
        thresholds.append((1-value)*arr_range)
        
    return thresholds
            
def tuneError(full_data,tuning_dataset,class_label,nominal_features=[]):
    '''
    Tunes the error
    
    @param full_data: the full dataset
    @param tuning_dataset: the 20% of dataset for tuning
    @param class_label: class label
    @param nominal_features: list of nominal features, if any
    
    '''
    thresholds = getThresholds(full_data[class_label].to_numpy()) 

    errors_dict = {}       
    for threshold in thresholds:
        print('Current Error Being Tested')
        acc = RunExperiment(tuning_dataset, task_type='Regression', k = 5, class_label = class_label, tuning=True, error=threshold,nominal_features=nominal_features)
        errors_dict[threshold]=acc
    return max(errors_dict, key=errors_dict.get)

In [50]:
def SeparateData(dataset):
    '''
    Separates tuning and experiment data
    
    @param full_data: the full dataset
    
    '''
    #Separates 20% of data for tuning and leaves rest for training/testing
    tuning_data = dataset.sample(frac=.2)
    non_tuning_data = dataset.drop(tuning_data.index)
    
    return (tuning_data,non_tuning_data)

# Evaluate

In [None]:
def Evaluate(task_type, true_values, predicted_values,error=None):
    '''
    Evaluates the data based on task type
    
    @param task_type: specifies whether the task is 'Classification' or 'Regression'
    @param true_values: actual values of the data
    @param predicted_values: values predicted by the model
    
    '''
    if task_type == 'Regression':
        num_correct_values = 0
        for index in range(len(true_values)):
            min_threshold = float(true_values[index]-error)
            max_threshold = float(true_values[index]+error)

            if (predicted_values[index] >= min_threshold) and (predicted_values[index] <= max_threshold):
                num_correct_values = num_correct_values + 1

        regression_accuracy = num_correct_values/len(true_values)
        mean_squared_error = np.square(np.subtract(true_values,predicted_values)).mean()
        return regression_accuracy,mean_squared_error
    
    #Calculates classification accuracy and error rate for classification data
    elif task_type == 'Classification':
        num_correct_values = 0
        for index in range(len(true_values)):
            if true_values[index]==predicted_values[index]:
                num_correct_values = num_correct_values + 1
        classification_accuracy = num_correct_values/len(true_values)
        error_rate = 1-classification_accuracy
        return classification_accuracy, error_rate

In [51]:
def RunExperiment(dataset, task_type, k = 5, class_label = None, tuning=False, error=None,nominal_features=[],pruning=False):
    '''
    Runs the experiments using the processed data, performs cross validation to get testing and training data,
    standardizes the data, feeds into the algorithm, and uses the output to evaluate the accuracy
    
    @param dataset: pre-processed dataframe
    @param validation_type: specifies whether the task is 'k-fold' or 'kx2 '
    @param task_type: specifies whether the task is 'Classification' or 'Regression'
    @param num_folds: number of folds--only needed when validation type is k-fold
    @param k: k value to be used for cross validation-only needed when validation type is kx2
    @param class_label: name of class column
    
    '''
    classification = False
    if task_type == 'classification' or task_type == 'Classification':
        classification = True
        
    avg_accuracy = 0
    avg_error = 0
    avg_MSE = 0

    #Performs k x 2 cross validation with parameter data
    validation_output = k_x_2_cross_validation(dataset, k, classification, class_label,tuning=False)
    for k in validation_output:
        print('K = ' + str(k))
        for num in validation_output[k]:
            print('  Experiment ' + str(num))

            #Gets test and training data for each k
            test_data = validation_output[k][num]['test']
            train_data = validation_output[k][num]['train']

            print('   Test Fold Size: ' + str(len(test_data)))
            print('   Train Fold Size: ' + str(len(train_data)))

            #Gets the prediction value from the null model
            #predicted_values = KNN1(train_data,test_data,3,class_label,task_type)
            #predicted_values = KNN(train_data,test_data,3,class_label,task_type)
            true_values = test_data[class_label].to_list()
            
            if task_type == 'Classification':
                tree=GenerateClassificationTree(train_data,class_label)
                
                if pruning:
                    pruned_tree = reduced_error_pruning(tree,train_data,class_label)
                    tree=pruned_tree

                predicted_values = []
                test_input =  test_data.drop(columns=class_label)
                predicted_values = GetClassPredictions(tree,test_input)
                
            if task_type == 'Regression':
                tree = GenerateRegressionTree(train_data,class_label,max_depth=10,nominal_features=nominal_features)
                if pruning:
                    pruned_tree = reduced_error_pruning(tree,train_data,class_label)
                    tree=pruned_tree
                
                test_input =  test_data.drop(columns=class_label)
                predicted_values = GetRegressionPredictions(tree,test_input,nominal_features=nominal_features)
                


            #Gets corresponding evaluation metric depending on task type and prints to console
            if task_type == 'Classification':
                evaluation = Evaluate(task_type,true_values,predicted_values)
                class_accuracy = evaluation[0]
                error = evaluation[1]
                avg_accuracy = avg_accuracy + class_accuracy
                avg_error = avg_error + error
                print('    Classification Accuracy: '+ str(class_accuracy))
                print('    Error: ' + str(error))
            elif task_type == 'Regression':
                evaluation = Evaluate(task_type,true_values,predicted_values,error=error)
                reg_accuracy = evaluation[0]
                MSE = evaluation[1]
                avg_accuracy = avg_accuracy + reg_accuracy
                avg_MSE = avg_MSE + MSE
                print('    Regression Accuracy: '+ str(reg_accuracy))
                print('    Mean Squared Error: ' + str(MSE))
            print('')

    #Gets the average of the evaluation metric over all folds and prints it
    if task_type == 'Classification':
            print('Average Classification Accuracy: '+ str(avg_accuracy/(2*k)))
            print('Average Error: ' + str(avg_error/(2*k)))
            if tuning == True:
                return avg_error/(2*k)
    elif task_type == 'Regression':
        print('Average Regression Accuracy: '+ str(avg_accuracy/(2*k)))
        print('Average Mean Squared Error: ' + str(avg_MSE/(2*k)))
        if tuning == True:
            return avg_accuracy/(2*k)

    print('')
        