Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 6 additions & 18 deletions simopt/experiment_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
problem_directory,
solver_directory,
)
from simopt.utils import make_nonzero

# Imports exclusively used when type checking
# Prevents imports from being executed at runtime
Expand Down Expand Up @@ -1529,6 +1530,7 @@ def post_normalize(
initial_obj_val = np.mean(x0_postreps)
opt_obj_val = np.mean(xstar_postreps)
initial_opt_gap = initial_obj_val - opt_obj_val
initial_opt_gap = make_nonzero(initial_opt_gap, "initial_opt_gap")
# Store x0 and x* info and compute progress curves for each ProblemSolver.
for experiment in experiments:
# DOUBLE-CHECK FOR SHALLOW COPY ISSUES.
Expand Down Expand Up @@ -1562,24 +1564,10 @@ def post_normalize(
)
)
# Normalize by initial optimality gap.
if initial_opt_gap == 0:
warning_msg = "Divide by zero during post-normalization (initial_opt_gap is 0)."
logging.warning(warning_msg)
norm_est_objectives = []
for est_objective in est_objectives:
est_diff = est_objective - opt_obj_val
# Follow IEEE 754 standard for division by zero.
if est_diff < 0:
norm_est_objectives.append(-float("inf"))
elif est_diff > 0:
norm_est_objectives.append(float("inf"))
else:
norm_est_objectives.append(float("nan"))
else:
norm_est_objectives = [
(est_objective - opt_obj_val) / initial_opt_gap
for est_objective in est_objectives
]
norm_est_objectives = [
(est_objective - opt_obj_val) / initial_opt_gap
for est_objective in est_objectives
]
frac_intermediate_budgets = [
budget / experiment.problem.factors["budget"]
for budget in experiment.all_intermediate_budgets[mrep]
Expand Down
90 changes: 35 additions & 55 deletions simopt/solvers/spsa.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import logging
from typing import Callable
from simopt.utils import classproperty
from simopt.utils import classproperty, make_nonzero

import numpy as np

Expand Down Expand Up @@ -295,25 +295,15 @@ def solve(self, problem: Problem) -> tuple[list[Solution], list[int]]:
delta,
)
gbar += np.abs(np.divide(ghat, self.factors["gavg"]))
meangbar = np.mean(gbar) / (
self.factors["n_loss"] / (2 * self.factors["gavg"])
)

a_leftside = self.factors["step"] * (
(aalg + 1) ** self.factors["alpha"]
)
if meangbar == 0:
warning_msg = "Division by zero in SPSA solver (meangbar == 0)"
logging.warning(warning_msg)
# Follow IEEE 754 standard.
if a_leftside < 0:
a = -np.inf
elif a_leftside > 0:
a = np.inf
else:
a = np.nan
else:
a = a_leftside / meangbar
meangbar = np.mean(gbar) / (
self.factors["n_loss"] / (2 * self.factors["gavg"])
)
meangbar = make_nonzero(meangbar, "meangbar")
a = a_leftside / meangbar
# Run the main algorithm.
# Initiate iteration counter.
k = 0
Expand Down Expand Up @@ -347,18 +337,8 @@ def solve(self, problem: Problem) -> tuple[list[Solution], list[int]]:
mean_plus = thetaminus_sol.objectives_mean * step_weight_plus
mean_net = mean_minus + mean_plus
net_step_weight = step_weight_plus + step_weight_minus
if net_step_weight == 0:
warning_msg = "Division by zero in SPSA solver (step_weight_minus = step_weight_plus)"
logging.warning(warning_msg)
# Follow IEEE 754 standard.
if mean_net < 0:
ftheta = -np.inf
elif mean_net > 0:
ftheta = np.inf
else:
ftheta = np.nan
else:
ftheta = mean_net / net_step_weight
net_step_weight = make_nonzero(net_step_weight, "net_step_weight")
ftheta = mean_net / net_step_weight
# If on the first iteration, record the initial solution as best estimated objective.
if k == 1:
ftheta_best = ftheta
Expand All @@ -373,9 +353,11 @@ def solve(self, problem: Problem) -> tuple[list[Solution], list[int]]:
recommended_solns.append(theta_sol)
intermediate_budgets.append(expended_budget)
# Estimate gradient. (-minmax is needed to cast this as a minimization problem.)
theta_mean_diff = (
thetaplus_sol.objectives_mean - thetaminus_sol.objectives_mean
)
ghat = np.dot(-1, problem.minmax) * np.divide(
(thetaplus_sol.objectives_mean - thetaminus_sol.objectives_mean)
/ ((step_weight_plus + step_weight_minus) * c),
theta_mean_diff / (net_step_weight * c),
delta,
)
# Take step and check feasibility.
Expand All @@ -396,35 +378,33 @@ def check_cons(
"""Evaluates the distance from the new vector (candiate_x) compared to the current vector (new_x) respecting the vector's boundaries of feasibility.
Returns the evaluated vector (modified_x) and the weight (t2 - how much of a full step took) of the new vector.
The weight (t2) is used to calculate the weigthed average in the ftheta calculation."""
# The current step.
max_step = 1e15 # Large finite replacement for infinite steps
# Compute step direction
current_step = np.subtract(candidate_x, new_x)
# Form a matrix to determine the possible stepsize.
step_size_matrix = np.ones((2, len(candidate_x)))
for i in range(0, len(candidate_x)):
# Initialize minimum step size
# TODO: figure out if this should be larger than 1
min_step_size = 1
for i in range(len(candidate_x)):
if current_step[i] > 0:
diff = upper_bound[i] - new_x[i]
if current_step[i] == np.inf:
warning_msg = "Division by +inf in SPSA solver"
logging.warning(warning_msg)
# IEEE 754 standard.
step_size_matrix[0, i] = 0
else:
step_size_matrix[0, i] = diff / current_step[i]
elif current_step[i] < 0:
diff = lower_bound[i] - new_x[i]
if current_step[i] == -np.inf:
warning_msg = "Division by -inf in SPSA solver"
logging.warning(warning_msg)
# IEEE 754 standard.
step_size_matrix[1, i] = 0
else:
step_size_matrix[1, i] = diff / current_step[i]
# Find the minimum stepsize.
min_step_size = step_size_matrix.min()
else:
continue

# Handle infinite steps
if np.isinf(current_step[i]):
logging.warning("Infinite step encountered in SPSA solver")
current_step[i] = np.sign(current_step[i]) * max_step

# Ensure denominator is never too small while preserving sign
current_step[i] = make_nonzero(current_step[i], f"current_step[{i}]")

# Compute safe step size
step_size = diff / current_step[i]
if step_size < min_step_size:
min_step_size = step_size

# Calculate the modified x.
if min_step_size == 0:
# If t2 is 0, then there shouldn't be any change.
modified_x = new_x
else:
modified_x = new_x + min_step_size * current_step
modified_x = new_x + min_step_size * current_step
return modified_x, min_step_size
39 changes: 10 additions & 29 deletions simopt/solvers/strong.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@

from __future__ import annotations

import logging
import math
from typing import Callable, Literal
from simopt.utils import classproperty
from simopt.utils import classproperty, make_nonzero

import numpy as np
from numpy.linalg import norm
Expand Down Expand Up @@ -325,19 +324,9 @@ def solve(self, problem: Problem) -> tuple[list[Solution], list[int]]:
np.subtract(candidate_x, new_x),
)
)
r_diff = r_old - r_new
if r_diff == 0:
warning_msg = "Division by zero in STRONG solver (r_diff == 0 (Step I_3))"
logging.warning(warning_msg)
# Follow IEEE 754 standard.
if g_diff < 0:
rho = -np.inf
elif g_diff > 0:
rho = np.inf
else:
rho = np.nan
else:
rho = g_diff / r_diff
r_diff = (r_old - r_new)[0]
r_diff = make_nonzero(r_diff, "r_diff (stage I)")
rho = g_diff / r_diff

