In [2]:
import random
import os
import typing
import logging
import numpy as np
from scipy.optimize import fmin_l_bfgs_b
import matplotlib.pyplot as plt
from sklearn.gaussian_process.kernels import ConstantKernel, RBF, WhiteKernel
from sklearn.gaussian_process import GaussianProcessRegressor
from scipy.stats import norm
import math

import warnings
warnings.filterwarnings("ignore")

EXTENDED_EVALUATION = False
# Set `EXTENDED_EVALUATION` to `True` in order to visualize your predictions.

In [3]:
class BO_algo(object):
    def __init__(self):
        """Initializes the algorithm with a parameter configuration. """

        # TODO: enter your code here
        self.beta = 2
        self.counter = 0
        self.previous_points = []
        self.kernel_f = ConstantKernel(1.5) * RBF(1.5) #, length_scale_bounds="fixed" , constant_value_bounds="fixed"
        self.kernel_c = ConstantKernel(3.5) * RBF(2) #, length_scale_bounds="fixed" , constant_value_bounds="fixed"
        # IMPORTANT: DO NOT REMOVE THOSE ATTRIBUTES AND USE sklearn.gaussian_process.GaussianProcessRegressor instances!
        # Otherwise, the extended evaluation will break.
        self.constraint_model = GaussianProcessRegressor(kernel = self.kernel_c)  # TODO : GP model for the constraint function
        self.objective_model = GaussianProcessRegressor(kernel = self.kernel_f)  # TODO : GP model for your acquisition function

    def next_recommendation(self) -> np.ndarray:
        """
        Recommend the next input to sample.

        Returns
        -------
        recommendation: np.ndarray
            1 x domain.shape[0] array containing the next point to evaluate
        """

        # TODO: enter your code here
        # In implementing this function, you may use optimize_acquisition_function() defined below.

        if self.counter == 0:
            x = np.array([[np.random.uniform(0,6)], [np.random.uniform(0,6)]]).reshape(1,2)
        else:
            x = self.optimize_acquisition_function()

        return x

    def optimize_acquisition_function(self) -> np.ndarray:  # DON'T MODIFY THIS FUNCTION
        """
        Optimizes the acquisition function.

        Returns
        -------
        x_opt: np.ndarray
            1 x domain.shape[0] array containing the point that approximately maximizes the acquisition function.
        """

        def objective(x: np.array):
            return - self.acquisition_function(x)

        f_values = []
        x_values = []

        # Restarts the optimization 20 times and pick best solution
        for _ in range(20):
            x0 = domain_x[0, 0] + (domain_x[0, 1] - domain_x[0, 0]) * \
                 np.random.rand(1)
            x1 = domain_x[1, 0] + (domain_x[1, 1] - domain_x[1, 0]) * \
                 np.random.rand(1)
            result = fmin_l_bfgs_b(objective, x0=np.array([x0, x1]), bounds=domain_x,
                                   approx_grad=True)
            x_values.append(np.clip(result[0], *domain_x[0]))
            f_values.append(result[1])

        ind = np.argmin(f_values)
        return np.atleast_2d(x_values[ind])

    def acquisition_function(self, x: np.ndarray) -> np.ndarray:
        """
        Compute the acquisition function.

        Parameters
        ----------
        x: np.ndarray
            point in the domain of f

        Returns
        ------
        af_value: float
            value of the acquisition function at x
        """

        # TODO: enter your code here
        #predict mu and sigma of new observation and compare with max of up to now observed values
        #keep constraint in mind
        
        mu_f, sigma_f = self.objective_model.predict(x.reshape(-1, 2), return_std=True)
        mu_c, sigma_c = self.constraint_model.predict(x.reshape(-1, 2), return_std=True)

        mu_opt = np.min([row[2] for row in self.previous_points])

        penalty = 3

        xi = 0.02


        enable_UCB = False
        enable_EI = False
        enable_CEI = True
        enable_CUCB = False

        if enable_UCB:

            if mu_c + 2 * sigma_c >= 0:
                    af_value = mu_f - sigma_f * self.beta - penalty
            else:
                    af_value = mu_f - sigma_f * self.beta
                
            return af_value

        if enable_CUCB:

            pr = norm.cdf(0, loc=mu_c, scale=sigma_c)
            ucb = mu_f - sigma_f * self.beta
            af_value = ucb * pr

            return af_value


        if enable_EI:

            imp = mu_opt - mu_f - xi
            Z = imp / sigma_f

            EI = imp * norm.cdf(Z) + sigma_f * norm.pdf(Z)

            if mu_c + 2 * sigma_c >= 0:
                    af_value = float(EI - penalty)
            else:
                    af_value = float(EI)
        
            return af_value

        if enable_CEI:

            imp = mu_opt - mu_f - xi
            Z = imp / sigma_f

            EI = imp * norm.cdf(Z) + sigma_f * norm.pdf(Z)
            sigma_c = sigma_c
            mu_c = mu_c
            pr = norm.cdf(0, loc=mu_c, scale=sigma_c)

            if pr <= 0.9:
                af_value = EI * pr
            else:
                af_value = EI * pr - penalty

            #af_value = EI * pr
            
            return af_value
        
        #raise NotImplementedError

    def add_data_point(self, x: np.ndarray, z: float, c: float):
        """
        Add data points to the model.

        Parameters
        ----------
        x: np.ndarray
            point in the domain of f
        z: np.ndarray
            value of the acquisition function at x
        c: np.ndarray
            value of the condition function at x
        """

        assert x.shape == (1, 2)
        self.previous_points.append([float(x[:, 0]), float(x[:, 1]), float(z), float(c)])
        # TODO: enter your code here

        X_1 = [row[0] for row in self.previous_points]
        X_2 = [row[1] for row in self.previous_points]
        X = np.column_stack((X_1, X_2))
        y = [row[2] for row in self.previous_points]

        self.counter += 1
        
        self.objective_model.fit(X,y)
        self.constraint_model.fit(X,y)
        
        #raise NotImplementedError

    def get_solution(self) -> np.ndarray:
        """
        Return x_opt that is believed to be the minimizer of f.

        Returns
        -------
        solution: np.ndarray
            1 x domain.shape[0] array containing the optimal solution of the problem
        """

        # TODO: enter your code here

        index = -1
        min_f = math.inf

        X_1 = [row[0] for row in self.previous_points]
        X_2 = [row[1] for row in self.previous_points]
        X = np.column_stack((X_1, X_2))
        f = [row[2] for row in self.previous_points]
        c = [row[3] for row in self.previous_points]

        sol = np.array([[np.random.uniform(0,6)], [np.random.uniform(0,6)]]).reshape(1,2)
        
        for i in range(self.counter):
            if c[i] < 0 and f[i] < min_f:
                min_f = f[i]
                index = i
                sol = X[index].reshape(1,2)
        return sol
        
        #raise NotImplementedError