In [106]:
import math
import numpy as np 
import pandas as pd
from collections import defaultdict

In [107]:
class XGBoostRegression:
    
    def __init__(self, params={}, random_seed=None):
        
        self.params = defaultdict(lambda: None, params)
        self.verbose = self.params['verbose'] or False
        self.n_estimators = self.params['n_estimators'] or 50
        self.subsample = self.params['subsample'] or 1.0
        self.learning_rate = self.params['learning_rate'] or 0.3
        self.base_prediction = params['base_score'] or 0.5
        self.max_depth = self.params['max_depth'] or 5
        self.rng = np.random.default_rng(seed=random_seed)
        
    def fit(self, X, y):
        current_predictions = self.base_prediction * np.ones(shape=y.shape)
        self.boosters = []
        objective = self.Objective()
        for i in range(self.n_estimators):
            gradients = objective.gradient(y, current_predictions)
            hessian = objective.hessian(y, current_predictions)
            sample_idxs = None if self.subsample == 1.0 \
                else self.rng.choice(len(y),
                    size=math.floor(len(y)*self.subsample),
                    replace=False)
            booster = TreeBooster(X, gradients, hessian, self.params,
                        self.max_depth, sample_idxs)
            current_predictions = self.learning_rate * booster.predict(X)
            self.boosters.append(booster)
            
            if self.verbose:
                print(f'[{i}] train loss = {objective.loss(y, current_predictions)}')
    
    def predict(self, X):
        return (self.base_prediction + self.learning_rate
            * np.sum([booster.predict(X) for booster in self.boosters], axis=0))
        
    class Objective:
        def loss(self, y, pred): 
            return np.mean((y - pred)**2)
        def gradient(self, y, pred): 
            return pred - y
        def hessian(self, y, pred): 
            return np.ones(len(y))
    

In [108]:
class TreeBooster:
    
    def __init__(self, X, gradients, hessian, params, max_depth, sample_idxs=None):
        self.params = params
        self.max_depth = max_depth
        assert max_depth >= 0, 'max_depth must be nonnegative'
        self.min_child_weight = self.params['min_child_weight'] or 1.0
        self.reg_lambda = self.params['reg_lambda'] or 1.0
        self.gamma = self.params['gamma'] or 0.0
        self.colsample_bynode = self.params['colsample_bynode'] or 1.0
        if isinstance(gradients, pd.Series): 
            gradients = gradients.values
        if isinstance(hessian, pd.Series): 
            hessian = hessian.values
        if sample_idxs is None:
            sample_idxs = np.arange(len(gradients))
        self.X, self.gradients, self.hessian, self.sample_idxs = X, gradients, hessian, sample_idxs
        self.n, self.c = len(self.sample_idxs), X.shape[1]
        self.value = -gradients[sample_idxs].sum() / \
            (hessian[sample_idxs].sum() + self.reg_lambda)
        self.current_best_score = 0.0
        if self.max_depth > 0:
            self.maybe_insert_child_nodes()
                
    def maybe_insert_child_nodes(self):
        for i in range(self.c):
            self.find_best_split(i)
        if self.is_leaf:
            return
        x = self.X.values[self.sample_idxs, self.split_feature_idx]
        left_idx = np.nonzero(x <= self.threshold)[0]
        right_idx = np.nonzero(x > self.threshold)[0]
        self.left = TreeBooster(self.X, self.gradients, self.hessian,
            self.params, self.max_depth - 1, self.sample_idxs[left_idx])
        self.right = TreeBooster(self.X, self.gradients, self.hessian, 
            self.params, self.max_depth - 1, self.sample_idxs[right_idx])

    @property
    def is_leaf(self):
        return self.current_best_score == 0.0

    def find_best_split(self, feature_idx):
        x = self.X.values[self.sample_idxs, feature_idx]
        gradients, hessian = self.gradients[self.sample_idxs], self.hessian[self.sample_idxs]
        sort_idx = np.argsort(x)
        sort_gradients, sort_hessian, sort_x = gradients[sort_idx], hessian[sort_idx], x[sort_idx]
        sum_gradients, sum_hessian = gradients.sum(), hessian.sum()
        sum_gradients_right, sum_hessian_right = sum_gradients, sum_hessian
        sum_gradients_left, sum_hessian_left = 0.0, 0.0

        for i in range(self.n - 1):
            gradients_i, hessian_i, x_i, x_i_next = sort_gradients[i], \
            sort_hessian[i], sort_x[i], sort_x[i+1]
            sum_gradients_left += gradients_i
            sum_gradients_right -= gradients_i
            sum_hessian_left += hessian_i
            sum_hessian_right -= hessian_i
            if sum_hessian_left < self.min_child_weight or x_i == x_i_next:continue
            if sum_hessian_right < self.min_child_weight: break

            gain = 0.5 * ((sum_gradients_left**2 / (sum_hessian_left + self.reg_lambda))
                            + (sum_gradients_right**2 / (sum_hessian_right + self.reg_lambda))
                            - (sum_gradients**2 / (sum_hessian + self.reg_lambda))
                            ) - self.gamma/2 # Eq(7) in the xgboost paper
            if gain > self.current_best_score: 
                self.split_feature_idx = feature_idx
                self.current_best_score = gain
                self.threshold = (x_i + x_i_next) / 2

    def predict(self, X):
        return np.array([self._predict_row(row) for i, row in X.iterrows()])

    def _predict_row(self, row):
        if self.is_leaf: 
            return self.value
        child = self.left if row[self.split_feature_idx] <= self.threshold \
            else self.right
        return child._predict_row(row)

In [109]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
    
X, y = fetch_california_housing(as_frame=True, return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, 
                                                    random_state=43)

In [110]:
import xgboost as xgb

params = {
    'n_estimators': 50,
    'learning_rate': 0.1,
    'max_depth': 5,
    'subsample': 0.8,
    'reg_lambda': 1.5,
    'gamma': 0.0,
    'min_child_weight': 25,
    'base_score': 0.0,
    'tree_method': 'exact',
}

num_boost_round = 50
# train the from-scratch XGBoost model
model_scratch = XGBoostRegression(params, random_seed=42)
model_scratch.fit(X_train, y_train)

# train the library XGBoost model
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)
model_xgb = xgb.train(params, dtrain, num_boost_round)

Parameters: { "n_estimators" } might not be used.

  This could be a false alarm, with some parameters getting used by language bindings but
  then being mistakenly passed down to XGBoost core, or some parameter actually being used
  but getting flagged wrongly here. Please open an issue if you find any such cases.




In [111]:
class SquaredErrorObjective():
    def loss(self, y, pred): return np.mean((y - pred)**2)
    def gradient(self, y, pred): return pred - y
    def hessian(self, y, pred): return np.ones(len(y))

In [112]:
pred_scratch = model_scratch.predict(X_test)
pred_xgb = model_xgb.predict(dtest)
print(f'scratch score: {SquaredErrorObjective().loss(y_test, pred_scratch)}')
print(f'xgboost score: {SquaredErrorObjective().loss(y_test, pred_xgb)}')

scratch score: 70.9466429657219
xgboost score: 0.24123239765807963
