In [3]:
import torch
import botorch
import gpytorch

In [4]:
class OptimizerClass():
    '''Mother class for optimizers'''
    def __init__(self,true_model,
                 surrogate_model,
                 bounds:tuple,
                 initial_phi:torch.tensor = None,
                 device = torch.device('cuda'),
                 history:tuple = (),
                 WandB:dict = {'name': 'Optimization'},
                 outputs_dir = 'outputs',
                 resume:bool = False):
        
        self.device = device
        self.true_model = true_model
        #self.model = self.surrogate_model_class(*self.history).to(self.device)
        self._i = len(self.history[0]) if resume else 0
        print('STARTING FROM i = ', self._i)
        self.model = surrogate_model
        self.bounds = bounds.cpu()
        self.wandb = WandB
        self.outputs_dir = outputs_dir

    def fit_surrogate_model(self,**kwargs):
        D = self.clean_training_data() #Should we do this here, in every iteration?
        self.model = self.model.fit(D[0].to(self.device),D[1].to(self.device),**kwargs)
    def update_history(self,phi,y):
        '''Append phi and y to D'''
        phi,y = phi.reshape(-1,self.history[0].shape[1]).cpu(), y.reshape(-1,self.history[1].shape[1]).cpu()
        self.history = (torch.cat([self.history[0], phi]),torch.cat([self.history[1], y]))
    def n_iterations(self):
        return self._i
    def stopping_criterion(self,**convergence_params):
        return self._i >= convergence_params['max_iter']
    def get_optimal(self):
        '''Get the current optimal'''
        idx = self.history[1].argmin()
        return self.history[0][idx],self.history[1][idx]
    def clean_training_data(self):
        '''Get samples on history for training'''
        return self.history
    
class BayesianOptimizer(OptimizerClass):
    
    def __init__(self,true_model,
                 surrogate_model,
                 bounds:tuple,
                 initial_phi:torch.tensor = None,
                 device = torch.device('cuda'),
                 acquisition_fn = botorch.acquisition.ExpectedImprovement,
                 acquisition_params = {'num_restarts': 30, 'raw_samples':5000},
                 history:tuple = (),
                 model_scheduler:dict = {},
                 WandB:dict = {'name': 'BayesianOptimization'},
                 reduce_bounds:int = 4000,
                 outputs_dir = 'outputs',
                 resume:bool = False):
        super().__init__(true_model,
                 surrogate_model,
                 bounds,
                 initial_phi=initial_phi,
                 device = device,
                 history = history,
                 WandB = WandB,
                 outputs_dir = outputs_dir,
                 resume = resume)
        
        self.acquisition_fn = acquisition_fn
        self.acquisition_params = acquisition_params
        self.model_scheduler = model_scheduler
        self._iter_reduce_bounds = reduce_bounds
        if resume: #current model from model_scheduler
            for i in model_scheduler:
                if self._i > i:
                    self.model = model_scheduler[i]
        
    def get_new_phi(self):
        '''Minimize acquisition function, returning the next phi to evaluate'''
        acquisition = self.acquisition_fn(self.model, self.history[1].min().to(self.device), maximize=False)
        return botorch.optim.optimize.optimize_acqf(acquisition, self.bounds.to(self.device), q=1,**self.acquisition_params)[0]
    def run_optimization(self, 
                         use_scipy:bool = True,
                         save_optimal_phi:bool = True,
                         save_history:bool = False,
                         **convergence_params):

        options = {'lr': 1e-2, 'maxiter': 100} if not use_scipy else None
        while not self.stopping_criterion(**convergence_params):
            if self._i in self.model_scheduler:
                self.model = self.model_scheduler[self._i]
            if self._i == self._iter_reduce_bounds:
                self.reduce_bounds()
            self.fit_surrogate_model(use_scipy = use_scipy,options = options)
            phi = self.get_new_phi()
            y = self.true_model(phi)
            self.update_history(phi,y)
            self._i += 1
        idx = self.history[1].argmin()
        return self.history[0][idx],self.history[1][idx]

In [None]:
def olivers_fn(phi:torch.tensor,x = None,w:float = 5,gamma:float = 1,noise = False):
    y = torch.prod(torch.sin(w*phi)*(1-torch.tanh(gamma*phi.pow(2))),-1,keepdims=True)
    if noise:
        y += torch.randn_like(y)*noise
    return y

