# Symbolic Regression

## 1. Imports libraries

In [43]:
import numpy as np
import random
from copy import deepcopy
import pandas as pd
import plotly.graph_objects as go
from random import choice
import tqdm
from sklearn.model_selection import KFold
import plotly.graph_objects as go
import plotly.io as pio

np.seterr(all='ignore')  # ignore all numpy warnings, 
None

## 2. Define Operators and Parameters Configuration

In [44]:
# Define the set of operators that can be used in the GP algorithm.
OPERATORS = [
    {"func_name": "add", "func": np.add, "arity": 2, "format_str": "({} + {})"},
    {"func_name": "subtract", "func": np.subtract, "arity": 2, "format_str": "({} - {})"},
    {"func_name": "arctan", "func": np.arctan, "arity": 1, "format_str": "arctan({})"},
    {"func_name": "multiply", "func": np.multiply, "arity": 2, "format_str": "({} * {})"},
    {"func_name": "divide", "func": np.divide, "arity": 2, "format_str": "({} / {})"},
    {"func_name": "power", "func": np.power, "arity": 2, "format_str": "({} ** {})"},
    {"func_name": "sqrt", "func": np.sqrt, "arity": 1, "format_str": "sqrt({})"},
    {"func_name": "cube_root", "func": np.cbrt, "arity": 1, "format_str": "cube_root({})"},
    {"func_name": "log", "func": np.log, "arity": 1, "format_str": "log({})"},
    {"func_name": "sin", "func": np.sin, "arity": 1, "format_str": "sin({})"},
    {"func_name": "cosh", "func": np.cosh, "arity": 1, "format_str": "cosh({})"},
    {"func_name": "sinh", "func": np.sinh, "arity": 1, "format_str": "sinh({})"},
    {"func_name": "tanh", "func": np.tanh, "arity": 1, "format_str": "tanh({})"},
    {"func_name": "cos", "func": np.cos, "arity": 1, "format_str": "cos({})"},
    {"func_name": "exp", "func": np.exp, "arity": 1, "format_str": "exp({})"},  
    {"func_name": "abs", "func": np.abs, "arity": 1, "format_str": "abs({})"}
]
None

In [45]:
# Set of parameters that define the evolution of the GP algorithm.
MAX_DEPTH = 5
POP_SIZE = 500
TOURNAMENT_SIZE = 10
CROSSOVER_RATE = 0.50
MAX_GENERATIONS = 1000
NUM_BEST_INDVIDUALS_TO_KEEP = 300
ERC_RATE = 0.05
MUTATION_TREE_RATE = 0.2
FLAG_EXPLORATION_RATE_DYNAMIC = False

## Select input dataset

## 3. Load input dataset Helper

In [46]:
# Load the dataset
def load_dataset(file_path):
    data = np.load(file_path)
    features = data['x'].T
    target = data['y']
    return features, target


## 4. Define Mean Square Error (MSE)  of Fitness

In [47]:

def evaluate_function(node, data):
    if node["type"] == "x":
        return data[node["name"]]
    if node["type"] == "constant":
        return np.full(len(next(iter(data.values()))), node["value"])
    # Evaluate children
    children_values = [evaluate_function(child, data) for child in node["children"]]
    if any(child is None for child in children_values):
        return None  # discard invalid functions

    try:
        result = node["operator"]["func"](*children_values)
        if np.any(np.isinf(result)) or np.any(np.isnan(result)):
            return None  # discard invalid functions
        result = np.clip(result, -1e10, 1e10)  # Limit the output range
        return result
    except (FloatingPointError, ZeroDivisionError, ValueError):
        return None  # discard invalid functions


def calculate_mse(function, data, target):
    """
    Calculates the fitness of a function. If the function is valid, it computes the mean squared error (MSE).
    If the function is invalid (None, NaN, Inf), it will return a high value to exclude it from the evolutionary process.
    """

    result = evaluate_function(function, data)
    if result is None:
        return float("inf")  # Penalize invalid functions
    try:
        # calculate the mean squared error (MSE) as a measure of fitness
        error = np.mean((result - target) ** 2)
        return error
    except ValueError:  # If there is an error during the calculation of the MSE, penalize the function
        return float("inf")
