## Import and Load Problems

In [13]:
import random
import numpy as np
import copy

In [14]:
problem = np.load('../data/problem_5.npz')
x = problem['x']
y = problem['y']
x.shape

(2, 5000)

## Operators

In [15]:
OPERATORS = {
    '+': lambda a, b: a + b,
    '-': lambda a, b: a - b,
    '*': lambda a, b: np.where((np.abs(a) < 1e6) & (np.abs(b) < 1e6), a * b, 1e6),
    '/': lambda a, b: np.where(b != 0, a / b, 1e6),
    'sin': lambda a: np.sin(np.where(np.abs(a) < 1e6, a, 0)),
    'cos': lambda a: np.cos(np.where(np.abs(a) < 1e6, a, 0)),
    'tan': lambda a: np.where(np.abs(np.cos(a)) > 1e-10, np.tan(a), 1e6),
    'exp': lambda a: np.exp(a, out=np.full_like(a, 1e6), where=(a < 100)),
    'log': lambda a: np.where(a > 0, np.log(np.maximum(a, 1e-10)), -1e6),
    'sqrt': lambda a: np.where(a >= 0, np.sqrt(np.maximum(a, 0)), -1e6),
    'abs': np.abs,
}

## MSE Unary Operators

In [16]:
def calculate_mse_for_operator(name,operator, x, y):
    """
    Calculate the mean squared error (MSE) for a given unary or binary operator.
    """
    if name in {'+', '-', '*', '/'}:
        print("It is a binary operator")
        mse = 1e9
    else:
        y_pred = operator(x)
        mse = np.mean((y - y_pred) ** 2)

    return mse

best_operator = None
best_mse_op = 1e9

# MSE for each operator
for name, operator in OPERATORS.items():
    mse = calculate_mse_for_operator(name,operator, x, y)
    print(f"MSE for {name}: {mse:.6f}")
    if mse < best_mse_op:
        best_operator = name
        best_mse_op = mse

print(f"\nBest Operator: {best_operator} (MSE: {best_mse_op})")


It is a binary operator
MSE for +: 1000000000.000000
It is a binary operator
MSE for -: 1000000000.000000
It is a binary operator
MSE for *: 1000000000.000000
It is a binary operator
MSE for /: 1000000000.000000
MSE for sin: 0.528604
MSE for cos: 0.471396
MSE for tan: 1350.784045
MSE for exp: 2198.152678
MSE for log: 1.365267
MSE for sqrt: 2.493278
MSE for abs: 8.299178

Best Operator: cos (MSE: 0.47139632384380803)


## Symbolic Regression

In [17]:
# Parameters
POPULATION_SIZE = 100
MUTATION_RATE = 0.3
MAX_GENERATIONS = 100
TOURNAMENT_SIZE = 10
MAX_DEPTH = 8

def select_random_node(node):
    """Select a random node from the tree using DFS."""
    nodes = []
    
    def dfs(current_node):
        if current_node is None:
            return
        nodes.append(current_node)
        if current_node.left:
            dfs(current_node.left)
        if current_node.right:
            dfs(current_node.right)
    
    dfs(node)
    return random.choice(nodes) if nodes else None

class Node:
    """Class representing a node in the expression tree."""
    def __init__(self, value, left=None, right=None):
        self.value = value
        self.left = left
        self.right = right

    def evaluate(self, x):
        try:
            x = np.clip(x, -1e6, 1e6)
            
            if self is None:
                return np.zeros_like(x)

            if self.value in OPERATORS:
                func = OPERATORS[self.value]
                if self.value in {'+', '-', '*', '/'}:
                    if self.left is not None and self.right is not None:
                        return func(self.left.evaluate(x), self.right.evaluate(x))
                    else:
                        return np.zeros_like(x)
                else:
                    if self.left is not None:
                        return func(self.left.evaluate(x))
                    else:
                        return np.zeros_like(x)

            elif isinstance(self.value, str) and self.value.startswith('x['):
                # Handle specific variables like x[0], x[1], etc.
                index = int(self.value[2:-1])  # Extract the index from 'x[i]'
                return x[index]  # Return the corresponding x[i]
            else:
                return np.full_like(x, self.value)

        except FloatingPointError:
            return np.full_like(x, 1e6)  # If an error occurs, return a finite value

    def __str__(self):
        """Return a readable representation of the expression."""
        if self.value in {'+', '-', '*', '/'}:
            return f"({str(self.left)} {self.value} {str(self.right)})"
        elif self.value in OPERATORS:
            return f"{self.value}({str(self.left)})"
        return str(self.value)

