In [59]:
import numpy as np
import random

# Define constants, variables, and operators
constant_range = np.linspace(0.5, 10, num=25)
CONSTANTS = list(constant_range)

def safe_divide(a, b):
    return np.divide(a, b, where=b != 0, out=np.full_like(a, np.nan))  # Safe divide for b != 0, else nan.

def safe_sqrt(a):
    return np.sqrt(a, where=a >= 0, out=np.full_like(a, np.nan))  # Safe sqrt for non-negative a, else nan.

def safe_log(a):
    return np.log(a, where=a > 0, out=np.full_like(a, -np.inf))  # Safe log for positive a, else -inf 

OPERATORS = {
    'add': np.add,
    'sub': np.subtract,
    'mul': np.multiply,
    'div': safe_divide,
    'sin': np.sin,
    'cos': np.cos,
    'sqrt': safe_sqrt,
    'log': safe_log 
}

class Node:
    def __init__(self, value, is_operator=False):
        """
        Initialize a Node.
        
        :param value: The value of the node (operator, constant, or variable name).
        :param is_operator: True if the node represents an operator, otherwise False.
        """
        self.value = value
        self.is_operator = is_operator
        self.left = None
        self.right = None

    def evaluate(self, variables, depth=0):
        """
        Evaluate the subtree rooted at this node.
        
        :param variables: A dictionary mapping variable names to their values.
        :param depth: The current recursion depth (for debugging).
        :return: The result of evaluating the subtree.
        """
        if depth > 1000:  # Prevent infinite recursion. sin, cos, tan, sqrt, and log are inherently recursive and can lead to infinite recursion if not properly handled.
            #print("Max recursion depth reached!")
            return float(1000000)  # Return a large penalty value for exceeding the maximum recursion depth
        
        # Check for None values to avoid errors
        if self is None:
            return 0

        if self.is_operator:
            # Unary operator (e.g., sin, log, etc.)
            if self.value in ['sin', 'cos', 'tan', 'sqrt', 'log']:
                if self.left is None:
                    raise ValueError(f"Missing left child for operator {self.value}")
                left_value = self.left.evaluate(variables, depth + 1)

                # Handle NaN or Inf results immediately before applying operator
                if np.isnan(left_value) or np.isinf(left_value):
                    return float(1000000)  # Return a large penalty value for NaN or Inf results
                
                return OPERATORS[self.value](left_value)
            
            # Binary operator (e.g., add, sub, mul, div, etc.)
            elif self.value in ['add', 'sub', 'mul', 'div']:
                if self.left is None or self.right is None:
                    raise ValueError(f"Missing children for operator {self.value}")
                left_value = self.left.evaluate(variables, depth + 1)
                right_value = self.right.evaluate(variables, depth + 1)

                # Handle NaN or Inf results immediately before applying operator
                if np.isnan(left_value) or np.isinf(left_value) or np.isnan(right_value) or np.isinf(right_value):
                    return float(1000000)  # Return a large penalty value for NaN or Inf results
            
                return OPERATORS[self.value](left_value, right_value)
        
        elif self.value in variables:
            # Variable node (e.g., x0, x1, etc.)
            return variables[self.value]
        
        # Constant node (just return the constant value)
        return self.value
    
class Individual:
    def __init__(self, root):
        """
        Initialize an Individual (a tree).
        
        :param root: The root node of the tree.
        """
        self.root = root
        self.fitness_value = None

    def evaluate(self, variables):
        """
        Evaluate the tree.
        
        :param variables: A dictionary mapping variable names to their values.
        :return: The result of evaluating the tree.
        """
        return self.root.evaluate(variables)

    def fitness(self, file_path):
        # Load the data
        data = np.load(file_path)
        x = data['x']
        y = data['y']

        # Initialize variables for prediction
        num_features = x.shape[0]
        variables = {f'x{i}': None for i in range(num_features)}

        # Compute predictions
        y_pred = []
        for i in range(x.shape[1]):  # Iterate over each column
            for j in range(num_features):  # Set variable values for this row
                variables[f'x{j}'] = x[j, i]
            y_pred.append(self.evaluate(variables))
        y_pred = np.array(y_pred)

        # Filter out nan and inf values
        valid_indices = np.isfinite(y_pred)
        y_pred = y_pred[valid_indices]
        y = y[valid_indices]

        # Calculate MSE
        mse = np.mean((y - y_pred) ** 2)
        self.fitness_value = mse
        return mse

    def __str__(self):
        """
        Return a string representation of the tree.
        """
        return self._str_helper(self.root)

    def _str_helper(self, node, visited=None):
        """Helper function for string representation. Recursively traverse the tree."""
        if visited is None:
            visited = set()  # Initialize visited set for the first call
        
        # If node is None, return empty string
        if node is None:
            return ""

        # Check if we've already visited this node to detect cycles
        if id(node) in visited:
            return "(...)  # Cycle detected"
        
        # Mark the current node as visited
        visited.add(id(node))

        if node.is_operator:
            return f"({self._str_helper(node.left, visited)} {node.value} {self._str_helper(node.right, visited)})"
        
        return str(node.value)



