In [None]:
import numpy as np
from sklearn.tree import DecisionTreeRegressor
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
import copy

# ==================== CUSTOM TREE STRUCTURE ====================

class CustomTreeNode:
    """Custom tree node structure that mimics sklearn's decision tree"""
    def __init__(self, node_id, is_leaf=False, value=None, feature=None, 
                 threshold=None, left_child=None, right_child=None, 
                 impurity=None, n_samples=None):
        self.node_id = node_id
        self.is_leaf = is_leaf
        self.value = value  # For regressor: predicted value
        self.feature = feature  # Feature index for splitting
        self.threshold = threshold  # Threshold value for splitting
        self.left_child = left_child  # CustomTreeNode or None
        self.right_child = right_child  # CustomTreeNode or None
        self.impurity = impurity  # Impurity at this node
        self.n_samples = n_samples  # Number of samples at this node
        self.parent = None  # Reference to parent node
        self.depth = 0  # Depth of the node
        
    def __repr__(self):
        if self.is_leaf:
            return f"Leaf(node_id={self.node_id}, value={self.value:.3f}, n_samples={self.n_samples})"
        else:
            return f"Node(node_id={self.node_id}, feature={self.feature}, threshold={self.threshold:.3f}, n_samples={self.n_samples})"

class CustomDecisionTree:
    """Custom decision tree structure"""
    def __init__(self):
        self.root = None
        self.nodes = {}  # Dictionary to store nodes by node_id
        self.max_depth = 0
        
    def add_node(self, node):
        """Add a node to the tree"""
        self.nodes[node.node_id] = node
        if node.parent is None:
            self.root = node
            node.depth = 0
        else:
            node.depth = node.parent.depth + 1
            self.max_depth = max(self.max_depth, node.depth)
    
    def print_tree(self, node=None, indent=0):
        """Print the tree structure"""
        if node is None:
            node = self.root
            
        prefix = "  " * indent
        print(f"{prefix}{node}")
        
        if not node.is_leaf:
            self.print_tree(node.left_child, indent + 1)
            self.print_tree(node.right_child, indent + 1)
    
    def get_leaves(self):
        """Get all leaf nodes in the tree"""
        leaves = []
        for node in self.nodes.values():
            if node.is_leaf:
                leaves.append(node)
        return leaves
    
    def get_all_nodes(self):
        """Get all nodes in the tree"""
        return list(self.nodes.values())
    
    def get_subtree_nodes(self, node):
        """Get all nodes in the subtree rooted at the given node"""
        nodes = []
        stack = [node]
        while stack:
            current = stack.pop()
            nodes.append(current)
            if not current.is_leaf:
                stack.append(current.right_child)
                stack.append(current.left_child)
        return nodes

# ==================== TREE CONVERSION ====================

def sklearn_tree_to_custom_tree(sklearn_tree, X_train, y_train):
    """
    Convert sklearn DecisionTreeRegressor to custom tree structure
    
    Args:
        sklearn_tree: Trained DecisionTreeRegressor
        X_train: Training features
        y_train: Training target
        
    Returns:
        CustomDecisionTree object
    """
    tree = sklearn_tree.tree_
    custom_tree = CustomDecisionTree()
    
    # Store node information
    node_info = {}
    
    # First pass: create all nodes
    for i in range(tree.node_count):
        left_child = tree.children_left[i]
        right_child = tree.children_right[i]
        is_leaf = left_child == right_child == -1
        
        # Calculate value (mean of samples in this node)
        if is_leaf:
            value = tree.value[i][0][0]
        else:
            # For internal nodes, we'll calculate the average value from training data
            node_mask = get_node_mask(sklearn_tree, X_train, i)
            if np.sum(node_mask) > 0:
                value = np.mean(y_train[node_mask])
            else:
                value = tree.value[i][0][0]
        
        # Create node
        node = CustomTreeNode(
            node_id=i,
            is_leaf=is_leaf,
            value=value,
            feature=tree.feature[i] if not is_leaf else None,
            threshold=tree.threshold[i] if not is_leaf else None,
            impurity=tree.impurity[i],
            n_samples=tree.n_node_samples[i]
        )
        
        node_info[i] = node
        custom_tree.add_node(node)
    
    # Second pass: set up parent-child relationships
    for i in range(tree.node_count):
        left_child = tree.children_left[i]
        right_child = tree.children_right[i]
        
        node = node_info[i]
        
        if left_child != -1:  # Not a leaf
            node.left_child = node_info[left_child]
            node.right_child = node_info[right_child]
            
            node.left_child.parent = node
            node.right_child.parent = node
    
    return custom_tree

