Working on https://pyro.ai/examples/bo.html


In [None]:
import math
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
from mpl_toolkits import mplot3d
import numpy as np
import torch
import torch.autograd as autograd
import torch.optim as optim
from torch.distributions import constraints, transform_to

import pyro
import pyro.contrib.gp as gp

assert pyro.__version__.startswith('0.4')
pyro.enable_validation(True)  # can help with debugging
pyro.set_rng_seed(1)

# Objective function

In [None]:
const_a = 1.0
const_b = 5.1 / (4.0 * math.pow(math.pi, 2))
const_c = 5.0 / math.pi
const_r = 6.0
const_s = 10
const_t = 1.0 / (8.0 * math.pi)

def branin_hoo_first_term(X):
    
    fx = const_s * (1.0 - const_t) * torch.cos(X[0]) + const_s
    
    return fx

def branin_hoo(X):
    
    fx = branin_hoo_first_term(X) + \
        const_a * torch.pow(X[1] - const_b * torch.pow(X[0], 2) + const_c * X[0] - const_r, 2)
    
    return fx

In [None]:
steps = 1000
strides = 200

const_x1_min = -5
const_x1_max = 10

X1 = torch.linspace(const_x1_min, const_x1_max, steps)

const_x2_min = 0
const_x2_max = 15

X2 = torch.linspace(const_x2_min, const_x2_max, steps)

In [None]:
# Plot f(x)

plt.figure(figsize=(8, 4))

ax = plt.axes(projection='3d')

X1_mesh, X2_mesh = torch.meshgrid(X1, X2)

Z_mesh = branin_hoo(torch.stack((X1_mesh, X2_mesh)))

ax = plt.axes(projection='3d')
ax.contour3D(X1_mesh, X2_mesh, Z_mesh, strides)

ax.set_xlabel('x1')
ax.set_ylabel('x2')
ax.set_zlabel('b-hoo')

plt.show()

# initialize the model with four input points

In [None]:
X1_train = torch.tensor([x for x in np.random.uniform(low=const_x1_min, high=const_x1_max, size=5)])
X2_train = torch.tensor([x for x in np.random.uniform(low=const_x2_min, high=const_x2_max, size=5)])

X_train = torch.stack((X1_train, X2_train))
Y_train = branin_hoo(X_train)

train_cnt = len(Y_train)

In [None]:
gpmodel = gp.models.GPRegression(X_train.T, Y_train, 
                                 gp.kernels.Matern52(input_dim=2), 
                                 noise=torch.tensor(0.1), 
                                 jitter=1.0e-4)

# Comparing the gpmodel with the objective function

In [None]:
X1_tmp = torch.tensor([-4.9, 4.2, 7.0])
X2_tmp = torch.tensor([0.0, -0.8, 7.0])
X_tmp = torch.stack((X1_tmp, X2_tmp))

f_loc, f_cov = gpmodel(X_tmp.T, full_cov=True)
print("GP:", f_loc)

bh_temp = branin_hoo(X_tmp)
print("BH:", bh_temp)

In [None]:
# Checking LMs
print(branin_hoo(
        torch.stack((
            torch.tensor([-math.pi, math.pi, 9.42478]), 
            torch.tensor([12.275, 2.275, 2.475])))))

# Plotting

In [None]:
def add_points(bo_ax, gpmodel_, train_cnt):
    
    # Plotting training data
    bo_ax.scatter(
        gpmodel_.X.numpy().T[0][:train_cnt], 
        gpmodel_.X.numpy().T[1][:train_cnt],
        marker="x", s=500, c='black')

    # Plotting BO steps
    bo_ax.scatter(
        gpmodel_.X.numpy().T[0][train_cnt:], 
        gpmodel_.X.numpy().T[1][train_cnt:],
        marker="o", s=100, c='red')

    for i, iter in enumerate(gpmodel_.X.numpy()):
        if i >= train_cnt:
            bo_ax.annotate("%d" % (i+1-train_cnt), 
                        (iter.T[0] + 0.1, iter.T[1] + 0.1))

# Constracting BO

In [None]:
def update_posterior(x_new):
        
    bh_y = branin_hoo(x_new.T) # evaluate f at new point.
        
    X = torch.cat([gpmodel.X, x_new]) # incorporate new evaluation
    y = torch.cat([gpmodel.y, bh_y])
    
    gpmodel.set_data(X, y)
    
    # optimize the GP hyperparameters using Adam with lr=0.001
    optimizer = torch.optim.Adam(gpmodel.parameters(), lr=0.001)
    
    gp.util.train(gpmodel, optimizer) 

In [None]:
def acquisition_function(x):
    
    return lower_confidence_bound(x)

In [None]:
def lower_confidence_bound(x, kappa=2):
    
    mu, variance = gpmodel(x, full_cov=False, noiseless=False)
    
    sigma = variance.sqrt()
    
    return mu - kappa * sigma

