In [20]:
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris,load_boston
from sklearn.metrics import accuracy_score,r2_score
import numpy as np
import math

In [21]:
class Decision_node():
    def __init__(self,feature_i=None,threshold=None,value=None,true_branch=None,false_branch=None):
        self.feature_i=feature_i
        self.threshold=threshold
        self.value=value
        self.true_branch=true_branch
        self.false_branch=false_branch
class Decision_tree(object):
    def __init__(self,min_samples_split=2,max_depth=float("inf"),min_impurity=1e-7,loss=None):
        self.min_samples_split=min_samples_split
        self.max_depth=max_depth
        self.min_impurity=min_impurity
        self.loss=loss
        self.impurity_calculation=None
        self.leaf_calculation=None
        self.one_dim=None
    def fit(self,X,y,loss=None):
        self.root=self.create_tree(X,y)
        self.loss=None
    def create_tree(self,X,y,current_depth=0):
        max_impurity=0
        best_criteria=None
        best_sets=None
        if len(np.shape(y)) == 1:
            y = np.expand_dims(y, axis=1)

        # Add y as last column of X
        Xy = np.concatenate((X, y), axis=1)
        samples,features=np.shape(X)
        if(samples>self.min_samples_split and current_depth<=self.max_depth):
            for feature_i in range(features):
                feature_values=np.expand_dims(X[:,feature_i],axis=1)
                unique_values=np.unique(feature_values)
                for threshold in unique_values:
                    x1,x2=divide_on_feature(Xy,feature_i,threshold)
                    if(len(x1)>0 and len(x2)>0):
                        y1=x1[:,features:]
                        y2=x2[:,features:]
                        impurity=self.impurity_calculation(y,y1,y2)
                        if(impurity>max_impurity):
                            max_impurity=impurity
                            best_criteria={"feature_i":feature_i,"threshold":threshold}
                            best_sets={"left_X":x1[:,:features],
                                      "left_y":x1[:,features:],
                                      "rigth_X":x2[:,:features],
                                      "right_y":x2[:,features:]}
            
        if(max_impurity>self.min_impurity):
            true_branch=self.create_tree(best_sets["left_X"],best_sets["left_y"],current_depth+1)
            false_branch=self.create_tree(best_sets["rigth_X"],best_sets["right_y"],current_depth+1)
            return Decision_node(feature_i=best_criteria["feature_i"],threshold=best_criteria["threshold"],true_branch=true_branch,false_branch=false_branch)
        leaf_value=self.leaf_calculation(y)
        return Decision_node(value=leaf_value)
    

    def predict_value(self, x, tree=None):
        """ Do a recursive search down the tree and make a prediction of the data sample by the
            value of the leaf that we end up at """

        if tree is None:
            tree = self.root

        # If we have a value (i.e we're at a leaf) => return value as the prediction
        if tree.value is not None:
            return tree.value

        # Choose the feature that we will test
        feature_value = x[tree.feature_i]

        # Determine if we will follow left or right branch
        branch = tree.false_branch
        if isinstance(feature_value, int) or isinstance(feature_value, float):
            if feature_value >= tree.threshold:
                branch = tree.true_branch
        elif feature_value == tree.threshold:
            branch = tree.true_branch

        # Test subtree
        return self.predict_value(x, branch)

    def predict(self, X):
        """ Classify samples one by one and return the set of labels """
        y_pred = [self.predict_value(sample) for sample in X]
        return y_pred
class RegressionTree(Decision_tree):
    def _calculate_variance_reduction(self, y, y1, y2):
        var_tot = calculate_variance(y)
        var_1 = calculate_variance(y1)
        var_2 = calculate_variance(y2)
        frac_1 = len(y1) / len(y)
        frac_2 = len(y2) / len(y)

        # Calculate the variance reduction
        variance_reduction = var_tot - (frac_1 * var_1 + frac_2 * var_2)

        return sum(variance_reduction)

    def _mean_of_y(self, y):
        value = np.mean(y, axis=0)
        return value if len(value) > 1 else value[0]

    def fit(self, X, y):
        self.impurity_calculation = self._calculate_variance_reduction
        self.leaf_calculation = self._mean_of_y
        super(RegressionTree, self).fit(X, y)
class DecisionClassification(Decision_tree):
    def _calculate_information_gain(self, y, y1, y2):
        # Calculate information gain
        p = len(y1) / len(y)
        entropy = calculate_entropy(y)
        info_gain = entropy - p * \
            calculate_entropy(y1) - (1 - p) * \
            calculate_entropy(y2)

        return info_gain
    def _majority_vote(self, y):
        most_common = None
        max_count = 0
        for label in np.unique(y):
            # Count number of occurences of samples with label
            count = len(y[y == label])
            if count > max_count:
                most_common = label
                max_count = count
        return most_common
    def fit(self,X,y):
        self.impurity_calculation=self._calculate_information_gain
        self.leaf_calculation=self._majority_vote
        super(DecisionClassification,self).fit(X,y)
            
        
      
        
                            
                    
                
        
        

In [22]:
def divide_on_feature(X,feature_i,threshold):
    sub=None
    if(isinstance(threshold,int) or isinstance(threshold,float)):
        sub=lambda sample: sample[feature_i]>=threshold
    else:
        sub=lambda sample:sample[feature_i]==threshold
    x1=[sample for sample in X if sub(sample)]
    x2=[sample for sample in X if not sub(sample)]
    return np.array(x1),np.array(x2)
    

In [23]:
def calculate_variance(X):

    """ Return the variance of the features in dataset X """

    mean = np.ones(np.shape(X)) * X.mean(0)

    n_samples = np.shape(X)[0]

    variance = (1 / n_samples) * np.diag((X - mean).T.dot(X - mean))

    

    return variance
                            
    

In [24]:
import math
def calculate_entropy(y):

    """ Calculate the entropy of label array y """

    log2 = lambda x: math.log(x) / math.log(2)

    unique_labels = np.unique(y)

    entropy = 0

    for label in unique_labels:

        count = len(y[y == label])

        p = count / len(y)

        entropy += -p * log2(p)

    return entropy    
        

In [25]:
X,y=load_boston().data,load_boston().target
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.2,random_state=42)

In [26]:
model=RegressionTree()
model.fit(X_train,y_train)

In [27]:
y_pred=model.predict(X_test)

In [28]:
r2_score(y_test,y_pred)

0.8557004172198377

In [29]:
x,Y=load_iris().data,load_iris().target
x_train,x_test,Y_train,Y_test=train_test_split(x,Y)
clf=DecisionClassification()
clf.fit(x_train,Y_train)
pred1=clf.predict(x_train)
pred_test1=clf.predict(x_test)
print(accuracy_score(Y_train,pred1))
print(accuracy_score(Y_test,pred_test1))

0.9910714285714286
0.8947368421052632
