# Symbolic Regression Implementation

### Loading the dataset and defining the input variables

In [None]:
import numpy as np
from gxgp.node import Node as GXNode
from gxgp.draw import draw
from typing import Optional, Any, Set, List, Tuple
import random

dataset_path = '../data/problem_1.npz' 
with np.load(dataset_path) as data:
    x_validation = data['x']
    y_validation = data['y']

num_variables = x_validation.shape[0]
VARIABLES = [f"x{i}" for i in range(num_variables)]


### Tree Node Definition

In [None]:
class TreeNode:
    def __init__(self, value, children=None):
        """
        value could be:
            - a float (constant);
            - a string (e.g., 'x0', 'x1', ... to represent a variable);
            - a Python function ( np.sin, np.cos, np.add, ...)
        """
        self.value = value
        self.children = children if children is not None else []
    
    def is_leaf(self):
        """
        Returns True if the node is a leaf node (i.e., has no children).
        """
        return not self.children
    
    def __str__(self):
        """
        Textual representation of the tree
        """
        if self.is_leaf():
            return str(self.value)
        children_str = ', '.join(str(child) for child in self.children)
        return f"({self.value} {children_str})"

### Tree Node Evaluation

In [None]:
def evaluate_node(node: TreeNode, x: np.ndarray) -> np.ndarray:
    """
    Evaluate a TreeNode against the input data x.
    """
    if node.is_leaf():
        return evaluate_leaf(node.value, x)
    
    op_func = node.value
    evaluated_children = [evaluate_node(child, x) for child in node.children]
    if len(evaluated_children) == 1:
        return apply_unary_safe(op_func, evaluated_children[0])
    elif len(evaluated_children) == 2:
        return apply_binary_safe(op_func, evaluated_children[0], evaluated_children[1])
    else:
        raise ValueError(f"Unsupported number of children: {len(evaluated_children)}")

def evaluate_leaf(value: Any, x: np.ndarray) -> np.ndarray:
    """
    Evaluate a leaf node value against the input data x.
    """
    if isinstance(value, (int, float)):
        return np.full(x.shape[1], float(value))
    
    if isinstance(value, str):
        if value in VARIABLES:
            return x[VARIABLES.index(value), :]
        raise ValueError(f"Unknown Variable: {value}")
    
    raise ValueError(f"Unknown leaf value: {value}")

def apply_unary_safe(func, arr: np.ndarray) -> np.ndarray:
    """
    Apply a unary function safely, catching exceptions and returning a ValueError if it fails.
    """
    try:
        return func(arr)
    except Exception as e:
        raise ValueError(f"Unary operator {func} failed with error: {e}") 

def apply_binary_safe(func, arr1: np.ndarray, arr2: np.ndarray) -> np.ndarray:
    """
    Apply a binary function safely, catching exceptions and returning a ValueError if it fails.
    """
    try:
        return func(arr1, arr2)
    except Exception as e:
        raise ValueError(f"Binary operator {func} failed with error: {e}") 

def sin_fn(x): return np.sin(x)
def cos_fn(x): return np.cos(x)
def neg_fn(x): return -x
def abs_fn(x): return np.abs(x)
def log_safe(x): return np.where(x <= 0, 0.0, np.log(x))
def sqrt_safe(x): return np.sqrt(np.abs(x))
def exp_safe(x): return np.exp(np.clip(x, -700, 700))

def add_fn(a, b): return a + b
def sub_fn(a, b): return a - b
def mul_fn(a, b): return a * b
def div_safe(a, b): return np.where(np.abs(b) < 1e-12, 1.0, a / b)

FUNCTION_LIBRARY = [
    {'function': sin_fn, 'arity': 1, 'weight': 1.0},
    {'function': cos_fn, 'arity': 1, 'weight': 1.0},
    {'function': neg_fn, 'arity': 1, 'weight': 1.0},
    {'function': abs_fn, 'arity': 1, 'weight': 1.0},
    {'function': log_safe, 'arity': 1, 'weight': 1.0},
    {'function': sqrt_safe, 'arity': 1, 'weight': 1.0},
    {'function': exp_safe, 'arity': 1, 'weight': 1.0},
    {'function': add_fn, 'arity': 2, 'weight': 1.0},
    {'function': sub_fn, 'arity': 2, 'weight': 1.0},
    {'function': mul_fn, 'arity': 2, 'weight': 1.0},
    {'function': div_safe, 'arity': 2, 'weight': 1.0}
]

UNARY_OPERATORS = [f for f in FUNCTION_LIBRARY if f['arity'] == 1 and f['weight'] > 0]
BINARY_OPERATORS = [f for f in FUNCTION_LIBRARY if f['arity'] == 2 and f['weight'] > 0]

