In [2]:
import numpy as np
import logging
from tqdm.auto import tqdm
from dataclasses import dataclass
import operator as op
from sklearn.model_selection import train_test_split
from typing import Union
from statistics import mean
import matplotlib.pyplot as plt
import re

# Assuming the 'tree' module from the original code is saved as 'tree.py'
from tree import Tree, Operator 

# ... (rest of your code) ...

# Configuration Constants
DEPTH_LIMIT = 10
PROGENY_COUNT = 10
COHORT_SIZE = 30
EPOCH_COUNT = 200
CHALLENGE_ID = 3
LITERAL_POOL_SIZE = 10
LITERAL_SPAN = 20

logging.basicConfig(level="INFO")

class CalculationEngine:
    """Handles mathematical operations with validation and error handling."""

    PRECISION_THRESHOLD = 1e-12
    RELATIVE_MARGIN = 1e-9

    @staticmethod
    def _ensure_scalar(value: Union[int, float, np.ndarray]) -> float:
        """Converts input to a scalar float, raising an error for invalid types."""
        if isinstance(value, (int, float)):
            return float(value)
        elif isinstance(value, np.ndarray):
            if value.ndim == 0:
                return float(value)
            else:
                value_flat = value.ravel()
                assert value_flat.size == 1, f"Expected scalar, found array: {value}"
                return float(value_flat[0])
        else:
            logging.error(
                f"Invalid data type: {type(value)}, expected int, float, or np.ndarray"
            )
            return value

    @staticmethod
    def sum_values(a, b):
        """Adds two numerical values after validation."""
        a = CalculationEngine._ensure_scalar(a)
        b = CalculationEngine._ensure_scalar(b)
        assert isinstance(a, (int, float)), f"Expected number, found {a} ({type(a)})"
        assert isinstance(b, (int, float)), f"Expected number, found {b} ({type(b)})"
        return a + b

    @staticmethod
    def difference(a, b):
        """Subtracts two numerical values after validation."""
        a = CalculationEngine._ensure_scalar(a)
        b = CalculationEngine._ensure_scalar(b)
        assert isinstance(a, (int, float)), f"Expected number, found {a} ({type(a)})"
        assert isinstance(b, (int, float)), f"Expected number, found {b} ({type(b)})"
        return a - b

    @staticmethod
    def product(a, b):
        """Multiplies two numerical values after validation."""
        a = CalculationEngine._ensure_scalar(a)
        b = CalculationEngine._ensure_scalar(b)
        assert isinstance(a, (int, float)), f"Expected number, found {a} ({type(a)})"
        assert isinstance(b, (int, float)), f"Expected number, found {b} ({type(b)})"
        return a * b

    @staticmethod
    def quotient(a, b):
        """Divides two numerical values, handling near-zero divisors."""
        a = CalculationEngine._ensure_scalar(a)
        b = CalculationEngine._ensure_scalar(b)
        assert isinstance(a, (int, float)), f"Expected number, found {a} ({type(a)})"
        assert isinstance(b, (int, float)), f"Expected number, found {b} ({type(b)})"
        if b == 0:
            margin = min(CalculationEngine.RELATIVE_MARGIN * a, CalculationEngine.PRECISION_THRESHOLD)
            a += margin
            b += margin
        return a / b

    @staticmethod
    def negate(a):
        """Negates a numerical value after validation."""
        a = CalculationEngine._ensure_scalar(a)
        assert isinstance(a, (int, float)), f"Expected number, found {a} ({type(a)})"
        return -a

    @staticmethod
    def magnitude(a):
        """Returns the absolute value of a number."""
        a = CalculationEngine._ensure_scalar(a)
        assert isinstance(a, (int, float)), f"Expected number, found {a} ({type(a)})"
        return a if a >= 0 else -a
    
    @staticmethod
    def sine(a):
        """Calculates the sine of a number, handling validation."""
        a = CalculationEngine._ensure_scalar(a)
        assert isinstance(a, (int, float)), f"Expected number, found {a} ({type(a)})"
        result = np.sin(a)
        return CalculationEngine._ensure_scalar(result)
    
    @staticmethod
    def cosine(a):
        """Calculates the cosine of a number, handling validation."""
        a = CalculationEngine._ensure_scalar(a)
        assert isinstance(a, (int, float)), f"Expected number, found {a} ({type(a)})"
        result = np.cos(a)
        return CalculationEngine._ensure_scalar(result)