In [60]:
# steps:
# Initialize Population: Generate an initial population of random expression trees (individuals).
# Evaluate Fitness: Assess the fitness of each individual by calculating its Mean Squared Error (MSE) against the target dataset.
# Selection: Apply tournament selection to choose parent individuals based on their fitness.
# Recombination (Crossover): Combine pairs of parent individuals to produce offspring through crossover.
# Mutation: Apply mutation to offspring to introduce genetic diversity.
# Replacement: Replace the old population with the new generation of individuals.
# Termination: Repeat the process for a predefined number of generations or until a satisfactory solution is found.

In [61]:
import numpy as np
import random

class SymbolicRegression:
    def __init__(self, population_size, num_generations, mutation_rate, crossover_rate, file_path):
        """
        Initialize the Symbolic Regression algorithm.
        
        :param population_size: Number of individuals in the population.
        :param num_generations: Number of generations to evolve.
        :param mutation_rate: Probability of mutation.
        :param crossover_rate: Probability of crossover.
        :param file_path: Path to the .npz file containing 'x' and 'y'.
        """
        self.population_size = population_size
        self.num_generations = num_generations
        self.mutation_rate = mutation_rate
        self.crossover_rate = crossover_rate
        self.file_path = file_path
        self.population = []
        
        # Load data
        data = np.load(self.file_path)
        self.x_data = data['x']
        self.y_data = data['y']

    def create_tree(self, num_nodes, x_data):
        """
        Create a random tree with the specified number of nodes.
        
        :param num_nodes: The number of nodes in the tree.
        :param x_data: The x data from the .npz file to determine the number of variables.
        :return: A random tree as an Individual object.
        """
        num_vars = x_data.shape[0]  # Number of variables (rows in x)
        VARIABLES = [f'x{i}' for i in range(num_vars)]  # Generate the variable names dynamically

        def build_tree(nodes_left, is_root=True):
            """Recursively build a tree."""
            if nodes_left == 1:
                if random.random() < 0.5:
                    return Node(random.choice(VARIABLES)), 1
                else:
                    return Node(random.choice(CONSTANTS)), 1
            else:
                operator = random.choice(list(OPERATORS.keys()))

                if operator in ['sin', 'cos', 'tan', 'sqrt', 'log']:
                    right_subtree_size = nodes_left - 1
                    node = Node(operator, is_operator=True)
                    right_child, _ = build_tree(right_subtree_size, is_root=False)
                    node.left = right_child
                    return node, 1 + right_subtree_size
                elif operator in ['add', 'sub', 'mul', 'div']:
                    if nodes_left <= 2:
                        left_subtree_size = 1
                        right_subtree_size = 1
                    else:
                        right_subtree_size = random.randint(1, nodes_left - 2)
                        left_subtree_size = nodes_left - 1 - right_subtree_size

                    node = Node(operator, is_operator=True)

                    left_child, left_size = build_tree(left_subtree_size, is_root=False)
                    right_child, right_size = build_tree(right_subtree_size, is_root=False)

                    node.left = left_child
                    node.right = right_child

                    return node, 1 + left_size + right_size

        root_node, _ = build_tree(num_nodes)
        return Individual(root_node)

    def create_initial_population(self):
        """Create the initial population of individuals."""
        for _ in range(self.population_size):
            num_nodes = random.randint(4, 10)
            individual = self.create_tree(num_nodes=num_nodes, x_data=self.x_data)
            self.population.append(individual)


    def tournament_selection(self, tournament_size=3):
        """Select parents using tournament selection."""
        selected_parents = []
        for _ in range(self.population_size):
            tournament = random.sample(self.population, tournament_size)
            tournament.sort(key=lambda x: x.fitness(file_path=self.file_path))
            selected_parents.append(tournament[0])
        return selected_parents

    def crossover(self, parent1, parent2):
        def get_random_node(node):
            if node.is_operator:
                return random.choice([node, node.left, node.right])
            return node

        parent1_node = get_random_node(parent1.root)
        parent2_node = get_random_node(parent2.root)

        offspring1, offspring2 = self.swap_subtree(parent1.root, parent2.root, parent1_node, parent2_node)

        return Individual(offspring1), Individual(offspring2)

    def swap_subtree(self, tree1, tree2, node1, node2):
        # Print when starting to swap subtrees
        #print(f"Swapping subtrees: node1={node1.value if node1 else 'None'}, node2={node2.value if node2 else 'None'}")

        # If node1 is None, we can't proceed further, return the trees unchanged
        if node1 is None:
            #print("node1 is None, returning trees unchanged.")
            return tree1, tree2
        if node2 is None:
            #print("node2 is None, returning trees unchanged.")
            return tree1, tree2

        # If we find a node that matches, we swap it
        if node1 is tree1:
            #print(f"Swapping tree1 with node2 ({node2.value})")
            tree1 = node2
        elif node2 is tree2:
            #print(f"Swapping tree2 with node1 ({node1.value})")
            tree2 = node1
        else:
            # Recurse into left and right children, if they exist
            if node1.left:
                #print(f"Recursing into left child of node1 ({node1.value})")
                node1.left, tree2 = self.swap_subtree(node1.left, tree2, node1.left, node2)
            if node1.right:
                #print(f"Recursing into right child of node1 ({node1.value})")
                node1.right, tree2 = self.swap_subtree(node1.right, tree2, node1.right, node2)

        return tree1, tree2


    def mutate(self, individual):
        node_to_mutate = self.select_random_node(individual.root)
        new_node = self.create_tree(num_nodes=3, x_data=self.x_data).root
        node_to_mutate.value = new_node.value
        node_to_mutate.left = new_node.left
        node_to_mutate.right = new_node.right

    def select_random_node(self, node):
        if random.random() < 0.5 and node.is_operator:
            return random.choice([node, node.left, node.right])
        return node
    
    def count_nodes(self, node, depth=0, max_depth=1000):
        """Count the number of nodes in the tree while limiting recursion depth."""
        if node is None or depth > max_depth:
            return 0
        if node.is_operator:
            return 1 + self.count_nodes(node.left, depth + 1, max_depth) + self.count_nodes(node.right, depth + 1, max_depth)
        return 1  # It's a leaf node (constant or variable)

    
    def evolve(self):
        """Evolve the population over multiple generations."""
        # Step 1: Create the initial population
        self.create_initial_population()

        for generation in range(self.num_generations):
            # Step 2: Evaluate fitness of each tree in the population
            for individual in self.population:
                try:
                    individual.fitness(file_path=self.file_path)
                    #if count_nodes==1 fitness_value=1000000
                    if self.count_nodes(individual.root) == 1:
                        individual.fitness_value = float(1000000)
                except Exception as e:
                    print(f"Error evaluating fitness for individual: {e}")
                    individual.fitness_value = float(1000000)  # Assign a high fitness value for invalid individuals

            # Step 3: Filter out invalid individuals (NaN or infinity fitness values). #Discard also individuals with only one node (i.e., trivial solutions)
            valid_population = [ind for ind in self.population if self.count_nodes(ind.root) > 1 ]
                       #if not np.isnan(ind.fitness_value) and not np.isinf(ind.fitness_value) and

            # If no valid individuals, we can either restart or return a default individual
            if not valid_population:
                print("No valid individuals found, restarting population.")
                self.create_initial_population()
                valid_population = self.population

            # Ensure population size is consistent
            while len(valid_population) < self.population_size:
                new_individual = self.create_tree(num_nodes=random.randint(4, 10), x_data=self.x_data)
                valid_population.append(new_individual)

            # Step 4: Select parents using tournament selection
            parents = self.tournament_selection()

            # Step 5: Create the next generation
            next_generation = []
            for i in range(0, len(parents), 2):
                parent1 = parents[i]
                parent2 = parents[i + 1] if i + 1 < len(parents) else parents[0]
                
                # Apply crossover
                if random.random() < self.crossover_rate:
                    offspring1, offspring2 = self.crossover(parent1, parent2)
                    next_generation.append(offspring1)
                    next_generation.append(offspring2)
                else:
                    # No crossover, directly use parents (or add crossover logic if needed)
                    next_generation.append(parent1)
                    next_generation.append(parent2)

            # Replace old population with the new generation
            self.population = next_generation

            # Step 6: Track the best individual
            valid_population = [ind for ind in self.population 
                            if isinstance(ind.fitness_value, (float, int))] # 
            #and not np.isnan(ind.fitness_value) and not np.isinf(ind.fitness_value)
            if valid_population:
                best_individual = min(valid_population, key=lambda x: x.fitness_value)
                print(f"Generation {generation + 1}: Best Fitness = {best_individual.fitness_value}")
                print(f"Best Individual: {best_individual._str_helper(best_individual.root)}")   
                print("nodes: ", self.count_nodes(best_individual.root)) 
            
            # If no valid individual, print message
            else:
                print(f"Generation {generation + 1}: No valid individuals, restarting population.")

        # Final Output: Return the best individual from the final generation
        valid_population = [ind for ind in self.population if not np.isnan(ind.fitness_value) and not np.isinf(ind.fitness_value)]
        if valid_population:
            return min(valid_population, key=lambda x: x.fitness_value)
        else:
            return None  # Return None if no valid individual is found




# Usage example
file_path = 'problem_1.npz'  # Path to the problem_1.npz file

# Initialize the Symbolic Regression algorithm
sr = SymbolicRegression(
    population_size=200, # i prefer increment the number of trees created because i noticed that the best fitness value is not always the same in this way. instead incrementing num_generation will not change so much the result
    num_generations=10,
    mutation_rate=0.1,
    crossover_rate=0.7,
    file_path=file_path
)

# Evolve the population
sr.evolve()


Generation 1: Best Fitness = 0.7821170316284126
Best Individual: (x0 mul (1.6875 log ))
nodes:  4
Generation 2: Best Fitness = 0.7821170316284126
Best Individual: (x0 mul (1.6875 log ))
nodes:  4


KeyboardInterrupt: 