In [None]:
import numpy as np
from collections import defaultdict

### Decision Tree 
Init, gini coeffient,entropy,MSE, find the best fit[ best threshold and best feature],fir for all, predict single and then apply for each row

In [None]:
class Node(object):
    def __init__(self, threshold=None, feature_idx=None, value=None, left=None, right=None, impurity_decrease = None):
        ## create node
        self.threshold = threshold
        self.feature_idx = feature_idx
        self.value = value 
        self.left = left
        self.right = right
        self.impurity_decrease = impurity_decrease

    def is_leaf_node(self):
        return self.value is not None

    def gini(self, class_count,n):
        """
        Calculate the Gini impurity for a list of class labels.
        """
        gini_value = 1
        for count in class_count.values():
            prob = count / n
            gini_value -= prob ** 2
        return gini_value

    def entropy(self, class_count,n):
        """
        Calculate the entropy gain for a list of class labels.
        """

        entropy_value = 0
        for count in class_count.values():
            prob = count / n
            if prob>0:
                entropy_value -= prob * np.log2(prob)
                
        return entropy_value

    def mse(self, sum_y,sum_y2,n ):
        """
        Calculate the mse for a list of regression labels.
        """
        return (sum_y2/n)-(sum_y/n)**2
    
    def errorloss(self, X, y, feature_idx, task_type, criterion='gini', y_pred=None, loss=None, lambda_=1.0, gamma=0.0):
        """
        Calculate error loss which gives best score and threshold for only one feature 
        """
        
        ## sort the x's such that easy to calculate thresholds 
        # pairs = list(zip(X[:, feature_idx], y))
        # sorted_pairs = sorted(pairs, key=lambda pair: pair[0])
        # x_sorted, y_sorted = zip(*sorted_pairs)
        
        sorted_idx = np.argsort(X[:, feature_idx])
        x_sorted = X[sorted_idx, feature_idx]
        y_sorted = y[sorted_idx]
        if task_type == 'gradient':
            y_pred_sorted = y_pred[sorted_idx]


        
        # convert to array for better perfomance
        x_sorted = np.array(x_sorted)
        y_sorted = np.array(y_sorted)
        
        n = len(y_sorted)
        
        # create dummys which minimises the score and helps gets the better threshold 
        best_fit = -np.inf
        best_threshold = 0
        
        
        if task_type == 'class':
            
            ## create dict such that timecomplexity is reduced 
            total_counts = defaultdict(int)
            
            for i in y_sorted:
                total_counts[i] +=1
            total_counts_main = total_counts.copy()
            left_counts = defaultdict(int)
            
            for i in range(1,n-1):
                
                left_value = y_sorted[i-1]
                left_counts[left_value] +=1
                total_counts[left_value] -=1
                right_counts = total_counts
                
                left_n  = i
                right_n = n-1-i
                
                threshold = (x_sorted[i-1] + x_sorted[i]) / 2

                if left_n == 0 or right_n == 0:
                    continue 
                    
                if x_sorted[i-1] == x_sorted[i] :
                    continue

                if criterion == 'entropy':
                    parent_loss = self.entropy(total_counts_main,n)
                    left_loss = self.entropy(left_counts,left_n)
                    right_loss = self.entropy(right_counts,right_n)
                    error_loss = parent_loss - (left_n * left_loss + right_n * right_loss)
                    if error_loss>best_fit:
                        best_fit = error_loss
                        best_threshold = threshold
                    
                elif criterion == 'gini':
                    parent_loss = self.gini(total_counts_main,n)
                    left_loss = self.gini(left_counts,left_n)
                    right_loss = self.gini(right_counts,right_n)
                    error_loss = parent_loss - (left_n * left_loss + right_n * right_loss)
                    if error_loss>best_fit:
                        best_fit = error_loss
                        best_threshold = threshold
                else:
                    raise ValueError("Invalid classicfication criteria.")
                
        elif task_type == 'regression':
            sum_total = np.sum(y_sorted)
            sum_sq_total = np.sum(np.square(y_sorted))
            sum_left = 0.0
            sum_sq_left = 0.0
            for i in range(1,n-1):
                y_val = y_sorted[i-1]
                sum_left += y_val
                sum_sq_left += np.sum(np.square(y_val))
                
                sum_right = sum_total - sum_left
                sum_sq_right = sum_sq_total - sum_sq_left
                
                left_n = i
                right_n = n-1-i
                
                parent_loss = self.mse(sum_total,sum_sq_total,n)
                left_loss = self.mse(sum_left,sum_sq_left,left_n)
                right_loss = self.mse(sum_right,sum_sq_right,right_n)
                error_loss = parent_loss - (left_n * left_loss + right_n * right_loss)
                if error_loss>best_fit:
                    best_fit = error_loss
                    best_threshold = threshold
        elif task_type == 'gradient':
            g_sorted = loss.gradient(y_sorted,y_pred_sorted)
            h_sorted = loss.hessian(y_sorted,y_pred_sorted)
            g_total = sum(g_sorted)
            h_total = sum(h_sorted)
            g_left = 0
            h_left = 0
            for i in range(1,n-1):
                g_left += g_sorted[i-1]
                h_left += h_sorted[i-1]
                g_right = g_total - g_left
                h_right = h_total - h_left
                threshold = (x_sorted[i-1] + x_sorted[i]) / 2
                
                if x_sorted[i-1] == x_sorted[i]:
                    continue
                gain = 0.5 * (
                                (g_left ** 2) / (h_left + lambda_) +
                                (g_right ** 2) / (h_right + lambda_) -
                                (g_total ** 2) / (h_total + lambda_)
                            ) - gamma
                if gain>best_fit:
                    best_fit = gain
                    best_threshold = threshold
        else:
            raise ValueError("Invalid task type.")
            
            
            
        # if error_loss>best_fit:
        #     best_fit = error_loss
        #     best_threshold = threshold

        return best_threshold,best_fit
    
    def find_best_fit(self,X,y, task_type, criterion, max_features,y_pred=None, loss=None, lambda_=1.0, gamma=0.0):
        """
        Calculate best threshold for each feature and get best feature in that fit 
        """
        best_fit = -np.inf
        best_feature_idx = None
        best_threshold = 0
        
        max_features = min(max_features, X.shape[1])
        
        if max_features is None:
            max_features = int(np.sqrt(X.shape[1])) if task_type == 'class' else X.shape[1] // 3
        feature_indices = np.random.choice(X.shape[1], size=max_features, replace=False)

                    
        for i in feature_indices:
            if task_type!='gradient':
                threshold,score =  self.errorloss(X,y,i, task_type, criterion)
            else :
                threshold,score =  self.errorloss(X,y,i, task_type, criterion,y_pred, loss, lambda_, gamma)
                
            if score>best_fit:
                best_fit = score
                best_feature_idx = i 
                best_threshold = threshold
        return best_feature_idx,best_threshold,best_fit
    
    def majority_class(self,y,task_type):
        """
        When stop condition hits return the value corresponding to task type 
        """
        
        if task_type == 'class':
            values,counts = np.unique(y, return_counts = True)
            return  values[np.argmax(counts)]
        elif task_type == 'regression':
            return np.mean(y)
        
    
    def fit(self,X,y,depth = 0,max_depth = 5,min_samples_split=2,task_type='class', criterion='gini', max_features = 3, y_pred=None, loss=None, lambda_=1.0, gamma=0.0):
        """
        recursive function fit which creats the tree and nodes based on the best threshold and best feature 
        """
        
        
        ## first stop condition if all y are same then stop which is puritycheck
        if len(set(y))== 1:
            self.value = y[0]
            return
        
        ## second stop condition , dont go beyond  max_depth 
        if depth >= max_depth:
            self.value = self.majority_class(y,task_type)
            return
        
        ## if the split has sample size less than minum minimum sample split then stop
        if len(y) < min_samples_split:
            self.value = self.majority_class(y,task_type)
            return 
        
        ## get the best feature and threshold from find best fit 
        if task_type!='gradient':
            best_feature_idx,best_threshold,best_fit = self.find_best_fit(X,y,task_type, criterion,max_features)
        else :
            best_feature_idx,best_threshold,best_fit = self.find_best_fit(X,y,task_type, criterion,max_features,y_pred, loss, lambda_, gamma)
        
        ## stop confition when there are no best features 
        if best_feature_idx is None:
            self.value = self.majority_class(y, task_type)
            return
        
        ## save asplit info for prediction 
        
        self.threshold = best_threshold
        self.feature_idx = best_feature_idx
        self.impurity_decrease = best_fit
        
        
        ## create next child nodes 
        print(f"Depth: {depth}, Feature: {best_feature_idx}, Threshold: {best_threshold}")
        
        mask_left = X[:, best_feature_idx] <= best_threshold
        X_left = X[mask_left]
        y_left = y[mask_left]
        self.left = Node()
        if task_type!='gradient':
            self.left.fit(X_left,y_left,depth+1,max_depth,min_samples_split,task_type, criterion,max_features)
        else :
            self.left.fit(X_left,y_left,depth+1,max_depth,min_samples_split,task_type, criterion,max_features,y_pred, loss, lambda_, gamma)
        
        mask_right = X[:, best_feature_idx] > best_threshold
        X_right = X[mask_right]
        y_right = y[mask_right]
        
        self.right = Node()
        # self.right.fit(X_right,y_right,depth+1,max_depth,min_samples_split,task_type, criterion,max_features)
        if task_type!='gradient':
            self.right.fit(X_right,y_right,depth+1,max_depth,min_samples_split,task_type, criterion,max_features)
        else :
            self.right.fit(X_right,y_right,depth+1,max_depth,min_samples_split,task_type, criterion,max_features,y_pred, loss, lambda_, gamma)
        
    def predict_single(self, x):
        if self.is_leaf_node():
            return self.value if self.value is not None else 0.0

        if self.feature_idx is None:
            return self.value if self.value is not None else 0.0

        if x[self.feature_idx] <= self.threshold:
            return self.left.predict_single(x) if self.left else (self.value if self.value is not None else 0.0)
        else:
            return self.right.predict_single(x) if self.right else (self.value if self.value is not None else 0.0)



    def predict(self, x):
        return [self.predict_single(row) for row in x.tolist()]
    

