<a href="https://colab.research.google.com/github/saiprathyushadike/ML_DL_MODELS_IMPLIMENTATION_BY_ME/blob/main/xgb.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

class Node:
    """Represents a node in the decision tree."""
    def __init__(self, feature_index=None, threshold=None, left=None, right=None, value=None, sum_grad=None, sum_hess=None, missing_direction=None):
        self.feature_index = feature_index  # Index of the feature to split on
        self.threshold = threshold          # Threshold value for the split
        self.left = left                    # Left child node
        self.right = right                  # Right child node
        self.value = value                  # Predicted value if this is a leaf node
        self.sum_grad = sum_grad            # Sum of gradients for data points in this node
        self.sum_hess = sum_hess            # Sum of Hessians for data points in this node
        self.missing_direction = missing_direction # Direction for missing values ('left' or 'right')

class DecisionTreeRegressor:
    """Basic implementation of a Decision Tree Regressor with regularization and missing value handling."""
    def __init__(self, max_depth=None, min_samples_split=2, reg_lambda=0, reg_gamma=0):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.reg_lambda = reg_lambda # L2 regularization term
        self.reg_gamma = reg_gamma   # Regularization term on the number of leaves
        self.root = None

    def _calculate_gain(self, G_L, H_L, G_R, H_R):
        """Calculates the gain for a split with regularization."""
        # Avoid division by zero by adding a small epsilon or checking H_L + H_R + self.reg_lambda
        gain = 0.5 * (G_L**2 / (H_L + self.reg_lambda) + G_R**2 / (H_R + self.reg_lambda) - (G_L + G_R)**2 / (H_L + H_R + self.reg_lambda + 1e-6)) - self.reg_gamma
        return gain

    def _find_best_split(self, X, gradients, hessians):
        """Finds the best feature and threshold to split the data considering gain with regularization and missing values."""
        best_gain = -float('inf')
        best_feature_index = None
        best_threshold = None
        best_missing_direction = None

        n_samples, n_features = X.shape
        G = np.sum(gradients)
        H = np.sum(hessians)

        for feature_index in range(n_features):
            # Handle missing values: separate non-missing and missing indices
            non_missing_indices = ~np.isnan(X[:, feature_index])
            missing_indices = np.isnan(X[:, feature_index])

            X_non_missing = X[non_missing_indices, feature_index]
            gradients_non_missing = gradients[non_missing_indices]
            hessians_non_missing = hessians[non_missing_indices]

            G_missing = np.sum(gradients[missing_indices])
            H_missing = np.sum(hessians[missing_indices])

            if len(X_non_missing) < self.min_samples_split:
                continue # Not enough non-missing samples to split

            # Get unique values for thresholds, potentially sort them
            thresholds = np.unique(X_non_missing)
            # Consider sorting thresholds for potentially better performance/consistency
            # thresholds = np.sort(thresholds)

            for threshold in thresholds:
                # Consider splitting non-missing values
                left_indices_non_missing = X_non_missing <= threshold
                right_indices_non_missing = X_non_missing > threshold

                G_L_non_missing = np.sum(gradients_non_missing[left_indices_non_missing])
                H_L_non_missing = np.sum(hessians_non_missing[left_indices_non_missing])
                G_R_non_missing = np.sum(gradients_non_missing[right_indices_non_missing])
                H_R_non_missing = np.sum(hessians_non_missing[right_indices_non_missing])

                # Explore sending missing values to the left child
                G_L_missing_left = G_L_non_missing + G_missing
                H_L_missing_left = H_L_non_missing + H_missing
                G_R_missing_left = G_R_non_missing
                H_R_missing_left = H_R_non_missing
                gain_missing_left = self._calculate_gain(G_L_missing_left, H_L_missing_left, G_R_missing_left, H_R_missing_left)

                # Explore sending missing values to the right child
                G_L_missing_right = G_L_non_missing
                H_L_missing_right = H_L_non_missing
                G_R_missing_right = G_R_non_missing + G_missing
                H_R_missing_right = H_R_non_missing + H_missing
                gain_missing_right = self._calculate_gain(G_L_missing_right, H_L_missing_right, G_R_missing_right, H_R_missing_right)


                # Compare gains and update best split
                if gain_missing_left > best_gain:
                    best_gain = gain_missing_left
                    best_feature_index = feature_index
                    best_threshold = threshold
                    best_missing_direction = 'left'

                if gain_missing_right > best_gain:
                    best_gain = gain_missing_right
                    best_feature_index = feature_index
                    best_threshold = threshold
                    best_missing_direction = 'right'

        # Only split if the best gain is positive (worth splitting)
        if best_gain <= 0:
             return None, None, None

        return best_feature_index, best_threshold, best_missing_direction

    def _build_tree(self, X, gradients, hessians, depth):
        """Recursively builds the decision tree with regularization and missing value handling."""
        n_samples = len(gradients)
        sum_grad = np.sum(gradients)
        sum_hess = np.sum(hessians)

        # Stopping criteria
        if (self.max_depth is not None and depth >= self.max_depth) or \
           (n_samples < self.min_samples_split) or \
           (sum_hess + self.reg_lambda == 0): # Avoid division by zero when calculating leaf value

            # Calculate optimal leaf value with regularization
            leaf_value = -sum_grad / (sum_hess + self.reg_lambda) if sum_hess + self.reg_lambda != 0 else 0
            return Node(value=leaf_value, sum_grad=sum_grad, sum_hess=sum_hess)

        best_feature_index, best_threshold, missing_direction = self._find_best_split(X, gradients, hessians)

        # If no good split is found or gain is not positive, create a leaf node
        if best_feature_index is None:
            leaf_value = -sum_grad / (sum_hess + self.reg_lambda) if sum_hess + self.reg_lambda != 0 else 0
            return Node(value=leaf_value, sum_grad=sum_grad, sum_hess=sum_hess)

        # Split the data based on the best split and missing direction
        feature_values = X[:, best_feature_index]
        left_indices = (feature_values <= best_threshold) & (~np.isnan(feature_values))
        right_indices = (feature_values > best_threshold) & (~np.isnan(feature_values))

        if missing_direction == 'left':
            left_indices = left_indices | np.isnan(feature_values)
        else: # missing_direction == 'right'
            right_indices = right_indices | np.isnan(feature_values)

        X_left, gradients_left, hessians_left = X[left_indices], gradients[left_indices], hessians[left_indices]
        X_right, gradients_right, hessians_right = X[right_indices], gradients[right_indices], hessians[right_indices]

        # Ensure that after splitting, at least one sample goes to each child
        if len(X_left) == 0 or len(X_right) == 0:
             leaf_value = -sum_grad / (sum_hess + self.reg_lambda) if sum_hess + self.reg_lambda != 0 else 0
             return Node(value=leaf_value, sum_grad=sum_grad, sum_hess=sum_hess)


        # Recursively build subtrees
        left_subtree = self._build_tree(X_left, gradients_left, hessians_left, depth + 1)
        right_subtree = self._build_tree(X_right, gradients_right, hessians_right, depth + 1)

        return Node(feature_index=best_feature_index, threshold=best_threshold,
                    left=left_subtree, right=right_subtree, sum_grad=sum_grad,
                    sum_hess=sum_hess, missing_direction=missing_direction)


    def fit(self, X, gradients, hessians):
        """Builds the decision tree using gradients and hessians."""
        self.root = self._build_tree(X, gradients, hessians, depth=0)

    def _predict_single(self, x, node):
        """Predicts the value for a single data point by traversing the tree, handling missing values."""
        if node is None: # Should not happen in a properly built tree, but as a safeguard
            return 0 # Or some default value

        if node.value is not None:
            return node.value

        feature_value = x[node.feature_index]

        if np.isnan(feature_value):
            if node.missing_direction == 'left':
                return self._predict_single(x, node.left)
            else: # missing_direction == 'right'
                return self._predict_single(x, node.right)
        elif feature_value <= node.threshold:
            return self._predict_single(x, node.left)
        else:
            return self._predict_single(x, node.right)

    def predict(self, X):
        """Predicts values for a set of data points."""
        return np.array([self._predict_single(x, self.root) for x in X])


