In [1]:
import numpy as np


class Decision_Tree_Regression(object):
    def __init__(self, max_depth=5, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.root = None

    def fit(self, X, y):
        self.root = self._build_tree(X, y)

    def _build_tree(self, X, y, depth=0):
        num_samples, num_features = np.shape(X)
        # min_samples_split: minimum number of samples required to split an internal node
        if num_samples >= self.min_samples_split and depth <= self.max_depth:
            best_idx, best_threshold = self._best_criteria(X, y)
            left_idx, right_idx = self._split(X[:, best_idx], best_threshold)
            left = self._build_tree(X[left_idx, :], y[left_idx], depth + 1)
            right = self._build_tree(X[right_idx, :], y[right_idx], depth + 1)
            return Node(best_idx, best_threshold, left, right)
        leaf_value = self._leaf_value(y)
        return Node(value=leaf_value)

    def _best_criteria(self, X, y):
        num_features = np.shape(X)[1]
        best_idx, best_threshold = None, None
        min_impurity = float('inf')
        for idx in range(num_features):
            thresholds = np.unique(X[:, idx])
            for threshold in thresholds:
                impurity = self._impurity(y, threshold, X[:, idx])
                if impurity <= min_impurity:
                    min_impurity = impurity
                    best_idx = idx
                    best_threshold = threshold
        return best_idx, best_threshold

    def _impurity(self, y, threshold, feature):
        left_idx, right_idx = self._split(feature, threshold) # indices of the left and right subtree
        if len(left_idx) == 0 or len(right_idx) == 0:
            return float('inf')
        left = y[left_idx]
        right = y[right_idx]
        p = len(left) / len(y) # proportion of the left subtree
        impurity = p * self._gini(left) + (1 - p) * self._gini(right)
        return impurity

    def _gini(self, y):
        _, counts = np.unique(y, return_counts=True)
        p = counts / len(y)
        return 1 - np.sum(np.square(p)) # gini impurity

    def _split(self, feature, threshold):
        left_idx = np.argwhere(feature <= threshold).flatten() # indices of the left subtree
        right_idx = np.argwhere(feature > threshold).flatten() # indices of the right subtree
        return left_idx, right_idx

    def _leaf_value(self, y):
        leaf_value = np.mean(y) # mean of the target values
        return leaf_value # return the mean of the target values

    def predict(self, X):
        # traverse the tree for each sample
        return np.array([self._traverse_tree(x, self.root) for x in X])

    def _traverse_tree(self, x, node):
        if node.value is not None:  # leaf node
            return node.value  # return the value of the leaf node
        feature_value = x[node.idx]  # get the value of the feature
        if feature_value <= node.threshold:  # go left
            # traverse the left subtree
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)  # traverse the right subtree


class Node(object):
    def __init__(self, idx=None, threshold=None, left=None, right=None, value=None):
        self.idx = idx
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

    def __repr__(self):
        return "%d" % self.idx
        # "Node(idx={}, threshold={}, value={})".format(self.idx, self.threshold, self.value)

    def __str__(self):
        return self.__repr__()

# Path: Decision_Tree_Regression.ipynb


def mean_squared_error(y_true, y_pred):
    return np.mean(np.square(y_true - y_pred))


In [2]:
import pickle as pkl

with open('data.pkl', 'rb') as f:
    X = pkl.load(f)
with open('labels.pkl', 'rb') as f:
    y = pkl.load(f)

In [None]:
net = Decision_Tree_Regression(max_depth=10, min_samples_split=5)
net.fit(X, y)

In [None]:
y_test = net.predict(X)

In [None]:
mean_squared_error(y, y_test)

In [None]:
def print_tree(root_node,):
    print(root_node)
    if root_node.left is not None:
        print_tree(root_node.left)
    if root_node.right is not None:
        print_tree(root_node.right)

print_tree(net.tree)

In [3]:
class XGBoost(object):
    def __init__(self, n_estimators=100, learning_rate=0.1, max_depth=3):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.trees = []

    def fit(self, X, y):
        y_pred = np.zeros(np.shape(y))
        for i in range(self.n_estimators):
            tree = Decision_Tree_Regression(max_depth=self.max_depth)
            # get the residuals
            residuals = -self.gradient(y, y_pred)
            # fit the tree to the negative gradient
            tree.fit(X, residuals)
            self.trees.append(tree)
            # update the predictions
            update_prediction = tree.predict(X)
            y_pred -= np.multiply(self.learning_rate, update_prediction)

    def predict(self, X):
        y_pred = np.zeros(np.shape(X)[0])
        for tree in self.trees:
            update_prediction = tree.predict(X)
            y_pred -= np.multiply(self.learning_rate, update_prediction)
        return y_pred

    def gradient(self, y, y_pred):
        return y - y_pred

    def loss(self, y, y_pred):
        return np.mean(np.square(y - y_pred))

In [5]:
model = XGBoost(n_estimators=10, learning_rate=0.1, max_depth=5)
model.fit(X, y)
y_pred = model.predict(X)
model.loss(y, y_pred)

736.5855887974029

In [None]:
model