None


## 5. Define initial candidate generation functions

In [48]:
def generate_grow(max_depth, features):
    """
    function that generates a random tree as candidate .
    """
    #if it is first node or random number is greater than 0.5, then return either a variable or a constant
    if max_depth == 0 or random.random() > 0.5:
        if random.random() < 0.99:
            return {"type": "x", "name": random.choice(features)}  # Variable
        else:
            return {"type": "constant", "value": random.choice([x for x in range(-10, 11) if x != 0])}  # Costant
    operator = random.choice(OPERATORS)
    children = [generate_grow(max_depth - 1, features) for _ in range(operator["arity"])]
    return {"type": "op", "operator": operator, "children": children}

def generate_full(max_depth, features):

    if max_depth == 0:
        if random.random() < 0.99:
            return {"type": "x", "name": random.choice(features)}
        else:
            return {"type": "constant", "value": random.choice([x for x in range(-10, 11) if x != 0])}
    operator = random.choice(OPERATORS)
    children = [generate_full(max_depth - 1, features) for _ in range(operator["arity"])]
    return {"type": "op", "operator": operator, "children": children}

def generate_candidate_half_half(max_depth, features):
    if random.random() < 0.5:
        return generate_grow(max_depth, features)
    else:
        return generate_full(max_depth, features)



## 6. Define Representation function for candidates

In [49]:
def get_individual_representation(node):
    if node["type"] == "x":
        return node["name"]
    if node["type"] == "constant":
        return f"{node['value']:.2f}"
    children = [get_individual_representation(child) for child in node["children"]]
    return node["operator"]["format_str"].format(*children)

## 7. Define Survival Selection Method

In [50]:

def tournament_selection(population, MSE_scores):
    selected = random.choices(population, k=TOURNAMENT_SIZE)
    return min(selected, key=lambda p: MSE_scores[p])
None

## 8. Define function to evaluate validity of candidate

In [51]:
# verify if the tree is valid
def validate_tree(node):
    if node["type"] == "op":
        if len(node["children"]) != node["operator"]["arity"]:
            return False
        #check if func_name is "divide" and the second child is a constant with value 0
        if node["operator"]["func_name"] == "divide" and node["children"][1]["type"] == "constant" and node["children"][1]["value"] == 0:
            return False
        #check if func_name is "log" and the child is a constant with value less than or equal to 0
        if node["operator"]["func_name"] == "log" and node["children"][0]["type"] == "constant" and node["children"][0]["value"] <= 0:
            return False
        return all(validate_tree(child) for child in node["children"])
    return True  # leaf nodes are always valid
def contains_all_vars(function, features):
    # verify if the function contains all the variables in the dataset
    required_vars = set(features)
    found_vars = set()

    def explore_node_chidren(node):
        if node["type"] == "x":
            found_vars.add(node["name"])

        # if the node is an operator, we need  explore its children
        if node["type"] == "op":
            for child in node["children"]:
                explore_node_chidren(child)

    explore_node_chidren(function)
    
    # verify if the function contains all the required variables
    return required_vars.issubset(found_vars)


## 9. Define CrossOver Helper Functions

In [52]:
def get_leaf_nodes(node):
    """
    Function that retrieves all leaf nodes given a parent node.
    """
    # iif the node has no children, it's  a leaf
    if "children" not in node or not node["children"]:
        return [node]

    # otherwise, recursively explore the children to find the leaf nodes
    leaf_nodes = []
    for child in node["children"]:
        leaf_nodes.extend(get_leaf_nodes(child))

    return leaf_nodes