### Random forest
bagged trees, call from decision tree nodes. Fit son random indices create multiple trees. keep storing best fit and finally give avergae of best fits

In [None]:
class BaggedTrees(object):
    def __init__(self, n_estimators = 10, max_depth = 5, min_samples_split = 2, task_type = 'class', criterion = 'gini', max_features = 3, random_state = 3):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.task_type = task_type
        self.max_features = max_features
        self.criterion = criterion
        self.random_state = random_state
        self.trees = []
        
    def fit(self,X,y) : 
        for i in range(0, self.n_estimators):
            np.random.seed(self.random_state + i)
            indices = np.random.choice(len(X), size=len(X), replace=True)
            X_sample = X[indices]
            y_sample = y[indices]
            
            if self.max_features > X_sample.shape[1]:
                max_features = X_sample.shape[1]
            else : 
                max_features = self.max_features
                
            tree = Node()
            tree.fit(X_sample,y_sample, depth = 0 ,max_depth = self.max_depth, min_samples_split = self.min_samples_split, task_type = self.task_type, criterion = self.criterion, max_features = max_features )
            self.trees.append(tree)
            
    def predict_single(self,x):
        
        if self.task_type == 'class':
            predictions = 0
            for i in self.trees:
                predictions_value = i.predict_single(x)
                if predictions_value == 1 :
                    predictions += 1
            prediction_final = 0 if predictions/self.n_estimators<=0.5 else 1
            return prediction_final
        
        else:
            predictions = []
            for i in self.trees:
                predictions_value = i.predict_single(x)
                predictions.append(predictions_value)
            return np.mean(predictions)
        
    def predict(self, x):
        return [self.predict_single(row) for row in x.tolist()] 
                    
                
                    
                    
            