# Define operator sets
core_operators = [
    Operator("+", CalculationEngine.sum_values, 2),
    Operator("-", CalculationEngine.difference, 2),
    Operator("*", CalculationEngine.product, 2),
    Operator("/", CalculationEngine.quotient, 2),
    Operator("-", CalculationEngine.negate, 1),
    Operator("|.|", CalculationEngine.magnitude, 1),
    Operator("sin(.)", CalculationEngine.sine, 1),
    Operator("cos(.)", CalculationEngine.cosine, 1)
]

standard_operators = [
    Operator("+", op.add, 2),
    Operator("-", op.sub, 2),
    Operator("*", op.mul, 2),
    Operator("|.|", op.abs, 1),
    Operator("-", op.neg, 1),
]

numpy_extensions = [
    Operator("sign", np.sign, 1),
    Operator("cbrt", np.cbrt, 1),
    Operator("sin", np.sin, 1),
    Operator("cos", np.cos, 1),
    Operator("tan", np.tan, 1),
    Operator("tanh", np.tanh, 1),
]

potential_overflow_ops = [
    Operator("^", op.pow, 2),
    Operator("exp", np.exp, 1),
    Operator("exp2", np.exp2, 1),
    Operator("square", np.square, 1),
    Operator("sinh", np.sinh, 1),
    Operator("cosh", np.cosh, 1),
]

restricted_domain_ops = [
    Operator("/", op.truediv, 2),
    Operator("/", np.divide, 2),
    Operator("reciprocal", np.reciprocal, 1),
    Operator("sqrt", np.sqrt, 1),
    Operator("log", np.log, 1),
    Operator("log2", np.log2, 1),
    Operator("log10", np.log10, 1),
    Operator("arcsin", np.arcsin, 1),
    Operator("arccos", np.arccos, 1),
    Operator("arctan", np.arctan, 1),
]

chosen_operators = standard_operators + numpy_extensions

# Load data and prepare dataset
data_archive = np.load(f"./data/problem_{CHALLENGE_ID}.npz")
input_dimensions = data_archive["x"].shape[0]
sample_count = data_archive["x"].shape[1]

train_idx, val_idx = train_test_split(
    range(sample_count), test_size=0.1, random_state=42
)
input_train, output_train = data_archive["x"][:, train_idx], data_archive["y"][train_idx]
input_val, output_val = data_archive["x"][:, val_idx], data_archive["y"][val_idx]

input_labels = [f"var{i}" for i in range(input_dimensions)]
constant_pool = [
    LITERAL_SPAN * np.random.uniform(low=-1, high=1, size=1)
    for _ in range(LITERAL_POOL_SIZE)
]

@dataclass
class SymbolicExpression:
    """Represents an individual in the population with a genome and fitness."""

    structure: Tree
    error_metric: float

def select_elite(cohort: list[SymbolicExpression]) -> SymbolicExpression:
    """Tournament selection to choose a parent from the population."""
    SELECTION_POOL_SIZE = 10
    chosen = sorted(
        np.random.choice(cohort, SELECTION_POOL_SIZE), key=lambda member: member.error_metric
    )
    return chosen[0]

def breed(elite1: SymbolicExpression, elite2: SymbolicExpression) -> SymbolicExpression:
    """Creates a new individual by crossing over two parents."""
    offspring_structure = Tree._recombine_expressions(elite1.structure, elite2.structure)
    return SymbolicExpression(offspring_structure, 0)