def replace_leaf(node, old_leaf, new_leaf):
    """
    Function that recursively replaces a leaf in a tree.
    """
    # if the current node is the old leaf, replace it with the new leaf
    if node == old_leaf:
        return deepcopy(new_leaf)
    # If the current node has children, explore recursively.
    if "children" in node:
        node["children"] = [replace_leaf(child, old_leaf, new_leaf) for child in node["children"]]

    return node
def get_operator_nodes(node):
    """
    Returns all operator nodes (non-leaf) of a tree.
    """
    nodes = []
    if node.get("type") == "op":
        nodes.append(node)
    if "children" in node:
        for child in node["children"]:
            nodes.extend(get_operator_nodes(child))
    return nodes

def get_parent_node(node, target):
    """
    Function that retrieves the parent of a target node.
    """

    if "children" not in node or not node["children"]:
        return None

    for child in node["children"]:
        if child is target:
            return node
        result = get_parent_node(child, target)
        if result:
            return result

    return None




def get_heigth_subtree(node):
    """
    Function that calculates the heigth of a subtree from the leaves towards the given node.
    """

    if "children" not in node or not node["children"]:
        # If the node has no children, it is a leaf and has height 0
        return 0
    return 1 + max(get_heigth_subtree(child) for child in node["children"])


def select_similar_height_node(node, target_height):
    """
    Function that returns a node in a tree with the closest heigth to the target one .
    Selects a node in a tree with a height similar to the target_height.
    """

    operator_nodes = get_operator_nodes(node)
    best_match = None
    min_diff = float("inf")
    for operator_nodes in operator_nodes:
        height = get_heigth_subtree(operator_nodes)
        diff = abs(height - target_height)
        if diff < min_diff:
            min_diff = diff
            best_match = operator_nodes
    return best_match

# Function to replace a subtree in a tree
def replace_subtree(parent, target, replacement):
    if parent == target:
        return deepcopy(replacement)
    if "children" in parent:
        parent["children"] = [
            replace_subtree(child, target, replacement) for child in parent["children"]
        ]
    return parent


## 10. Define CrossOver Methods

In [53]:
#Crossover where a random crossover point is chosen and the subtrees are swapped.
def crossover_random_height(parent_1, parent_2):
    """function to perform crossover between two trees of different height , swapping subtrees with a random height."""
    # create deep copies of the parents to avoid unwanted modifications
    new_parent_1 = deepcopy(parent_1)
    new_parent_2 = deepcopy(parent_2)

    # Verify that both parents are operator nodes with valid children
    if new_parent_1["type"] != "op" or new_parent_2["type"] != "op":
        return new_parent_1  # No crossover possible, return the first parent
    # Verify that both parents have children
    if len(new_parent_1["children"]) == 0 or len(new_parent_2["children"]) == 0:
        return new_parent_1  # No crossover possible, return the first parent

    # 90% of swapping non-leaf nodes (subtrees)
    if random.random() < 0.9:

        if len(new_parent_1["children"]) != new_parent_1["operator"]["arity"] or len(new_parent_2["children"]) != new_parent_2["operator"]["arity"]:
            return new_parent_1  # No crossover possible, return the first parent
        # Select a random crossover point in parent_1 and parent_2
        swapping_point_1 = random.randint(0, len(new_parent_1["children"]) - 1)
        swapping_point_2 = random.randint(0, len(new_parent_2["children"]) - 1)

        # Swap the subtrees
        new_parent_1["children"][swapping_point_1], new_parent_2["children"][swapping_point_2] = (
            deepcopy(new_parent_2["children"][swapping_point_2]),
            deepcopy(new_parent_1["children"][swapping_point_1]),
        )
    else:
        # 10% probability of swapping leaf nodes
        leaves_parent_1 = get_leaf_nodes(new_parent_1)
        leaves_parent_2 = get_leaf_nodes(new_parent_2)

        if not leaves_parent_1 or not leaves_parent_2:
            return new_parent_1  # No crossover possible if there are no leaves

        # Select a random leaf from each parent
        leaf_1 = choice(leaves_parent_1)
        leaf_2 = choice(leaves_parent_2)

        # Replace the leaf
        new_parent_1 = replace_leaf(new_parent_1, leaf_1, leaf_2)
        new_parent_2 = replace_leaf(new_parent_2, leaf_2, leaf_1)

    return new_parent_1



