Copyright **`(c)`** 2024 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions — see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

### Importing libraries

In [35]:
import numpy as np
from tqdm.auto import tqdm
import random
import matplotlib.pyplot as plt

### Utility functions

In [36]:
def add(a, b):
    return a + b

def sub(a, b):
    return a - b

def mul(a, b):
    return a * b

def div(a, b):
    return a / (b + 1e-50)  # we added that to avoid zero division

def log(a):
    return np.log(np.abs(a) + 1e-50)  # we added that to avoid zero log

def sqrt(a):
    return np.sqrt(np.abs(a))  # we added that to avoid negative square root

def sin(a):
    return np.sin(a)

def cos(a):
    return np.cos(a)

def abs(a):
    return np.abs(a)

In [37]:
class Node:
    def __init__(self, value, children=None):
        self.value = value  # operator or terminal
        self.children = children or []  # child nodes

    def is_terminal(self):
        return len(self.children) == 0

    def __str__(self):
        if callable(self.value):
            return f"{self.value.__name__}({', '.join(map(str, self.children))})"
        return str(self.value)

    def depth(self):
        if self.is_terminal():
            return 0  # terminal node has depth 0
        return 1 + max(child.depth() for child in self.children)

In [38]:
def generate_random_tree(max_depth, terminals, functions):
    if max_depth == 0 or (random.random() < 0.5):  # terminal node
        value = random.choice(terminals)
        if value == 'const':  # return a random constant from -100 to 100
            return Node(np.random.randint(-100, 100))
        return Node(value)
    else:  # function node
        func, arity = random.choice(list(functions.values()))
        children = [generate_random_tree(max_depth - 1, terminals, functions) for _ in range(arity)]
        return Node(func, children)

In [39]:
def evaluate_tree(tree, X):
    if tree.is_terminal():
        # if it's a constant
        if isinstance(tree.value, (int, float)):  
            return np.full(X.shape[1], tree.value)  # return constant value for all samples
        # if it's a variable
        elif tree.value.startswith('x'):
            col_idx = int(tree.value[1:])   # converting x1 -> index 0, x2 -> index 1
            return X[col_idx, :] # return the corresponding feature for all samples
    else:
        # handle function node (apply function to children)
        func = tree.value
        args = [evaluate_tree(child, X) for child in tree.children]
        return func(*args) # ensure all arguments have the same shape

In [40]:
def fitness(tree, X, y):
    predictions = evaluate_tree(tree, X)
    return np.mean((predictions - y) ** 2)

In [41]:
def crossover(parent1, parent2, max_depth):
    def select_random_subtree(node):
        if not node.children or random.random() < 0.5:  # 50% chance to pick the current node
            return node
        return select_random_subtree(random.choice(node.children))

    if parent1.is_terminal() or parent2.is_terminal():
        return select_random_subtree(parent2)

    if random.random() < 0.5:  # 50% chance to swap subtrees
        return select_random_subtree(parent2)

    # applying crossover recursively on children
    new_children = []
    for child in parent1.children:
        new_children.append(crossover(child, parent2, max_depth))

    new_tree = Node(parent1.value, new_children)

    # enforce depth constraint
    if new_tree.depth() > max_depth:
        return parent1  # return parent1 if depth exceeds max_depth, otherwise return new_tree

    return new_tree

def subtree_mutation(tree, max_depth, terminals, functions):
    if random.random() < 0.1 or tree.is_terminal() or max_depth == 0:  # replace subtree
        return generate_random_tree(max_depth, terminals, functions)

    new_tree = Node(tree.value, [subtree_mutation(child, max_depth - 1, terminals, functions) for child in tree.children])
    if(new_tree.depth() > max_depth):
        return tree

    return new_tree

def hoist_mutation(tree, max_depth):
    if tree.is_terminal() or max_depth == 0: # terminal nodes remain unchanged
        return tree

    # selecting a random subtree
    selected_subtree = random.choice(tree.children)
    if random.random() < 0.7 or selected_subtree.is_terminal():  # hoist this subtree
        return selected_subtree

    # recursively applying hoist mutation to children
    return Node(tree.value, [hoist_mutation(child, max_depth - 1) for child in tree.children])

def point_mutation(tree, terminals, functions):
    if random.random() < 0.2 or tree.is_terminal():
        if tree.is_terminal():
            # replace terminal with another random terminal
            new_value = random.choice(terminals)

            if new_value == 'const':  # generate a random constant if needed
                return Node(np.random.randint(-100, 100))

            return Node(new_value)
        else:
            # replace function with another function of the same arity
            func, arity = random.choice([f for f in functions.values() if len(tree.children) == f[1]])
            return Node(func, tree.children)  # keeping the same children as before

    # recursively applying point mutation to children
    return Node(tree.value, [point_mutation(child, terminals, functions) for child in tree.children])

def collapse_mutation(tree, terminals):
    if tree.is_terminal():  # terminal nodes remain unchanged
        return tree

    if random.random() < 0.3:  # 30% chance to collapse this subtree into a terminal
        new_value = random.choice(terminals)

        if new_value == 'const':  # generate a random constant if needed
            return Node(np.random.randint(-100, 100))

        return Node(new_value)

    # recursively applying collapse mutation to children
    return Node(tree.value, [collapse_mutation(child, terminals) for child in tree.children])

