In [1]:
import torch


def one_step_cholesky(top_left, K_Xθ, K_θθ, A_inv) :
    
    '''Update the Cholesky factor when the matrix is extended.

    Note: See thesis appendix A.2 for notation of args and further information.

    Args:
        top_left: Cholesky factor L11 of old matrix A11.
        K_Xθ: Upper right bock matrix A12 of new matrix A.
        K_θθ: Lower right block matrix A22 of new matrix A.
        A_inv: Inverse of old matrix A11.

    Returns:
        New cholesky factor S of new matrix A.
    '''
    # Solve with A \ b: A @ x = b, x = A^(-1) @ b,
    # top_right = L11^T \ A12 = L11^T  \ K_Xθ, top_right = (L11^T)^(-1) @ K_Xθ,
    # Use: (L11^(-1))^T = L11 @ A11^(-1).
    # Hint: could also be solved with torch.cholesky_solve (in theory faster).
    top_right = top_left @ (A_inv @ K_Xθ)
    bot_left = torch.zeros_like(top_right).transpose(-1, -2)
    bot_right = torch.cholesky(
        K_θθ - top_right.transpose(-1, -2) @ top_right, upper=True
    )
    return torch.cat(
        [
            torch.cat([top_left, top_right], dim=-1),
            torch.cat([bot_left, bot_right], dim=-1),
        ],
        dim=-2,
    )


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
from typing import Tuple

import torch
import gpytorch
import botorch



class GradientInformation(botorch.acquisition.AnalyticAcquisitionFunction):
    '''Acquisition function to sample points for gradient information.

    Attributes:
        model: Gaussian process model that supplies the Jacobian (e.g. DerivativeExactGPSEModel).
    '''

    def __init__(self, model):
        '''Inits acquisition function with model.'''
        super().__init__(model)

    def update_theta_i(self, theta_i: torch.Tensor):
      
        '''Updates the current parameters.

        This leads to an update of K_xX_dx.

        Args:
            theta_i: New parameters.
        '''
        if not torch.is_tensor(theta_i):
            theta_i = torch.tensor(theta_i)
        self.theta_i = theta_i
        self.update_K_xX_dx()

    def update_K_xX_dx(self):
        '''When new x is given update K_xX_dx.'''
        # Pre-compute large part of K_xX_dx.
        X = self.model.train_inputs[0]
        x = self.theta_i.view(-1, self.model.D)
        self.K_xX_dx_part = self._get_KxX_dx(x, X)

    def _get_KxX_dx(self, x, X) :
        
        
        '''Computes the analytic derivative of the kernel K(x,X) w.r.t. x.

        Args:
            x: (n x D) Test points.

        Returns:
            (n x D) The derivative of K(x,X) w.r.t. x.
        '''
        N = X.shape[0]
        n = x.shape[0]
        K_xX = self.model.covar_module(x, X).evaluate()
        lengthscale = self.model.covar_module.base_kernel.lengthscale.detach()
        return (
            -torch.eye(self.model.D, device=X.device)
            / lengthscale
            @ (
                (x.view(n, 1, self.model.D) - X.view(1, N, self.model.D))
                * K_xX.view(n, N, 1)
            ).transpose(1, 2)
        )

    @botorch.utils.transforms.t_batch_mode_transform(expected_q=1)
    def forward(self, thetas: torch.Tensor) :
        
        '''Evaluate the acquisition function on the candidate set thetas.

        Args:
            thetas: A (b) x D-dim Tensor of (b) batches with a d-dim theta points each.

        Returns:
            A (b)-dim Tensor of acquisition function values at the given theta points.
        '''
        sigma_n = self.model.likelihood.noise_covar.noise
        D = self.model.D
        X = self.model.train_inputs[0]
        x = self.theta_i.view(-1, D)

        variances = []
        for theta in thetas:
            theta = theta.view(-1, D)
            # Compute K_Xθ, K_θθ (do not forget to add noise).
            K_Xθ = self.model.covar_module(X, theta).evaluate()
            K_θθ = self.model.covar_module(theta).evaluate() + sigma_n * torch.eye(
                K_Xθ.shape[-1]
            ).to(theta)

            # Get Cholesky factor.
            L = one_step_cholesky(
                top_left=self.model.get_L_lower().transpose(-1, -2),
                K_Xθ=K_Xθ,
                K_θθ=K_θθ,
                A_inv=self.model.get_KXX_inv(),
            )

            # Get K_XX_inv.
            K_XX_inv = torch.cholesky_inverse(L, upper=True)

            # get K_xX_dx
            K_xθ_dx = self._get_KxX_dx(x, theta)
            K_xX_dx = torch.cat([self.K_xX_dx_part, K_xθ_dx], dim=-1)

            # Compute_variance.
            variance_d = -K_xX_dx @ K_XX_inv @ K_xX_dx.transpose(1, 2)
            variances.append(torch.trace(variance_d.view(D, D)).view(1))

        return -torch.cat(variances, dim=0)


