<a href="https://colab.research.google.com/github/utkarsh5k/HighDimBOInLearntSubspace/blob/main/High_Dimensional_BO_through_EI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# High Dimensional Bayesian Optimization using learnt active subspaces

In [None]:
!pip install torch
!pip install gpytorch
!pip install botorch
!pip install matplotlib

Collecting gpytorch
  Downloading gpytorch-1.5.1-py2.py3-none-any.whl (503 kB)
[K     |████████████████████████████████| 503 kB 11.8 MB/s 
Installing collected packages: gpytorch
Successfully installed gpytorch-1.5.1
Collecting botorch
  Downloading botorch-0.5.1-py3-none-any.whl (486 kB)
[K     |████████████████████████████████| 486 kB 12.0 MB/s 
Installing collected packages: botorch
Successfully installed botorch-0.5.1


In [None]:
import numpy as np 
from botorch.models import SingleTaskGP
from botorch.fit import fit_gpytorch_model
from botorch.utils import standardize
from gpytorch.mlls import ExactMarginalLogLikelihood
from botorch.acquisition import UpperConfidenceBound, qExpectedImprovement
from botorch.optim import optimize_acqf
from torch import nn as nn
import torch 
from torch.autograd import Variable
from torch.quasirandom import SobolEngine


In [None]:
def ackley(x): 
  """
  Calculates ackley function value for arbitrary number of dimensions
  """
  a = 20 
  b = 0.2 
  c = 2 * np.pi 

  n = len(x)
  first_operand = -a * np.exp(np.sqrt(np.sum(x**2) / n) * -b)
  second_operand = np.exp(np.sum(np.cos(c * x)) / n)

  return first_operand - second_operand + a + np.exp(1)

offset = [np.random.uniform(low = -10, high = 10) for _ in range(200)]

def ackley_prime(x): 
  assert len(x) == 200
  return ackley(x + offset)



In [None]:
class TransformerNetwork(nn.Module):
  def __init__(self, original_dim, target_dim):
        super().__init__()
        self.target_dims = target_dim
        self.orig_dims = original_dim
        self.flatten = nn.Flatten()
        self.linear_nn = nn.Sequential(
            nn.Linear(target_dim, original_dim, bias = False),
            torch.nn.ReLU()
        )

        self.optimizer = None 

        self.alpha = 1
        self.beta = 1

        # [([x_1, x_2...], y)....]
        self.ackley_evaluations = []
        self.loss_per_step = []
        self.bootstrap_low_dim_space(200)
        self.init_optimizer()
        self.num_evaluations = 0



  def init_optimizer(self):
    learning_rate = 1e-3
    self.optimizer = torch.optim.SGD(self.linear_nn.parameters(), lr=learning_rate)

  def forward(self, x):
    logits = self.linear_nn(x)
    return logits
  
  def train_loop(self):
    for i in range(100):
      candidate, prediction, ei = self.run_bayes_op()
      loss = Variable(self.get_ei_loss(candidate, ei.detach().numpy()[0]), requires_grad = True)
      self.optimizer.zero_grad()
      loss.backward()
      self.optimizer.step()

  def get_low_dim_evaluations(self): 
     return torch.tensor([x[0] for x in self.ackley_evaluations]), torch.tensor([[x[1] * -1] for x in self.ackley_evaluations])

  def get_evals_for_low_dim(self, low_dim): 
    eval_val = []
    eval_x = []
    for pt in low_dim: 
      x = self.forward(torch.tensor(pt.detach().numpy(), dtype = torch.float)).detach().numpy()
      eval_x.append(pt.detach().numpy())   
      eval_val.append(ackley_prime(x))
    #print(eval_x)
    return torch.tensor(eval_x, dtype = torch.float), torch.tensor([[x * -1] for x in eval_val])

  def get_full_loss(self, point, prediction): 
    high_dim_point = self.forward(point).detach().numpy()
    high_dim_eval = ackley_prime(high_dim_point)
    

    self.ackley_evaluations.append((point.detach().numpy(), high_dim_eval))
    loss = self.alpha * (high_dim_eval - prediction)
    print(f'High dimensional eval: {high_dim_eval} Prediction: {prediction} loss = {loss}')
    return torch.tensor([loss])

  def get_ei_loss(self, pred_point, pred_ei):
    high_dim_point = self.forward(pred_point).detach().numpy()
    high_dim_eval = ackley_prime(high_dim_point)
    self.num_evaluations += 1
    cur_min_eval = self.get_min_eval()
    self.ackley_evaluations.append((pred_point.detach().numpy(), high_dim_eval))
    
    actual_imp = cur_min_eval - high_dim_eval
    loss = (actual_imp - pred_ei) * -1 
    if loss < 0: 
      loss = 0
    print(f'[{self.num_evaluations}] High dimensional eval: {high_dim_eval} loss = {loss}')
    return torch.tensor([loss * 100])

  def get_min_eval(self):
    return np.array([x[1] for x in self.ackley_evaluations]).min()

  def bootstrap_low_dim_space(self, num_points = 1): 
    '''
    We need to have at least one sample in the low dim space to run BayesOP
    '''
    
    points = self.get_initial_points(self.target_dims, num_points).detach().numpy()
    #points = self.latin_hypercube(num_points, self.orig_dims)

    for point in points:
      high_dim_point = self.forward(torch.tensor(point, dtype = torch.float)).detach().numpy()
      eval = ackley_prime(high_dim_point)
      self.ackley_evaluations.append((point, eval))
    

  def get_initial_points(self, dim, n_pts, seed=0):
    sobol = SobolEngine(dimension=dim, scramble=True, seed=seed)
    X_init = sobol.draw(n=n_pts).to(dtype=float)
    return X_init
    
  def latin_hypercube(self, n_pts, dim):
    """Basic Latin hypercube implementation with center perturbation."""
    X = np.zeros((n_pts, dim))
    centers = (1.0 + 2.0 * np.arange(0.0, n_pts)) / float(2 * n_pts)
    for i in range(dim):  # Shuffle the center locataions for each dimension.
        X[:, i] = centers[np.random.permutation(n_pts)]

    # Add some perturbations within each box
    pert = np.random.uniform(-1.0, 1.0, (n_pts, dim)) / float(2 * n_pts)
    X += pert
    return X

  def run_bayes_op(self):   
    train_X, train_Y = self.get_low_dim_evaluations()

    gp = SingleTaskGP(train_X, train_Y)
    mll = ExactMarginalLogLikelihood(gp.likelihood, gp)
    fit_gpytorch_model(mll)

    # Optimize acquisition function 
    #UCB = UpperConfidenceBound(gp, beta=0.1)
    ei = qExpectedImprovement(gp, train_Y.max(), maximize=True)
    #bounds = torch.stack([torch.tensor([0] * self.target_dims), torch.tensor([1] * self.target_dims)])
    bounds = torch.stack([torch.tensor([-32] * self.target_dims, dtype = torch.float), torch.tensor([32] * self.target_dims, dtype = torch.float)])
    candidate, acq_value = optimize_acqf(ei, bounds=bounds, q=1, num_restarts=10, raw_samples=512)
    prediction = gp.posterior(candidate).mean.detach().numpy()[0, 0]
    best_f = train_Y.min()
    return candidate[0], prediction * -1, ei(train_X)



In [None]:
network = TransformerNetwork(original_dim = 200, target_dim = 20)
#network = SVDBO(original_dim = 200, target_dim = 20)
network.train_loop()
  

[1] High dimensional eval: 18.431241719860946 loss = 3.367053574349297
[2] High dimensional eval: 17.376715955550463 loss = 2.317981651261713
[3] High dimensional eval: 18.453580141231818 loss = 3.3944931003678374
[4] High dimensional eval: 18.089871568541415 loss = 3.0310159470015297
[5] High dimensional eval: 18.210687951740823 loss = 3.151925667881037
[6] High dimensional eval: 18.212582454805528 loss = 3.1536691051249224
[7] High dimensional eval: 18.47487475195973 loss = 3.4160384289399386
[8] High dimensional eval: 17.022112474704183 loss = 1.9633782169739957
[9] High dimensional eval: 16.624983624550115 loss = 1.6438945563997727
[10] High dimensional eval: 19.529324897127378 loss = 4.470491016643217
[11] High dimensional eval: 18.565976200148466 loss = 3.507061338450494
[12] High dimensional eval: 17.311434724131765 loss = 2.252959316389744
[13] High dimensional eval: 18.295941787536673 loss = 3.237127951521929
[14] High dimensional eval: 17.52781534491656 loss = 2.4689868478080

KeyboardInterrupt: ignored

In [None]:
evaluations = [x[1] for x in network.ackley_evaluations]

In [None]:
monotonic_evaluations = []
min = evaluations[0]
for x in evaluations: 
  if x < min: 
    min = x
  
  monotonic_evaluations.append(min)

import matplotlib.pyplot as plt 

x_axis = [i for i in range(1, len(evaluations) + 1)]
plt.plot(x_axis, monotonic_evaluations)
plt.xlabel("Number of evaluations")
plt.ylabel("200D Ackley Function Value")
plt.title("Number of evaluations vs smallest 200D Ackley Fn Value")

from google.colab import files
plt.savefig("200DAckley.png")
files.download("200DAckley.png") 

In [None]:
class SVDBO(): 
  def __init__(self, original_dim, target_dim): 
    self.target_dims = target_dim
    self.orig_dims = original_dim

    self.alpha = 1
    self.beta = 1

    # [([x_1, x_2...], y)....]
    self.ackley_evaluations = []
    self.num_evaluations = 0 
    self.initial_points  = self.latin_hypercube(self.orig_dims, self.orig_dims)
    self.evaluations = []
    self.get_evaluations()
    self.points_mean = None
    _, _, self.right = self.perform_svd()
    self.subspace_axes = self.right[0: self.target_dims]
  
  def latin_hypercube(self, n_pts, dim):
    """Basic Latin hypercube implementation with center perturbation."""
    X = np.zeros((n_pts, dim))
    centers = (1.0 + 2.0 * np.arange(0.0, n_pts)) / float(2 * n_pts)
    for i in range(dim):  # Shuffle the center locataions for each dimension.
        X[:, i] = centers[np.random.permutation(n_pts)]

    # Add some perturbations within each box
    pert = np.random.uniform(-1.0, 1.0, (n_pts, dim)) / float(2 * n_pts)
    X += pert
    return X

  def project_points_into_subspace(self, points):
    return points @ self.subspace_axes.T
  
  def get_reverse_projection(self, points):
    return (points @ self.subspace_axes) + self.points_mean

  def perform_svd(self):
    self.points_mean = self.initial_points.mean(axis = 0)
    return np.linalg.svd(self.initial_points - self.points_mean)

  def get_evaluations(self):
    for pt in self.initial_points:
      self.evaluations.append(ackley_prime(pt)) 

  def get_evals_for_low_dim(self):
    low_dim_points = self.project_points_into_subspace(self.initial_points)
    return torch.tensor(low_dim_points, dtype = torch.float), torch.tensor([[x * -1] for x in self.evaluations])

  def train_loop(self):
    for _ in range(500):
      candidate, prediction = self.run_bayes_op()
      high_dim = self.get_reverse_projection(np.array([candidate.detach().numpy()]))[0]
      self.initial_points = np.vstack([self.initial_points, high_dim])
      eval = ackley_prime(high_dim)
      self.evaluations.append(eval)
      print(f"High Dim Actual: {eval} GP Predicted: {prediction}")    

  def run_bayes_op(self):   
    train_X, train_Y = self.get_low_dim_evaluations()

    gp = SingleTaskGP(train_X, train_Y)
    mll = ExactMarginalLogLikelihood(gp.likelihood, gp)
    fit_gpytorch_model(mll)

    # Optimize acquisition function 
    #UCB = UpperConfidenceBound(gp, beta=0.1)
    ei = qExpectedImprovement(gp, train_Y.max(), maximize=True)
    #bounds = torch.stack([torch.tensor([0] * self.target_dims), torch.tensor([1] * self.target_dims)])
    bounds = torch.stack([torch.tensor([-32] * self.target_dims, dtype = torch.float), torch.tensor([32] * self.target_dims, dtype = torch.float)])
    candidate, acq_value = optimize_acqf(ei, bounds=bounds, q=1, num_restarts=10, raw_samples=512)
    prediction = gp.posterior(candidate).mean.detach().numpy()[0, 0]
    return candidate[0], prediction * -1 