def crossover_similar_height(parent_1, parent_2):
    """
    Crossover with random (similar) height, 90% swaps non-leaf nodes, 10% swaps leaf nodes.
"""
    # Create deep copies of the parents to avoid unwanted modifications
    new_parent_1 = deepcopy(parent_1)
    new_parent_2 = deepcopy(parent_2)

    # Verify that both parents are operator nodes with valid children
    if new_parent_1["type"] != "op" or new_parent_2["type"] != "op":
        return new_parent_1  # No crossover possible, return the first parent
    if len(new_parent_1["children"]) == 0 or len(new_parent_2["children"]) == 0:
        return new_parent_1  # No crossover possible, return the first parent
    if random.random() < 0.9:
        # Selects a random swap point in parent_1.
        operator_nodes_1 = get_operator_nodes(new_parent_1)
        if not operator_nodes_1:
            return new_parent_1  # No valid swap point in parent_1
        swapping_point_1 = random.choice(operator_nodes_1)

        # Get the height of the subtree
        height1 = get_heigth_subtree(swapping_point_1)

        # Select a similar height node in parent_2
        swapping_point_2 = select_similar_height_node(new_parent_2, height1)
        if not swapping_point_2:
            return new_parent_1  # No valid swap point in parent_2

        new_parent_1 = replace_subtree(new_parent_1, swapping_point_1, swapping_point_2)
        new_parent_2 = replace_subtree(new_parent_2, swapping_point_2, swapping_point_1)
    else:
        # 10% swap leaf nodes        
        leaves_parent_1 = get_leaf_nodes(new_parent_1)
        leaves_parent_2 = get_leaf_nodes(new_parent_2)

        if not leaves_parent_1 or not leaves_parent_2:
            return new_parent_1  # No crossover possible if there are no leaves
        
        # Select a random leaf from each parent
        leaf_1 = choice(leaves_parent_1)
        leaf_2 = choice(leaves_parent_2)

        # Replace the leaf
        new_parent_1 = replace_leaf(new_parent_1, leaf_1, leaf_2)
        new_parent_2 = replace_leaf(new_parent_2, leaf_2, leaf_1)

    return new_parent_1

    
def crossover_fixed_height(parent_1, parent_2):
    """
    Perform a crossover between two trees, swapping subtrees with a maximum height of 3.
    """
    # Create deep copies of the parents to avoid unwanted modifications
    new_parent_1 = deepcopy(parent_1)
    new_parent_2 = deepcopy(parent_2)

    # Verify that both parents are operator nodes with valid children
    if new_parent_1["type"] != "op" or new_parent_2["type"] != "op":
        return new_parent_1  # No crossover possible, return the first parent
    if len(new_parent_1["children"]) == 0 or len(new_parent_2["children"]) == 0:
        return new_parent_1  # No crossover possible, return the first parent
    if random.random() < 0.9:
        # Selects a random swap point in parent_1 with a maximum height of 3
        operator_nodes_1 = [node for node in get_operator_nodes(new_parent_1)
                        if get_heigth_subtree(node) <= 2]
        operator_nodes_2 = [node for node in get_operator_nodes(new_parent_2)
                        if get_heigth_subtree(node) <= 2]

        if not operator_nodes_1 or not operator_nodes_2:
            return new_parent_1  # No valid swap point in parent_1

        # Select a random swap point in parent_1 and parent_2
        swapping_point_1 = random.choice(operator_nodes_1)
        swapping_point_2 = random.choice(operator_nodes_2)

        new_parent_1 = replace_subtree(new_parent_1, swapping_point_1, swapping_point_2)
        new_parent_2 = replace_subtree(new_parent_2, swapping_point_2, swapping_point_1)
    else:
        leaves_parent_1 = get_leaf_nodes(new_parent_1)
        leaves_parent_2 = get_leaf_nodes(new_parent_2)

        if not leaves_parent_1 or not leaves_parent_2:
            return new_parent_1  # No crossover possible if there are no leaves
        # Select a random leaf from each parent
        leaf_1 = choice(leaves_parent_1)
        leaf_2 = choice(leaves_parent_2)

        # Replace the leaf
        new_parent_1 = replace_leaf(new_parent_1, leaf_1, leaf_2)
        new_parent_2 = replace_leaf(new_parent_2, leaf_2, leaf_1)

    return new_parent_1


