In [1]:
from sklearn import datasets 
import matplotlib.pyplot as plt 
import seaborn as sns
import math
import numpy as np
from scipy import stats
from tqdm import tqdm

data = datasets.load_iris()
data.keys()

dict_keys(['data', 'target', 'target_names', 'DESCR', 'feature_names', 'filename'])

In [2]:
X,Y = data['data'],data['target']

In [3]:
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score

model = GradientBoostingClassifier(n_estimators=10)
model.fit(X,Y)
print(accuracy_score(Y,model.predict(X)))


0.9933333333333333


## Decision Tree Regressor Modified to handle One-hot-encoded labels


note: for multi-class problems the labels need to be convered to one-hot encoding 

In [4]:
class DecisionNode:
    
    def __init__(self,feature = None,
                     entropy = None,threshold = None,
                     value = None,right=None,left=None):   
            
            self.entropy = entropy
            
            self.feature = feature
             
            self.right = right
            
            self.left = left
            
            self.threshold = threshold
            
            self.value = value

# function to find feature to split on 

def divide_on_feature(X, feature_i, threshold):

    split_func = None
    if isinstance(threshold, int) or isinstance(threshold, float):
        split_func = lambda sample: sample[feature_i] >= threshold
    else:
        split_func = lambda sample: sample[feature_i] == threshold

    X_1 = np.array([sample for sample in X if split_func(sample)])
    X_2 = np.array([sample for sample in X if not split_func(sample)])

    return np.array([X_1, X_2])



class DecisionTreeRegressior:
    
    
    def __init__(self, min_samples_split=3, min_impurity=1e-7,
                 max_depth= 5,criterian = 'mse'):
        
        #contains the root of the tree 
        self.root = None         
        self.criterian = criterian
        #stopping conditions 
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_impurity = min_impurity
        
        
        
        
    def fit(self,X,y):

        self.root  = self._build_tree(X,y)
        
    
        
    def _build_tree(self, X, y, current_depth=0):
 
        largest_impurity = 0
        best_criteria = None    # Feature index and threshold
        best_sets = None     # Subsets of the data
        right = None 
        left= None
        selected_feature = None
        selected_threshold = None
        
        # Check if expansion of y is needed
        if len(np.shape(y)) == 1:
            y = np.expand_dims(y, axis=1)

        # Add y as last column of X
        Xy = np.concatenate((X, y), axis=1)

        n_samples, n_features = np.shape(X)

        
        if n_samples >= self.min_samples_split and self.max_depth >= current_depth:
            
#             print(current_depth)
            
            # Calculate the impurity for each feature
            for feature_i in range(n_features):
                
                # All values of feature_i
                feature_values = np.expand_dims(X[:, feature_i], axis=1)
                unique_values = np.unique(feature_values)

                # Iterate through all unique values of feature column i and
                # calculate the impurity
                for threshold in unique_values:
                    # Divide X and y depending on if the feature value of X at index feature_i
                    # meets the threshold
                    Xy1, Xy2 = divide_on_feature(Xy, feature_i, threshold)

                    if len(Xy1) > 0 and len(Xy2) > 0:
                        # Select the y-values of the two sets
                        y1 = Xy1[:,n_features:]
                        y2 = Xy2[:,n_features:]

                        # Calculate impurity
                        
#                         impurity = self.variance_reduction(y, y1, y2)
                        impurity = self.variance_reduction(y, y1, y2)
#                      

                        # If this threshold resulted in a higher information gain than previously
                        # recorded save the threshold value and the feature
                        # index
                        if impurity > largest_impurity:
                            largest_impurity = impurity
                            selected_feature = feature_i
                            selected_threshold = threshold 
                            right,left = Xy2,Xy1
    
    
        if largest_impurity > self.min_impurity:
            # Build subtrees for the right and left branches
            left = self._build_tree(left[:,:n_features], left[:,n_features:], current_depth = current_depth + 1)
            right = self._build_tree(right[:,:n_features], right[:,n_features:], current_depth = current_depth + 1)
            return DecisionNode(feature=selected_feature, threshold=selected_threshold, left=left, right=right)

        # We're at leaf => determine value
        leaf_value = self.mean_leaf(y)

        return DecisionNode(value=leaf_value)
    
    
    def mean_leaf(self,y):
        
        value =  np.mean(y,axis = 0)
        return value if len(value) > 1 else value[0]
    
    

    def variance_reduction(self,y,y1,y2):
    
        frac1 = len(y1)/ len(y)
        frac2 = len(y2)/ len(y)
        return np.sum(np.var(y,axis = 0) - frac1 * np.var(y1,axis = 0) - frac2 * np.var(y2,axis=0))

     

    def predict_value(self, x, tree=None):
        

        if tree is None:
            tree = self.root

        if tree.value is not None:
            return tree.value