def get_node_mask(tree, X, node_id):
    """
    Get mask for samples that fall into a specific node
    """
    node_indicator = tree.decision_path(X)
    node_mask = node_indicator[:, node_id].toarray().flatten().astype(bool)
    return node_mask

# ==================== PRUNING FUNCTIONS ====================

def calculate_node_mse(node, X_train, y_train, tree_model):
    """
    Calculate MSE for a specific node if it were to become a leaf
    """
    # Get samples that reach this node
    node_mask = get_node_mask(tree_model, X_train, node.node_id)
    
    if np.sum(node_mask) == 0:
        return float('inf')
    
    # Get predictions if this node were a leaf
    predictions = np.full_like(y_train, node.value)
    
    # Calculate MSE only for samples in this node
    mse = np.mean((y_train[node_mask] - predictions[node_mask]) ** 2)
    
    return mse

def calculate_subtree_mse(node, X_train, y_train, tree_model, custom_tree):
    """
    Calculate MSE for the entire subtree rooted at this node
    """
    # Get all samples that reach this node
    node_mask = get_node_mask(tree_model, X_train, node.node_id)
    
    if np.sum(node_mask) == 0:
        return float('inf')
    
    # Get predictions for the subtree
    predictions = np.zeros_like(y_train, dtype=float)
    
    # For each sample, find the leaf it ends up in
    for i, x in enumerate(X_train):
        if node_mask[i]:
            # Traverse the custom tree to find the leaf for this sample
            current_node = node
            while not current_node.is_leaf:
                if x[current_node.feature] <= current_node.threshold:
                    current_node = current_node.left_child
                else:
                    current_node = current_node.right_child
            predictions[i] = current_node.value
    
    # Calculate MSE only for samples in this node
    mse = np.mean((y_train[node_mask] - predictions[node_mask]) ** 2)
    
    return mse

