In [14]:
import numpy as np
import pandas as pd 
import matplotlib.pyplot as plt
from scipy.special import expit

from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier

In [47]:
def fcnCalculateMSE(y):
    if len(y) == 0:
        return 0
    else:
        yPred = np.mean(y)
        return np.mean((y - yPred) ** 2)
    
class Node:

    def __init__(self, feature = None, threshold = None, left = None, right = None, *, value = None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value
        
    def is_leaf_node(self):
        return self.value is not None
    
class DecisionTreeRegressor:
    
    def __init__(self, min_sample_split = 2, max_depth = 100, n_feats = None, threshold_for_category = 10):
        self.min_sample_split = min_sample_split
        self.max_depth = max_depth
        self.n_feats = n_feats
        self.root = None
        self.FeatureType = None
        self.threshold_for_category = threshold_for_category
        self.y = None
        
    def _label_categorical_features(self, y):
        yUnique = np.unique(y)
        Y = np.zeros(len(y) , dtype = int)
        for intCtr in range(0, len(y)):
            Y[intCtr] = np.argwhere(y[intCtr] == yUnique)
        return Y

    
    def _determine_feature_type(self, X):
        lstFeatureType = []
        for intFeatureIterator in range(0, X.shape[1]):
            if len(np.unique(X[:, intFeatureIterator])) <= self.threshold_for_category or isinstance(np.unique(X[:, intFeatureIterator]), str):
                lstFeatureType.append("Categorical")
            else:
                lstFeatureType.append("Continuous")
        
        return lstFeatureType
        
    def fit(self, X, y, p):
        self.n_feats = X.shape[1] if not self.n_feats else min(X.shape[1], self.n_feats)
        self.FeatureType = self._determine_feature_type(X)
        self.y = y
        self.root = self._grow_tree(X, y, p)
        
    def _grow_tree(self, X, y, p, depth = 0):
        n_samples, n_features = X.shape
        n_labels = len(np.unique(y))
        
        if (depth >= self.max_depth or n_labels == 1 or n_samples < self.min_sample_split):
            if np.sum(p) == 0:
                leaf_value = self._mean(y)
            else:
                leaf_value = np.sum(y) / np.sum(p*(1-p))
            return Node(value = leaf_value)
        
        feat_idxs = np.random.choice(n_features, self.n_feats, replace = False)
        
        best_feat, best_thresh = self._best_criteria(X, y, feat_idxs)
        left_idxs, right_idxs = self._Split(X[:, best_feat], best_thresh, self.FeatureType[best_feat])
        left = self._grow_tree(X[left_idxs, :], y[left_idxs], p[left_idxs] , depth+1)
        right = self._grow_tree(X[right_idxs, :], y[right_idxs], p[right_idxs], depth+1)
        
        return Node(best_feat, best_thresh, left, right)       
    
    def _best_criteria(self, X, y, feat_idxs):
        best_gain = -1
        split_idx, split_thresh = None, None
        for feat_idx in feat_idxs:
            X_column = X[:, feat_idx]
            feature_type = self.FeatureType[feat_idx]
            thresholds = np.unique(X_column)
            for threshold in thresholds:
                gain = self._information_gain(self.y, X_column, threshold, feature_type)
                if gain > best_gain:
                    best_gain = gain
                    split_idx = feat_idx
                    split_thresh = threshold
        return split_idx, split_thresh
    
  
    def _information_gain(self, y, X_column, split_thresh, feature_type):
        parent_entropy = fcnCalculateMSE(y)
        
        left_idxs, right_idxs = self._Split(X_column, split_thresh, feature_type)
        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0
        n = len(y)
        n_l, n_r = len(left_idxs), len(right_idxs)
        e_l, e_r = fcnCalculateMSE(y[left_idxs]), fcnCalculateMSE(y[right_idxs])
        child_entropy = (n_l/n) * e_l + (n_r/n) * e_r
        ig = parent_entropy - child_entropy
        return ig
         
    
    def _Split(self, X_column, split_thresh, feature_type):
        if feature_type == "Continuous":
            left_idxs = np.argwhere(X_column <= split_thresh).flatten()
            right_idxs = np.argwhere(X_column > split_thresh).flatten()
        else:
            left_idxs = np.argwhere(X_column == split_thresh).flatten()
            right_idxs = np.argwhere(X_column != split_thresh).flatten()            
        return left_idxs, right_idxs
        
    def _mean(self, y):
        return np.mean(y)

    def predict(self, X):
        return np.array([self._traverse_tree(x, self.root) for x in X])
    
    def _traverse_tree(self, X, node):
        if node.is_leaf_node():
            return node.value

        if self.FeatureType[node.feature] == "Categorical":
            if X[node.feature] == node.threshold:
                return self._traverse_tree(X, node.left)
            return self._traverse_tree(X, node.right)        
        else:
            if X[node.feature] <= node.threshold:
                return self._traverse_tree(X, node.left)

            return self._traverse_tree(X, node.right)

In [48]:
class GradientBoosting:
    
    def __init__(self, n_estimators = 100, learning_rate = 0.1, loss_fn = 'classification', tolerance_level = 0.5):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.tolerance_level = tolerance_level
        if loss_fn == 'regression':
            self.loss_fn = 'regression'
        elif loss_fn == 'classification':
            self.loss_fn = 'classification' 
        else:
            raise AttributeError('Unknown loss function: {}'.format(loss_fn))
        self._estimator = None 

        """
        #loss_fn can be one of:
        - 'mse': Regression with Mean Square Error
        - 'log': Binary Classification with Logistic loss.
        
        #tolerance_level used in binary classification to convert probabilities to classes
        """
    def _mse_gradient(self, y, yp):
        return -(y - yp)
    
    def _logistic_gradient(self, y, yp):
        return expit(yp) - y 
    
    def fit(self, X, y):
        
        if self.loss_fn == 'regression': 
            self._estimators = [np.mean(y)]
            fx = np.full(len(y), np.mean(y))
        else:
            self._estimators = [np.log(np.sum(y, where = 1)/(len(y)-np.sum(y, where = 1)))]
            fx = np.full(len(y), np.log(np.sum(y, where = 1)/(len(y)-np.sum(y, where = 1))))
            
        for m_trees in range(1, self.n_estimators + 1):
            reg_tree = DecisionTreeRegressor()
            if self.loss_fn == 'regression': 
                psuedo_residuals = -self._mse_gradient(y, fx)
                reg_tree.fit(X, psuedo_residuals, np.zeros(y))
            else:
                psuedo_residuals = -self._logistic_gradient(y, fx)
                reg_tree.fit(X, psuedo_residuals, expit(fx))
            
            fxm = reg_tree.predict(X)
            fx = fx + self.learning_rate * fxm
            
            self._estimators.append(reg_tree)
    
    def predict(self, X):
        if self.loss_fn == 'regression':
            return self._predict_reg(X)
        else:
            return self._predict_class(X)
        
    def _predict_reg(self, X):
        f0x = self._estimators[0]
        y_pred = np.sum([self.learning_rate * tree.predict(X) for tree in self._estimators[1:]], axis = 0)
        
        return y_pred + f0x
    
    def _proba_to_class(self, sample):
        return int(sample > self.tolerance_level)

    def _predict_class(self, X):
        predicted_probas = expit(self._predict_reg(X))
        return np.array([self._proba_to_class(sample) for sample in predicted_probas])

def fcnAccuracy(y_true, y_pred):
    accuracy = np.sum(y_true == y_pred) / len(y_true)
    return accuracy  

In [18]:
data = datasets.load_breast_cancer()
X = data.data
y = data.target

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=3)

In [51]:
dt = GradientBoosting(n_estimators = 5, loss_fn = 'classification')
dt.fit(X_train, y_train)
    
y_pred = dt.predict(X_test)
accuracy = fcnAccuracy(y_test, y_pred)

print ("Accuracy:", accuracy)

Accuracy: 0.9370629370629371


In [23]:
dt2 = GradientBoostingClassifier(n_estimators=5)
dt2.fit(X_train, y_train)
    
y_pred2 = dt2.predict(X_test)
accuracy2 = fcnAccuracy(y_test, y_pred2)

print ("Accuracy:", accuracy2)

Accuracy: 0.9440559440559441
