Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loose coupling nsgaii elite population selection #4821

15 changes: 2 additions & 13 deletions optuna/samplers/_nsgaiii.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for confirmation. The loose coupling of the elite population selection for the NSGAIIISampler will be done as a follow-up?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I am going to do it as a follow-up.

Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from optuna.samplers.nsgaii._crossovers._base import BaseCrossover
from optuna.samplers.nsgaii._crossovers._uniform import UniformCrossover
from optuna.samplers.nsgaii._dominates_function import _constrained_dominates
from optuna.samplers.nsgaii._sampler import _fast_non_dominated_sort
from optuna.samplers.nsgaii._sampler import _validate_constraints
from optuna.samplers.nsgaii._dominates_function import _validate_constraints
from optuna.samplers.nsgaii._elite_population_selection_strategy import _fast_non_dominated_sort
from optuna.study import Study
from optuna.study._multi_objective import _dominates
from optuna.trial import FrozenTrial
Expand Down Expand Up @@ -102,17 +102,6 @@ def __init__(
if population_size < 2:
raise ValueError("`population_size` must be greater than or equal to 2.")

if not (mutation_prob is None or 0.0 <= mutation_prob <= 1.0):
raise ValueError(
"`mutation_prob` must be None or a float value within the range [0.0, 1.0]."
)

if not (0.0 <= crossover_prob <= 1.0):
raise ValueError("`crossover_prob` must be a float value within the range [0.0, 1.0].")

if not (0.0 <= swapping_prob <= 1.0):
raise ValueError("`swapping_prob` must be a float value within the range [0.0, 1.0].")

toshihikoyanase marked this conversation as resolved.
Show resolved Hide resolved
if crossover is None:
crossover = UniformCrossover(swapping_prob)

Expand Down
17 changes: 17 additions & 0 deletions optuna/samplers/nsgaii/_dominates_function.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from __future__ import annotations

from collections.abc import Callable
from collections.abc import Sequence
import warnings

import numpy as np

from optuna.samplers._base import _CONSTRAINTS_KEY
from optuna.study import StudyDirection
from optuna.study._multi_objective import _dominates
Expand Down Expand Up @@ -81,3 +84,17 @@ def _constrained_dominates(
violation0 = sum(v for v in constraints0 if v > 0)
violation1 = sum(v for v in constraints1 if v > 0)
return violation0 < violation1


def _validate_constraints(
population: list[FrozenTrial],
constraints_func: Callable[[FrozenTrial], Sequence[float]] | None = None,
) -> None:
if constraints_func is None:
return
for _trial in population:
_constraints = _trial.system_attrs.get(_CONSTRAINTS_KEY)
if _constraints is None:
continue
if np.any(np.isnan(np.array(_constraints))):
raise ValueError("NaN is not acceptable as constraint value.")
150 changes: 150 additions & 0 deletions optuna/samplers/nsgaii/_elite_population_selection_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from __future__ import annotations

from collections import defaultdict
from collections.abc import Callable
from collections.abc import Sequence
import itertools

import optuna
from optuna.samplers.nsgaii._dominates_function import _constrained_dominates
from optuna.samplers.nsgaii._dominates_function import _validate_constraints
from optuna.study import Study
from optuna.study._multi_objective import _dominates
from optuna.trial import FrozenTrial


class NSGAIIElitePopulationSelectionStrategy:
def __init__(
self,
*,
population_size: int,
constraints_func: Callable[[FrozenTrial], Sequence[float]] | None = None,
) -> None:
if population_size < 2:
raise ValueError("`population_size` must be greater than or equal to 2.")

self._population_size = population_size
self._constraints_func = constraints_func

def __call__(self, study: Study, population: list[FrozenTrial]) -> list[FrozenTrial]:
"""Select elite population from the given trials by NSGA-II algorithm.

Args:
study:
Target study object.
population:
Trials in the study.

Returns:
A list of trials that are selected as elite population.
"""
_validate_constraints(population, self._constraints_func)
dominates = _dominates if self._constraints_func is None else _constrained_dominates
population_per_rank = _fast_non_dominated_sort(population, study.directions, dominates)

elite_population: list[FrozenTrial] = []
for individuals in population_per_rank:
if len(elite_population) + len(individuals) < self._population_size:
elite_population.extend(individuals)
else:
n = self._population_size - len(elite_population)
_crowding_distance_sort(individuals)
elite_population.extend(individuals[:n])
break

return elite_population


def _calc_crowding_distance(population: list[FrozenTrial]) -> defaultdict[int, float]:
"""Calculates the crowding distance of population.

We define the crowding distance as the summation of the crowding distance of each dimension
of value calculated as follows:

* If all values in that dimension are the same, i.e., [1, 1, 1] or [inf, inf],
the crowding distances of all trials in that dimension are zero.
* Otherwise, the crowding distances of that dimension is the difference between
two nearest values besides that value, one above and one below, divided by the difference
between the maximal and minimal finite value of that dimension. Please note that:
* the nearest value below the minimum is considered to be -inf and the
nearest value above the maximum is considered to be inf, and
* inf - inf and (-inf) - (-inf) is considered to be zero.
"""

manhattan_distances: defaultdict[int, float] = defaultdict(float)
if len(population) == 0:
return manhattan_distances

for i in range(len(population[0].values)):
population.sort(key=lambda x: x.values[i])

# If all trials in population have the same value in the i-th dimension, ignore the
# objective dimension since it does not make difference.
if population[0].values[i] == population[-1].values[i]:
continue

vs = [-float("inf")] + [trial.values[i] for trial in population] + [float("inf")]

# Smallest finite value.
v_min = next(x for x in vs if x != -float("inf"))

# Largest finite value.
v_max = next(x for x in reversed(vs) if x != float("inf"))

width = v_max - v_min
if width <= 0:
# width == 0 or width == -inf
width = 1.0

for j in range(len(population)):
# inf - inf and (-inf) - (-inf) is considered to be zero.
gap = 0.0 if vs[j] == vs[j + 2] else vs[j + 2] - vs[j]
manhattan_distances[population[j].number] += gap / width
return manhattan_distances


def _crowding_distance_sort(population: list[FrozenTrial]) -> None:
manhattan_distances = _calc_crowding_distance(population)
population.sort(key=lambda x: manhattan_distances[x.number])
population.reverse()


def _fast_non_dominated_sort(
population: list[FrozenTrial],
directions: list[optuna.study.StudyDirection],
dominates: Callable[[FrozenTrial, FrozenTrial, list[optuna.study.StudyDirection]], bool],
) -> list[list[FrozenTrial]]:
dominated_count: defaultdict[int, int] = defaultdict(int)
dominates_list = defaultdict(list)

for p, q in itertools.combinations(population, 2):
if dominates(p, q, directions):
dominates_list[p.number].append(q.number)
dominated_count[q.number] += 1
elif dominates(q, p, directions):
dominates_list[q.number].append(p.number)
dominated_count[p.number] += 1

population_per_rank = []
while population:
non_dominated_population = []
i = 0
while i < len(population):
if dominated_count[population[i].number] == 0:
individual = population[i]
if i == len(population) - 1:
population.pop()
else:
population[i] = population.pop()
non_dominated_population.append(individual)
else:
i += 1

for x in non_dominated_population:
for y in dominates_list[x.number]:
dominated_count[y] -= 1

assert non_dominated_population
population_per_rank.append(non_dominated_population)

return population_per_rank
Loading