# Scalable Constrained Bayesian Optimization (SCBO)
In this tutorial, we show how to implement Scalable Constrained Bayesian Optimization (SCBO) [1] in a closed loop in BoTorch.

We optimize the 20𝐷 Ackley function on the domain [−5,10]20. This implementation uses two simple constraint functions c1 and c2. Our goal is to find values x which maximize Ackley(x) subject to the constraints c1(x) <= 0 and c2(x) <= 0.

[1]: David Eriksson and Matthias Poloczek. Scalable constrained Bayesian optimization. In International Conference on Artificial Intelligence and Statistics, pages 730–738. PMLR, 2021.
(https://doi.org/10.48550/arxiv.2002.08526)

Since SCBO is essentially a constrained version of Trust Region Bayesina Optimization (TuRBO), this tutorial shares much of the same code as the TuRBO Tutorial (https://botorch.org/tutorials/turbo_1) with small modifications made to implement SCBO.

In [2]:
import math
from dataclasses import dataclass

import torch
from botorch.fit import fit_gpytorch_model
from botorch.models import SingleTaskGP
from botorch.test_functions import Ackley
from botorch.utils.transforms import unnormalize
from torch.quasirandom import SobolEngine

import gpytorch
from gpytorch.constraints import Interval
from gpytorch.kernels import MaternKernel, ScaleKernel
from gpytorch.likelihoods import GaussianLikelihood
from gpytorch.mlls import ExactMarginalLogLikelihood
from botorch.models.model_list_gp_regression import ModelListGP

# Constrained Max Posterior Sampling 
# is a new sampling class, similar to MaxPosteriorSampling, 
# which implements the constrained version of Thompson Sampling described in [1]
from botorch.generation.sampling import ConstrainedMaxPosteriorSampling

import warnings
warnings.filterwarnings("ignore")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dtype = torch.float 

## Demonstration with 20-dimensional Ackley function and Two Simple Constraint Functions

In [2]:
# Here we define the example 20D Ackley function 
fun = Ackley(dim=20, negate=True).to(dtype=dtype, device=device)
fun.bounds[0, :].fill_(-5)
fun.bounds[1, :].fill_(10)
dim = fun.dim
lb, ub = fun.bounds

batch_size = 4
n_init = 2 * dim
max_cholesky_size = float("inf")  # Always use Cholesky

# When evaluating the function, we must first unnormalize the inputs since 
# we will use normalized inputs x in the main optimizaiton loop
def eval_objective(x):
    """This is a helper function we use to unnormalize and evalaute a point"""
    return fun(unnormalize(x, fun.bounds))

### Defining two simple constraint functions

#### We'll use two constraints functions: c1 and c2 
We want to find solutions which maximize the above Ackley objective subject tot he constraint that 
c1(x) <= 0 and c2(x) <= 0 
Note that SCBO expects all constraints to be of the for c(x) <= 0, so any other desired constraints must be modified to fit this form. 

Note also that while the below constraints are very simple functions, the point of this tutorial is to show how to use SCBO, and this same implementation could be applied in the same way if c1, c2 were actually complex black-box functions. 


In [3]:
def c1(x): # Equivalent to enforcing that x[0] >= 0
    return -x[0] 

def c2(x): # Equivalent to enforcing that x[1] >= 0
    return -x[1] 

# We assume c1, c2 have same bounds as the Ackley function above 
def eval_c1(x):
    """This is a helper function we use to unnormalize and evalaute a point"""
    return c1(unnormalize(x, fun.bounds))

def eval_c2(x):
    """This is a helper function we use to unnormalize and evalaute a point"""
    return c2(unnormalize(x, fun.bounds))

## Define TuRBO Class

Just as in the TuRBO Tutorial (https://botorch.org/tutorials/turbo_1), we'll define a class to hold the turst region state and a method update_state() to update the lengths of the trust region as discussed in the original TuRBO paper. 

In [4]:
@dataclass
class TurboState:
    dim: int
    batch_size: int
    length: float = 0.8
    length_min: float = 0.5 ** 7
    length_max: float = 1.6
    failure_counter: int = 0
    failure_tolerance: int = float("nan")  # Note: Post-initialized
    success_counter: int = 0
    success_tolerance: int = 10  # Note: The original paper uses 3
    best_value: float = -float("inf")
    restart_triggered: bool = False

    def __post_init__(self):
        self.failure_tolerance = math.ceil(
            max([4.0 / self.batch_size, float(self.dim) / self.batch_size])
        )


def update_state(state, Y_next):
    if max(Y_next) > state.best_value + 1e-3 * math.fabs(state.best_value):
        state.success_counter += 1
        state.failure_counter = 0
    else:
        state.success_counter = 0
        state.failure_counter += 1

    if state.success_counter == state.success_tolerance:  # Expand trust region
        state.length = min(2.0 * state.length, state.length_max)
        state.success_counter = 0
    elif state.failure_counter == state.failure_tolerance:  # Shrink trust region
        state.length /= 2.0
        state.failure_counter = 0

    state.best_value = max(state.best_value, max(Y_next).item())
    if state.length < state.length_min:
        state.restart_triggered = True
    return state

# Define example state 
state = TurboState(dim=dim, batch_size=batch_size)
print(state)

TurboState(dim=20, batch_size=4, length=0.8, length_min=0.0078125, length_max=1.6, failure_counter=0, failure_tolerance=5, success_counter=0, success_tolerance=10, best_value=-inf, restart_triggered=False)


### Generate Initial Points

Here we define a simple method to generate a set of random initial datapoints that we will use to kick-off optimization. 

In [5]:
def get_initial_points(dim, n_pts, seed=0):
    sobol = SobolEngine(dimension=dim, scramble=True, seed=seed)
    X_init = sobol.draw(n=n_pts).to(dtype=dtype, device=device)
    return X_init

## Generate batch modifications for SCBO 

Just as in the TuRBO Tutorial (https://botorch.org/tutorials/turbo_1), we'll define a method generate_batch to generate a new batch of candidate points within the TuRBO trust region using thompson sampling. 

The key difference here from TuRBO is that, instead of using MaxPosteriorSampling to simply grab the candidates within the trust region with the maximum posterior values, we use ConstrainedMaxPosteriorSampling to instead grab the candidates within the trust region with the maximum posterior values subject to the constrain that the posteriors for the constraint models for c1(x) and c2(x) must be less than or equal to 0 for each candidate. In otherwords, we use additional GPs ('constraiant models') to model each black-box constraint (c1 and c2), and throw out all candidates for which the posterior prediction of these constraint models is greater than 0 (throw out all predicted constraint violators). According to [1], in the special case when all of the candidaates arae predicted to be constraint violators, we select the candidate with the minimum predicted violation. (See botorch.generation.sampling.ConstrainedMaxPosteriorSampling for implementation details).

In [10]:
def generate_batch(
    state,
    model,  # GP model
    X,  # Evaluated points on the domain [0, 1]^d
    Y,  # Function values
    batch_size,
    n_candidates=None,  # Number of candidates for Thompson sampling
    constraint_model=None
):
    assert X.min() >= 0.0 and X.max() <= 1.0 and torch.all(torch.isfinite(Y))
    if n_candidates is None:
        n_candidates = min(5000, max(2000, 200 * X.shape[-1]))

    # Scale the TR to be proportional to the lengthscales
    x_center = X[Y.argmax(), :].clone()
    weights = model.covar_module.base_kernel.lengthscale.squeeze().detach()
    weights = weights / weights.mean()
    weights = weights / torch.prod(weights.pow(1.0 / len(weights)))
    tr_lb = torch.clamp(x_center - weights * state.length / 2.0, 0.0, 1.0)
    tr_ub = torch.clamp(x_center + weights * state.length / 2.0, 0.0, 1.0)

    # Thompson Sampling w/ Constraints (SCBO) 
    dim = X.shape[-1]
    sobol = SobolEngine(dim, scramble=True)
    pert = sobol.draw(n_candidates).to(dtype=dtype, device=device)
    pert = tr_lb + (tr_ub - tr_lb) * pert

    # Create a perturbation mask
    prob_perturb = min(20.0 / dim, 1.0)
    mask = (
        torch.rand(n_candidates, dim, dtype=dtype, device=device)
        <= prob_perturb
    )
    ind = torch.where(mask.sum(dim=1) == 0)[0]
    mask[ind, torch.randint(0, dim - 1, size=(len(ind),), device=device)] = 1

    # Create candidate points from the perturbations and the mask        
    X_cand = x_center.expand(n_candidates, dim).clone()
    X_cand[mask] = pert[mask]

    # Sample on the candidate points using Constrained Max Posterior Sampling
    constrained_thompson_sampling = ConstrainedMaxPosteriorSampling(model=model, constraint_model=constraint_model, replacement=False)
    with torch.no_grad():  
        X_next = constrained_thompson_sampling(X_cand, num_samples=batch_size)


    return X_next

## Main Optimization Loop

In [25]:
# Get initial data 
# Must get initial values for both objective and constraints 
X_turbo = get_initial_points(dim, n_init)
Y_turbo = torch.tensor(
    [eval_objective(x) for x in X_turbo], dtype=dtype, device=device
).unsqueeze(-1)
C1_turbo = torch.tensor(
    [eval_c1(x) for x in X_turbo], dtype=dtype, device=device
).unsqueeze(-1)
C2_turbo = torch.tensor(
    [eval_c2(x) for x in X_turbo], dtype=dtype, device=device
).unsqueeze(-1)

C1_turbo.min(), C1_turbo.max(), C2_turbo.min(), C2_turbo.max()

(tensor(-9.6854, device='cuda:0'),
 tensor(4.8509, device='cuda:0'),
 tensor(-9.8032, device='cuda:0'),
 tensor(4.7317, device='cuda:0'))

In [26]:
# Initialize TuRBO state 
state = TurboState(dim, batch_size=batch_size)
N_CANDIDATES = min(5000, max(2000, 200 * dim)) 

 
while not state.restart_triggered:  # Run until TuRBO converges
    # Fit a GP model for objective 
    train_Y = (Y_turbo - Y_turbo.mean()) / Y_turbo.std()
    likelihood = GaussianLikelihood(noise_constraint=Interval(1e-8, 1e-3))
    covar_module = ScaleKernel(  # Use the same lengthscale prior as in the TuRBO paper
        MaternKernel(nu=2.5, ard_num_dims=dim, lengthscale_constraint=Interval(0.005, 4.0))
    )
    model = SingleTaskGP(X_turbo, train_Y, covar_module=covar_module, likelihood=likelihood)
    mll = ExactMarginalLogLikelihood(model.likelihood, model)

    # Fit a GP model for C1 
    train_C1 = (C1_turbo - C1_turbo.mean()) / C1_turbo.std()
    c1_likelihood = GaussianLikelihood(noise_constraint=Interval(1e-8, 1e-3))
    c1_covar_module = ScaleKernel(  # Use the same lengthscale prior as in the TuRBO paper
        MaternKernel(nu=2.5, ard_num_dims=dim, lengthscale_constraint=Interval(0.005, 4.0))
    )
    c1_model = SingleTaskGP(X_turbo, train_C1, covar_module=c1_covar_module, likelihood=c1_likelihood)
    c1_mll = ExactMarginalLogLikelihood(c1_model.likelihood, c1_model)

    # Fit a GP model for C2 
    train_C2 = (C2_turbo - C2_turbo.mean()) / C2_turbo.std()
    c2_likelihood = GaussianLikelihood(noise_constraint=Interval(1e-8, 1e-3))
    c2_covar_module = ScaleKernel(  # Use the same lengthscale prior as in the TuRBO paper
        MaternKernel(nu=2.5, ard_num_dims=dim, lengthscale_constraint=Interval(0.005, 4.0))
    )
    c2_model = SingleTaskGP(X_turbo, train_C2, covar_module=c2_covar_module, likelihood=c2_likelihood)
    c2_mll = ExactMarginalLogLikelihood(c2_model.likelihood, c2_model)

    # Do the fitting and acquisition function optimization inside the Cholesky context
    with gpytorch.settings.max_cholesky_size(max_cholesky_size):
        # Fit the models
        fit_gpytorch_model(mll)
        fit_gpytorch_model(c1_mll)
        fit_gpytorch_model(c2_mll)
    
        # Generate a batch of candidates
        X_next = generate_batch(
            state=state,
            model=model,
            X=X_turbo,
            Y=train_Y,
            batch_size=batch_size,
            n_candidates=N_CANDIDATES,
            constraint_model=ModelListGP(c1_model, c2_model)    
        )

    # Evaluate both the objective and constraints for the selected candidaates 
    Y_next = torch.tensor(
        [eval_objective(x) for x in X_next], dtype=dtype, device=device
    ).unsqueeze(-1) 

    C1_next = torch.tensor(
        [eval_c1(x) for x in X_next], dtype=dtype, device=device
    ).unsqueeze(-1) 

    C2_next = torch.tensor( 
        [eval_c2(x) for x in X_next], dtype=dtype, device=device
    ).unsqueeze(-1) 

    # Grab the truly valid samples (Valid_Y_next)
    #   (valid samples must have BOTH c1 <= 0 and c2 <= 0 )
    constraint_vals = torch.cat([C1_next, C2_next], dim=-1) 
    bool_tensor = constraint_vals <= 0 
    bool_tensor = torch.all(bool_tensor, dim=-1).unsqueeze(-1) 
    Valid_Y_next = Y_next[bool_tensor] 

    # Update state only with VALID samples only 
    #   ie we only want to count a success and update 
    #   our current best point found when we have 
    #   found a new point that both improves upon our current best,
    #   and meets all constraints
    # Only update the state if there are new valid points to update with
    if Valid_Y_next.numel() > 0: 
        state = update_state(state=state, Y_next=Valid_Y_next)
    else:
        print("None of the suggested candidates met the constraints...")

    # Append data
    #   Notice we append all data, even points that violate
    #   the constriants, this is so our constraint models
    #   can learn more about the constranit functions and 
    #   gain confidence about where violation occurs
    X_turbo = torch.cat((X_turbo, X_next), dim=0)
    Y_turbo = torch.cat((Y_turbo, Y_next), dim=0)
    C1_turbo = torch.cat((C1_turbo, C1_next), dim=0)
    C2_turbo = torch.cat((C2_turbo, C2_next), dim=0)

    # Print current status 
    #   Note: Since only valid y's that met the constraints were used to update the 
    #   trust region state, state.best_value is always the best objective value
    #   found so far which meets the constraints 
    print(
        f"{len(X_turbo)}) Best value: {state.best_value:.2e}, TR length: {state.length:.2e}"
    )

None of the suggested candidates met the constraints...
44) Best value: -inf, TR length: 8.00e-01
None of the suggested candidates met the constraints...
48) Best value: -inf, TR length: 8.00e-01
52) Best value: -1.24e+01, TR length: 8.00e-01
56) Best value: -1.22e+01, TR length: 8.00e-01
60) Best value: -1.13e+01, TR length: 8.00e-01
64) Best value: -1.03e+01, TR length: 8.00e-01
68) Best value: -1.03e+01, TR length: 8.00e-01
72) Best value: -1.03e+01, TR length: 8.00e-01
76) Best value: -1.03e+01, TR length: 8.00e-01
80) Best value: -1.03e+01, TR length: 8.00e-01
84) Best value: -1.03e+01, TR length: 4.00e-01
88) Best value: -9.54e+00, TR length: 4.00e-01
92) Best value: -9.34e+00, TR length: 4.00e-01
96) Best value: -8.60e+00, TR length: 4.00e-01
100) Best value: -8.03e+00, TR length: 4.00e-01
104) Best value: -7.93e+00, TR length: 4.00e-01
108) Best value: -7.93e+00, TR length: 4.00e-01
112) Best value: -7.93e+00, TR length: 4.00e-01
116) Best value: -7.93e+00, TR length: 4.00e-01


In [27]:
#  Valid samples must have BOTH c1 <= 0 and c2 <= 0  
constraint_vals = torch.cat([C1_turbo, C2_turbo], dim=-1) 
bool_tensor = constraint_vals <= 0 
bool_tensor = torch.all(bool_tensor, dim=-1).unsqueeze(-1) 
Valid_Y = Y_turbo[bool_tensor] 

print(f"With constraints, the best value we found is: {Valid_Y.max().item():.4f}") 

With constraints, the best value we found is: -5.6503