def cost_complexity_pruning(custom_tree, X_train, y_train, sklearn_tree, alpha=0.01):
    """
    Perform cost-complexity pruning on the custom tree
    
    Args:
        custom_tree: CustomDecisionTree to prune
        X_train: Training features
        y_train: Training targets
        sklearn_tree: Original sklearn tree (for decision paths)
        alpha: Complexity parameter - higher alpha means more pruning
        
    Returns:
        pruned_tree: Pruned CustomDecisionTree
        pruning_log: List of pruning operations
    """
    # Create a deep copy to avoid modifying the original
    pruned_tree = copy.deepcopy(custom_tree)
    pruning_log = []
    
    # Continue pruning until no more pruning is possible
    more_pruning_possible = True
    iteration = 0
    
    while more_pruning_possible:
        iteration += 1
        print(f"\n=== Pruning Iteration {iteration} ===")
        
        leaves = pruned_tree.get_leaves()
        pruning_candidates = []
        
        # For each leaf, check if its parent can be pruned
        for leaf in leaves:
            parent = leaf.parent
            
            # Skip if parent is None (root) or if parent has already been marked for pruning
            if parent is None or parent.is_leaf:
                continue
                
            # Check if this parent hasn't been considered yet
            if parent not in [cand[0] for cand in pruning_candidates]:
                # Calculate cost if we keep the subtree
                subtree_cost = calculate_subtree_mse(parent, X_train, y_train, sklearn_tree, pruned_tree)
                subtree_cost_complexity = subtree_cost + alpha * len(pruned_tree.get_subtree_nodes(parent))
                
                # Calculate cost if we prune to a leaf
                leaf_cost = calculate_node_mse(parent, X_train, y_train, sklearn_tree)
                leaf_cost_complexity = leaf_cost + alpha * 1  # Single leaf
                
                # If pruning reduces cost-complexity, add to candidates
                if leaf_cost_complexity <= subtree_cost_complexity:
                    pruning_candidates.append((parent, leaf_cost_complexity - subtree_cost_complexity))
        
        if not pruning_candidates:
            print("No more pruning candidates found.")
            more_pruning_possible = False
            break
        
        # Sort candidates by cost improvement (most negative first - biggest improvement)
        pruning_candidates.sort(key=lambda x: x[1])
        print(f"Found {len(pruning_candidates)} pruning candidates")
        
        # Prune the best candidate
        node_to_prune, cost_improvement = pruning_candidates[0]
        print(f"Pruning node {node_to_prune.node_id} with cost improvement: {cost_improvement:.4f}")
        
        # Convert the node to a leaf
        node_to_prune.is_leaf = True
        node_to_prune.left_child = None
        node_to_prune.right_child = None
        
        # Remove child nodes from the tree
        subtree_nodes = pruned_tree.get_subtree_nodes(node_to_prune)
        for child_node in subtree_nodes:
            if child_node != node_to_prune:
                del pruned_tree.nodes[child_node.node_id]
        
        pruning_log.append({
            'iteration': iteration,
            'node_id': node_to_prune.node_id,
            'cost_improvement': cost_improvement,
            'remaining_nodes': len(pruned_tree.nodes)
        })
        
        print(f"Pruned node {node_to_prune.node_id}. Tree now has {len(pruned_tree.nodes)} nodes.")
        
        # Check if we can prune more (if there are still internal nodes)
        internal_nodes = [n for n in pruned_tree.nodes.values() if not n.is_leaf]
        if not internal_nodes:
            print("No more internal nodes to prune.")
            more_pruning_possible = False
    
    print(f"\n=== Pruning Complete ===")
    print(f"Total iterations: {iteration}")
    print(f"Final tree size: {len(pruned_tree.nodes)} nodes")
    print(f"Original tree size: {len(custom_tree.nodes)} nodes")
    
    return pruned_tree, pruning_log

# ==================== MAIN EXECUTION ====================