### test out the above classes 

In [None]:
X = np.array([
    [2, 3,2,3],
    [1, 5,1,5],
    [8, 7,8,7],
    [9, 6,9,6],
])

y = np.array([0, 0, 1, 1])

tree = Node()
tree.fit(X, y, max_depth=3, task_type='class', criterion='gini')

preds = tree.predict(X)
print(preds)


In [None]:
RandomF = BaggedTrees().fit(X, y)

In [None]:
preds = tree.predict(X)
print(preds)

### Adaboost 
Weighted averages of the trees instaed of averages as we have taken above 

In [None]:
class AdaBoostClassifier:
    def __init__(self, n_estimators=50, max_depth=1, min_samples_split = 2, task_type = 'class', criterion = 'gini', max_features = 3, random_state = 3,learning_rate = 1):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.task_type = task_type
        self.criterion = criterion
        self.max_features = max_features
        self.random_state = random_state
        self.learning_rate = learning_rate
        self.alphas = []    # list to store alpha_m
        self.models = []    # list to store weak learners (your Node)
    def fit(self, X , y):
        
        K = np.unique(y)
        num_class = len(K)
        
        if self.task_type == 'class' and num_class >2 :
             self.task_type = 'multiclass'
        
        
        if self.task_type == 'class':
            y = np.where(y == 0, -1, 1)
        
        else:
            y = y
            
        n_samples = X.shape[0]
        weights = np.ones(n_samples) / n_samples  # uniform distribution'
        epsilon = 1e-10
        
        for i in range(0,self.n_estimators):
            np.random.seed(self.random_state + i)
            weights = weights/weights.sum()  # normalize
            indices = np.random.choice(len(X), size=len(X), replace=True, p = weights)
            X_sample = X[indices]
            y_sample = y[indices]

            if self.max_features > X_sample.shape[1]:
                max_features = X_sample.shape[1]
            else : 
                max_features = self.max_features

            learner = Node()
            learner.fit(X_sample,y_sample, depth = 0 ,max_depth = self.max_depth, min_samples_split = self.min_samples_split, task_type = self.task_type, criterion = self.criterion, max_features = max_features )


            predictions = learner.predict(X_sample)
            
            if self.task_type == 'class':
                predictions = np.where(predictions == 0, -1, 1)
                weighted_error = np.sum(weights* (predictions!=y_sample))
                if weighted_error <= 0:
                    alpha_m = 1e10  # Very strong learner
                    self.models.append(learner)
                    self.alphas.append(alpha_m)
                    break  # Stop adding more learners
                alpha_m = 0.5*(np.log((1-weighted_error+epsilon)/(weighted_error+epsilon)))
                alpha_m = self.learning_rate * alpha_m
                weights = weights * np.exp(-alpha_m * y_sample * predictions)
                
            elif self.task_type == 'multiclass':
                # alpha_m = defaultdict(int)
                # predictions = np.where(y == k, 1,0)
                weighted_error = np.sum(weights* (predictions!=y_sample))
                if weighted_error <= 0:
                    alpha_m = 1e10
                    self.models.append(learner)
                    self.alphas.append(alpha_m)
                    break  # Stop adding more learners
                # for i in rane(0,K):
                alpha_m = np.log((1-weighted_error+epsilon)/(weighted_error+epsilon))+np.log(num_class-1)
                alpha_m = self.learning_rate * alpha_m
                weights *= np.exp(-alpha_m* (predictions!=y_sample))
                
                    
            else : 
                
                error = np.abs(y_sample - predictions)
                normalised_error = error/np.max(error)
                normalised_error = np.clip(normalised_error, 1e-10, 1 - 1e-10)
                weighted_error = np.sum(weights * normalised_error)
                if weighted_error <= 0:
                    alpha_m = 1e10  
                    self.models.append(learner)
                    self.alphas.append(alpha_m)
                    break  # Stop adding more learners
                beta = weighted_error/(1-weighted_error)
                alpha_m = np.log(1/beta)
                alpha_m = self.learning_rate * alpha_m
                weights = weights * beta**(1-normalised_error)
                
            self.models.append(learner)
            self.alphas.append(alpha_m)
                

            
    def predict_single(self,x):
        weighted_prediction = 0 
        function_k = defaultdict(int)
        sum_alphas = 0
        for i,j in zip(self.alphas,self.models):
            prediction = j.predict_single(x)
            if self.task_type == 'class':
                weighted_prediction += (i*prediction)
            elif self.task_type == 'multiclass':
                function_k[prediction] += i
            else:
                weighted_prediction += (i*prediction)
                sum_alphas +=i
        
        if self.task_type == 'class':
            final_prediction = 1 if weighted_prediction>0 else -1
        elif self.task_type == 'multiclass':
            final_prediction = max(function_k, key=function_k.get)
        else:
            final_prediction = weighted_prediction/sum_alphas

        
        return final_prediction

    def predict(self, x):
        return [self.predict_single(row) for row in x.tolist()]    
    
    def feature_importances_(self):
        feature_importance = defaultdict(float)
        for model, alpha in zip(self.models, self.alphas):
            if model.feature_idx is not None:
                feature_importance[model.feature_idx] += alpha * model.impurity_decrease
        return feature_importance