In [None]:
def find_a_candidate(x_init):
    
    # transform x to an unconstrained domain
    constraint_x1 = constraints.interval(const_x1_min, const_x1_max)
    constraint_x2 = constraints.interval(const_x2_min, const_x2_max)

    unconstrained_x1_init = transform_to(constraint_x1).inv(x_init[:, 0])
    unconstrained_x2_init = transform_to(constraint_x2).inv(x_init[:, 1])

    unconstrained_x_init = torch.stack((unconstrained_x1_init, unconstrained_x2_init), dim=1)
        
    unconstrained_x = unconstrained_x_init.clone().detach().requires_grad_(True)
    
    minimizer = optim.LBFGS([unconstrained_x])

    def closure():
        minimizer.zero_grad()
                
        x1_tmp = transform_to(constraint_x1)(unconstrained_x[:, 0])
        x2_tmp = transform_to(constraint_x2)(unconstrained_x[:, 1])
        
        y = lower_confidence_bound(torch.stack((x1_tmp, x2_tmp), dim=1))
        
        autograd.backward(unconstrained_x, autograd.grad(y, unconstrained_x))
                
        return y

    minimizer.step(closure)
   
    # after finding a candidate in the unconstrained domain,
    # convert it back to original domain.
    x1_tmp = transform_to(constraint_x1)(unconstrained_x[:, 0])
    x2_tmp = transform_to(constraint_x2)(unconstrained_x[:, 1])
    
    x = torch.stack((x1_tmp, x2_tmp), dim=1)
    
    return x.detach()

In [None]:
def next_x(num_candidates=5):
    
    candidates = []
    values = []

    x_init = gpmodel.X[-1:]
    
    for i in range(num_candidates):
                
        x = find_a_candidate(x_init)
        
        y = acquisition_function(x)
        
        candidates.append(x)
        
        values.append(y)
        
        x_init = torch.stack(
                (x[:,0].new_empty(1).uniform_(const_x1_min, const_x1_max),
                 x[:,1].new_empty(1).uniform_(const_x2_min, const_x2_max)), dim=1)
        
    argmin = torch.min(torch.cat(values), dim=0)[1].item()
    
    return candidates[argmin]

In [None]:
bo_total_steps = 10
no_cols = 3

fbo, fbo_axes = plt.subplots(ncols=no_cols, nrows=bo_total_steps, figsize=(no_cols*6, bo_total_steps*6))

optimizer = torch.optim.Adam(gpmodel.parameters(), lr=0.001)
gp.util.train(gpmodel, optimizer)

for i in range(bo_total_steps):

    xmin = next_x()
    
    ########### Ploting prior
    Predict_mesh, _ = gpmodel(torch.stack((X1_mesh.flatten(), X2_mesh.flatten())).T) 

    Predict_mesh = Predict_mesh.reshape((steps, steps)).detach()
    fbo_axes[i, 0].contour(X1_mesh, X2_mesh, Predict_mesh, strides)
    fbo_axes[i, 0].title.set_text('Step %d: prior' % (i+1))
    
    ########### Plotting acquisition function
    acquisition_mesh = acquisition_function(
            torch.stack((X1_mesh.flatten(), X2_mesh.flatten())).T)
    
    acquisition_mesh = acquisition_mesh.reshape((steps, steps)).detach()
    fbo_axes[i, 1].contour(X1_mesh, X2_mesh, acquisition_mesh, strides)
    
    # Updating posterior
    update_posterior(xmin)
            
    ########### Plotting GP countour plot
    Predict_mesh, _ = gpmodel(torch.stack((X1_mesh.flatten(), X2_mesh.flatten())).T) 

    Predict_mesh = Predict_mesh.reshape((steps, steps)).detach()
    fbo_axes[i, 2].contour(X1_mesh, X2_mesh, Predict_mesh, strides)
    fbo_axes[i, 2].title.set_text('Step %d: posterior' % (i+1))
    
    fbo_axes[i, 1].scatter(
        gpmodel.X.numpy().T[0][len(gpmodel.X.numpy())-1], 
        gpmodel.X.numpy().T[1][len(gpmodel.X.numpy())-1],
        marker="o", s=100, c='red')
    
    fbo_axes[i, 1].title.set_text('Step %d: acquisition function' % (i+1))
        

# Plotting BO steps

In [None]:
fig, ax = plt.subplots(figsize=(18, 18))
CS = ax.contour(X1_mesh, X2_mesh, Z_mesh, levels=200)

add_points(ax, gpmodel, train_cnt)

In [None]:
fig, ax = plt.subplots(figsize=(18, 9), ncols=2, nrows=1)
ax[0].contour(X1_mesh, X2_mesh, Z_mesh, strides)
ax[0].title.set_text('Branin-Hoo')

ax[1].contour(X1_mesh, X2_mesh, Predict_mesh, strides)
ax[1].title.set_text('GP')