def main():
    # Step 1: Create synthetic regression data
    print("Generating synthetic data...")
    X, y = make_regression(n_samples=10000, n_features=10, noise=0.1, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    print(f"Training data shape: {X_train.shape}")
    print(f"Test data shape: {X_test.shape}")
    
    # Step 2: Train Decision Tree Regressor
    print("\nTraining Decision Tree Regressor...")
    dt_regressor = DecisionTreeRegressor(
        max_depth=5,
        min_samples_split=5,
        min_samples_leaf=2,
        random_state=42
    )
    dt_regressor.fit(X_train, y_train)
    
    # Evaluate initial model
    train_score = dt_regressor.score(X_train, y_train)
    test_score = dt_regressor.score(X_test, y_test)
    print(f"Initial Model - Train R²: {train_score:.4f}, Test R²: {test_score:.4f}")
    print(f"Tree has {dt_regressor.tree_.node_count} nodes")
    
    # Step 3: Convert sklearn tree to custom tree
    print("\nConverting to custom tree structure...")
    custom_tree = sklearn_tree_to_custom_tree(dt_regressor, X_train, y_train)
    
    print("Original custom tree structure:")
    custom_tree.print_tree()
    print(f"\nCustom tree has {len(custom_tree.nodes)} nodes")
    print(f"Number of leaves: {len(custom_tree.get_leaves())}")
    
    # Step 4: Perform cost-complexity pruning
    print("\n" + "="*50)
    print("Starting Cost-Complexity Pruning")
    print("="*50)
    
    # Try different alpha values
    alpha_values = [0.001, 0.01, 0.1]
    
    for alpha in alpha_values:
        print(f"\n{'='*50}")
        print(f"Pruning with alpha = {alpha}")
        print(f"{'='*50}")
        
        pruned_tree, pruning_log = cost_complexity_pruning(
            custom_tree, X_train, y_train, dt_regressor, alpha
        )
        
        print(f"\nPruned tree structure (alpha={alpha}):")
        pruned_tree.print_tree()
        
        # Create a function to make predictions with custom tree
        def custom_tree_predict(tree, X):
            predictions = np.zeros(len(X))
            for i, x in enumerate(X):
                node = tree.root
                while not node.is_leaf:
                    if x[node.feature] <= node.threshold:
                        node = node.left_child
                    else:
                        node = node.right_child
                predictions[i] = node.value
            return predictions
        
        # Evaluate pruned tree
        y_train_pred = custom_tree_predict(pruned_tree, X_train)
        y_test_pred = custom_tree_predict(pruned_tree, X_test)
        
        train_r2 = 1 - np.sum((y_train - y_train_pred) ** 2) / np.sum((y_train - np.mean(y_train)) ** 2)
        test_r2 = 1 - np.sum((y_test - y_test_pred) ** 2) / np.sum((y_test - np.mean(y_test)) ** 2)
        
        print(f"\nPruned Model (alpha={alpha}):")
        print(f"  Train R²: {train_r2:.4f}")
        print(f"  Test R²: {test_r2:.4f}")
        print(f"  Nodes: {len(pruned_tree.nodes)}")
        print(f"  Leaves: {len(pruned_tree.get_leaves())}")
        print(f"  Max Depth: {pruned_tree.max_depth}")
        
        # Print pruning log
        print("\nPruning Log:")
        for log_entry in pruning_log[-5:]:  # Show last 5 pruning operations
            print(f"  Iteration {log_entry['iteration']}: Pruned node {log_entry['node_id']}, "
                  f"Cost improvement: {log_entry['cost_improvement']:.4f}")

if __name__ == "__main__":
    main()

In [1]:
import numpy as np
from sklearn.tree import DecisionTreeRegressor


In [2]:
class CustomTreeNode:
    """
    Custom tree node for multivariate regression
    """
    def __init__(
        self,
        node_id,
        is_leaf,
        value,
        feature=None,
        threshold=None,
        impurity=None,
        n_samples=None
    ):
        self.node_id = node_id
        self.is_leaf = is_leaf
        self.value = value              # ndarray (n_targets,)
        self.feature = feature
        self.threshold = threshold
        self.impurity = impurity
        self.n_samples = n_samples

        self.left_child = None
        self.right_child = None
        self.parent = None
        self.depth = 0

    def __repr__(self):
        if self.is_leaf:
            return (
                f"Leaf(id={self.node_id}, "
                f"value={np.round(self.value, 3)}, "
                f"samples={self.n_samples})"
            )
        return (
            f"Node(id={self.node_id}, "
            f"feature={self.feature}, "
            f"threshold={self.threshold:.3f}, "
            f"samples={self.n_samples})"
        )


In [3]:
class CustomDecisionTree:
    def __init__(self):
        self.root = None
        self.nodes = {}
        self.max_depth = 0

    def add_node(self, node):
        self.nodes[node.node_id] = node
        if node.parent is None:
            self.root = node
            node.depth = 0
        else:
            node.depth = node.parent.depth + 1
            self.max_depth = max(self.max_depth, node.depth)

    def get_leaves(self):
        return [n for n in self.nodes.values() if n.is_leaf]

    def get_subtree_nodes(self, node):
        stack = [node]
        subtree = []
        while stack:
            current = stack.pop()
            subtree.append(current)
            if not current.is_leaf:
                stack.append(current.right_child)
                stack.append(current.left_child)
        return subtree

    def print_tree(self, node=None, indent=0):
        if node is None:
            node = self.root
        print("  " * indent + str(node))
        if not node.is_leaf:
            self.print_tree(node.left_child, indent + 1)
            self.print_tree(node.right_child, indent + 1)


In [4]:
def sklearn_tree_to_custom_tree(sklearn_model):
    """
    Optimized conversion for multivariate regression
    """
    tree = sklearn_model.tree_
    custom_tree = CustomDecisionTree()

    node_map = {}

    # Create nodes (single pass)
    for i in range(tree.node_count):
        is_leaf = tree.children_left[i] == -1

        node = CustomTreeNode(
            node_id=i,
            is_leaf=is_leaf,
            value=tree.value[i][0].copy(),   # ndarray (n_targets,)
            feature=None if is_leaf else tree.feature[i],
            threshold=None if is_leaf else tree.threshold[i],
            impurity=tree.impurity[i],
            n_samples=tree.n_node_samples[i]
        )

        node_map[i] = node
        custom_tree.add_node(node)

    # Assign parent-child relationships
    for i in range(tree.node_count):
        node = node_map[i]
        if not node.is_leaf:
            left_id = tree.children_left[i]
            right_id = tree.children_right[i]

            node.left_child = node_map[left_id]
            node.right_child = node_map[right_id]

            node.left_child.parent = node
            node.right_child.parent = node

    return custom_tree


In [5]:
def count_leaves(node):
    if node.is_leaf:
        return 1
    return count_leaves(node.left_child) + count_leaves(node.right_child)

def nodes_by_depth(tree):
    levels = {}
    for node in tree.nodes.values():
        levels.setdefault(node.depth, []).append(node)
    return [levels[d] for d in sorted(levels.keys(), reverse=True)]


In [6]:
def bottom_up_level_pruning(tree, cost_function):
    """
    Generic bottom-up, level-by-level pruning.

    Parameters
    ----------
    tree : CustomDecisionTree
    cost_function : callable
        cost_function(node, as_leaf=True|False) -> float
    """

    levels = nodes_by_depth(tree)

    for level_nodes in levels:
        for node in level_nodes:

            if node.is_leaf:
                continue

            # Compute costs
            cost_as_subtree = cost_function(node, as_leaf=False)
            cost_as_leaf = cost_function(node, as_leaf=True)

            # Prune condition
            if cost_as_leaf <= cost_as_subtree:
                # Remove subtree nodes
                subtree = tree.get_subtree_nodes(node)
                for child in subtree[1:]:
                    tree.nodes.pop(child.node_id, None)

                node.left_child = None
                node.right_child = None
                node.is_leaf = True


In [7]:
def subtree_impurity(node):
    if node.is_leaf:
        return node.impurity * node.n_samples
    return (
        subtree_impurity(node.left_child) +
        subtree_impurity(node.right_child)
    )


In [8]:
# A. CART Cost-Complexity Pruning (Recommended)
def cart_cost_function(alpha):
    def cost(node, as_leaf):
        if as_leaf:
            return node.impurity * node.n_samples + alpha * 1
        else:
            return subtree_impurity(node) + alpha * count_leaves(node)
    return cost

# B. Pure Structural Pruning (Aggressive)
def structural_cost(node, as_leaf):
    return 1 if as_leaf else count_leaves(node)

# C. Hybrid (Structure + Variance)
def hybrid_cost(alpha, beta):
    def cost(node, as_leaf):
        if as_leaf:
            return beta * node.impurity + alpha
        return beta * subtree_impurity(node) + alpha * count_leaves(node)
    return cost




In [9]:
# Example usage

alpha = 0.01
cost_fn = cart_cost_function(alpha)

bottom_up_level_pruning(custom_tree, cost_fn)


NameError: name 'custom_tree' is not defined

# testing my code

In [27]:
import numpy as np
from sklearn.tree import DecisionTreeRegressor
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split


In [28]:
X, y = make_regression(
    n_samples=300,
    n_features=6,
    n_targets=2,          # multivariate regression
    noise=15.0,
    random_state=42
)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, random_state=42
)