#         print(tree.feature)
        feature_value = x[tree.feature]

        # Determine if we will follow left or right branch
        branch = tree.right
        if isinstance(feature_value, int) or isinstance(feature_value, float):
            if feature_value >= tree.threshold:
                branch = tree.left
        elif feature_value == tree.threshold:
            branch = tree.left


        # Test subtree
        return self.predict_value(x, branch)

        
    def predict(self,X):
        
        y_pred = [self.predict_value(sample) for sample in X]
         
        return y_pred
        
      
# m  = DecisionTreeRegressior()        
# m.fit(X,Y)
# r2_score(Y,m.predict(X))

The gradient boosting model works in a sequential manner where the next model will take in 
the previous models prediction as its ground throuth and try to reduce the error.


In [83]:
class Loss(object):
    def loss(self, y_true, y_pred):
        return NotImplementedError()

    def gradient(self, y, y_pred):
        raise NotImplementedError()

    def acc(self, y, y_pred):
        return 0

class SquareLoss(Loss):
    def __init__(self): pass

    def loss(self, y, y_pred):
        return 0.5 * np.power((y - y_pred), 2)

    def gradient(self, y, y_pred):
        return -(y - y_pred)



In [6]:
class CrossEntropy():


    def loss(self, y, p):
        # Avoid division by zero
        p = np.clip(p, 1e-15, 1 - 1e-15)
        return - y * np.log(p) - (1 - y) * np.log(1 - p)

    def acc(self, y, p):
        return accuracy_score(np.argmax(y, axis=1), np.argmax(p, axis=1))

    def gradient(self, y, p):
        # Avoid division by zero
        p = np.clip(p, 1e-15, 1 - 1e-15)
        return - (y / p) + (1 - y) / (1 - p)


In [19]:
a = np.array([1,2,3])

func = lambda x : np.where(a == x)[0][0]
func(1)

0

In [22]:
def to_categorical(x):
    
    unique_vals = np.unique(x)
    func = lambda x : np.where(unique_vals == x)[0][0]
    
    X_ = np.zeros((x.shape[0],len(unique_vals)))
    
    for index,val in enumerate(x):
        
        idx = func(val)
        X_[index,idx] = 1
    
    return X_
    

class GradientBoostingClassifier:
   
    def __init__(self, n_estimators = 5, learning_rate = 0.1, min_samples_split=2,
                 min_impurity = 1e-7, max_depth = 3):
        
        
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.min_samples_split = min_samples_split
        self.min_impurity = min_impurity
        self.max_depth = max_depth
        self.loss = CrossEntropy()

        self.trees = []
        for _ in range(n_estimators):
            tree = DecisionTreeRegressior( min_samples_split,
                 min_impurity, max_depth)
            self.trees.append(tree)


    def fit(self, X, y):
        y = to_categorical(y)
        y_pred = np.full(np.shape(y), np.mean(y, axis=0))
#         print(y_pred.shape)
        for i in range(self.n_estimators):
        
            gradient = self.loss.gradient(y, y_pred)
            
#             print(gradient.shape)
            self.trees[i].fit(X, gradient)
            update = self.trees[i].predict(X)
        
            # Update y prediction
#             print(y_pred.shape,np.asarray(update).shape)
            y_pred -= np.multiply(self.learning_rate, update)


    def predict(self, X):
        y_pred = np.array([])
        # Make predictions
        for tree in self.trees:
            update = tree.predict(X)
            update = np.multiply(self.learning_rate, update)
            y_pred = -update if not y_pred.any() else y_pred - update

        y_pred = np.exp(y_pred) / np.expand_dims(np.sum(np.exp(y_pred), axis=1), axis=1)
            
        y_pred = np.argmax(y_pred, axis=1)
        
        return y_pred



    
model = GradientBoostingClassifier()        
model.fit(X,Y)
accuracy_score(Y,model.predict(X))
        

1.0

In [23]:
model.predict(X)

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2], dtype=int64)