class Tree:
    """Class representing a symbolic expression tree."""
    def __init__(self, root=None, depth=MAX_DEPTH, num_variables=3):
        self.num_variables = num_variables  # Number of variables (e.g., x[0], x[1], x[2])
        self.root = root if root else self.generate_random_tree(depth)

    def generate_random_tree(self, depth, early_termination_chance=0.2):
        """Generate a random tree with a chance to terminate early."""
        if depth == 0 or random.random() < early_termination_chance:
            # Generate either a constant or a specific variable like x[0], x[1], etc.
            if random.random() < 0.5:
                return Node(random.uniform(-10, 10))  # Constant
            else:
                var_index = random.randint(0, self.num_variables - 1)  # Random variable index
                return Node(f'x[{var_index}]')  # Specific variable like x[0], x[1], etc.

        op = random.choice(list(OPERATORS.keys()))
        if op in {'+', '-', '*', '/'}:
            left = self.generate_random_tree(depth - 1, early_termination_chance)
            right = self.generate_random_tree(depth - 1, early_termination_chance)
            return Node(op, left, right)
        else:  # Unary operations
            left = self.generate_random_tree(depth - 1, early_termination_chance)
            return Node(op, left)

    def evaluate(self, x):
        """Evaluate the tree on the input x."""
        return self.root.evaluate(x)

    def fitness(self, x, y):
        """Calculate MSE fitness for the input x and target y."""
        y_pred = self.evaluate(x)
        mse = np.mean((y - y_pred) ** 2)
        return mse

    def __str__(self):
        """Return the readable representation of the tree."""
        return str(self.root)
    
    def dynamic_crossover(self, other_tree, generation, max_generations, exploration=True):
        """
        Dynamic crossover between two trees based on current generation.
        - exploration=True: two-point crossover (more exploratory)
        - exploration=False: single-point crossover (more conservative)
        """
        def deep_copy_subtree(node):
            """Create a deep copy of a subtree"""
            if node is None:
                return None
            new_node = copy.deepcopy(node)
            return new_node

        def replace_subtree(original_node, replacement_subtree):
            """Replace a node's entire subtree"""
            if original_node is None:
                return replacement_subtree
            
            original_node.value = replacement_subtree.value
            original_node.left = replacement_subtree.left
            original_node.right = replacement_subtree.right
            return original_node

        # Select random nodes
        node1 = select_random_node(self.root)
        node2 = select_random_node(other_tree.root)

        if node1 is not None and node2 is not None:
            if exploration:
                # Two-point crossover: swap entire subtrees
                subtree1 = deep_copy_subtree(node1)
                subtree2 = deep_copy_subtree(node2)
                
                replace_subtree(node1, subtree2)
                replace_subtree(node2, subtree1)
                
            else:
                # Single-point crossover: replace subtree
                subtree = deep_copy_subtree(node2)
                replace_subtree(node1, subtree)

    def mutate(self, mutation_rate=MUTATION_RATE):
        """Perform a random mutation on the tree."""
        def mutate_node(node):
            if node is None:
                return
            # Mutation probability
            if random.random() < mutation_rate:
                # Random mutation of the node
                if node.value in {'+', '-', '*', '/', 'sin', 'cos', 'exp', 'log', 'sqrt', 'tan', 'abs'}:
                    node.value = random.choice(list(OPERATORS.keys()))
                elif isinstance(node.value, (int, float)):
                    if random.random() < 0.5:
                        node.value += random.uniform(-1, 1)  # Small perturbation
                    else:
                        node.value = random.uniform(-10, 10)  # Full random change
                elif isinstance(node.value, str) and node.value.startswith('x['):
                    # Mutate the variable index
                    var_index = random.randint(0, self.num_variables - 1)
                    node.value = f'x[{var_index}]'
                

            # Recursion on children
            mutate_node(node.left)
            mutate_node(node.right)

        mutate_node(self.root)

    def prune(self, max_complexity):
            """Prune the tree to reduce complexity."""
            
            def count_nodes(node):
                """Count the number of nodes in the tree."""
                if node is None:
                    return 0
                return 1 + count_nodes(node.left) + count_nodes(node.right)

            def simplify(node):
                """Simplify the tree by removing unnecessary nodes."""
                if node is None:
                    return None

                node.left = simplify(node.left)
                node.right = simplify(node.right)

                # Common simplifications
                if node.value == '+' and node.right and isinstance(node.right.value, (int, float)) and node.right.value == 0:
                    return node.left  # x + 0 = x
                if node.value == '*' and node.right and isinstance(node.right.value, (int, float)):
                    if node.right.value == 1:
                        return node.left  # x * 1 = x
                    elif node.right.value == 0:
                        return Node(0)  # x * 0 = 0

                return node

            # Simplify the tree
            self.root = simplify(self.root)

            # If the tree is still too complex, prune the deepest nodes
            while count_nodes(self.root) > max_complexity:
                def prune_deepest(node, depth=0, max_depth=MAX_DEPTH):
                    """Prune the deepest nodes in the tree."""
                    if node is None or depth >= max_depth - 1:
                        return Node(random.uniform(-10, 10))  
                    if node.left:
                        node.left = prune_deepest(node.left, depth + 1, max_depth)
                    if node.right:
                        node.right = prune_deepest(node.right, depth + 1, max_depth)
                    return node

                self.root = prune_deepest(self.root)


