In [2]:
import numpy as np

In [3]:
class Node:
    def __init__(self, data=None, children=None, split_on=None, pred_class=None):
        self.children = children
        self.split_on = split_on
        self.pred_class = pred_class
        self.is_leaf = False

    def __str__(self):
        s = f"Node({id(self)}"
        if self.children:
            s += f", children: {[id(c) for c in self.children]}"
        if self.split_on:
            s += f", split on: {self.split_on}"
        if self.pred_class:
            s += f", predicted class {self.pred_class})"
        return s
        

In [4]:
class DecisionTreeClassifier:
    def __init__(self):
        self.root = Node()

    @staticmethod
    def entropy(Y):
        """
        Calculate the entropy of a numpy array of target values
        """
        # Implement
        _, counts = np.unique(Y, return_counts=True)

        probabilities = counts / len(Y)
        entropy = -np.sum(probabilities * np.log2(probabilities))
        return entropy
        
        

    def split_on_feature(self, data, feat_index):
        """
        Find best attribute for splitting the data.
        Return the nodes into which data has been split, the data split and the weighted entropy of the split.
        split nodes and split data are dictionaries where the keys is the value to split on.
        """

        split_nodes = {}
        split_data = {}
        weighted_entropy = 0
        
        # Implement here
        total_samples = len(data)

        for value in np.unique(data[:, feat_index]):
            subset = data[data[:, feat_index] == value]
            target_values = subset[:, -1]
            subset_entropy = DecisionTreeClassifier.entropy(target_values)
            weight = len(subset) / total_samples
            weighted_entropy += weight * subset_entropy
            split_nodes[value] = target_values
            split_data[value] = subset

        return split_nodes, split_data, weighted_entropy

    def best_split(self, node, data, depth_left):
        # Check if the node meets the criteria to stop splitting
        if depth_left == 0 or self.meet_criteria(data):
            node.is_leaf = True
            y = self.get_y(data)
            node.pred_class = self.get_pred_class(y)
            return

        # Find best attribute to split data on and record it as well as the children nodes
        # Implement here
        best_feature = None
        best_split_nodes = None
        best_split_data = None
        lowest_weighted_entropy = float('inf')

        num_features = data.shape[1] - 1
        for feat_index in range(num_features):
            split_nodes, split_data, weighted_entropie  = self.split_on_feature(data, feat_index)

            if weighted_entropie < lowest_weighted_entropy:
                lowest_weighted_entropy = weighted_entropie
                best_feature = feat_index
                best_split_nodes = split_nodes
                best_split_data = split_data
                

        node.children = {}
        node.split_on = best_feature

        # Recursively call the best_split function for each child node
        # Implement here
        for value, subset in best_split_data.items():
            child_node = Node()
            node.children[value] = child_node
            self.best_split(child_node, subset, depth_left - 1)


    def meet_criteria(self, data):
        """
        Check if the criteria for stopping the tree expansion is met for a given node. Here we only check if the entropy of the target values (y) is zero.
        Additionally, you can customize criteria based on your specific requirements. For instance, you can set the maximum depth for the decision tree or incorporate other conditions for stopping the tree expansion. Modify the implementation of this method according to your desired criteria.

        Parameters:
        -----------
        data : np.array
            The data to check for meeting the stopping criteria.

        Returns:
        -----------
        bool
            True if the criteria is met, False otherwise.

        """

        y = self.get_y(data)
        return True if self.entropy(y) == 0 else False
    
    @staticmethod
    def get_y(data):
        y = data[:, -1]
        return y

    @staticmethod
    def get_pred_class(Y):
        labels, labels_counts = np.unique(Y, return_counts=True)
        index = np.argmax(labels_counts)
        return labels[index]

    def fit(self, X, Y, max_depth=int(1e10)):
        """
        Fit the decision tree model to the provided dataset.

        Parameters:
        -----------
        X: numpy.ndarray
            The input features of the dataset.

        Y: numpy.ndarray
            The target labels of the dataset.
        """
        data = np.column_stack([X, Y])
        self.root = Node()
        self.best_split(self.root, data, max_depth)

    def predict(self, X):
        predictions = np.array([self.traverse_tree(x, self.root) for x in X])
        return predictions

    def traverse_tree(self, x, node):
        # Check if the current node is a leaf node
        if node.is_leaf:
            return node.pred_class

        # Get the feature value at the split point for the current node
        feat_value = x[node.split_on]

        # Recursively traverse the decision tree using the child node corresponding to the feature value
        predicted_class = self.traverse_tree(x, node.children[feat_value])

        return predicted_class