def mutate(tree, max_depth, terminals, functions):
    # defining the probabilities for each mutation type
    mutation_probs = {
        "subtree": 0.6,  # subtree mutation
        "point": 0.2,    # point mutation
        "hoist": 0.1,    # hoist mutation
        "collapse" : 0.1    # collapse mutation
    }

    # selecting the mutation type based on weighted probability
    mutation_type = random.choices(
        list(mutation_probs.keys()),
        weights=list(mutation_probs.values()),
        k=1
    )[0]

    # applying the selected mutation type
    if mutation_type == "subtree":
        return subtree_mutation(tree, max_depth, terminals, functions)
    elif mutation_type == "point":
        return point_mutation(tree, terminals, functions)
    elif mutation_type == "hoist":
        return hoist_mutation(tree, max_depth)
    elif mutation_type == "collapse":
        return collapse_mutation(tree,terminals)

### Main algorithm

In [42]:
def evolve(X, y, terminals, functions, population_size, max_depth, num_generations):
    # initializing the population with "population_size" random trees
    population = [generate_random_tree(2, terminals, functions) for _ in range(population_size)]

    best_fitness = float('inf') 
    generation = 0

    for generation in range(num_generations):
        if best_fitness == 0:
            break
        # evaluating the fitness for each tree in the population
        fitness_scores = np.array([fitness(tree, X, y) for tree in population])
        # taking the index of the best fitness_score
        best_idx = np.argmin(fitness_scores)
        current_best_tree, current_best_score = population[best_idx], fitness_scores[best_idx]
        current_best_depth = current_best_tree.depth()

        print(f"Generation {generation}, Best Fitness: {current_best_score}, Depth: {current_best_depth}")

        #updating the best fitness and the best tree
        if current_best_score < best_fitness:
            best_fitness = current_best_score
            best_tree = current_best_tree

        # keeping the top 20% of the population (elitism)
        sorted_population = [tree for _, tree in sorted(zip(fitness_scores, population), key=lambda x: x[0])]
        next_population = sorted_population[:population_size // 5]

        # generating the offspring, doing crossover or mutation
        while len(next_population) < population_size:
            if random.random() < 0.5: # 50% chance to perform crossover
                parents = random.sample(population, 2)
                child = crossover(parents[0], parents[1], max_depth)
            else:  # 50% chance to perform mutation
                parent = random.choice(population)
                child = mutate(parent, max_depth, terminals, functions)
            next_population.append(child)

        population = next_population

    print(f"Evolution completed after {generation} generations with best fitness: {best_fitness}")
    # returning the best tree
    return best_tree

### To convert tree to numpy

In [43]:
def tree_to_numpy(node):
    if node.is_terminal():
        # returning the variable or constant directly
        return str(node.value)

    # function case: Construct the string representation
    func = node.value  # e.g., "add", "mul"
    children_expr = [tree_to_numpy(child) for child in node.children]

    # function names to the equivalent NumPy functions
    numpy_func_map = {
        add: 'np.add',
        sub: 'np.subtract',
        mul: 'np.multiply',
        div: 'np.divide',
        sin: 'np.sin',
        cos: 'np.cos',
        log: 'np.log',
        sqrt: 'np.sqrt',
        abs: 'np.abs'
    }

    # generating the NumPy-compatible string
    if func in numpy_func_map:
        if len(children_expr) == 2:  # binary operator
            return f"{numpy_func_map[func]}({children_expr[0]}, {children_expr[1]})"
        elif len(children_expr) == 1:  # unary operator
            return f"{numpy_func_map[func]}({children_expr[0]})"
        else:
            raise ValueError("Function arity does not match the tree structure.")
    else:
        raise ValueError(f"Unsupported function: {func}")

### To execute the code

In [44]:
problem = np.load('../data/problem_1.npz')
x= problem['x']
y = problem['y']
print(x.shape, y.shape)

# defining function_set with function names and their arity
function_set = {
    'add': (add, 2),
    'substract': (sub, 2),
    'multiply': (mul, 2),
    'divide': (div, 2),
    'sin': (sin, 1),
    'cos': (cos, 1),
    'log': (log, 1),
    'sqrt': (sqrt, 1),
    'abs': (abs, 1),
}

num_vars = x.shape[0] 
terminals = [f'x{i}' for i in range(num_vars)] 
terminals.append('const')
print(terminals)

best_tree = evolve(x, y, terminals, function_set, population_size=1_000_000, max_depth=5, num_generations=50)

# displaying the best formula
print("Best Formula:", best_tree)

# evaluating on training set
mse_train = np.mean((y - evaluate_tree(best_tree, x)) ** 2)
print("Training MSE:", mse_train)

(1, 500) (500,)
['x0', 'const']
Generation 0, Best Fitness: 0.0, Depth: 1
Evolution completed after 1 generations with best fitness: 0.0
Best Formula: sin(x0)
Training MSE: 0.0


In [45]:
# printing the numpy version of the tree
print(tree_to_numpy(best_tree))

np.sin(x0)
