In [3]:
import torch
import pandas
import numpy as np

In [4]:
import pandas
#dataset taken from https://www.kaggle.com/yashsawarn/wifi-stretgth-for-rooms

def read_dataset(csv_name = 'wifi_localization.txt'):
    """
    Reads a csv dataset 
    returns it as a pytorch tensor
    """
    data_frame = pandas.read_table(csv_name, delim_whitespace=True, names=('A', 'B', 'C', 'D', 'F', 'G', 'ROOM'),
                       dtype={'A': np.int64, 'B': np.float64, 'C': np.float64, 'D': np.float64,'E': np.float64,'F': np.float64,'G': np.float64,'ROOM': np.float64})

    targets_torch = torch.tensor(data_frame['ROOM'].values)
    dataset_torch = torch.tensor(data_frame.values)
    print(dataset_torch)
    return dataset_torch
dataset_torch = read_dataset()

tensor([[-56., -61., -66.,  ..., -82., -81.,   1.],
        [-57., -61., -65.,  ..., -85., -85.,   1.],
        [-60., -60., -67.,  ..., -85., -84.,   1.],
        ...,
        [-59., -46., -65.,  ..., -87., -88.,   4.],
        [-58., -52., -61.,  ..., -90., -85.,   4.],
        [-50., -45., -60.,  ..., -88., -87.,   4.]], dtype=torch.float64)


In [7]:
class Node_CART:    
    def __init__(self, num_classes = 4, ref_CART = None, current_depth = 0):
        """
        Create the node attributes
        param num_classes: K number of classes to classify
        param ref_cart: reference to the tree containing the node
        param current_depth: current depth of the node in the tree
        """
        self.ref_CART = ref_CART
        self.threshold_value = 0
        self.feature_num = 0
        self.node_right = None
        self.node_left = None
        self.data_torch_partition = None
        self.gini = 0
        self.dominant_class = None
        self.accuracy_dominant_class = None        
        self.num_classes = num_classes
        self.current_depth = current_depth
    
    def to_xml(self, current_str = ""):
        """
        Recursive function to write the node content to an xml formatted string
        param current_str : the xml content so far in the whole tree
        return the string with the node content
        """
        str_node = "<node><thresh>" + str(self.threshold_value) + "</thresh>" + "<feature>" + str(self.feature_num) + "</feature><depth>" + str(self.current_depth)+ "</depth>" 
        str_node += "<gini>" + str(self.gini) + "</gini>"
        if(self.node_right != None):
            str_left = self.node_right.to_xml(current_str)
            str_node += str_left
        if(self.node_left != None):
            str_right = self.node_left.to_xml(current_str)
            str_node += str_right
            
        if(self.is_leaf()):
            str_node += "<dominant_class>" + str(self.dominant_class) + "</dominant_class><acc_dominant_class>"  + str(self.accuracy_dominant_class) + "</acc_dominant_class>"
        str_node += "</node>"
        return str_node
    
    def is_leaf(self):
        """
        Checks whether the node is a leaf
        """
        return (self.node_left == None and self.node_left == None)
    
    def create_with_children(self, data_torch, current_depth, list_selected_features = [], min_gini = 0.000001):
        """
        Creates a node by selecting the best feature and threshold, and if needed, creating its children
        param data_torch: dataset with the current partition to deal with in the node
        param current_depth: depth counter for the node
        param list_selected_features: list of selected features so far for the CART building process
        param min_gini: hyperparmeter selected by the user defining the minimum tolerated gini coefficient for a  node
        return the list of selected features so far
        """        
        #update depth of children
        depth_children = current_depth + 1
        if(depth_children <= self.ref_CART.get_max_depth()):
            num_observations = data_torch.shape[0]            
            #careful with max depth
            #if no threshold and feature were selected, select it using a greedy approach            
            (threshold_value, feature_num, gini) = self.select_best_feature_and_thresh(data_torch, list_features_selected = list_selected_features)
            list_selected_features += [feature_num]
            #store important data in attributes
            self.threshold_value = threshold_value
            self.feature_num = feature_num
            self.data_torch_partition = data_torch
            self.gini = gini            
            num_features = data_torch.shape[1]
            #data_torch_left = torch.zeros(1, num_features)
            #data_torch_right = torch.zeros(1, num_features)
            #create the right and left node data if the current gini is still high            
            if(self.gini > min_gini):                
                data_torch_left = data_torch[data_torch[:, feature_num] < threshold_value]
                data_torch_right = data_torch[data_torch[:, feature_num] >= threshold_value]
                #add data to the right and left children
                self.node_right = Node_CART(num_classes = self.num_classes, ref_CART = self.ref_CART, current_depth = depth_children)
                self.node_left = Node_CART(num_classes = self.num_classes, ref_CART = self.ref_CART, current_depth = depth_children)
                list_selected_features = self.node_right.create_with_children(data_torch_right, depth_children, list_selected_features = list_selected_features)            
                self.node_left.create_with_children( data_torch_left, depth_children, list_selected_features = list_selected_features)
        #if is leaf, fill the         
        if(self.is_leaf()):            
            labels_data = data_torch[:,  -1]            
            self.dominant_class = torch.mode(labels_data).values.item()
            num_obs_label = labels_data[labels_data == self.dominant_class].shape[0]
            self.accuracy_dominant_class = num_obs_label / labels_data.shape[0]           
            
        return list_selected_features
    
    
    def select_best_feature_and_thresh(self, data_torch, list_features_selected = [], num_classes = 4):
        """
        Selects the best feature and threshold that minimizes the gini coefficient
        param data_torch: dataset partition to analyze
        param list_features_selected list of features selected so far, thus must be ignored 
        param num_classes: number of K classes to discriminate from 
        return min_thresh, min_feature, min_gini found for the dataset partition when 
        selecting the found feature and threshold
        """        
        num_features = data_torch.shape[1]
        min_gini = 99999
        min_feature = 0
        min_thresh = 0
        for feature in range(0, num_features):            
            if(not feature in list_features_selected):
                values_feature = torch.unique(data_torch[:, feature])
                #we assume that the values were sorted by unique              
                
                #dont try the min and max, since they will not split data
                for possible_thresh_ind in range(1, values_feature.shape[0] - 1):
                    #evaluate a possible threshold
                    possible_thresh = values_feature[possible_thresh_ind]
                    #split the data
                    data_torch_left = data_torch[data_torch[:, feature] < possible_thresh]
                    data_torch_right = data_torch[ data_torch[:, feature] >= possible_thresh]
                    gini_left = self.calculate_gini(data_torch_left, num_classes)
                    gini_right = self.calculate_gini(data_torch_right, num_classes)
                    #num of observations per new partition
                    num_obs_left = data_torch_left.shape[0]
                    num_obs_right = data_torch_right.shape[0]
                    #calculate gini coefficient for both partitions
                    gini_total = (num_obs_left / (num_obs_left + num_obs_right )) * gini_left + (num_obs_right / (num_obs_left +num_obs_right )) * gini_right
                    
                    if(gini_total < min_gini):
                        min_gini = gini_total.item()
                        min_feature = feature
                        min_thresh = possible_thresh.item()
        #return selected cut       
        return (min_thresh, min_feature, min_gini)   
        
    
    def calculate_gini(self, data_partition_torch, num_classes = 4):
        """
        Calculates the gini coefficient for a given partition with the given number of classes
        param data_partition_torch: current dataset partition as a tensor
        param num_classes: K number of classes to discriminate from
        returns the calculated gini coefficient
        """
        targets_torch = data_partition_torch[:,-1]
        num_observations = targets_torch.shape[0]        
        acc = torch.zeros(num_classes)
        gini = 1
        for k in range(1, num_classes + 1):            
            num_k = targets_torch[targets_torch == k].shape[0]            
            acc[k - 1] = num_k / num_observations
            gini -= acc[k - 1] * acc[k - 1]
         
        return gini
    
    def evaluate_node(self, input_torch): 
        """
        Evaluates an input observation within the node. 
        If is not a leaf node, send it to the corresponding node
        return predicted label
        """
        feature_val_input = input_torch[self.feature_num]
        if(self.is_leaf()):
            return self.dominant_class
        else:
            if(feature_val_input < self.threshold_value):
                return self.node_left.evaluate_node(input_torch)
            else:
                return self.node_right.evaluate_node(input_torch)
        

