In [3]:
import ray

ray.shutdown()
ray.init(dashboard_host="0.0.0.0")
ray.available_resources()

2024-08-11 14:31:39,332	INFO worker.py:1772 -- Started a local Ray instance. View the dashboard at [1m[32m172.25.4.192:8266 [39m[22m


{'accelerator_type:G': 1.0,
 'node:__internal_head__': 1.0,
 'node:172.25.4.192': 1.0,
 'CPU': 32.0,
 'object_store_memory': 2928236544.0,
 'memory': 5856473088.0,
 'GPU': 1.0}

In [4]:
import logging
from ray.tune.search import UNRESOLVED_SEARCH_SPACE
from ray.tune.search.variant_generator import parse_spec_vars
from ray.tune.search.hebo.hebo_search import SPACE_ERROR_MESSAGE
import pandas as pd
from ray.tune.utils.util import validate_warmstart
import torch
from ray.tune.result import DEFAULT_METRIC
from typing import Dict, Any, Optional, List, Union
from ray import tune, train
from ray.tune.search.hebo import HEBOSearch
import hebo

logger = logging.getLogger(__name__)


class CustomHEBOSearch(HEBOSearch):

    def __init__(
            self,
            space: Optional[
                Union[Dict, "hebo.design_space.design_space.DesignSpace"]
            ] = None,
            metric: Optional[List[str]] = None,
            mode: Optional[List[str]] = None,
            constraint: Optional[List[str]] = None,
            points_to_evaluate: Optional[List[Dict]] = None,
            evaluated_rewards: Optional[List] = None,
            random_state_seed: Optional[int] = None,
            max_concurrent: int = 8,
            **kwargs,
    ):
        assert hebo is not None, (
            "HEBO must be installed! You can install HEBO with"
            " the command: `pip install 'HEBO>=0.2.0'`."
            "This error may also be caused if HEBO"
            " dependencies have bad versions. Try updating HEBO"
            " first."
        )
        if mode:
            assert all(
                m in ["min", "max"] for m in mode
            ), "Mode must be either 'min' or 'max'."
        assert (
                isinstance(max_concurrent, int) and max_concurrent >= 1
        ), "`max_concurrent` must be an integer and at least 1."
        if random_state_seed is not None:
            assert isinstance(
                random_state_seed, int
            ), "random_state_seed must be None or int, got '{}'.".format(
                type(random_state_seed)
            )
        super(HEBOSearch, self).__init__(metric=metric, mode=mode)

        if isinstance(space, dict) and space:
            resolved_vars, domain_vars, grid_vars = parse_spec_vars(space)
            if resolved_vars:
                raise TypeError(SPACE_ERROR_MESSAGE)
            if domain_vars or grid_vars:
                logger.warning(
                    UNRESOLVED_SEARCH_SPACE.format(par="space", cls=type(self))
                )
                space = self.convert_search_space(space)
        elif space is not None and not isinstance(
                space, hebo.design_space.design_space.DesignSpace
        ):
            raise TypeError(SPACE_ERROR_MESSAGE + " Got {}.".format(type(space)))

        self._constraint = constraint

        self._hebo_config = kwargs
        self._random_state_seed = random_state_seed
        self._space = space
        self._points_to_evaluate = points_to_evaluate
        self._evaluated_rewards = evaluated_rewards
        self._initial_points = []
        self._live_trial_mapping = {}

        self._max_concurrent = max_concurrent
        self._suggestions_cache = []
        self._batch_filled = False

        self._opt = None
        if space:
            self._setup_optimizer()

    @property
    def metric_op(self):
        return self._metric_op

    def _setup_optimizer(self):
        # HEBO internally minimizes, so "max" => -1
        self._metric_op = [-1.0 if mode == "max" else 1.0 for mode in self._mode]

        if self._metric is None and self._mode:
            # If only a mode was passed, use anonymous metric
            self._metric = [DEFAULT_METRIC]

        if not isinstance(self._space, hebo.design_space.design_space.DesignSpace):
            raise ValueError(
                f"Invalid search space: {type(self._space)}. Either pass a "
                f"valid search space to the `HEBOSearch` class or pass "
                f"a `param_space` parameter to `tune.Tuner()`"
            )

        if self._space.num_paras <= 0:
            raise ValueError(
                "Got empty search space. Please make sure to pass "
                "a valid search space with at least one parameter to "
                "`HEBOSearch`"
            )

        if self._random_state_seed is not None:
            np.random.seed(self._random_state_seed)
            torch.random.manual_seed(self._random_state_seed)

        self._opt = hebo.optimizers.general.GeneralBO(
            space=self._space,
            num_obj=len(self._metric),
            num_constr=len(self._constraint) if self._constraint else 0,
            **self._hebo_config
        )

        if self._points_to_evaluate:
            validate_warmstart(
                self._space.para_names,
                self._points_to_evaluate,
                self._evaluated_rewards,
            )
            if self._evaluated_rewards:
                self._opt.observe(
                    pd.DataFrame(self._points_to_evaluate),
                    np.array(
                        np.concatenate(
                            np.array(self._evaluated_rewards) * self._metric_op,
                            self._constraint,
                        )
                    )
                )
            else:
                self._initial_points = self._points_to_evaluate

    def _process_result(self, trial_id: str, result: Dict):
        X = self._live_trial_mapping[trial_id]
        y = np.array(
            [
                result[metric] * op
                for metric, op in zip(self._metric, self._metric_op, strict=True)
            ],
            ndmin=2
        )
        self._opt.observe(
            X=X,
            y=y,
        )

    def add_evaluated_point(
            self,
            parameters: Dict,
            value: float,
            error: bool = False,
            pruned: bool = False,
            intermediate_values: Optional[List[float]] = None,
    ):
        raise NotImplementedError("This method is not supported for CustomHEBOSearch")



In [5]:

from ax.service.utils.instantiation import ObjectiveProperties
import copy
import ax
from ax.service.ax_client import AxClient
from ray.tune.search.ax import AxSearch


class CustomAxSearch(AxSearch):
    def __init__(
            self,
            space: Optional[Union[Dict, List[Dict]]] = None,
            metric: Optional[List[str]] = None,
            mode: Optional[List[str]] = None,
            points_to_evaluate: Optional[List[Dict]] = None,
            parameter_constraints: Optional[List] = None,
            outcome_constraints: Optional[List] = None,
            ax_client: Optional[AxClient] = None,
            **ax_kwargs,
    ):
        assert (
                ax is not None
        ), """Ax must be installed!
            You can install AxSearch with the command:
            `pip install ax-platform sqlalchemy`."""

        if mode:
            assert all(m in ["min", "max"] for m in mode), "`mode` must be 'min' or 'max'."

        super(AxSearch, self).__init__(
            metric=metric,
            mode=mode,
        )

        self._ax = ax_client
        self._ax_kwargs = ax_kwargs or {}

        if isinstance(space, dict) and space:
            resolved_vars, domain_vars, grid_vars = parse_spec_vars(space)
            if domain_vars or grid_vars:
                logger.warning(
                    UNRESOLVED_SEARCH_SPACE.format(par="space", cls=type(self))
                )
                space = self.convert_search_space(space)

        self._space = space
        self._parameter_constraints = parameter_constraints
        self._outcome_constraints = outcome_constraints

        self._points_to_evaluate = copy.deepcopy(points_to_evaluate)

        self._parameters = []
        self._live_trial_mapping = {}

        if self._ax or self._space:
            self._setup_experiment()

    def _setup_experiment(self):
        if self._metric is None and self._mode:
            # If only a mode was passed, use anonymous metric
            self._metric = [DEFAULT_METRIC]

        if not self._ax:
            self._ax = AxClient(**self._ax_kwargs)

        try:
            exp = self._ax.experiment
            has_experiment = True
        except ValueError:
            has_experiment = False

        if not has_experiment:
            if not self._space:
                raise ValueError(
                    "You have to create an Ax experiment by calling "
                    "`AxClient.create_experiment()`, or you should pass an "
                    "Ax search space as the `space` parameter to `AxSearch`, "
                    "or pass a `param_space` dict to `tune.Tuner()`."
                )
            if not self.mode:
                raise ValueError(
                    "Please specify the `mode` argument when initializing "
                    "the `AxSearch` object or pass it to `tune.TuneConfig()`."
                )
            self._ax.create_experiment(
                parameters=self._space,
                objectives={
                    metric: ObjectiveProperties(minimize=mode != "max")
                    for metric, mode in zip(self._metric, self._mode, strict=True)
                },
                parameter_constraints=self._parameter_constraints,
                outcome_constraints=self._outcome_constraints,
            )
        else:
            if any(
                    [
                        self._space,
                        self._parameter_constraints,
                        self._outcome_constraints,
                        self._mode,
                        self._metric,
                    ]
            ):
                raise ValueError(
                    "If you create the Ax experiment yourself, do not pass "
                    "values for these parameters to `AxSearch`: {}.".format(
                        [
                            "space",
                            "parameter_constraints",
                            "outcome_constraints",
                            "mode",
                            "metric",
                        ]
                    )
                )

        exp = self._ax.experiment

        # Update mode and metric from experiment if it has been passed
        self._mode = []
        self._metric = []
        for objective in exp.optimization_config.objective.objectives:
            self._mode.append("min" if objective.minimize else "max")
            self._metric.append(objective.metric.name)

        self._parameters = list(exp.parameters)

        if self._ax._enforce_sequential_optimization:
            logger.warning(
                "Detected sequential enforcement. Be sure to use "
                "a ConcurrencyLimiter."
            )


In [13]:
import dill
import numpy as np

step_1 = 1
step_2 = 1 * 10 ** -8
step_1 *= 1000
step_2 *= 10 ** 4
space: Dict[str, Any] = {
    "1": tune.choice(categories=np.arange(1 * 10 ** 1, 1 * 10 ** 8 + step_1, step_1)),
    "2": tune.choice(categories=np.arange(1 * 10 ** -8, 1 + step_2, step_2)),
    "3": tune.choice(categories=np.arange(1 * 10 ** 1, 1 * 10 ** 8 + step_1, step_1)),
    "4": tune.choice(categories=np.arange(1 * 10 ** -8, 1 + step_2, step_2)),
    "5": tune.choice(categories=np.arange(1 * 10 ** 1, 1 * 10 ** 8 + step_1, step_1)),
    "6": tune.choice(categories=np.arange(1 * 10 ** -8, 1 + step_2, step_2)),
    "7": tune.choice(categories=np.arange(1 * 10 ** 1, 1 * 10 ** 8 + step_1, step_1)),
    "8": tune.choice(categories=np.arange(1 * 10 ** -8, 1 + step_2, step_2)),
}
# for value in space.values():
#     new_categories: List[str] = []
#     for category in value.categories:
#         new_category: str = dill.dumps(category).hex()
#         new_categories.append(new_category)
#     value.categories = new_categories

In [19]:
from ray.tune.search.optuna import OptunaSearch


def evaluator(config: Dict[str, Any]):
    metrics: Dict[str, Any] = {
        # key: dill.loads(bytes.fromhex(value)) for key, value in config.items()
        key: value for key, value in config.items()
    }
    train.report(
        metrics=metrics,
    )


search_alg = OptunaSearch(
    space=space,
    metric=["1", "2", "3", "4", "5", "6", "7", "8"],
    mode=["max", "min", "min", "max", "max", "min", "min", "max"],
)
search_alg.set_max_concurrency(
    max_concurrent=32,
)
tuner = tune.Tuner(
    trainable=evaluator,
    tune_config=tune.TuneConfig(
        search_alg=search_alg,
        num_samples=30,
    ),
)
results = tuner.fit()

0,1
Current time:,2024-08-11 14:41:33
Running for:,00:02:28.35
Memory:,10.3/15.5 GiB

Trial name,status,loc,1,2,3,4,5,6,7,8,iter,total time (s),1.1,2.1,3.1
evaluator_3a99c88d,TERMINATED,172.25.4.192:10772,79252000.0,0.561,25846000.0,0.0624,88154000.0,0.0982,6022010.0,0.3928,1,0.000181913,79252000.0,0.561,25846000.0
evaluator_90cfa248,TERMINATED,172.25.4.192:10843,17134000.0,0.1809,77454000.0,0.3112,35454000.0,0.3185,14159000.0,0.3724,1,0.000175238,17134000.0,0.1809,77454000.0
evaluator_fbf99130,TERMINATED,172.25.4.192:10915,85681000.0,0.9481,64901000.0,0.1681,25568000.0,0.6175,62446000.0,0.6817,1,0.000172377,85681000.0,0.9481,64901000.0
evaluator_737a6241,TERMINATED,172.25.4.192:10985,64668000.0,0.7938,68938000.0,0.4268,86839000.0,0.2016,83585000.0,0.4152,1,0.000173807,64668000.0,0.7938,68938000.0
evaluator_8dfb18b8,TERMINATED,172.25.4.192:11056,13583000.0,0.6628,66925000.0,0.4808,31564000.0,0.047,81843000.0,0.6119,1,0.000267506,13583000.0,0.6628,66925000.0
evaluator_9878dbe6,TERMINATED,172.25.4.192:11127,436010.0,0.6163,68063000.0,0.00160001,11598000.0,0.9059,78319000.0,0.8271,1,0.000246048,436010.0,0.6163,68063000.0
evaluator_0bb7d92f,TERMINATED,172.25.4.192:11198,30201000.0,0.5299,66301000.0,0.9113,38562000.0,0.4881,85198000.0,0.4303,1,0.000180006,30201000.0,0.5299,66301000.0
evaluator_232c569d,TERMINATED,172.25.4.192:11270,81582000.0,0.00670001,93022000.0,0.958,33929000.0,0.1775,39334000.0,0.762,1,0.000458956,81582000.0,0.00670001,93022000.0
evaluator_9ef3ed6a,TERMINATED,172.25.4.192:11340,69295000.0,0.1661,18025000.0,0.4292,22997000.0,0.8061,59276000.0,0.194,1,0.000175476,69295000.0,0.1661,18025000.0
evaluator_473de7ad,TERMINATED,172.25.4.192:11411,6160010.0,0.4678,42797000.0,0.7847,22724000.0,0.8585,35431000.0,0.2632,1,0.000171661,6160010.0,0.4678,42797000.0


2024-08-11 14:41:33,249	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/root/ray_results/evaluator_2024-08-11_14-39-04' in 0.4621s.
2024-08-11 14:41:33,258	INFO tune.py:1041 -- Total run time: 148.37 seconds (147.89 seconds for the tuning loop).


In [20]:
df_results: pd.DataFrame = results.get_dataframe()
df_results

Unnamed: 0,1,2,3,4,5,6,7,8,timestamp,checkpoint_dir_name,...,iterations_since_restore,config/1,config/2,config/3,config/4,config/5,config/6,config/7,config/8,logdir
0,79252010,0.561,25846010,0.0624,88154010,0.0982,6022010,0.3928,1723387146,,...,1,79252010,0.561,25846010,0.0624,88154010,0.0982,6022010,0.3928,3a99c88d
1,17134010,0.1809,77454010,0.3112,35454010,0.3185,14159010,0.3724,1723387148,,...,1,17134010,0.1809,77454010,0.3112,35454010,0.3185,14159010,0.3724,90cfa248
2,85681010,0.9481,64901010,0.1681,25568010,0.6175,62446010,0.6817,1723387150,,...,1,85681010,0.9481,64901010,0.1681,25568010,0.6175,62446010,0.6817,fbf99130
3,64668010,0.7938,68938010,0.4268,86839010,0.2016,83585010,0.4152,1723387152,,...,1,64668010,0.7938,68938010,0.4268,86839010,0.2016,83585010,0.4152,737a6241
4,13583010,0.6628,66925010,0.4808,31564010,0.047,81843010,0.6119,1723387154,,...,1,13583010,0.6628,66925010,0.4808,31564010,0.047,81843010,0.6119,8dfb18b8
5,436010,0.6163,68063010,0.0016,11598010,0.9059,78319010,0.8271,1723387156,,...,1,436010,0.6163,68063010,0.0016,11598010,0.9059,78319010,0.8271,9878dbe6
6,30201010,0.5299,66301010,0.9113,38562010,0.4881,85198010,0.4303,1723387159,,...,1,30201010,0.5299,66301010,0.9113,38562010,0.4881,85198010,0.4303,0bb7d92f
7,81582010,0.0067,93022010,0.958,33929010,0.1775,39334010,0.762,1723387161,,...,1,81582010,0.0067,93022010,0.958,33929010,0.1775,39334010,0.762,232c569d
8,69295010,0.1661,18025010,0.4292,22997010,0.8061,59276010,0.194,1723387163,,...,1,69295010,0.1661,18025010,0.4292,22997010,0.8061,59276010,0.194,9ef3ed6a
9,6160010,0.4678,42797010,0.7847,22724010,0.8585,35431010,0.2632,1723387165,,...,1,6160010,0.4678,42797010,0.7847,22724010,0.8585,35431010,0.2632,473de7ad


In [21]:
import numpy as np
from pymoo.decomposition.asf import ASF


def get_decision_index(results: np.ndarray) -> int:
    decomp: ASF = ASF()
    approx_ideal: np.ndarray = results.min(axis=0)
    approx_nadir: np.ndarray = results.max(axis=0)
    denominator: np.ndarray = approx_nadir - approx_ideal
    denominator[denominator == 0] += np.finfo(denominator.dtype).eps
    normalized_results: np.ndarray = (results - approx_ideal) / denominator
    weights: np.ndarray = np.array([0.5] * results.shape[1], dtype=np.float64)
    weights[weights == 0] += np.finfo(weights.dtype).eps
    normalized_weights: np.ndarray = weights / weights.sum()
    decision_index: int = decomp.do(normalized_results, 1 / normalized_weights).argmin()

    return decision_index


metric_op: List[int] = [-1 if mode == "max" else 1 for mode in search_alg.mode]
decision_df_results: pd.DataFrame = df_results.copy(deep=True)
decision_df_results[search_alg.metric] = decision_df_results[search_alg.metric] * metric_op
decision_index: int = get_decision_index(decision_df_results[search_alg.metric].to_numpy())
df_results["decision"] = df_results.index == decision_index
df_results

Unnamed: 0,1,2,3,4,5,6,7,8,timestamp,checkpoint_dir_name,...,config/1,config/2,config/3,config/4,config/5,config/6,config/7,config/8,logdir,decision
0,79252010,0.561,25846010,0.0624,88154010,0.0982,6022010,0.3928,1723387146,,...,79252010,0.561,25846010,0.0624,88154010,0.0982,6022010,0.3928,3a99c88d,False
1,17134010,0.1809,77454010,0.3112,35454010,0.3185,14159010,0.3724,1723387148,,...,17134010,0.1809,77454010,0.3112,35454010,0.3185,14159010,0.3724,90cfa248,False
2,85681010,0.9481,64901010,0.1681,25568010,0.6175,62446010,0.6817,1723387150,,...,85681010,0.9481,64901010,0.1681,25568010,0.6175,62446010,0.6817,fbf99130,False
3,64668010,0.7938,68938010,0.4268,86839010,0.2016,83585010,0.4152,1723387152,,...,64668010,0.7938,68938010,0.4268,86839010,0.2016,83585010,0.4152,737a6241,False
4,13583010,0.6628,66925010,0.4808,31564010,0.047,81843010,0.6119,1723387154,,...,13583010,0.6628,66925010,0.4808,31564010,0.047,81843010,0.6119,8dfb18b8,False
5,436010,0.6163,68063010,0.0016,11598010,0.9059,78319010,0.8271,1723387156,,...,436010,0.6163,68063010,0.0016,11598010,0.9059,78319010,0.8271,9878dbe6,False
6,30201010,0.5299,66301010,0.9113,38562010,0.4881,85198010,0.4303,1723387159,,...,30201010,0.5299,66301010,0.9113,38562010,0.4881,85198010,0.4303,0bb7d92f,False
7,81582010,0.0067,93022010,0.958,33929010,0.1775,39334010,0.762,1723387161,,...,81582010,0.0067,93022010,0.958,33929010,0.1775,39334010,0.762,232c569d,False
8,69295010,0.1661,18025010,0.4292,22997010,0.8061,59276010,0.194,1723387163,,...,69295010,0.1661,18025010,0.4292,22997010,0.8061,59276010,0.194,9ef3ed6a,False
9,6160010,0.4678,42797010,0.7847,22724010,0.8585,35431010,0.2632,1723387165,,...,6160010,0.4678,42797010,0.7847,22724010,0.8585,35431010,0.2632,473de7ad,False


In [22]:
import plotly.express as px

plot_df_results: pd.DataFrame = df_results.copy(deep=True)
plot_df_results["decision"] = plot_df_results["decision"].map({True: 1, False: 0})
fig = px.parallel_coordinates(
    data_frame=plot_df_results,
    color="decision",
    dimensions=search_alg.metric,
    template="plotly",
)
fig.show()

In [23]:
from ray.tune.search import Searcher
import rexmex


def evaluate(results: pd.DataFrame, space: Dict[str, Any], search_alg: Searcher):
    f = results[search_alg.metric].to_numpy()
    d_f = results[results["decision"] == True][search_alg.metric].to_numpy()[0]
    pf = []
    bounds = []
    for variable, mode in zip(space.values(), search_alg.mode):
        if mode == "max":
            pf.append(variable.categories[-1])
        else:
            pf.append(variable.categories[0])
        bounds.append((variable.categories[0], variable.categories[-1]))
    evaluation = {}
    evaluation["smape f"] = rexmex.metrics.symmetric_mean_absolute_percentage_error([pf] * f.shape[0], f)
    evaluation["mae f"] = rexmex.metrics.mean_absolute_error([pf] * f.shape[0], f)
    r_mae_f = []
    for f_row in f:
        f_err = []
        for i in range(f.shape[1]):
            bound = np.abs(bounds[i][1] - bounds[i][0])
            dist = np.abs(pf[i] - f_row[i])
            err = dist / bound
            f_err.append(err)
        r_mae_f.append(np.average(f_err))
    evaluation["bound relative mae f"] = np.average(r_mae_f)
    evaluation["smape d_f"] = rexmex.metrics.symmetric_mean_absolute_percentage_error(np.array(pf), np.array(d_f))
    evaluation["mae d_f"] = rexmex.metrics.mean_absolute_error(pf, d_f)
    d_f_err = []
    for i in range(f.shape[1]):
        bound = np.abs(bounds[i][1] - bounds[i][0])
        dist = np.abs(pf[i] - d_f[i])
        err = dist / bound
        d_f_err.append(err)
    evaluation["bound relative mae d_f"] = np.average(d_f_err)

    return evaluation


labels = ["optuna_combined_un"]
evaluation_df_results = df_results.copy(deep=True)
evaluations = []
min_evaluation = {}
for result, label in zip(evaluation_df_results, labels):
    evaluation = evaluate(df_results, space, search_alg)

    for key, value in evaluation.items():
        if min_evaluation.get(key, None) is None:
            min_evaluation[key] = {
                "value": value,
                "label": label
            }
        if value < min_evaluation[key]["value"]:
            min_evaluation[key]["value"] = value
            min_evaluation[key]["label"] = label

    evaluations.append(evaluation)

df_evaluation = pd.DataFrame(evaluations, index=[str(label) for label in labels])
df_evaluation

Unnamed: 0,smape f,mae f,bound relative mae f,smape d_f,mae d_f,bound relative mae d_f
optuna_combined_un,138.5677,25825280.0,0.513297,127.000921,28735500.0,0.382317
