In [None]:
import pandas as pd
import numpy as np

## Decision Tree Regressor

In [1]:
class Node_reg:
    """
    Initializes "Node class constructor" for regression problems which stores and initialises:
    Feature index used for best split :  feature = None,
    Left child node :  left_spl = None,
    Right child node :  right_spl = None,
    Node Splitting threshold value :  spl_val = None,
    Variance loss value:  var_redn = None,
    Output value of the node :  out_val = None

    """

    def __init__(
        self,
        feature=None,
        spl_val=None,
        left_spl=None,
        right_spl=None,
        var_redn=None,
        *,
        out_val=None
    ):
        self.feature = feature
        self.threshold = spl_val
        self.left = left_spl
        self.right = right_spl
        self.var_redn = var_redn
        self.output = out_val

    # function to check whether a leaf node or not
    def is_leaf_node(self):
        return self.output is not None


In [3]:
class DecisionTreeRegressor:
    """
    Initializes "Decision Tree constructor" for regression problems which stores and initialises:
    Maximum tree depth : max_depth=100,
    Minimum samples reuqired for splitting nodes : min_samples_split=2,
    Number of features to be used for tree: n_features = None,
    Cost function/ Loss criterion for splitting nodes : criterion='mse'

    """

    def __init__(
        self, max_depth=100, min_samples_split=2, n_features=None, criterion="mse"
    ):

        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.n_features = n_features
        self.criterion = criterion
        self.root = None

    # # For storing tree structure to generate tree graph visualization
    #     self.tree = None
    #     self.graph = None
    #     self.list_node_names = []

    """
  Defines the train method of the DecisionTree class, 
  which takes two inputs: 
  1. X, a matrix of input features
  2. y, a vector of output labels
  Then, uses the _build_tree function to proceed with growing the decision tree structure
  """

    def train(self, X, y):
        # sets self.n_features to number of features in the input if n_features is not specified,
        # Else, the minimum of n_features and the number of features in the input data if n_features is specified to avoid exceeding the maximum feature limit

        self.n_features = (
            X.shape[1] if not self.n_features else min(self.n_features, X.shape[1])
        )
        self.root = self._build_tree(X, y)

    """
  Defines the recursive "_build_tree" function of the DecisionTree class, 
  which takes two inputs: 
  1. X, a matrix of input features
  2. y, a vector of output labels

  Returns Tree Nodes after each iteration
  """

    def _build_tree(self, X, y, depth=0):
        # stores the number of observations and features in the input data
        n_obs, n_fts = X.shape

        # checks for stopping conditions of a decision tree, if met returns the leaf node
        if depth >= self.max_depth or n_obs <= self.min_samples_split:
            leaf_val = self._calculate_node_mean(y)
            return Node_reg(out_val=leaf_val)

        # checks for all possible splits across features and returns the best split threshold and feature column index
        best_var_redn = -float("inf")
        best_feature, best_thresh = None, None
        criterion = self.criterion

        # random selection of specified number of features out of all the available features
        select_fts = np.random.choice(n_fts, self.n_features, replace=False)

        for feature in select_fts:
            thresholds = np.unique(X[:, feature])
            for thresh in thresholds:
                var_redn = self._variance_reduction(y, X[:, feature], thresh, criterion)
                if var_redn > best_var_redn:
                    best_var_redn = var_redn
                    best_feature = feature
                    best_thresh = thresh

        # create child nodes
        left_idxs, right_idxs = self._split(X[:, best_feature], best_thresh)
        left = self._build_tree(X[left_idxs, :], y[left_idxs], depth + 1)
        right = self._build_tree(X[right_idxs, :], y[right_idxs], depth + 1)

        return Node_reg(
            feature=best_feature,
            spl_val=best_thresh,
            left_spl=left,
            right_spl=right,
            var_redn=best_var_redn,
        )

    """
  Defines the function to compute leaf node value 
  which takes input as y, a vector of output labels and returns the mean
  """

    def _calculate_node_mean(self, y):
        """ """
        val = np.mean(y)
        return val

    """
  Defines the node splitting function which takes as input
  data : the subset input data based on a feature index
  split_thresh : Specified split threshold to split the tree in left and right child nodes

  Returns : Indices of left and right child nodes
  """

    def _split(self, data, split_thresh):
        left_idxs = np.argwhere(data <= split_thresh).flatten()
        right_idxs = np.argwhere(data > split_thresh).flatten()
        return left_idxs, right_idxs

    """
  Defines the information_gain funtion which takes as input
  y_parent : Parent node data for labels
  X_ft : the subset input data based on a feature index
  split_thresh : Specified split threshold to split the tree in left and right child nodes
  criterion : Criterion to calculate node impurity values, mean-squared-error or mean-absolute-error for regression tasks

  Returns : Information gain value, which is variance reduction for regression, as we move from a parent node to child nodes
  """

    def _variance_reduction(self, y_parent, X_ft, threshold, criterion):
        if criterion.lower() == "mse":
            # parent entropy
            parent_mse = self._mse_loss(y_parent)

            # create children
            left_idxs, right_idxs = self._split(X_ft, threshold)
            if len(left_idxs) == 0 or len(right_idxs) == 0:
                return 0

            # calculate the weighted avg. entropy of children
            n = len(y_parent)
            p_left, p_right = len(left_idxs) / n, len(right_idxs) / n
            e_left, e_right = self._mse_loss(y_parent[left_idxs]), self._mse_loss(
                y_parent[right_idxs]
            )

            child_mse = p_left * e_left + p_right * e_right

            # calculate the IG
            information_gain = parent_mse - child_mse

        elif criterion.lower() == "mae":
            # parent gini impurity
            parent_mae = self._mae_loss(y_parent)

            # create children
            left_idxs, right_idxs = self._split(X_ft, threshold)
            if len(left_idxs) == 0 or len(right_idxs) == 0:
                return 0

            # calculate the weighted avg. gini impurity of children
            n = len(y_parent)
            p_left, p_right = len(left_idxs) / n, len(right_idxs) / n
            g_left, g_right = self._mae_loss(y_parent[left_idxs]), self._mae_loss(
                y_parent[right_idxs]
            )

            child_mae = p_left * g_left + p_right * g_right

            # calculate the IG
            information_gain = parent_mae - child_mae

        else:
            information_gain = -1

        return information_gain

    """
  Defines the MSE_loss function which takes as input
  y_parent : data of label for a node

  Returns : Mean squared error of the node
  """

    def _mse_loss(self, y):
        n = len(y)
        m = np.mean(y)
        return np.sum(np.square(y - m)) / n

    """
  Defines the MAE_loss function which takes as input
  y_parent : data of label for a node

  Returns : Mean absolute error of the node
  """

    def _mae_loss(self, y):
        n = len(y)
        m = np.mean(y)
        return np.sum(np.abs(y - m)) / n

    """
  Defines the predict method of the Decision Tree Regressor which takes as input
  X : Test/ Valdiation dataset for features

  Returns : Array of Prediction classes after traversing through each observation or row in "X"
  """

    def predict(self, X):
        return np.array([self._traverse_dtree(x, self.root) for x in X])

    """
  Defines the recursive "_traverse_dtree" function which takes as input
  x : one single row/ observation of the test/validation data
  node : information of parent node to traverse down the tree to leaf
  """

    def _traverse_dtree(self, x, node):

        # Checks for leaf node, if satisfied returns the node output value, else keeps on traversing
        if node.is_leaf_node():
            return node.output

        # Checks if observation has the feature with value according to left child node (<= threshold) and keeps on traversing in the left splits
        if x[node.feature] <= node.threshold:
            return self._traverse_dtree(x, node.left)

        # Finally traverses on the right child node
        return self._traverse_dtree(x, node.right)

    """
  Defines the recursive "_traverse_dtree" function which takes as input
  x : one single row/ observation of the test/validation data
  node : information of parent node to traverse down the tree to leaf
  """

    def print_tree(self, tree=None, indent=" "):
        """function to print the tree"""
        if not tree:
            tree = self.root

        if tree.output is not None:
            print(tree.output)

        else:
            print(
                "X_" + str(tree.feature),
                "<=",
                tree.threshold,
                "?",
                round(tree.var_redn, 4),
            )
            print("%sleft:" % (indent), end="")
            self.print_tree(tree.left, indent + indent)
            print("%sright:" % (indent), end="")
            self.print_tree(tree.right, indent + indent)

            # # 3. Record the subtree
            # condition = "X_"+ str(tree.feature) + "<=" + str(tree.threshold)
            # result_tree = {condition: []}

            # result_tree[condition].append(self.print_tree(tree.left))
            # result_tree[condition].append(self.print_tree(tree.right))

    # def to_dot(self):
    #       dot_data = export_graphviz(self.train, out_file=None,
    #                                  feature_names=self.feature_names,
    #                                  class_names=self.class_names,
    #                                  filled=True, rounded=True,
    #                                  special_characters=True)
    #       return dot_data

    # def plot_tree(self):
    #       dot_data = self.to_dot()
    #       graph = graphviz.Source(dot_data)
    #       display(graph)
