In [1]:
import numpy as np
from abc import ABC, abstractmethod
from typing import List, Tuple, Dict, Optional, Union
from dataclasses import dataclass
from collections import defaultdict
import heapq

In [2]:
@dataclass
class SplitInfo:
    """Information about a split decision"""
    feature: int
    threshold: float
    gain: float
    left_indices: np.ndarray
    right_indices: np.ndarray

In [3]:
class HistogramBuilder:
    """Single Responsibility: Build histograms for gradient boosting"""
    
    def __init__(self, max_bins: int = 255):
        self.max_bins = max_bins
        self.bin_mappers = {}
    
    def build_bin_mappers(self, X: np.ndarray) -> None:
        """Create bin mappers for each feature"""
        n_features = X.shape[1]
        
        for feature_idx in range(n_features):
            feature_values = X[:, feature_idx]
            unique_values = np.unique(feature_values)
            
            if len(unique_values) <= self.max_bins:
                thresholds = unique_values[:-1] + np.diff(unique_values) / 2
            else:
                # Use quantiles for binning
                quantiles = np.linspace(0, 1, self.max_bins + 1)[1:-1]
                thresholds = np.quantile(unique_values, quantiles)
                thresholds = np.unique(thresholds)
            
            self.bin_mappers[feature_idx] = thresholds
    
    def create_histograms(self, X: np.ndarray, gradients: np.ndarray, 
                         hessians: np.ndarray, indices: np.ndarray) -> Dict[int, np.ndarray]:
        """Create histograms for gradient and hessian sums"""
        histograms = {}
        
        for feature_idx, thresholds in self.bin_mappers.items():
            n_bins = len(thresholds) + 1
            hist = np.zeros((n_bins, 2))  # [gradient_sum, hessian_sum]
            
            feature_values = X[indices, feature_idx]
            feature_gradients = gradients[indices]
            feature_hessians = hessians[indices]
            
            # Bin the values
            bin_indices = np.digitize(feature_values, thresholds)
            
            # Accumulate gradients and hessians
            for i in range(len(indices)):
                bin_idx = bin_indices[i]
                hist[bin_idx, 0] += feature_gradients[i]
                hist[bin_idx, 1] += feature_hessians[i]
            
            histograms[feature_idx] = hist
        
        return histograms