## 11. Define Mutation Helper Functions

In [54]:
def get_subtrees_with_height(node, parent=None, index=None, current_height=0):
        """
        Retrieves all subtrees (pointers to nodes, parents, indices, and heights).
        """
        subtrees = []
        if node["type"] == "op":  # only consider operator nodes
            for i, child in enumerate(node["children"]):
                subtree_height = get_heigth_subtree(child)
                subtrees.append((node, i, subtree_height))  # Store the parent, index, and height
                subtrees.extend(get_subtrees_with_height(child, node, i, current_height + 1))
        return subtrees

def get_heigth_subtree(node):
    """
    Calculates the height of a subtree from the leaves towards the given node.
    """
    if node["type"] != "op" or not node.get("children"):
        return 0  # Leaf nodes have height 0
    return 1 + max(get_heigth_subtree(child) for child in node["children"])

def perform_erc_mutation():
    """
    Perform the ERC mutation.
    """
    while True:
        value = random.uniform(-4, 4)  # Generate a random value
        if value != 0:  # Ensure the value is not zero
            return {"type": "constant", "value": value}




## 12. Define Mutation Methods

In [55]:
def mutate_all_tree(features, max_depth):
    return generate_candidate_half_half(max_depth, features)

def mutate(function, features, max_depth):
    # Create a deep copy of the function to avoid unwanted modifications
    function = deepcopy(function)
    if function["type"] == "op":
        # if the node is an operator, randomly change the operator
        function["operator"] = random.choice(OPERATORS)
        # Recursively mutate the children
        child_idx = random.randint(0, len(function["children"]) - 1)
        function["children"][child_idx] = mutate(function["children"][child_idx], features, max_depth - 1)
    
    elif function["type"] == "x":
        # if the node is a leaf, randomly change the variable
        function["value"] = random.choice(features)  # Randomly select a new variable

    return function


def mutate_erc(function, features, max_depth):
    """
    Perform the ERC mutation.
    """
    # Create a deep copy of the function to avoid unwanted modifications
    function = deepcopy(function)
    if function["type"] != "op":
            # If the current node is a leaf, perform the ERC mutation
        return perform_erc_mutation()

    # If the current node is an operator, recursively mutate the children
    if function["type"] == "op":
        child_idx = random.randint(0, len(function["children"]) - 1)
        function["children"][child_idx] = mutate_erc(function["children"][child_idx], features, max_depth - 1)

    return function

def swap_mutation(function):
    # Create a deep copy of the function to avoid unwanted modifications
    function = deepcopy(function)
    # Get all subtrees with their heights
    subtrees = get_subtrees_with_height(function)
    if len(subtrees) < 2:
        return function  # No mutation possible
    # Select a random subtree
    parent_1, idx1, height1 = random.choice(subtrees)

    subtrees_sorted_by_height_diff = sorted(
        subtrees,
        key=lambda x: abs(x[2] - height1)  # Sort by the absolute difference in height
    )

   # filter out subtrees that are too similar in height
    for parent_2, idx2, _ in subtrees_sorted_by_height_diff:
        if (parent_1, idx1) != (parent_2, idx2):  # Ensure the subtrees are different
            # Swap the subtrees
            parent_1["children"][idx1], parent_2["children"][idx2] = (
                deepcopy(parent_2["children"][idx2]),
                deepcopy(parent_1["children"][idx1]),
            )
            break

    return function