### Random Tree Generation

In [None]:
MAX_TREE_DEPTH = 8
CONST_MIN = -10.0
CONST_MAX = 10.0

def create_subtree(
    depth: int,
    force_variable: Optional[str] = None,  
    constant_min: float =CONST_MIN,
    constant_max: float =CONST_MAX
) -> TreeNode:
    """
    Generate recursively a subtree. If force_var is provided, it will be used as the root node.
    If depth is 0, a leaf node will be created with either a variable or a random constant.
    """
    if depth <= 0:
        if force_variable:
            return TreeNode(force_variable)
        elif VARIABLES and random.random() < 0.5:
            return TreeNode(random.choice(VARIABLES))
        else:
            return TreeNode(random.uniform(constant_min, constant_max))
        
    if random.random() < 0.3 and UNARY_OPERATORS:
        op = random.choices(
            UNARY_OPERATORS,
            weights=[f['weight'] for f in UNARY_OPERATORS],
            k=1
        )[0]['function']
        child = create_subtree(depth - 1, force_variable, constant_min, constant_max)
        return TreeNode(op, [child])
    
    else:
        op = random.choices(
            BINARY_OPERATORS,
            weights=[f['weight'] for f in BINARY_OPERATORS],
            k=1
        )[0]['function']
        left = create_subtree(depth - 1, force_variable, constant_min, constant_max)
        right = create_subtree(depth - 1, None, constant_min, constant_max)
        return TreeNode(op, [left, right])

def generate_random_expression_tree(
        depth=MAX_TREE_DEPTH, 
        constant_min=CONST_MIN, 
        constant_max=CONST_MAX
) -> TreeNode:
    """
    Generate a tree which includes at least one of the available variables.
    """
    if not VARIABLES:
        raise ValueError("There is no variable available.")
    
    selected_var = (
        VARIABLES[0] if len(VARIABLES) == 1 else random.choice(VARIABLES)
    )
    return create_subtree(depth, force_variable=selected_var, constant_min=constant_min, constant_max=constant_max)

### Expression String Representation

In [None]:
log_safe = lambda x: np.where(x <= 0, 0.0, np.log(x))
sqrt_safe = lambda x: np.sqrt(np.abs(x))
exp_safe = lambda x: np.exp(np.clip(x, -700, 700))
div_safe = lambda a, b: np.where(np.abs(b) < 1e-12, 1.0, a / b)

FUNCTION_DISPLAY_NAMES = {
    id(np.add): '+', 
    id(np.subtract): '-', 
    id(np.multiply): '*', 
    id(np.sin): 'sin',
    id(np.cos): 'cos',
    id(np.negative): 'neg',
    id(np.abs): 'abs',
    id(log_safe): 'log',
    id(sqrt_safe): 'sqrt',
    id(exp_safe): 'exp',
    id(div_safe): '/',
}

def expression_to_string(node: TreeNode) -> str:
    """
    It Returns a string representation of the tree node
    """
    if node.is_leaf():
        val = node.value
        if isinstance(val, (float, int)) and not isinstance(val, bool):
            return f"{float(val):.3f}"
        elif isinstance(val, str) and val.startswith('x'):
            return val
        return str(val)

    op = node.value
    op_name = FUNCTION_DISPLAY_NAMES.get(id(op), getattr(op, '__name__', str(op)))
    child_expressions = [expression_to_string(child) for child in node.children]
    
    if len(child_expressions) == 1:
        return f"{op_name}({child_expressions[0]})"
    elif len(child_expressions) == 2:
        return f"({child_expressions[0]} {op_name} {child_expressions[1]})"
    else:
        raise ValueError(f"Unsupported number of children: {len(child_expressions)}")

### GXGP Node Conversion Function


In [None]:
def create_variable_function(var_name: str):
    """
    It creates a function that returns the value of a variable from the keyword arguments.
    """
    def var_func(**kwargs):
        return kwargs[var_name]
    var_func.__name__ = var_name
    return var_func

def create_constant_function(const_val: float):
    """
    It creates a function that returns a constant value.
    """
    def const_func(**kwargs):
        return const_val
    const_func.__name__ = f"{const_val:.3f}"
    return const_func