class GradientBoostingRegressor:
    """Implements the Gradient Boosting algorithm from scratch with regularization, missing value handling, and early stopping."""

    def __init__(self, n_estimators=100, learning_rate=0.1, loss='mse', max_depth=3, reg_lambda=0, reg_gamma=0, subsample=1.0, colsample_bytree=1.0, validation_fraction=0.1, n_iter_no_change=None, tol=1e-4):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.loss = loss
        self.max_depth = max_depth
        self.reg_lambda = reg_lambda # L2 regularization term
        self.reg_gamma = reg_gamma   # Regularization term on the number of leaves
        self.subsample = subsample
        self.colsample_bytree = colsample_bytree
        self.validation_fraction = validation_fraction
        self.n_iter_no_change = n_iter_no_change # For early stopping
        self.tol = tol # Tolerance for early stopping
        self.estimators = []
        self.initial_prediction = None
        self.evals_result = {} # To store evaluation results for early stopping

        if self.loss == 'mse':
            self._gradient = self._mse_gradient
            self._hessian = self._mse_hessian
            self._eval_metric = lambda y_true, y_pred: np.sqrt(mean_squared_error(y_true, y_pred)) # RMSE for evaluation
        else:
            raise ValueError(f"Loss function '{self.loss}' not supported.")

    def _mse_gradient(self, y_true, y_pred):
        """Calculates the gradient (first derivative) for MSE loss."""
        return -(y_true - y_pred)

    def _mse_hessian(self, y_true, y_pred):
        """Calculates the Hessian (second derivative) for MSE loss."""
        return np.ones_like(y_true)

    def fit(self, X, y):
        """Fits the gradient boosting model to the training data with early stopping."""
        n_samples = len(y)

        # Split data for early stopping if validation_fraction is set
        if self.n_iter_no_change is not None and self.validation_fraction > 0:
            X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=self.validation_fraction, random_state=42)
        else:
            X_train, y_train = X, y
            X_val, y_val = None, None

        # 1. Initialize the predictions (e.g., with the mean of y for regression)
        self.initial_prediction = np.mean(y_train)
        y_pred_train = np.full_like(y_train, self.initial_prediction, dtype=float)
        y_pred_val = None
        if X_val is not None:
             y_pred_val = np.full_like(y_val, self.initial_prediction, dtype=float)

        best_val_score = float('inf')
        no_change_count = 0

        # 2. Iteratively train base learners
        for i in range(self.n_estimators):
            # Calculate the negative gradient (pseudo-residuals) and hessians for the training data
            gradients = self._gradient(y_train, y_pred_train)
            hessians = self._hessian(y_train, y_pred_train)

            # Apply subsampling and column subsampling
            sample_indices = np.random.choice(len(X_train), max(1, int(len(X_train) * self.subsample)), replace=False)
            feature_indices = np.random.choice(X_train.shape[1], max(1, int(X_train.shape[1] * self.colsample_bytree)), replace=False)

            X_train_subset = X_train[sample_indices][:, feature_indices]
            gradients_subset = gradients[sample_indices]
            hessians_subset = hessians[sample_indices]

            # Train a base learner (Decision Tree Regressor) using gradients and hessians on the subset
            tree = DecisionTreeRegressor(max_depth=self.max_depth,
                                         min_samples_split=self.min_samples_split, # Pass min_samples_split
                                         reg_lambda=self.reg_lambda,
                                         reg_gamma=self.reg_gamma)
            tree.fit(X_train_subset, gradients_subset, hessians_subset)

            # Make predictions with the new tree on the *full* training data
            # Need to handle the selected features for prediction
            tree_pred_train = tree.predict(X_train[:, feature_indices])

            # Update the overall predictions on training data
            y_pred_train += self.learning_rate * tree_pred_train

            # Store the trained tree and the features used
            self.estimators.append((tree, feature_indices))

            # Evaluate on validation data and check for early stopping
            if X_val is not None:
                # Need to handle the selected features for validation prediction
                tree_pred_val = tree.predict(X_val[:, feature_indices])
                y_pred_val += self.learning_rate * tree_pred_val
                val_score = self._eval_metric(y_val, y_pred_val)

                if 'validation_0' not in self.evals_result:
                    self.evals_result['validation_0'] = {self._eval_metric.__name__: []}
                self.evals_result['validation_0'][self._eval_metric.__name__].append(val_score)


                if val_score < best_val_score - self.tol: # Use tolerance for improvement
                    best_val_score = val_score
                    no_change_count = 0
                else:
                    no_change_count += 1

                if self.n_iter_no_change is not None and no_change_count >= self.n_iter_no_change:
                    print(f"Early stopping at iteration {i+1}. Best validation score: {best_val_score:.4f}")
                    self.estimators = self.estimators[:i+1] # Keep only the best set of estimators
                    break

    def predict(self, X):
        """Makes predictions on new data."""
        if self.initial_prediction is None:
            raise RuntimeError("Model has not been fitted yet.")

        # Start with the initial prediction
        y_pred = np.full(len(X), self.initial_prediction, dtype=float)

        # Add predictions from each subsequent tree, using the features they were trained on
        for tree, feature_indices in self.estimators:
            y_pred += self.learning_rate * tree.predict(X[:, feature_indices])

        return y_pred

# Note on Objective Function: The `calculate_objective` function defined earlier
# is useful for understanding the theory but is not directly used in the `fit` method
# of the `GradientBoostingRegressor` class as implemented here. The `fit` method
# minimizes the objective implicitly by training trees on gradients and using the
# gain calculation during splitting, which is derived from the objective function.