In [4]:
class GOSSampler:
    """Gradient-based One-Side Sampling implementation"""
    
    def __init__(self, top_rate: float = 0.2, other_rate: float = 0.1):
        self.top_rate = top_rate
        self.other_rate = other_rate
    
    def sample(self, gradients: np.ndarray, hessians: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Apply GOSS sampling strategy"""
        n_samples = len(gradients)
        abs_gradients = np.abs(gradients)
        
        # Get top samples with large gradients
        n_top = int(n_samples * self.top_rate)
        top_indices = np.argpartition(abs_gradients, -n_top)[-n_top:]
        
        # Get random sample from remaining
        remaining_indices = np.setdiff1d(np.arange(n_samples), top_indices)
        n_other = int(len(remaining_indices) * self.other_rate)
        other_indices = np.random.choice(remaining_indices, n_other, replace=False)
        
        # Combine and apply amplification factor
        selected_indices = np.concatenate([top_indices, other_indices])
        amplification_factor = (1 - self.top_rate) / self.other_rate
        
        # Amplify gradients and hessians for other samples
        sampled_gradients = gradients[selected_indices].copy()
        sampled_hessians = hessians[selected_indices].copy()
        
        other_mask = np.isin(selected_indices, other_indices)
        sampled_gradients[other_mask] *= amplification_factor
        sampled_hessians[other_mask] *= amplification_factor
        
        return selected_indices, (sampled_gradients, sampled_hessians)

In [5]:
class FeatureBundler:
    """Exclusive Feature Bundling implementation"""
    
    def __init__(self, max_conflict_rate: float = 0.0):
        self.max_conflict_rate = max_conflict_rate
        self.bundles = []
        self.bundle_mapping = {}
    
    def create_bundles(self, X: np.ndarray) -> None:
        """Create feature bundles based on sparsity conflicts"""
        n_features = X.shape[1]
        conflict_matrix = self._build_conflict_matrix(X)
        
        # Greedy bundling algorithm
        used_features = set()
        
        for feature in range(n_features):
            if feature in used_features:
                continue
                
            bundle = [feature]
            used_features.add(feature)
            
            for other_feature in range(feature + 1, n_features):
                if other_feature in used_features:
                    continue
                    
                # Check if feature can be added to bundle
                can_bundle = True
                for bundled_feature in bundle:
                    if conflict_matrix[bundled_feature, other_feature] > self.max_conflict_rate:
                        can_bundle = False
                        break
                
                if can_bundle:
                    bundle.append(other_feature)
                    used_features.add(other_feature)
            
            self.bundles.append(bundle)
            
        # Create mapping
        for bundle_idx, bundle in enumerate(self.bundles):
            for feature in bundle:
                self.bundle_mapping[feature] = bundle_idx
    
    def _build_conflict_matrix(self, X: np.ndarray) -> np.ndarray:
        """Build conflict matrix for features"""
        n_features = X.shape[1]
        conflict_matrix = np.zeros((n_features, n_features))
        
        for i in range(n_features):
            for j in range(i + 1, n_features):
                # Calculate conflict rate (both features non-zero)
                conflicts = np.sum((X[:, i] != 0) & (X[:, j] != 0))
                total_non_zero = np.sum((X[:, i] != 0) | (X[:, j] != 0))
                
                if total_non_zero > 0:
                    conflict_rate = conflicts / total_non_zero
                else:
                    conflict_rate = 0
                
                conflict_matrix[i, j] = conflict_rate
                conflict_matrix[j, i] = conflict_rate
        
        return conflict_matrix

In [6]:
class Node:
    """Tree node implementation"""
    
    def __init__(self, indices: np.ndarray, depth: int = 0):
        self.indices = indices
        self.depth = depth
        self.is_leaf = True
        self.split_feature = None
        self.split_threshold = None
        self.left_child = None
        self.right_child = None
        self.leaf_value = 0.0
        self.gain = 0.0

In [8]:
class SplitFinder:
    """Single Responsibility: Find best splits using histogram method"""
    
    def __init__(self, reg_lambda: float = 0.1, reg_gamma: float = 0.0, min_child_samples: int = 20):
        self.reg_lambda = reg_lambda
        self.reg_gamma = reg_gamma
        self.min_child_samples = min_child_samples
    
    def find_best_split(self, histograms: Dict[int, np.ndarray], 
                       total_gradient: float, total_hessian: float) -> Optional[SplitInfo]:
        """Find the best split using histogram method"""
        best_gain = 0.0
        best_split = None
        
        for feature_idx, histogram in histograms.items():
            split_info = self._find_best_split_for_feature(
                feature_idx, histogram, total_gradient, total_hessian
            )
            
            if split_info and split_info.gain > best_gain:
                best_gain = split_info.gain
                best_split = split_info
        
        return best_split
    
    def _find_best_split_for_feature(self, feature_idx: int, histogram: np.ndarray,
                                   total_gradient: float, total_hessian: float) -> Optional[SplitInfo]:
        """Find best split for a single feature"""
        n_bins = histogram.shape[0]
        best_gain = 0.0
        best_threshold_idx = -1
        
        left_gradient = 0.0
        left_hessian = 0.0
        
        for threshold_idx in range(n_bins - 1):
            left_gradient += histogram[threshold_idx, 0]
            left_hessian += histogram[threshold_idx, 1]
            
            right_gradient = total_gradient - left_gradient
            right_hessian = total_hessian - left_hessian
            
            # Check minimum samples constraint
            if left_hessian < self.min_child_samples or right_hessian < self.min_child_samples:
                continue
            
            gain = self._calculate_gain(left_gradient, left_hessian, 
                                      right_gradient, right_hessian,
                                      total_gradient, total_hessian)
            
            if gain > best_gain:
                best_gain = gain
                best_threshold_idx = threshold_idx
        
        if best_threshold_idx == -1 or best_gain <= self.reg_gamma:
            return None
        
        # Create split info (simplified - would need actual threshold value and indices)
        return SplitInfo(
            feature=feature_idx,
            threshold=best_threshold_idx,  # Simplified
            gain=best_gain,
            left_indices=np.array([]),  # Would be populated with actual implementation
            right_indices=np.array([])  # Would be populated with actual implementation
        )
    
    def _calculate_gain(self, left_grad: float, left_hess: float,
                       right_grad: float, right_hess: float,
                       total_grad: float, total_hess: float) -> float:
        """Calculate split gain"""
        left_score = (left_grad ** 2) / (left_hess + self.reg_lambda)
        right_score = (right_grad ** 2) / (right_hess + self.reg_lambda)
        parent_score = (total_grad ** 2) / (total_hess + self.reg_lambda)
        
        return 0.5 * (left_score + right_score - parent_score) - self.reg_gamma

In [9]:
class LeafWiseTreeBuilder:
    """Leaf-wise tree growth strategy"""
    
    def __init__(self, max_depth: int = 6, max_leaves: int = 31):
        self.max_depth = max_depth
        self.max_leaves = max_leaves
    
    def build_tree(self, X: np.ndarray, gradients: np.ndarray, hessians: np.ndarray,
                   hist_builder: HistogramBuilder, split_finder: SplitFinder) -> Node:
        """Build tree using leaf-wise growth"""
        root = Node(np.arange(len(gradients)))
        
        # Priority queue for leaf-wise growth (max-heap based on gain)
        leaf_queue = []
        
        # Calculate initial leaf value
        total_gradient = np.sum(gradients)
        total_hessian = np.sum(hessians)
        root.leaf_value = -total_gradient / (total_hessian + split_finder.reg_lambda)
        
        # Try to split root
        histograms = hist_builder.create_histograms(X, gradients, hessians, root.indices)
        split_info = split_finder.find_best_split(histograms, total_gradient, total_hessian)
        
        if split_info:
            heapq.heappush(leaf_queue, (-split_info.gain, 0, root, split_info))
        
        n_leaves = 1
        
        while leaf_queue and n_leaves < self.max_leaves:
            neg_gain, _, node, split_info = heapq.heappop(leaf_queue)
            
            if node.depth >= self.max_depth:
                continue
            
            # Perform split
            self._split_node(node, split_info, X)
            n_leaves += 1
            
            # Add new leaves to queue
            for child in [node.left_child, node.right_child]:
                if child and len(child.indices) > split_finder.min_child_samples:
                    child_gradients = gradients[child.indices]
                    child_hessians = hessians[child.indices]
                    
                    child_total_grad = np.sum(child_gradients)
                    child_total_hess = np.sum(child_hessians)
                    child.leaf_value = -child_total_grad / (child_total_hess + split_finder.reg_lambda)
                    
                    if child.depth < self.max_depth:
                        child_histograms = hist_builder.create_histograms(
                            X, gradients, hessians, child.indices
                        )
                        child_split_info = split_finder.find_best_split(
                            child_histograms, child_total_grad, child_total_hess
                        )
                        
                        if child_split_info:
                            heapq.heappush(leaf_queue, (-child_split_info.gain, child.depth, child, child_split_info))
        
        return root
    
    def _split_node(self, node: Node, split_info: SplitInfo, X: np.ndarray) -> None:
        """Split a node based on split information"""
        node.is_leaf = False
        node.split_feature = split_info.feature
        node.split_threshold = split_info.threshold
        
        # Simplified splitting logic
        feature_values = X[node.indices, split_info.feature]
        left_mask = feature_values <= split_info.threshold
        
        left_indices = node.indices[left_mask]
        right_indices = node.indices[~left_mask]
        
        node.left_child = Node(left_indices, node.depth + 1)
        node.right_child = Node(right_indices, node.depth + 1)


In [10]:
class LossFunction(ABC):
    """Interface Segregation: Abstract loss function"""
    
    @abstractmethod
    def gradient(self, y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
        pass
    
    @abstractmethod
    def hessian(self, y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
        pass

In [11]:
class SquaredLoss(LossFunction):
    """Squared loss for regression"""
    
    def gradient(self, y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
        return y_pred - y_true
    
    def hessian(self, y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
        return np.ones_like(y_true)


class LogLoss(LossFunction):
    """Logistic loss for binary classification"""
    
    def gradient(self, y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
        pred = self._sigmoid(y_pred)
        return pred - y_true
    
    def hessian(self, y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
        pred = self._sigmoid(y_pred)
        return pred * (1 - pred)
    
    def _sigmoid(self, x: np.ndarray) -> np.ndarray:
        return 1 / (1 + np.exp(-np.clip(x, -250, 250)))