def convert_tree_to_gxgp_node(node: TreeNode, collected_nodes: Optional[Set[GXNode]]) -> GXNode:
    """
    It converts a custom TreeNode object to a GXNode object.
    If `collected_nodes` is provided, it will be used to collect the nodes.
    """    
    if collected_nodes is None:
        collected_nodes = set()

    if node.is_leaf():
        value = node.value

        if isinstance(value, str):
            var_func = create_variable_function(value)
            gx_node = GXNode(var_func, [], name=value)      
        else:
            const_func = create_constant_function(float(value))
            gx_node = GXNode(const_func, [], name=f"{float(value):.3f}") 
        
        collected_nodes.add(gx_node)
        return gx_node
    
    op = node.value
    op_name = FUNCTION_DISPLAY_NAMES.get(id(op), getattr(op, '__name__', str(op)))
    
    converted_children = [
        convert_tree_to_gxgp_node(child, collected_nodes)
        for child in (node.children or [])
    ]

    gx_node = GXNode(op, converted_children, name=op_name)
    collected_nodes.add(gx_node)
    return gx_node



### Fitness Function

In [None]:
def evaluate_fitness(individual: TreeNode, x: np.ndarray, y: np.ndarray, penalty_factor: float = 0.001) -> float:
    """
    It calculates the fitness of an individual based on Mean Square Error (MSE). 
    """
    try:
        y_pred = evaluate_node(individual, x)

        if not np.all(np.isfinite(y_pred)):
            return 1e10 
        
        mse = np.mean((y - y_pred) ** 2)
        return mse
    
    except Exception as e:
        return 1e10 

### Selection Function

In [None]:
def tournament_selection(population: List[TreeNode], x: np.ndarray, y: np.ndarray, k=3) -> TreeNode:
    """
    It returns the best individual from a random subset of the population (a random tournament).
    """
    contenders = random.sample(population, k) if len(population) >= k else population
    best_individual = min(contenders, key=lambda ind: evaluate_fitness(ind, x, y))
    return best_individual

### Crossover Functions

In [None]:
def crossover(parent1: TreeNode, parent2: TreeNode) -> TreeNode:
    """
    Crossover between two parents by swapping two internal subtrees.
    """
    offspring1 = clone_expression_tree(parent1)
    offspring2 = clone_expression_tree(parent2)
    
    internal_nodes1 = [node for node in collect_all_nodes(offspring1) if not node.is_leaf()]
    internal_nodes2 = [node for node in collect_all_nodes(offspring2) if not node.is_leaf()]
    
    if internal_nodes1 and internal_nodes2:
        node1 = random.choice(internal_nodes1)
        node2 = random.choice(internal_nodes2)
        node1.value, node1.children, node2.value, node2.children = (
            node2.value, node2.children, node1.value, node1.children
        )
    
    return offspring1


def select_random_node(tree: TreeNode) -> TreeNode:
    """
    It returns a random node from the specified tree.
    """
    all_nodes = collect_all_nodes(tree)
    return random.choice(all_nodes)

def collect_all_nodes(tree: TreeNode) -> List[TreeNode]:
    """
    It returns a list with all the nodes of the tree.
    """
    nodes = [tree]
    for child in tree.children:
        nodes.extend(collect_all_nodes(child))
    return nodes

def clone_expression_tree(node: TreeNode) -> TreeNode:
    """
    It creates a deep copy of the tree.
    """
    copied_node = TreeNode(node.value)
    copied_node.children = [clone_expression_tree(child) for child in node.children]
    return copied_node


### Mutation Functions

In [None]:
def subtree_mutation(individual: TreeNode, mutation_rate: float = 0.4) -> TreeNode:
    """
    With a certain probability it mutates a node of the tree.
    """
    mutant = clone_expression_tree(individual)

    if random.random() < mutation_rate:
        internal_nodes = [node for node in collect_all_nodes(mutant) if not node.is_leaf()]
        leaf_nodes = [node for node in collect_all_nodes(mutant) if node.is_leaf()]
        
        if internal_nodes and (not leaf_nodes or random.random() < 0.5):
            
            target_node = random.choice(internal_nodes)
            new_subtree = generate_random_expression_tree(
                depth=MAX_TREE_DEPTH, 
                constant_min=CONST_MIN, 
                constant_max=CONST_MAX
            )
            target_node.value = new_subtree.value
            target_node.children = new_subtree.children
        
        elif leaf_nodes:
            target_node = random.choice(leaf_nodes)
            if random.random() < 0.5 and VARIABLES:
                target_node.value = random.choice(VARIABLES)
            else:
                target_node.value = random.uniform(CONST_MIN, CONST_MAX)
                target_node.children = []
    
    return mutant

def hoist_mutation(individual: TreeNode) -> TreeNode:
    """
    Performs hoist mutation by replacing the individual with a randomly chosen subtree.
    """
    candidate_subtrees = [n for n in collect_all_nodes(individual) if not n.is_leaf()]

    if not candidate_subtrees:
        return clone_expression_tree(individual)

    return clone_expression_tree(random.choice(candidate_subtrees))

### Depth Tree Control