## 13. Define Evolution strategy

In [56]:
mean_MSE = []
# Function to dynamically change the exploration rate

def get_dynamic_exploration_rate(MSE_scores):
    """
    Function that dynamically adjusts the exploration rate based on changes in the MSE (Mean Squared Error).

    """
    # Calculate and store the mean of the current MSE scores
    mean_MSE.append(np.mean(list(MSE_scores.values())))

    # Default exploration rate
    exploration_rate = 0.7

    # Adjust exploration rate if we have more than 100 iterations
    if len(mean_MSE) > 101:
        current_MSE = mean_MSE[-1]
        previous_MSE = mean_MSE[-99]  # Compare with MSE from 99 steps ago

        # Calculate percentage change in the MSE
        change_percentage = (current_MSE - previous_MSE) / previous_MSE if previous_MSE != 0 else 0

        # Adjust exploration rate based on the change in the MSE
        if change_percentage < 0:
            # Reduce exploration rate if the MSE has improved (decreased)
            exploration_rate = max(0.5, 1 + change_percentage)
        else:
            # Increase exploration rate if the MSE has worsened or stayed the same
            exploration_rate = 0.5

    return exploration_rate



In [57]:

def run_symbolic_regression(features, data, target):
    # Initialize the population with valid individuals only (i.e., those that contain all the variables)
    population = []
    while len(population) < POP_SIZE:
        candidate = generate_candidate_half_half(MAX_DEPTH, features)
        if evaluate_function(candidate, data) is not None:  # Ensure the function is valid
            population.append(candidate)

    best_function = None
    best_MSE = float("inf")
    # Evolve the population for a set number of generations
    for _ in tqdm.tqdm(range(MAX_GENERATIONS)):
        # calculate the MSE scores for the population
        MSE_scores = {}
        for individual in population:
            individual_repr = get_individual_representation(individual)
            score = calculate_mse(individual, data, target)
            C[individual_repr] = score
        
        # Map the functions to their string representations
        individual_mapping = {get_individual_representation(individual): individual for individual in population}
        offspring_list = []
        if FLAG_EXPLORATION_RATE_DYNAMIC:
            exploration_rate = get_dynamic_exploration_rate(MSE_scores)
        else:
            exploration_rate = 0.5
        # Generate new individuals for the population
        while len(offspring_list) < POP_SIZE:
            is_valid_candidate = False
            while not is_valid_candidate:
                parent_1_repr = tournament_selection(list(MSE_scores.keys()), MSE_scores)
                parent_1 = individual_mapping[parent_1_repr]
                if random.random() < CROSSOVER_RATE:
                    parent_2_repr = tournament_selection(list(MSE_scores.keys()), MSE_scores)
                    parent_2 = individual_mapping[parent_2_repr]
                    # choose between crossover_similar_height and crossover_2
                    crossover_probability = random.random()
                    #exploration
                    if crossover_probability < exploration_rate:
                        if random.random() < 0.5:
                            offspring = crossover_random_height(parent_1, parent_2)
                        else:
                           offspring = crossover_similar_height(parent_1, parent_2)
                    else:
                        #exploitation
                        offspring = crossover_fixed_height(parent_1, parent_2)
                else:
                    random_number = random.random()
                    if random_number <= ERC_RATE:
                        offspring = mutate_erc(parent_1, features, MAX_DEPTH)
                    elif random_number > 0.1 and random.random() < MUTATION_TREE_RATE:
                        offspring = mutate_all_tree(features, MAX_DEPTH)
                    elif random_number < 0.1 and random_number >  ERC_RATE:
                        offspring = mutate(parent_1, features, MAX_DEPTH)

                # Validate the offspring
                if validate_tree(offspring) and evaluate_function(offspring, data) is not None and contains_all_vars(offspring, features):
                    offspring_list.append(offspring)
                    is_valid_candidate = True
        
        # Combine the old and new populations
        total_population =  offspring_list + population
        # Calculate the MSE scores for the total population
        MSE_scores = {get_individual_representation(individual): calculate_mse(individual, data, target) for individual in total_population}
        # Sort the combined population by their MSE scores
        sorted_population = sorted(total_population, key=lambda individual: MSE_scores[get_individual_representation(individual)])
        
        # Select the top 300 individuals
        population = sorted_population[:NUM_BEST_INDVIDUALS_TO_KEEP]
        # Update the best function and its MSE score
        
        for individual in population:
            individual_repr = get_individual_representation(individual)
            score = MSE_scores[individual_repr]
            if score < best_MSE:
                best_MSE = score
                best_function = individual
        if best_MSE == 0:
            return best_function, best_MSE
    
    return best_function, best_MSE