In [5]:
from sklearn.model_selection import train_test_split
X=np.load("Features.npy")
y=np.load("Labels.npy")


# binarize the features: use all possible values as thresholds on all axes
def axis_aligned_features(X):
    X_features = None
    for i in range(X.shape[1]):
        distinct_vals = sorted(np.unique(X[:,i]))
        thresholds = [(x + y)/2 for x,y in zip(iter(distinct_vals), iter(distinct_vals[1::]))]
        for t in thresholds:
            mask = X[:,i] > t
            if X_features is not None:
                X_features = np.concatenate([X_features, mask.reshape((-1,1))], axis=1)
            else:
                X_features = mask.reshape((-1,1))
    return X_features

X = axis_aligned_features(X)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=100, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=100, random_state=42)

print(X_train.shape)
print(X_val.shape)
print(X_test.shape)


(800, 1998)
(100, 1998)
(100, 1998)


In [6]:
dt = DecisionTreeClassifier()
dt.fit(X_train, y_train)

In [7]:
def accuracy(dt, X, y):
    acc = (dt.predict(X) == y).sum() / len(y)
    return acc

# get validation set accuracy
print(f"Training set accuracy: {accuracy(dt, X_train, y_train)}")
print(f"Validation set accuracy: {accuracy(dt, X_val, y_val)}")
print(f"Test set accuracy: {accuracy(dt, X_test, y_test)}")

Training set accuracy: 1.0
Validation set accuracy: 0.91
Test set accuracy: 0.89


In [8]:
# Learn random forest
from scipy.stats import mode

class RandomForestClassifier:
    def __init__(self):
        self.trees = []
    
    def bootstrap_sample(self, X, y, sample_frac):
        indices = np.arange(X.shape[0])
        indices = np.random.permutation(indices)
        indices = indices[:round(sample_frac*len(indices))]
        X = X[indices,:]
        y = y[indices]
        return X,y

    def bootstrap_features(self, X, feature_frac):
        indices = np.arange(X.shape[1])
        indices = np.random.permutation(indices)
        indices = indices[:round(feature_frac*len(indices))]
        assert not (X == -1).any()
        X[:,indices] = -1 # we should never split on -1, since this would not differentiate
        return X
    
    def fit(self, X, y, n_trees, max_depth, sample_frac=0.5, feature_frac=0.5):
        for i in range(n_trees):
            # Train DT and append to list
            # Implement here
            self.trees = []
            n_samples, n_features = X.shape

            for i in range(n_trees):
                sample_indices = np.random.choice(n_samples, int(sample_frac * n_samples), replace=True)
                X_sample = X[sample_indices]
                y_sample = y[sample_indices]

                feature_indices = np.random.choice(n_features, int(feature_frac * n_features), replace=True)
                X_sample = X_sample[:, feature_indices]

                tree = DecisionTreeClassifier()
                tree.fit(X_sample, y_sample, max_depth=max_depth)
                self.trees.append((tree, feature_indices))

    def predict(self, X):
        # Get majority vote from all trees
        # Implement here
        predictions = np.zeros((len(self.trees), X.shape[0]), dtype=int)

        for i, (tree, feature_indices) in enumerate(self.trees):
            X_subset = X[:, feature_indices]
            predictions[i] = tree.predict(X_subset)

        majority_vote = mode(predictions, axis=0).mode[0]
        return majority_vote
        


In [10]:
rf = RandomForestClassifier()
rf.fit(X_train, y_train, n_trees=10, max_depth=10, sample_frac=0.5, feature_frac=0.5)

In [12]:
# get validation set accuracy
print(f"Training set accuracy: {accuracy(rf, X_train, y_train)}")
print(f"Validation set accuracy: {accuracy(rf, X_val, y_val)}")
print(f"Test set accuracy: {accuracy(rf, X_test, y_test)}")

Training set accuracy: 0.395
Validation set accuracy: 0.38
Test set accuracy: 0.4