In [None]:
def enforce_tree_depth_limit(node: TreeNode, max_depth: int = 3, current_depth: int = 0) -> None:
    """It reduces the depth of the tree to a maximum depth."""
    if current_depth >= max_depth:
        if VARIABLES:
            replacement = random.choice(VARIABLES + [random.uniform(CONST_MIN, CONST_MAX)])
        else:
            replacement = random.uniform(CONST_MIN, CONST_MAX)
        
        node.value = replacement
        node.children = []
    else:
        for c in node.children:
            enforce_tree_depth_limit(c, max_depth, current_depth + 1)


### Genetic Programming Algorithm

In [None]:
def run_genetic_programming(x: np.ndarray, y: np.ndarray, population_size: int = 50000, generations: int = 200, elite_size: int = 2, max_depth: int = 6) -> Tuple[TreeNode, float, List[Tuple[TreeNode, float]]]:
    """
    It executes the genetic programming algorithm.
    
    Returns:
        - the best individual found,
        - its fitness value,
        - the hall of fame (best for each generation)
    """
    population: List[TreeNode] = []

    for i in range(population_size // 2):
        tree = generate_random_expression_tree(depth=2)
        enforce_tree_depth_limit(tree, max_depth=max_depth)
        population.append(tree)

    for i in range(population_size - len(population)):
        tree = generate_random_expression_tree(depth=4)
        enforce_tree_depth_limit(tree, max_depth=max_depth)
        population.append(tree)
    
    best_overall: TreeNode = None
    best_fitness: float = float('inf')
    hall_of_fame: List[Tuple[TreeNode, float]] = []  

    for generation in range(generations):
        evaluated = [(individual, evaluate_fitness(individual, x, y)) for individual in population]
        evaluated.sort(key=lambda x: x[1]) 
        
        best_current, current_fitness = evaluated[0]
      
        if current_fitness < best_fitness:
            best_overall = clone_expression_tree(best_current)
            best_fitness = current_fitness
        
        best_str = expression_to_string(best_current)
        print(f"[Generazione {generation}] Best MSE: {current_fitness:.40f} => {best_str}")
        
        hall_of_fame.append((clone_expression_tree(best_current), current_fitness))
        
        new_population: List[TreeNode] = [ind for ind, _ in evaluated[:elite_size]]
        
        while len(new_population) < population_size:
            parent1 = tournament_selection(population, x, y, k=3)
            parent2 = tournament_selection(population, x, y, k=3)
            offspring = crossover(parent1, parent2)

            if random.random() < 0.10:
                offspring = hoist_mutation(offspring)
            else:
                offspring = subtree_mutation(offspring)
            enforce_tree_depth_limit(offspring, max_depth=max_depth)
            new_population.append(offspring)
        
        population = new_population
    
    return best_overall, best_fitness, hall_of_fame

### Training

In [None]:
N = x_validation.shape[1]
print("\nTraining:\n")

TRAIN_SIZE = N // 10
print(f"TRAIN_SIZE = {TRAIN_SIZE}")

train_indexes = np.random.choice(N, size=TRAIN_SIZE, replace=False)
x_train = x_validation[:, train_indexes]
y_train = y_validation[train_indexes]

population_sizes = [1000, 10_000, 50_000]
generations_list = [100, 200]
elite_sizes = [2, 4]

for pop_size in population_sizes:
    for gen_count in generations_list:
        for elite_size in elite_sizes:
            print(f"\n[TRAINING] - Population = {pop_size}, Generations = {gen_count}, Elite = {elite_size}")
            
            best_individual, best_fitness, hall_of_fame = run_genetic_programming(
                x_train, 
                y_train,
                population_size=pop_size,
                generations=gen_count,
                elite_size=elite_size,
                max_depth=6
            )


            expr_str_training = expression_to_string(best_individual)
            print(f"\nResults - Population = {pop_size}, Generations = {gen_count}, Elite={elite_size}")
            print(f"Best expression found = {expr_str_training}")
            print(f"MSE = {best_fitness}")     

            gx_node = convert_tree_to_gxgp_node(best_individual, collected_nodes=None)
            print("Final Expression Tree (GP on training set):")
            file_name = f"tree_pop{pop_size}_gen{gen_count}_elite{elite_size}.png"
            draw(gx_node, file_name)



### Test

In [None]:
print("Test:\n")

best_individual, best_fitness, hall_of_fame = run_genetic_programming(
    x_validation, y_validation,
    population_size=50_000,
    generations=200,
    elite_size=4,
    max_depth=8
)

expr_str = expression_to_string(best_individual)
print(f"Best expression found = {expr_str}")
print(f"MSE = {best_fitness}")

gx_best_individual = convert_tree_to_gxgp_node(best_individual)
print("Final Expression Tree (GP on test set):")
draw(gx_best_individual)