## 14. Define Plot functions

In [58]:
def plot_2D(best_function, features, target,problem):
    """
    Function that generates a 2D plot comparing the real data points with the predicted points.
    """
 
    X = features[:, 0]


    x_grid = np.linspace(X.min(), X.max(), 100).reshape(-1, 1)
    x_flat = x_grid.ravel()
    data_range = {
        "x0": x_flat,
    }


    y_pred = evaluate_function(best_function, data_range)
    y_pred_grid = y_pred.reshape(x_grid.shape)
    y_pred_flat = y_pred_grid.ravel()
    fig = go.Figure()

    fig.add_trace(go.Scatter( x=X, y=target, mode='markers', marker=dict(size=4, color='blue'), name="Ground Truth"))

    fig.add_trace(go.Scatter(x=x_flat, y=y_pred_flat, mode='lines', line=dict(color='orange'), name="Symbolic Regression Best Candidate"))


    fig.update_layout(
        xaxis_title='x',
        yaxis_title='y',
        title="Comparison Symbolic regression prediction against Ground Truth",
        autosize=True
    )

    fig.show()
    file_path = f"../results/problem_{problem}_2d_plot.html"
    pio.write_html(fig, file=file_path, auto_open=True)
    print(f"The 3D plot has been saved here: {file_path}")
    fig.write_image(f"../results/problem_{problem}_2d_plot.png")
    



In [59]:


def plot_3D(best_function, features, target,problem):
    X1 = features[:, 0]
    X2 = features[:, 1]

    x1_grid = np.linspace(X1.min(), X1.max(), 50)
    x2_grid = np.linspace(X2.min(), X2.max(), 50)
    x1_mesh, x2_mesh = np.meshgrid(x1_grid, x2_grid)

    data_range = {
        "x0": x1_mesh.ravel(),
        "x1": x2_mesh.ravel(),
    }

    y_pred = evaluate_function(best_function, data_range)
    y_pred_mesh = y_pred.reshape(x1_mesh.shape)

    fig = go.Figure()

 
    fig.add_trace(go.Scatter3d(
        x=X1,
        y=X2,
        z=target,
        mode='markers',
        marker=dict(size=4, color='blue'),
        name="Ground Truth"
    ))


    fig.add_trace(go.Surface(
        z=y_pred_mesh,
        x=x1_grid,
        y=x2_grid,
        colorscale='Reds',
        opacity=0.7,
        name="Symbolic Regression Best Candidate"
    ))

 
    fig.update_layout(
        scene=dict(
            xaxis_title='x1',
            yaxis_title='x2',
            zaxis_title='y'
        ),
        title="Comparison Symbolic regression prediction against Ground Truth",
        autosize=True
    )


    fig.show()
    file_path = f"../results/problem_{problem}_2d_plot.html"
    pio.write_html(fig, file=file_path, auto_open=True)
    print(f"The 3D plot has been saved here: {file_path}")
    #save the plot as png
None

## 15. Define K-FOLD validation

In [60]:

