In [1]:
import os
from collections import defaultdict
from functools import partial
from itertools import zip_longest
import shutil

import cma
import numpy as np
from tensorboardX import SummaryWriter

from phantom.learn.xcma import XCMA
from phantom.learn.xnes import XNES
from phantom.learn.xnessa import XNESSA

import lib.CORNN as CORNN

In [2]:
import importlib
import phantom
importlib.reload(phantom)

<module 'phantom' from 'C:\\Users\\volke\\PycharmProjects\\phantom-sc2\\phantom\\__init__.py'>

In [3]:
class CMAAdapter:
    def __init__(self, x0, sigma0, popsize):
        opts = cma.CMAOptions()
        opts.set("popsize", popsize)
        self.es = cma.CMAEvolutionStrategy(x0, sigma0, opts)

    def ask(self, rng):
        x = self.es.ask()
        return np.asarray(x).T

    def tell(self, x, fx):
        self.es.tell(x.T.tolist(), fx)
        return self.es.stop()

    def recommend(self):
        return self.es.mean

In [4]:
class XNESAdapter:
    def __init__(self, x0, sigma0, popsize):
        self.es = XNES(x0, sigma0)
        self.popsize = popsize

    def ask(self, rng):
        self.z, x = self.es.ask(num_samples=self.popsize, rng=rng)
        return x

    def tell(self, x, fx):
        return self.es.tell(self.z, np.argsort(fx))

    def recommend(self):
        return self.es.loc

In [5]:
class XNESSAAdapter:
    def __init__(self, x0, sigma0, popsize):
        self.es = XNESSA(x0, sigma0)
        self.popsize = popsize

    def ask(self, rng):
        self.z, x = self.es.ask(num_samples=self.popsize, rng=rng)
        return x

    def tell(self, x, fx):
        return self.es.tell(self.z, np.argsort(fx))

    def recommend(self):
        return self.es.loc

In [6]:
class XCMAAdapter:
    def __init__(self, x0, sigma0, popsize):
        self.es = XCMA(x0, sigma0)
        self.popsize = popsize

    def ask(self, rng):
        self.z, x = self.es.ask(num_samples=self.popsize, rng=rng)
        return x

    def tell(self, x, fx):
        return self.es.tell(self.z, np.argsort(fx))

    def recommend(self):
        return self.es.loc

In [7]:
es_dict = {
    "XCMA": XCMAAdapter,
    "CMA": CMAAdapter,
    "XNES": XNESAdapter,
    "LRAXCMA": XNESSAAdapter,
}

In [8]:


function_dictionary= CORNN.get_benchmark_functions()
# Returns a dictionary of all the regression functions within CORNN
# The key is the function name, the value is a 3 element tuple:
# containing the raw objective function (for example the Ackley function) and
# the x variable's domain and the y variable's domain

print([*function_dictionary.keys()])
# list all the available objective functions.

training_data, test_data= CORNN.get_scaled_function_data(function_dictionary["Ackley"])
# Both the training data and the test data are pairs.
# The first element of training_data is an PyTorch tensor of data patterns
# The second element of training_data is a PyTorch tensor of the corresponding labels
# The same is true for test_data

neural_network_dictionary= CORNN.get_NN_models()
# Returns a dictionary of all neural network architecture from CORNN
# The key is the class name, the value is the NN class built on PyTorch.

print([*neural_network_dictionary.keys()])
# list all the available neural network architecture.

neural_network_architecture=neural_network_dictionary["Net_3_tanh_layers"]() # the () to instantiate
# Selects the 3 hiddern layer NN model that uses ReLU activation function within
# the Hiddern layers.


# The combination of training_data, test_data, and the
# selected neural network architecture makes a problem instance of CORNN
CORNN_benchmark_instance= CORNN.NN_Benchmark(training_data, test_data, neural_network_architecture)
instance_dimension= CORNN_benchmark_instance.get_weight_count()


example_candidate_solution=np.random.rand(instance_dimension)

# In order to evaluate a candidate solution on the training set simply use:
training_loss=CORNN_benchmark_instance.training_set_evaluation(example_candidate_solution)
print("Training set loss:",training_loss)

# In order to evaluate a candidate solution on the training set simply use:
testing_loss=CORNN_benchmark_instance.testing_set_evaluation(example_candidate_solution)
print("Testing set loss:",testing_loss)