class CART:
    def __init__(self, dataset_torch, max_CART_depth):
        """
        CART has only one root node
        """
        self.root = Node_CART(num_classes = 4, ref_CART = self, current_depth = 0)
        self.max_CART_depth = max_CART_depth
        self.list_selected_features = []
        
    def get_root(self):
        """
        Gets tree root
        """
        return self.root
    
    def get_max_depth(self):
        """
        Gets the selected max depth of the tree
        """
        return self.max_CART_depth
    
    def build_CART(self, data_torch):
        """
        Build CART from root
        """
        self.list_selected_features = self.root.create_with_children(data_torch, current_depth = 0)
    
    def to_xml(self, xml_file_name):
        """
        write Xml file with tree content
        """
        str_nodes = self.root.to_xml()
        file = open(xml_file_name,"w+") 
        file.write(str_nodes)
        file.close()
        return str_nodes
    
    
    def evaluate_input(self, input_torch):
        """
        Evaluate a specific input in the tree and get the predicted class
        """
        return self.root.evaluate_node(input_torch)
        
    
    
        

    
        

In [8]:
#TEST

def train_CART(dataset_torch, name_xml = ""):
    tree = CART(dataset_torch = dataset_torch, max_CART_depth = 3)
    tree.build_CART(dataset_torch)
    if(not name_xml == ""):
        tree.to_xml(name_xml)
    return tree

def test_CART(tree, testset_torch):
    num_observations = testset_torch.shape[0]
    print("Num observations to test ", num_observations)
    num_correct = 0
    for obs_num in range(0, num_observations):
        current_observation = testset_torch[obs_num, :]
        #print("current obs ", current_observation)
        label_observation = testset_torch[obs_num, -1]
        label_predicted = tree.evaluate_input(current_observation)
        #print("predicted label ", label_predicted)
        if(label_predicted == label_observation):
            num_correct += 1
            
    accuracy = num_correct / num_observations
    print("Model accuracy ", accuracy)
    return accuracy

        

tree = train_CART(dataset_torch, name_xml = "CART_example.xml")
acc = test_CART(tree, dataset_torch)

Num observations to test  2000
Model accuracy  0.9345