### gradiant boosting
simultanoulsy work on tress, predict errors and minimise it 

In [None]:
class Loss:
    def __call__(self, y_true, y_pred):
        raise NotImplementedError

    def gradient(self, y_true, y_pred):
        raise NotImplementedError

    def hessian(self, y_true, y_pred):
        raise NotImplementedError
class MSELoss(Loss):
    def __call__(self, y_true, y_pred):
        return 0.5 * np.mean((y_true - y_pred) ** 2)

    def gradient(self, y_true, y_pred):
        return -(y_true - y_pred)

    def hessian(self, y_true, y_pred):
        return np.ones_like(y_true)

class GradientBoostingRegressor():
    def __init__(self, n_estimators=50, max_depth=1, min_samples_split = 2, task_type = 'gradient', criterion = 'gini', max_features = 3, random_state = 3,learning_rate = 1, loss = None,init_pred = None):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.task_type = task_type
        self.criterion = criterion
        self.max_features = max_features
        self.random_state = random_state
        self.learning_rate = learning_rate
        self.trees = []
        self.loss = loss
        self.init_pred = init_pred
    def fit(self,X,y, lambda_=1.0, gamma=0.0):
        self.init_pred = np.mean(y)
        y_pred = np.full_like(y, np.mean(y))
        self.loss =  MSELoss()
        for i in range(0,self.n_estimators ):
           
            
            if self.max_features > X.shape[1]:
                max_features = X.shape[1]
            else : 
                max_features = self.max_features

            tree = Node()
            tree.fit(X,y, depth = 0 ,max_depth = self.max_depth, min_samples_split = self.min_samples_split, task_type = self.task_type, criterion = self.criterion, max_features = max_features,y_pred= y_pred,loss = self.loss,lambda_ = lambda_,gamma = gamma )
            self.trees.append(tree)
            tree_output = tree.predict(X)

          
            y_pred += self.learning_rate * np.array(tree.predict(X))
    
    def predict(self, X):
        # Step 1: Initialize with array of init_pred
        y_pred = np.full(X.shape[0], self.init_pred)

        # Step 2: Add tree predictions scaled by learning rate
        for tree in self.trees:
            y_pred += self.learning_rate * np.array(tree.predict(X))

        # Step 3: Return final prediction
        return y_pred
     

In [None]:
from sklearn.datasets import make_regression

# Generate synthetic data
X, y = make_regression(n_samples=100, n_features=5, noise=10.0, random_state=42)
model = GradientBoostingRegressor(n_estimators=10, max_depth=3, learning_rate=0.1)
model.fit(X, y)
preds = model.predict(X)


In [None]:
from sklearn.metrics import mean_squared_error, r2_score

mse = mean_squared_error(y, preds)
r2 = r2_score(y, preds)

print(f"RMSE: {mse**0.5:.4f}")
print(f"R² Score: {r2:.4f}")