def run_k_fold_cross_validation(features, target, k=3):
    """
    Function that performs k-fold cross-validation to evaluate the performance of the GP algorithm considering different folds
    """
    kf = KFold(n_splits=k, shuffle=True, random_state=43)
    fold_scores = []
    best__MSE = float("inf")
    best_p = None
    feature_names = [f"x{i}" for i in range(features.shape[1])]
    fold=0
    for train_idx, val_idx in kf.split(features):
        is_valid_candidate = False
        while not is_valid_candidate:
            train_features, val_features = features[train_idx], features[val_idx]
            train_target, val_target = target[train_idx], target[val_idx]
            # Create a dictionary with the training and validation data
            train_data_dict = {f"x{i}": train_features[:, i] for i in range(train_features.shape[1])}
            val_data_dict = {f"x{i}": val_features[:, i] for i in range(val_features.shape[1])}
            
            # Evolve the best function for the current fold
            best_function, _ = run_symbolic_regression(features=feature_names, data=train_data_dict, target=train_target)
            
            if best_function is None:
                print("Fold discarded: invalid function, retrying evolution")
            # Calculate the MSE of the best function on the validation data
            current_MSE = calculate_mse(best_function, val_data_dict, val_target) 
            if current_MSE == float("inf"):
                print("Fold discarded: invalid function, retrying evolution")
                print(f"function invalid: {get_individual_representation(best_function)}")
                # Save the data to a CSV file
                pd.DataFrame(val_data_dict).to_csv("data.csv")
            else: 
                is_valid_candidate = True
                fold_scores.append(current_MSE)
                fold+=1
                # Reset the mean_MSE list
                mean_MSE.clear()
                print(f"Fold {fold}° validated with MSE: {current_MSE:.4f}")
                print(f"Best function of fold {fold}°: {get_individual_representation(best_function)}")
                if current_MSE < best__MSE:
                    best__MSE = current_MSE
                    best_p = best_function
    
    return best_p, best__MSE



## 16. Execute Symbolic regression

In [61]:

# Load the dataset
problem=1
file_path = f"../data/problem_{problem}.npz"
samples = 10000
features, target = load_dataset(file_path)
if file_path == "../data/problem_8.npz":
    #do a sampling of the dataset taking only 10000 samples (randomly) to speed up the process
    if features.shape[0] > samples:
        idx = np.random.choice(features.shape[0], 10000, replace=False)
        features_sampled = features[idx]
        target_sampled = target[idx]
    else:
        features_sampled = features
        target_sampled = target
    best_p, best_MSE = run_k_fold_cross_validation(features_sampled, target_sampled, k=5)
else:
    # Perform k-fold cross validation
    best_p, best_MSE = run_k_fold_cross_validation(features, target, k=2)
print(f"Best function : {get_individual_representation(best_p)} MSE: {best_MSE:.4f} and MSE (all dataset): {calculate_mse(best_p, {f'x{i}': features[:, i] for i in range(features.shape[1])}, target):.4f}")
print(best_p)


  0%|          | 0/1000 [00:00<?, ?it/s]


Fold 1° validated with MSE: 0.0000
Best function of fold 1°: sin(x0)


  0%|          | 0/1000 [00:00<?, ?it/s]

Fold 2° validated with MSE: 0.0000
Best function of fold 2°: sin(x0)
Best function : sin(x0) MSE: 0.0000 and MSE (all dataset): 0.0000
{'type': 'op', 'operator': {'func_name': 'sin', 'func': <ufunc 'sin'>, 'arity': 1, 'format_str': 'sin({})'}, 'children': [{'type': 'x', 'name': 'x0'}]}





## 17. Generate plots

In [62]:
if features.shape[1] == 2:
    plot_3D(best_p, features, target,problem)

if features.shape[1] == 1:
    plot_2D(best_p, features, target,problem)

The 3D plot has been saved here: ../results/problem_1_2d_plot.html


In [63]:
! pip install kaleido


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m25.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