['Ackley', 'Ackley N.2', 'Ackley N.3', 'Ackley N.4', 'Adjiman', 'Alpine N.1', 'Alpine N.2', 'Bartels Conn', 'Beale', 'Bird', 'Bohachevsky N.1', 'Bohachevsky N.2', 'Booth', 'Brent', 'Brown', 'Bukin N.6', 'Cross-In-Tray', 'Deckkers-Aarts', 'Drop-Wave', 'Easom', 'Egg Crate', 'Egg Holder', 'Exponential', 'Goldstein-Price', 'Griewank', 'Himmelblau', 'Holder-Table', 'Keane', 'Leon', 'Levi N.13', 'Matyas', 'McCormick', 'Michalewicz', 'Periodic', 'Qing', 'Rastrigin', 'Ridge', 'Rosenbrock', 'Salomon', 'Schaffer N.2', 'Schaffer N.3', 'Schwefel 2.20', 'Schwefel 2.22', 'Schwefel 2.23', 'Shubert', 'Shubert 3', 'Sphere', 'Styblinski-Tang', 'Sum Squares', 'Three-Hump Camel', 'Xin-She Yang N.2', 'Xin-She Yang N.3', 'Xin-She Yang N.4', 'Zakharov']
['Net_1_relu_layer', 'Net_1_tanh_layer', 'Net_3_relu_layers', 'Net_3_tanh_layers', 'Net_5_relu_layers', 'Net_5_tanh_layers']
Training set loss: 14.48801326751709
Testing set loss: 14.5460844039917


In [9]:
fn_selection = {"Brown", "Rosenbrock", "Shubert"}
fn_selection = {}

In [10]:
fn_dict = {}
for fn, x_range, y_range, name in function_dictionary.values():
    if name not in fn_selection:
        continue
    training_data, test_data= CORNN.get_scaled_function_data(function_dictionary[name])
    nn = "Net_5_tanh_layer"
    b = CORNN.NN_Benchmark(training_data, test_data, neural_network_architecture)
    d = b.get_weight_count()
    fn_dict[name] = d, b.training_set_evaluation
fn_dict

{}

In [11]:
for d in [10, 30, 100]:
    fn_dict[f"elli{d}"] = d, partial(
        cma.ff.elli,
        rot=1,
        xoffset=2,
    )
    fn_dict[f"rosen{d}"] = d, cma.ff.rosen

In [12]:
def single(d, fn, es, rng):
    while True:
        x = es.ask(rng)
        fx = list(map(fn, x.T))
        yield fn(es.recommend())
        # print(min(fx))
        if es.tell(x, fx):
            break

In [13]:
def benchmark(fns, es_clss):
    streams = []
    for d, fn in fns:
        n = 4 + int(3 * np.log(d))
        for es_cls in es_clss:
            x0 = np.zeros(d)
            sigma0 = 1.
            es = es_cls(x0, sigma0, n)
            rng = np.random.default_rng(0)
            streams.append(single(d, fn, es, rng))
    return zip_longest(*streams)

In [14]:
output_dir = "runs"

In [15]:
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)

In [16]:
writers = {
    (fn_name, es_name): SummaryWriter(log_dir=f"{output_dir}/{fn_name}/{es_name}")
    for fn_name in fn_dict
    for es_name in es_dict
}

stream = benchmark(fn_dict.values(), es_dict.values())
for step, results in enumerate(stream):
    for key, value in zip(writers.keys(), results):
        (fn_name, es_name) = key
        if value is not None:
            writers[key].add_scalar(fn_name, value, global_step=step)
    for w in writers.values():
        w.flush()

for w in writers.values():
    w.close()

(5_w,10)-aCMA-ES (mu_w=3.2,w_1=45%) in dimension 10 (seed=917606, Fri Jan 30 10:35:41 2026)
(5_w,10)-aCMA-ES (mu_w=3.2,w_1=45%) in dimension 10 (seed=861184, Fri Jan 30 10:35:41 2026)
(7_w,14)-aCMA-ES (mu_w=4.3,w_1=36%) in dimension 30 (seed=869765, Fri Jan 30 10:35:41 2026)
(7_w,14)-aCMA-ES (mu_w=4.3,w_1=36%) in dimension 30 (seed=848633, Fri Jan 30 10:35:41 2026)
(8_w,17)-aCMA-ES (mu_w=5.1,w_1=31%) in dimension 100 (seed=907037, Fri Jan 30 10:35:41 2026)
(8_w,17)-aCMA-ES (mu_w=5.1,w_1=31%) in dimension 100 (seed=918965, Fri Jan 30 10:35:41 2026)
