In [1]:
import math
from numpy.random import default_rng
import numpy as np
from classification import (
    get_entropy,
    get_information_gain,
    find_best_node,
    induce_decision_tree,
    split_dataset,
    Decision_Node,
    Leaf_Node,
)

def confusion_matrix(y_gold, y_prediction, class_labels=None):
    """ Compute the confusion matrix.
        
    Args:
        y_gold (np.ndarray): the correct ground truth/gold standard labels
        y_prediction (np.ndarray): the predicted labels
        class_labels (np.ndarray): a list of unique class labels. 
                               Defaults to the union of y_gold and y_prediction.

    Returns:
        np.array : shape (C, C), where C is the number of classes. 
                   Rows are ground truth per class, columns are predictions
    """

    # if no class_labels are given, we obtain the set of unique class labels from
    # the union of the ground truth annotation and the prediction
    if not class_labels:
        class_labels = np.unique(np.concatenate((y_gold, y_prediction)))

    confusion = np.zeros((len(class_labels), len(class_labels)), dtype=np.int)

    # for each correct class (row), 
    # compute how many instances are predicted for each class (columns)
    for (i, label) in enumerate(class_labels):
        # get predictions where the ground truth is the current class label
        indices = (y_gold == label)
        gold = y_gold[indices]
        predictions = y_prediction[indices]

        # quick way to get the counts per label
        (unique_labels, counts) = np.unique(predictions, return_counts=True)

        # convert the counts to a dictionary
        frequency_dict = dict(zip(unique_labels, counts))

        # fill up the confusion matrix for the current row
        for (j, class_label) in enumerate(class_labels):
            confusion[i, j] = frequency_dict.get(class_label, 0)

    return confusion


def precision(y_gold, y_prediction):
    """ Compute the precision score per class given the ground truth and predictions
        
    Also return the macro-averaged precision across classes.
        
    Args:
        y_gold (np.ndarray): the correct ground truth/gold standard labels
        y_prediction (np.ndarray): the predicted labels

    Returns:
        tuple: returns a tuple (precisions, macro_precision) where
            - precisions is a np.ndarray of shape (C,), where each element is the 
              precision for class c
            - macro-precision is macro-averaged precision (a float) 
    """

    confusion = confusion_matrix(y_gold, y_prediction)
    p = np.zeros((len(confusion), ))
    for c in range(confusion.shape[0]):
        if np.sum(confusion[:, c]) > 0:
            p[c] = confusion[c, c] / np.sum(confusion[:, c])

    ## Alternative solution without computing the confusion matrix
    #class_labels = np.unique(np.concatenate((y_gold, y_prediction)))
    #p = np.zeros((len(class_labels), ))
    #for (c, label) in enumerate(class_labels):
    #    indices = (y_prediction == label) # get instances predicted as label
    #    correct = np.sum(y_gold[indices] == y_prediction[indices]) # intersection
    #    if np.sum(indices) > 0:
    #        p[c] = correct / np.sum(indices)     

    # Compute the macro-averaged precision
    macro_p = 0.
    if len(p) > 0:
        macro_p = np.mean(p)
    
    return (p, macro_p)


def recall(y_gold, y_prediction):
    """ Compute the recall score per class given the ground truth and predictions
        
    Also return the macro-averaged recall across classes.
        
    Args:
        y_gold (np.ndarray): the correct ground truth/gold standard labels
        y_prediction (np.ndarray): the predicted labels

    Returns:
        tuple: returns a tuple (recalls, macro_recall) where
            - recalls is a np.ndarray of shape (C,), where each element is the 
                recall for class c
            - macro-recall is macro-averaged recall (a float) 
    """

    confusion = confusion_matrix(y_gold, y_prediction)
    r = np.zeros((len(confusion), ))
    for c in range(confusion.shape[0]):
        if np.sum(confusion[c, :]) > 0:
            r[c] = confusion[c, c] / np.sum(confusion[c, :])

    ## Alternative solution without computing the confusion matrix
    #class_labels = np.unique(np.concatenate((y_gold, y_prediction)))
    #r = np.zeros((len(class_labels), ))
    #for (c, label) in enumerate(class_labels):
    #    indices = (y_gold == label) # get instances for current label
    #    correct = np.sum(y_gold[indices] == y_prediction[indices]) # intersection
    #    if np.sum(indices) > 0:
    #        r[c] = correct / np.sum(indices)     

    # Compute the macro-averaged recall
    macro_r = 0.
    if len(r) > 0:
        macro_r = np.mean(r)
    
    return (r, macro_r)


def f1_score(y_gold, y_prediction):
    """ Compute the F1-score per class given the ground truth and predictions
        
    Also return the macro-averaged F1-score across classes.
        
    Args:
        y_gold (np.ndarray): the correct ground truth/gold standard labels
        y_prediction (np.ndarray): the predicted labels

    Returns:
        tuple: returns a tuple (f1s, macro_f1) where
            - f1s is a np.ndarray of shape (C,), where each element is the 
              f1-score for class c
            - macro-f1 is macro-averaged f1-score (a float) 
    """

    (precisions, macro_p) = precision(y_gold, y_prediction)
    (recalls, macro_r) = recall(y_gold, y_prediction)

    # just to make sure they are of the same length
    assert len(precisions) == len(recalls)

    f = np.zeros((len(precisions), ))
    for c, (p, r) in enumerate(zip(precisions, recalls)):
        if p + r > 0:
            f[c] = 2 * p * r / (p + r)

    # Compute the macro-averaged F1
    macro_f = 0.
    if len(f) > 0:
        macro_f = np.mean(f)
    
    return (f, macro_f)