In [3]:
import torch
import gpytorch
import botorch


class ExactGPSEModel(gpytorch.models.ExactGP, botorch.models.gpytorch.GPyTorchModel):
   

    _num_outputs = 1  # To inform GPyTorchModel API.

    def __init__(
        self,
        train_x: torch.Tensor,
        train_y: torch.Tensor,
        lengthscale_constraint=None,
        lengthscale_hyperprior=None,
        outputscale_constraint=None,
        outputscale_hyperprior=None,
        noise_constraint=None,
        noise_hyperprior=None,
        ard_num_dims=None,
        prior_mean=0,
    ):
        """Inits GP model with data and a Gaussian likelihood."""
        likelihood = gpytorch.likelihoods.GaussianLikelihood(
            noise_constraint=noise_constraint, noise_prior=noise_hyperprior
        )
        if train_y is not None:
            train_y = train_y.squeeze(-1)
        super(ExactGPSEModel, self).__init__(train_x, train_y, likelihood)

        self.mean_module = gpytorch.means.ConstantMean()
        if prior_mean != 0:
            self.mean_module.initialize(constant=prior_mean)
            self.mean_module.constant.requires_grad = False

        self.covar_module = gpytorch.kernels.ScaleKernel(
            gpytorch.kernels.RBFKernel(
                ard_num_dims=ard_num_dims,
                lengthscale_prior=lengthscale_hyperprior,
                lengthscale_constraint=lengthscale_constraint,
            ),
            outputscale_prior=outputscale_hyperprior,
            outputscale_constraint=outputscale_constraint,
        )
        # Initialize lengthscale and outputscale to mean of priors.
        if lengthscale_hyperprior is not None:
            self.covar_module.base_kernel.lengthscale = lengthscale_hyperprior.mean
        if outputscale_hyperprior is not None:
            self.covar_module.outputscale = outputscale_hyperprior.mean

        self.D = train_x.shape[1]

    def forward(self, x):

    
        mean_x = self.mean_module(x)
        covar_x = self.covar_module(x)
        return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)


class DerivativeExactGPSEModel(ExactGPSEModel):


    def __init__(self,train_x,train_y):
       
        """Inits GP model with data and a Gaussian likelihood."""

        
        super(DerivativeExactGPSEModel, self).__init__(train_x,train_y)
        self.D = train_x.shape[1]
        self.N = train_x.shape[0]
        self.N_max = 1e5

    def append_train_data(self,new_x,new_y):

          
          train_x = torch.cat([self.train_inputs[0], new_x.reshape(1,-1)])
          train_y = torch.cat([self.train_targets, new_y])
          self.set_train_data(inputs=train_x, targets=train_y, strict=False)
          self.N = train_x.shape[0]

    def get_L_lower(self):
        """Get Cholesky decomposition L, where L is a lower triangular matrix.

        Returns:
            Cholesky decomposition L.
        """
        return (
            self.prediction_strategy.lik_train_train_covar.root_decomposition()
            .root.evaluate()
            .detach()
        )

    def get_KXX_inv(self):
        """Get the inverse matrix of K(X,X).

        Returns:
            The inverse of K(X,X).
        """
        L_inv_upper = self.prediction_strategy.covar_cache.detach()
        return L_inv_upper @ L_inv_upper.transpose(0, 1)

    def get_KXX_inv_old(self):
        """Get the inverse matrix of K(X,X).

        Not as efficient as get_KXX_inv.

        Returns:
            The inverse of K(X,X).
        """
        X = self.train_inputs[0]
        sigma_n = self.likelihood.noise_covar.noise.detach()
        return torch.inverse(
            self.covar_module(X).evaluate() + torch.eye(X.shape[0]) * sigma_n
        )

    def _get_KxX_dx(self, x):
        """Computes the analytic derivative of the kernel K(x,X) w.r.t. x.

        Args:
            x: (n x D) Test points.

        Returns:
            (n x D) The derivative of K(x,X) w.r.t. x.
        """
        X = self.train_inputs[0]
        n = x.shape[0]
        K_xX = self.covar_module(x, X).evaluate()
        lengthscale = self.covar_module.base_kernel.lengthscale.detach()
        return (
            -torch.eye(self.D, device=x.device)
            / lengthscale ** 2
            @ (
                (x.view(n, 1, self.D) - X.view(1, self.N, self.D))
                * K_xX.view(n, self.N, 1)
            ).transpose(1, 2)
        )

    def _get_Kxx_dx2(self):
        """Computes the analytic second derivative of the kernel K(x,x) w.r.t. x.

        Args:
            x: (n x D) Test points.

        Returns:
            (n x D x D) The second derivative of K(x,x) w.r.t. x.
        """
        lengthscale = self.covar_module.base_kernel.lengthscale.detach()
        sigma_f = self.covar_module.outputscale.detach()
        return (
            torch.eye(self.D, device=lengthscale.device) / lengthscale ** 2
        ) * sigma_f

    def posterior_derivative(self, x):
        """Computes the posterior of the derivative of the GP w.r.t. the given test
        points x.

        Args:
            x: (n x D) Test points.

        Returns:
            A GPyTorchPosterior.
        """
        if self.prediction_strategy is None:
            self.posterior(x)  # Call this to update prediction strategy of GPyTorch.
        K_xX_dx = self._get_KxX_dx(x)
        mean_d = K_xX_dx @ self.get_KXX_inv() @ self.train_targets
        variance_d = (
            self._get_Kxx_dx2() - K_xX_dx @ self.get_KXX_inv() @ K_xX_dx.transpose(1, 2)
        )
        variance_d = variance_d.clamp_min(1e-9)

        return mean_d, variance_d