class Population:
    """Class representing a population of random trees."""
    def __init__(self, size=POPULATION_SIZE, depth=MAX_DEPTH, num_variables=3):
        self.num_variables = num_variables
        self.trees = [Tree(depth=depth, num_variables=num_variables) for _ in range(size)]

    def evaluate_population(self, x, y):
        """Calculate the fitness of each tree and sort the population."""
        for tree in self.trees:
            tree.fitness_value = tree.fitness(x, y)
        self.trees.sort(key=lambda t: t.fitness_value) 
        print("BEST TREE")
        print(self.trees[0])

    def best_tree(self):
        """Return the best tree in the population."""
        return self.trees[0] if self.trees else None

    def __str__(self):
        """Return the list of trees with their fitness."""
        return "\n".join([f"Tree: {tree}, Fitness: {tree.fitness_value:.6f}" for tree in self.trees])
    
    def calculate_structural_diversity(self):
        expressions = [str(tree) for tree in self.trees]
        return len(set(expressions)) / len(self.trees)

    def evolve(self, x, y, generation, max_generations, mutation_rate=MUTATION_RATE):
        next_generation = []
        
        # Proper elite preservation with deep copy
        elite_size = max(1, len(self.trees) // 10)
        elite_trees = [copy.deepcopy(tree) for tree in self.trees[:elite_size]]
        next_generation.extend(elite_trees)
        
        # Better diversity measure
        diversity = self.calculate_structural_diversity()
        
        while len(next_generation) < len(self.trees):
            # Increased tournament size
            parent1 = tournament_selection(self.trees, tournament_size=TOURNAMENT_SIZE)
            parent2 = tournament_selection(self.trees, tournament_size=TOURNAMENT_SIZE)
            
            child1 = copy.deepcopy(parent1)
            child2 = copy.deepcopy(parent2)
            
            child1.dynamic_crossover(child2, generation, max_generations)
            child1.prune(max_complexity=70)
            child2.prune(max_complexity=70)

            child1.fitness_value = child1.fitness(x, y)
            child2.fitness_value = child2.fitness(x, y)
            
            # Don't mutate if fitness is good
            if child1.fitness_value > np.mean([t.fitness_value for t in self.trees]):
                child1.mutate(mutation_rate)
                child2.mutate(mutation_rate)
                child1.prune(max_complexity=70)
                child2.prune(max_complexity=70)
                
            next_generation.extend([child1, child2])
        
        self.trees = next_generation[:len(self.trees)]
            

def tournament_selection(population, tournament_size=TOURNAMENT_SIZE):
    """Tournament selection."""
    tournament = random.sample(population, tournament_size)
    return min(tournament, key=lambda t: t.fitness_value)

def calculate_diversity(population):
    """Calculate population diversity as the variance of fitness."""
    fitness_values = [tree.fitness_value for tree in population]
    return np.var(fitness_values)

def adaptive_mutation_rate(generation, max_generations, base_rate=0.1):
    """Adaptive mutation rate based on generation."""
    return base_rate * (1 - generation / max_generations)

if __name__ == "__main__":
    num_variables = x.shape[0]  # Number of variables (e.g., x[0], x[1], x[2])
    
    # Create initial population
    population = Population(size=POPULATION_SIZE, depth=MAX_DEPTH, num_variables=num_variables)
    population.evaluate_population(x, y)

    # Evolution loop
    for generation in range(MAX_GENERATIONS):
        print(f"\nGeneration {generation + 1}/{MAX_GENERATIONS}")
        population.evolve(x, y, generation, MAX_GENERATIONS, mutation_rate=MUTATION_RATE)
        population.evaluate_population(x, y)

        # Print the best tree of the current generation
        best = population.best_tree()
        print(f"Best tree (Fitness: {best.fitness_value}): {best}")

    # Final result
    best = population.best_tree()
    print("\nFinal result:")
    print(f"Best tree found: {best}")
    print(f"Fitness (MSE): {best.fitness_value}")

BEST TREE
((x[1] / exp((tan(-1.1909243079360525) * (log(cos(cos(x[0]))) * log(-8.923707992920049))))) * cos(x[0]))

Generation 1/100


  '/': lambda a, b: np.where(b != 0, a / b, 1e6),


BEST TREE
abs(sin(log(log(exp(-6.667309832387143)))))
Best tree (Fitness: 5.572810232617333e-18): abs(sin(log(log(exp(-6.667309832387143)))))

Generation 2/100
BEST TREE
abs(sin(log(log(exp(-6.667309832387143)))))
Best tree (Fitness: 5.572810232617333e-18): abs(sin(log(log(exp(-6.667309832387143)))))

Generation 3/100


  '/': lambda a, b: np.where(b != 0, a / b, 1e6),


BEST TREE
abs(sin(log(log(exp(-6.667309832387143)))))
Best tree (Fitness: 5.572810232617333e-18): abs(sin(log(log(exp(-6.667309832387143)))))

Generation 4/100
BEST TREE
abs(sin(log(log(exp(-6.667309832387143)))))
Best tree (Fitness: 5.572810232617333e-18): abs(sin(log(log(exp(-6.667309832387143)))))

Generation 5/100
BEST TREE
abs(sin(log(log(exp(-6.667309832387143)))))
Best tree (Fitness: 5.572810232617333e-18): abs(sin(log(log(exp(-6.667309832387143)))))

Generation 6/100
BEST TREE
abs(sin(log(log(exp(-6.667309832387143)))))
Best tree (Fitness: 5.572810232617333e-18): abs(sin(log(log(exp(-6.667309832387143)))))

Generation 7/100


  '/': lambda a, b: np.where(b != 0, a / b, 1e6),


BEST TREE
abs(sin(log(log(exp(-6.667309832387143)))))
Best tree (Fitness: 5.572810232617333e-18): abs(sin(log(log(exp(-6.667309832387143)))))

Generation 8/100
BEST TREE
abs(sin(log(log(exp(-6.667309832387143)))))
Best tree (Fitness: 5.572810232617333e-18): abs(sin(log(log(exp(-6.667309832387143)))))

Generation 9/100
BEST TREE
abs(sin(log(log(exp(-6.667309832387143)))))
Best tree (Fitness: 5.572810232617333e-18): abs(sin(log(log(exp(-6.667309832387143)))))

Generation 10/100
BEST TREE
abs(sin(log(log(exp(-6.667309832387143)))))
Best tree (Fitness: 5.572810232617333e-18): abs(sin(log(log(exp(-6.667309832387143)))))

Generation 11/100
BEST TREE
abs(sin(log(log(exp(-6.667309832387143)))))
Best tree (Fitness: 5.572810232617333e-18): abs(sin(log(log(exp(-6.667309832387143)))))

Generation 12/100
BEST TREE
abs(sin(log(log(exp(-6.667309832387143)))))
Best tree (Fitness: 5.572810232617333e-18): abs(sin(log(log(exp(-6.667309832387143)))))

Generation 13/100
BEST TREE
abs(sin(log(log(exp(-6.667