def introduce_variation(expression: SymbolicExpression) -> SymbolicExpression:
    """Mutates the genome of an individual."""
    mutated_structure = Tree._apply_mutation(expression.structure)
    return SymbolicExpression(mutated_structure, 0)

def visualize_error_progression(error_data: list[list[float]], series_labels: list[str]):
    """Plots the error metric over generations for different datasets."""
    for i, errors in enumerate(error_data):
        plt.plot(errors, label=series_labels[i])
    plt.legend()
    plt.title("Error Metric vs. Generation")
    plt.xlabel("Generation")
    plt.ylabel("Mean Squared Error")
    plt.show()

def optimize_expressions(initial_cohort: list[SymbolicExpression]) -> SymbolicExpression:
    """Main evolutionary algorithm loop."""
    for expr in initial_cohort:
        expr.error_metric = Tree.calculate_mean_squared_error(expr.structure, input_train, output_train)

    train_errors = []
    val_errors = []

    for _ in tqdm(range(EPOCH_COUNT)):
        progeny: list[SymbolicExpression] = []
        for _ in range(PROGENY_COUNT):
            if np.random.random() < 0.02:
                parent = select_elite(initial_cohort)
                offspring = introduce_variation(parent)
            else:
                parent1 = select_elite(initial_cohort)
                parent2 = select_elite(initial_cohort)
                offspring = breed(parent1, parent2)
            progeny.append(offspring)

        for expr in progeny:
            expr.error_metric = Tree.calculate_mean_squared_error(expr.structure, input_train, output_train)

        initial_cohort.extend(progeny)
        initial_cohort.sort(key=lambda expr: expr.error_metric)
        initial_cohort = initial_cohort[:COHORT_SIZE]

        avg_train_error = mean(
            [float(expr.error_metric) for expr in initial_cohort]
        )
        train_errors.append(avg_train_error)

        avg_val_error = mean(
            [
                float(Tree.calculate_mean_squared_error(expr.structure, input_val, output_val))
                for expr in initial_cohort
            ]
        )
        val_errors.append(avg_val_error)

    # visualize_error_progression([train_errors, val_errors], ["Training", "Validation"])
    return initial_cohort[0]


In [None]:
import matplotlib.pyplot as plt
import numpy as np
i=1

print(f"Problem {i}")
x = np.load(f"./data/problem_{i}.npz")
x['x'].shape, x['y'].shape

dataset_dim = x['x'].shape[0]
dataset_size = x['x'].shape[1]

train_indices, val_indices = train_test_split(range(dataset_size), test_size=0.1, random_state=42)
x_train, y_train = x['x'][:, train_indices], x['y'][train_indices]
x_val, y_val = x['x'][:, val_indices], x['y'][val_indices]


# Generate the initial population
initial_population = [
    SymbolicExpression(
        Tree.construct_expression(
            chosen_operators, input_labels, constant_pool, depth_limit=DEPTH_LIMIT
        ),
        0,
    )
    for _ in range(COHORT_SIZE)
]

print(input_train.shape, output_train.shape)
print(input_val.shape, output_val.shape)

# Run the evolutionary algorithm
top_expression = optimize_expressions(initial_population)

# Visualize and evaluate the best solution
# plt.figure(figsize=(14, 8))
# top_expression.structure.render_expression()
validation_error = Tree.calculate_mean_squared_error(top_expression.structure, input_val, output_val)
print(
    f"Training Error: {top_expression.error_metric}, Validation Error: {validation_error}"
)

# Display the formula of the best solution
print(f"\nBest Solution Formula: {top_expression.structure}")


# def display_individual(individual: Individual):
#     print(individual.genome)
#     print(individual.mse)


# # Display the formula of the best solution
# display_individual(champion)



Problem 1
(3, 4500) (4500,)
(3, 500) (500,)


  [float(expr.error_metric) for expr in initial_cohort]
  float(Tree.calculate_mean_squared_error(expr.structure, input_val, output_val))
  0%|          | 1/200 [00:02<07:20,  2.21s/it]