In [1]:
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.metrics import mean_squared_error
from sklearn.tree import DecisionTreeRegressor
import numpy as np
import matplotlib.pyplot as plt
from sklearn.utils import shuffle


In [2]:
from sklearn.datasets import load_boston
boston = load_boston()

In [3]:
class CustomRandomForestRegressor:

    def __init__(self, n_estimators=10, max_features=1.0, random_state=42):
        self.n_estimators = n_estimators
        self.max_features = max_features
        self.random_state = random_state

        np.random.seed(self.random_state)

        self.models = np.empty((self.n_estimators), dtype=object)
        self.base_learner = DecisionTreeRegressor

    def fit(self, X, y):
        samples_number, features_number = X.shape
        subsample_size = samples_number
        subsample_features_number = self.max_features * features_number
        for i in range(self.n_estimators):
            subsample_index = np.random.choice(
                np.arange(samples_number), size=subsample_size, replace=True)
            X_subsample, y_subsample = X[subsample_index], y[subsample_index]
            self.models[i] = self.base_learner()
            self.models[i].fit(X_subsample, y_subsample)

    def predict(self, X):

        return np.mean([model.predict(X) for model in self.models], axis=0)

In [4]:
class CustomGradientBoostingRegressor:

    def __init__(self, learning_rate=0.1, n_estimators=100, max_depth=3, criterion='mse', random_state=42):
        self.learning_rate = learning_rate
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.criterion = criterion
        self.random_state = random_state

        np.random.seed(self.random_state)

        self.models = np.empty((self.n_estimators), dtype=object)
        self.base_learner = DecisionTreeRegressor

    def fit(self, X, y):
        residuals = np.copy(y)

        for i in range(self.n_estimators):

            self.models[i] = self.base_learner()
            self.models[i].fit(X, residuals)
            residuals = residuals - \
                self.learning_rate * self.models[i].predict(X)

    def predict(self, X):

        return self.learning_rate * np.sum([model.predict(X) for model in self.models], axis=0)

    def staged_predict(self, X):
        return self.learning_rate * np.cumsum([model.predict(X) for model in self.models], axis=0)

In [5]:
class iGBRT:

    def __init__(
        self, implementation='sklearn', initializer_n_estimators=10, initializer_max_features=1.0,
                 booster_learning_rate=0.1, booster_n_estimators=100, booster_max_depth=3, random_state=42):
        self.implementation = implementation

        self.initializer_n_estimators = initializer_n_estimators
        self.initializer_max_features = initializer_max_features

        self.booster_learning_rate = booster_learning_rate
        self.booster_n_estimators = booster_n_estimators
        self.booster_max_depth = booster_max_depth

        self.random_state = random_state

        if implementation == 'sklearn':
            self.initializer = RandomForestRegressor
            self.booster = GradientBoostingRegressor
        if implementation == 'custom':
            self.initializer = CustomRandomForestRegressor
            self.booster = CustomGradientBoostingRegressor

        self.initializer = self.initializer(
            n_estimators=self.initializer_n_estimators, max_features=self.initializer_max_features,
                                           random_state=self.random_state)
        self.booster = self.booster(
            learning_rate=self.booster_learning_rate, n_estimators=self.booster_n_estimators, max_depth=self.booster_max_depth,
            criterion='mse', random_state=self.random_state)

        self.train_score_ = np.empty(
            (self.booster_n_estimators), dtype=float)  # float?

    def fit(self, X, y):
        self.initializer.fit(X, y)
        r = y - self.initializer.predict(X)
        self.booster.fit(X, r)

        predictions = self.staged_predict(X)
        for i in range(self.booster_n_estimators):
            self.train_score_[i] = mean_squared_error(predictions[i], y)

    def predict(self, X):
        return self.initializer.predict(X) + self.booster.predict(X)

    def staged_predict(self, X):
        if self.implementation == 'sklearn':
            return self.initializer.predict(X) + np.array(list(self.booster.staged_predict(X)))
        if self.implementation == 'custom':
            return self.initializer.predict(X) + self.booster.staged_predict(X)

In [6]:
from sklearn.datasets import load_svmlight_file
X_train, y_train = load_svmlight_file('data/reg.train.txt')
X_test, y_test = load_svmlight_file('data/reg.test.txt')
dataset_name = 'spam'

In [7]:
def plot_error(X_train, y_train, X_test, y_test, dataset_name): 
    
    custom_model = iGBRT(implementation='custom', initializer_n_estimators=100, random_state=1234)
    sklearn_model = iGBRT(implementation='sklearn', initializer_n_estimators=100, random_state=1234)

    for model in [custom_model, sklearn_model]:
        model.fit(X_train, y_train)
        mse = mean_squared_error(y_test, model.predict(X_test))
    print("MSE: %.4f" % mse)

    custom_plot_data = {'fill_color': 'c', 'line_color':
                    'b', 'model': custom_model, 'name': 'custom'}
    sklearn_plot_data = {'fill_color': 'y', 'line_color':
                     'g', 'model': sklearn_model, 'name': 'sklearn'}

    for plot_data in [custom_plot_data, sklearn_plot_data]:

        plot_data['train_score'] = plot_data['model'].train_score_

        plot_data['test_score'] = np.zeros(
            (plot_data['model'].booster_n_estimators,), dtype=np.float64)

        for i, y_pred in enumerate(plot_data['model'].staged_predict(X_test)):
            plot_data['test_score'][i] = mean_squared_error(y_test, y_pred)

    plt.figure(figsize=(40, 20))
    plt.subplot(1, 2, 1)
    plt.title(dataset_name +' MSE')

    for plot_data in [custom_plot_data, sklearn_plot_data]:
        plt.fill_between(
            np.arange(plot_data['model'].booster_n_estimators) +
                  1, plot_data['train_score'] * 0.97,
                     plot_data['train_score'] * 1.03, alpha=0.5,
                     color=plot_data['fill_color'])
        plt.plot(
            np.arange(model.booster_n_estimators) + 1, plot_data[
                'train_score'], plot_data['line_color'] + '-.',
                label=plot_data['name'] + ' train error')

        plt.fill_between(
            np.arange(plot_data['model'].booster_n_estimators) +
                  1, plot_data['test_score'] * 0.97,
                     plot_data['test_score'] * 1.03, alpha=0.5,
                     color=plot_data['fill_color'])
        plt.plot(
            np.arange(plot_data['model'].booster_n_estimators) +
                  1, plot_data['test_score'], plot_data['line_color'] + ':',
             label=plot_data['name'] + ' test error')
    plt.legend(loc='upper right')
    plt.xlabel('Boosting Iterations')
    plt.ylabel('MSE')
    plt.savefig('plot.png')
    return custom_model, sklearn_model

In [None]:
custom_model, sklearn_model = plot_error(X_train, y_train, X_test, y_test, dataset_name)