In [51]:
dt = DecisionTreeRegressor(
    max_depth=8,          # intentionally deep
    min_samples_leaf=1,
    random_state=42
)

dt.fit(X_train, y_train)


In [52]:
class CustomTreeNode:
    def __init__(self, node_id, is_leaf, value, feature=None,
                 threshold=None, impurity=None, n_samples=None):
        self.node_id = node_id
        self.is_leaf = is_leaf
        self.value = value
        self.feature = feature
        self.threshold = threshold
        self.impurity = impurity
        self.n_samples = n_samples

        self.left_child = None
        self.right_child = None
        self.parent = None
        self.depth = 0


class CustomDecisionTree:
    def __init__(self):
        self.root = None
        self.nodes = {}
        self.max_depth = 0

    def add_node(self, node):
        self.nodes[node.node_id] = node
        if node.parent is None:
            self.root = node
            node.depth = 0
        else:
            node.depth = node.parent.depth + 1
            self.max_depth = max(self.max_depth, node.depth)

    def get_leaves(self):
        return [n for n in self.nodes.values() if n.is_leaf]

    def get_subtree_nodes(self, node):
        stack = [node]
        nodes = []
        while stack:
            n = stack.pop()
            nodes.append(n)
            if not n.is_leaf:
                stack.append(n.right_child)
                stack.append(n.left_child)
        return nodes