In [4]:
def optimize_acqf_custom_bo(
    acq_func: botorch.acquisition.AcquisitionFunction,
    bounds: torch.Tensor    
):
    '''Function to optimize the GradientInformation acquisition function for custom Bayesian optimization.'''

 
    candidates, acq_value = botorch.optim.optimize_acqf(
        acq_function=acq_func,
        bounds=bounds,
        q=1,  # Analytic acquisition function.
        num_restarts=20,
        raw_samples=100,
        options={'nonnegative': True, 'batch_limit': 5},
        return_best_only=True,
        sequential=False,
    )
    # Observe new values.
    new_x = candidates.detach()
    return new_x, acq_value


class GIBO():
    
    def __init__(self,model,
                 verbose= False,
                 normalize_gradient=True,standard_deviation_scaling=False):

      max_samples_per_iteration = 3
      delta = 0.1
      acquisition_fcn = GradientInformation(model)
      optimize_acqf =  optimize_acqf_custom_bo
      optimizer_torch = torch.optim.SGD(model.parameters(), lr=0.5)
      params_tmp = model.train_inputs[0][-1]
      params_history_list = [params_tmp.clone()]
      D = len_params
      self.__dict__.update(locals())

    def step(self,model,objective_env):

        params = self.params_history_list[-1].reshape(1,-1)
        self.params = params
        model.posterior(params)  ## hotfix
 
        # Evaluate current parameters
        f_params = objective_env(params)
        # Update training points
        model.append_train_data(params, f_params)
        self.acquisition_fcn.update_theta_i(params)
        
        # Stay local around current parameters.
        
        bounds = torch.tensor([[-self.delta], [self.delta]]) + params
        
        # Only optimize model hyperparameters if N >= N_max.
        if (model.N >= model.N_max): 

            # Adjust hyperparameters
            mll = ExactMarginalLogLikelihood(model.likelihood, model)
            fit_gpytorch_model(mll)
            model.posterior(params)  ## hotfix

        
        ## Optimize gradient information locally
        for i in range(self.max_samples_per_iteration):

            model.posterior(params)  ## hotfix


            # Optimize acquistion function and get new observation.
            new_x, acq_value = self.optimize_acqf(self.acquisition_fcn, bounds)            
            new_y = objective_env(new_x)
            # Update training points.
            self.model.append_train_data(new_x, new_y)

            model.posterior(params)  ## hotfix
            self.acquisition_fcn.update_K_xX_dx()

        ## compute gradients manually
        with torch.no_grad():
          
            self.optimizer_torch.zero_grad()
            mean_d, variance_d = model.posterior_derivative(params)
            params_grad = -mean_d.view(1, self.D)

            if self.normalize_gradient:
                lengthscale = model.covar_module.base_kernel.lengthscale.detach()
                params_grad = torch.nn.functional.normalize(params_grad) * lengthscale

            if self.standard_deviation_scaling:
                params_grad = params_grad / torch.diag(variance_d.view(self.D, self.D))

            params.grad = params_grad  # set gradients
            self.optimizer_torch.step()      
            self.params_history_list.append(self.params.clone())

        # if self.verbose:
        #     posterior = model.posterior(self.params)
        #     print(
        #         f"theta_t:  predicted mean {posterior.mvn.mean.item(): .2f} / variance {posterior.mvn.variance.item(): .2f}."
        #     )


In [5]:
### initialize train_x, train_y

train_x = torch.rand(3,len_params) ## [n_trials,n_params]
train_y = [objective_env.run(p) for p in train_x]
train_y = torch.Tensor(train_y).reshape(-1,1)  ## [n_trials,1]

### initialize model

model = DerivativeExactGPSEModel(train_x, train_y)
optimizer = GIBO(model)

### now we loop :
max_iter = 100

for i in range(max_iter):

  optimizer.step(model,objective_env)
  
  if i % 5 == 0:

    best_val = model.train_targets.max()
    curr_val = model.train_targets[-1]    
    print(f'curr {curr_val} max {best_val}')



NameError: name 'len_params' is not defined

In [8]:
b = torch.Tensor([0])
a = torch.Tensor([1,2,3])
torch.cat((a,b))

tensor([1., 2., 3., 0.])