# Step 4: Update the trust region size and determine to accept or reject the solution.
if (rho < eta_0) | (g_diff <= 0) | (r_diff <= 0):
Expand Down Expand Up @@ -430,13 +419,9 @@ def solve(self, problem: Problem) -> tuple[list[Solution], list[int]]:
np.subtract(candidate_x, new_x),
)
)
r_diff = r_old - r_new
if r_diff == 0:
warning_msg = "Division by zero in STRONG solver (r_diff == 0 (Step II_3))"
logging.warning(warning_msg)
rho = 0
else:
rho = g_diff / r_diff
r_diff = (r_old - r_new)[0]
r_diff = make_nonzero(r_diff, "rdiff (stage II)")
rho = g_diff / r_diff
# Step 4: Update the trust region size and determine to accept or reject the solution.
if (rho < eta_0) | (g_diff <= 0) | (r_diff <= 0):
# Inner Loop.
Expand Down Expand Up @@ -556,13 +541,9 @@ def solve(self, problem: Problem) -> tuple[list[Solution], list[int]]:
rr_old = g_b_old
# Set rho to the ratio.
g_b_diff = g_b_old - g_b_new
rr_diff = rr_old - rr_new
if rr_diff == 0:
warning_msg = "Division by zero in STRONG solver (rr_diff == 0)"
logging.warning(warning_msg)
rrho = 0
else:
rrho = (g_b_diff) / (rr_diff)
rr_diff = (rr_old - rr_new)[0]
rr_diff = make_nonzero(rr_diff, "rr_diff")
rrho = (g_b_diff) / (rr_diff)

if (
(rrho < eta_0)
Expand Down
35 changes: 35 additions & 0 deletions simopt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,38 @@ def __get__(self, instance: Any, owner: type[T]) -> Any: # noqa: ANN401
def classproperty(func: Callable[[type[T]], Any]) -> ClassPropertyDescriptor:
"""Decorator to create a class property using a descriptor."""
return ClassPropertyDescriptor(func)


def make_nonzero(value: float, name: str, epsilon: float = 1e-15) -> float:
"""Return a non-zero value to avoid division by zero.

Arguments
---------
value : float
The value to check.
name : str
The name of the variable.
epsilon : float, optional (default=1e-15)
The value to use if the original value is zero.

Returns
-------
float
The original value if it's not close to zero, otherwise a non-zero value.
"""
# Delayed imports
import numpy as np

# If it's not close to 0, return the original value
if not np.isclose(value, 0, atol=epsilon):
return value

# Otherwise, calculate the new value
import logging

new_value = epsilon if value == 0 else np.sign(value) * epsilon
warning_msg = (
f"{name} is {value}. Setting to {new_value} to avoid division by zero."
)
logging.warning(warning_msg)
return new_value
Loading