In [53]:
def sklearn_tree_to_custom_tree(model):
    tree = model.tree_
    custom_tree = CustomDecisionTree()
    node_map = {}

    # Create nodes
    for i in range(tree.node_count):
        is_leaf = tree.children_left[i] == -1
        node = CustomTreeNode(
            node_id=i,
            is_leaf=is_leaf,
            value=tree.value[i][0].copy(),
            feature=None if is_leaf else tree.feature[i],
            threshold=None if is_leaf else tree.threshold[i],
            impurity=tree.impurity[i],
            n_samples=tree.n_node_samples[i]
        )
        node_map[i] = node
        custom_tree.nodes[i] = node

    # Link nodes
    for i in range(tree.node_count):
        node = node_map[i]
        if not node.is_leaf:
            left = node_map[tree.children_left[i]]
            right = node_map[tree.children_right[i]]

            node.left_child = left
            node.right_child = right
            left.parent = node
            right.parent = node

    # Assign root
    custom_tree.root = node_map[0]

    # Compute depths correctly
    def assign_depths(node, depth=0):
        node.depth = depth
        custom_tree.max_depth = max(custom_tree.max_depth, depth)
        if not node.is_leaf:
            assign_depths(node.left_child, depth + 1)
            assign_depths(node.right_child, depth + 1)

    assign_depths(custom_tree.root)

    return custom_tree


In [54]:
custom_tree = sklearn_tree_to_custom_tree(dt)


In [55]:
def count_leaves(node):
    if node.is_leaf:
        return 1
    return count_leaves(node.left_child) + count_leaves(node.right_child)


def subtree_impurity(node):
    if node.is_leaf:
        return node.impurity * node.n_samples
    return (
        subtree_impurity(node.left_child) +
        subtree_impurity(node.right_child)
    )


In [56]:
def cart_cost_function(alpha):
    def cost(node, as_leaf):
        if as_leaf:
            return node.impurity * node.n_samples + alpha * 1
        else:
            return subtree_impurity(node) + alpha * count_leaves(node)
    return cost


In [57]:
def nodes_by_depth(tree):
    levels = {}
    for node in tree.nodes.values():
        levels.setdefault(node.depth, []).append(node)
    return [levels[d] for d in sorted(levels.keys(), reverse=True)]


In [58]:
def bottom_up_level_pruning(tree, cost_function):
    while True:
        pruned_any = False
        levels = nodes_by_depth(tree)

        for level in levels:
            for node in level:
                if node.is_leaf:
                    continue

                cost_subtree = cost_function(node, as_leaf=False)
                cost_leaf = cost_function(node, as_leaf=True)

                if cost_leaf <= cost_subtree:
                    subtree = tree.get_subtree_nodes(node)
                    for child in subtree[1:]:
                        tree.nodes.pop(child.node_id, None)

                    node.left_child = None
                    node.right_child = None
                    node.is_leaf = True

                    pruned_any = True

        if not pruned_any:
            break


In [59]:
print("BEFORE PRUNING")
print("Total nodes :", len(custom_tree.nodes))
print("Total leaves:", len(custom_tree.get_leaves()))
print("Max depth   :", custom_tree.max_depth)


BEFORE PRUNING
Total nodes : 223
Total leaves: 112
Max depth   : 8


In [60]:
alpha = 0.05
cost_fn = cart_cost_function(alpha)

bottom_up_level_pruning(custom_tree, cost_fn)


In [61]:
print("\nAFTER PRUNING")
print("Total nodes :", len(custom_tree.nodes))
print("Total leaves:", len(custom_tree.get_leaves()))
print("Max depth   :", custom_tree.max_depth)



AFTER PRUNING
Total nodes : 223
Total leaves: 112
Max depth   : 8