def accuracy(y_gold, y_prediction):
    """ Compute the accuracy given the ground truth and predictions

    Args:
        y_gold (np.ndarray): the correct ground truth/gold standard labels
        y_prediction (np.ndarray): the predicted labels

    Returns:
        float : the accuracy
    """

    assert len(y_gold) == len(y_prediction)  
    
    try:
        return np.sum(y_gold == y_prediction) / len(y_gold)
    except ZeroDivisionError:
        return 0.


def read_dataset(filepath):
    """
    Read .txt file from a specified filepath. (i.e. data/train_full.txt)
    Returns 2 numpy arrays of the instances and the labels
    """
    data = np.loadtxt(filepath, dtype=str, delimiter=",")
    instances = data[:,:-1]
    instances = instances.astype(int)
    labels = data[:,-1]

    return instances, labels


def induce_random_decision_tree(instances, labels, num_attributes_per_tree, depth=0):
    """Induce a random decision tree for a given dataset

    Args:
    instances (numpy.ndarray): Instances, numpy array of shape (N, K)
                       N is the number of instances
                       K is the number of attributes
    labels (numpy.ndarray): Class labels, numpy array of shape (N, )
                       Each element in labels is a str
    num_attributes_per_tree (int): Number of attributes to be used in each tree
    Returns:
      DecisionNode: A decision node object which contains the attribute, value, left node, right node and depth
    """
    # Get a shuffled array of the indexes of the attributes
    random_indexes = default_rng().permutation(len(instances[0, :]))
    # Take the first attributes_per_tree indexes
    random_indexes_in_each_tree = random_indexes[:num_attributes_per_tree]
    # Create a new np array with only the attributes we want
    new_instances = instances[:, random_indexes_in_each_tree]

    # Find the best split point for the data
    best_attr, best_split = find_best_node(new_instances, labels)

    sorted_indices = instances[
        :, best_attr
    ].argsort()  # returns sorted instance indices based on the best attribute
    sorted_instances = instances[sorted_indices]  # sort the instances
    sorted_labels = labels[sorted_indices]  # sort the labels

    # Split the total dataset based on the best split point
    (
        split_instances_left,
        split_instances_right,
        split_labels_left,
        split_labels_right,
    ) = split_dataset(sorted_instances, sorted_labels, best_split)

    # If we can no longer split data, return a leaf node
    if (
        len(set(labels)) == 1
        or len(split_instances_left) == 0
        or len(split_instances_right) == 0
    ):
        return Leaf_Node(labels, depth)

    # Otherwise return a decision node
    LeftNode = induce_random_decision_tree(
        split_instances_left,
        split_labels_left,
        num_attributes_per_tree,
        depth + 1,
    )
    RightNode = induce_random_decision_tree(
        split_instances_right,
        split_labels_right,
        num_attributes_per_tree,
        depth + 1,
    )

    # Remap the best attribute its original index
    best_attr = random_indexes_in_each_tree[best_attr]

    # what to return if left node or right node does not exist
    return Decision_Node(
        best_attr, sorted_instances[best_split][best_attr], LeftNode, RightNode, depth
    )

decision_trees = []
number_of_trees = 2

#Read in data
train_instances, train_labels = read_dataset("data/train_full.txt")
test_instances, test_labels = read_dataset("data/test.txt")

#Get number of attributes in the dataset
num_attributes = len(train_instances[0, :])
#Take the square root of number of attributes and round up (optimal number of attibutes per tree?)
attributes_per_tree = math.ceil(np.sqrt(num_attributes))


# Loop through number of trees and train each tree
for i in range(0, number_of_trees):
    # Create a new random combination of the row indexes from new_instances
    random_row_indexes = default_rng().choice(len(train_instances), len(train_instances))
    new_instances = train_instances[random_row_indexes, :]
    new_labels = train_labels[random_row_indexes]

    # Now we have the new instances and labels, we can create a new decision tree
    new_decision_tree = induce_random_decision_tree(
        new_instances, new_labels, attributes_per_tree
    )
    decision_trees.append(new_decision_tree)

    
predictions1 = []
for i in test_instances:
    votes = []
    for j in range(len(decision_trees)):
        votes.append(decision_trees[j].predict(i))
    most_voted_label = max(set(votes), key=votes.count)
    predictions1.append(most_voted_label)


print(accuracy(test_labels, predictions1))
confusion = confusion_matrix(test_labels, np.array(predictions1))
print(confusion)
(p, macro_p) = precision(test_labels, np.array(predictions1))
print("precison:", p)
print("macro prec:", macro_p)
(r, macro_r) = recall(test_labels, np.array(predictions1))
print("recall", r)
print("macro recall:", macro_r)
(f1, macro_f1) = f1_score(test_labels, np.array(predictions1))
print("f1:", f1)
print("macro f1:", macro_f1)



0.095
[[ 0  4  1  5  3 21]
 [ 9  1  0  1  1 25]
 [ 2  5  1  4  0 14]
 [ 3  5  0  2  2 15]
 [ 3 26  0  1  1  3]
 [ 6 16  0  2  4 14]]
precison: [0.         0.01754386 0.5        0.13333333 0.09090909 0.15217391]
macro prec: 0.14899336615583755
recall [0.         0.02702703 0.03846154 0.07407407 0.02941176 0.33333333]
macro recall: 0.08371795626697587
f1: [0.         0.0212766  0.07142857 0.0952381  0.04444444 0.20895522]
macro f1: 0.07355715512273149
