In [3]:
import bayesmark.random_search as rs
from bayesmark import np_util
from bayesmark.abstract_optimizer import AbstractOptimizer
from bayesmark.experiment import experiment_main

import numpy as np
from scipy.stats import norm
from scipy.optimize import minimize

from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Matern

In [10]:
class OurOptimizer(AbstractOptimizer):
    # Unclear what is best package to list for primary_import here.
    primary_import = "bayesmark"

    def __init__(self, api_config, random=np_util.random):
        """Build wrapper class to use random search function in benchmark.

        Settings for `suggest_dict` can be passed using kwargs.

        Parameters
        ----------
        api_config : dict-like of dict-like
            Configuration of the optimization variables. See API description.
        """
        AbstractOptimizer.__init__(self, api_config)
        self.random = random
        self.meta_model = GaussianProcessRegressor(kernel=Matern(nu=2.5))
        self.api_config = api_config
        self.preprocessor_dict = self.build_preprocessor(api_config)

    def process(self, x):
        features = []
        for feature in self.api_config:
            feature_values = x[feature]
            features.append(self.preprocessor_dict[feature](feature_values))
        return features

    def build_preprocessor(self, api_config):
        processing_dict = {}
        for feature in api_config:
            if api_config[feature]['type'] == 'bool':
                processing_dict[feature] = lambda x: int(x)

            elif api_config[feature]['type'] == 'cat':
                processing_dict[feature] = self.categorical_processing(api_config[feature]['values'])

            elif api_config[feature]['type'] == 'int':
                if 'range' in api_config[feature]:
                    min_val, max_val = api_config[feature]['range']
                    processing_dict[feature] =  self.min_max_processing(min_val, max_val)

            elif api_config[feature]['type'] == 'real':
                if 'range' in api_config[feature]:
                    min_val, max_val = api_config[feature]['range']
                    if api_config[feature]['space'] == 'log':
                        min_val = np.log10(min_val)
                        max_val = np.log10(max_val)
                        processing_dict[feature] =  lambda x: self.min_max_processing(min_val, max_val)(np.log10(x))
                    else:
                        processing_dict[feature] =  self.min_max_processing(min_val, max_val)

                else:
                    processing_dict[feature] = lambda x: x
                
        return processing_dict

    def categorical_processing(self, cats):
        def process(x):
            return cats.index(x)
        return process
    
    def min_max_processing(self, min_val, max_val):
        def process(x):
            return (x - min_val) / (max_val - min_val)
        return process

    def suggest(self, n_suggestions=1):
        """Get suggestion.

        Parameters
        ----------
        n_suggestions : int
            Desired number of parallel suggestions in the output

        Returns
        -------
        next_guess : list of dict
            List of `n_suggestions` suggestions to evaluate the objective
            function. Each suggestion is a dictionary where each key
            corresponds to a parameter being optimized.
        """
        # N = 30
        # x_guess = rs.suggest_dict([], [], self.api_config, n_suggestions=N*n_suggestions,)# random=self.random)
        # x_guess_p = [self.process(x) for x in x_guess]
        # y_guess = self.meta_model.predict(x_guess_p)
        # best_idx = np.argsort(y_guess)[:n_suggestions]
        # x_guess = [x_guess[i] for i in best_idx]
        # return x_guess
        ################################
        xi = 0.01  # Exploration-exploitation trade-off parameter
        best_y = min(self.y, default=0)  # Best observed value

        def acquisition_function(x):
            x = np.array(x).reshape(1, -1)
            mu, sigma = self.meta_model.predict(x, return_std=True)
            z = (mu - best_y - xi) / sigma
            return -(mu - best_y - xi) * norm.cdf(z) + sigma * norm.pdf(z)

        # Initialize with random search
        x_guess = rs.suggest_dict([], [], self.api_config, n_suggestions=n_suggestions)

        # Optimize acquisition function
        bounds = self.get_bounds()
        for i in range(n_suggestions):
            res = minimize(lambda x: -acquisition_function(x),
                           x0=x_guess[i], bounds=bounds, method='L-BFGS-B')
            x_guess[i] = res.x

        # Convert back to dict format
        x_guess_dicts = [self.convert_to_dict(x) for x in x_guess]
        return x_guess_dicts


    def observe(self, X, y):
        """Feed an observation back.

        Parameters
        ----------
        X : list of dict-like
            Places where the objective function has already been evaluated.
            Each suggestion is a dictionary where each key corresponds to a
            parameter being optimized.
        y : array-like, shape (n,)
            Corresponding values where objective has been evaluated
        """
        X_p = [self.process(x) for x in X]
        self.y = y
        self.meta_model.fit(X_p, y)

    def get_bounds(self):
        bounds = []
        for _, configs in self.api_config.items():
            if configs['type'] in ['int', 'real']:
                # For numerical types, use the provided range
                bound = configs['range']
            elif configs['type'] == 'bool':
                # For boolean, the range is [0, 1]
                bound = (0, 1)
            elif configs['type'] == 'cat':
                # For categorical, use index range
                bound = (0, len(configs['values']) - 1)
            bounds.append(bound)
        return bounds
    
    def convert_to_dict(self, x_array):
        x_dict = {}
        for i, (feature, configs) in enumerate(self.api_config.items()):
            value = x_array[i]
            if configs['type'] == 'bool':
                x_dict[feature] = bool(round(value))
            elif configs['type'] == 'cat':
                x_dict[feature] = configs['values'][int(round(value))]
            elif configs['type'] in ['int', 'real']:
                x_dict[feature] = value
        return x_dict


In [11]:
X = np.random.normal(size=(50, 4))

In [12]:
y = 3 * X[:, 0] + np.sin(X[:, 1]) + 2 * X[:, 2]**2 + np.random.normal(size=50)

In [13]:
X = [{'x1': x[0], 'x2': x[1], 'x3': x[2], 'x4': x[3]} for x in X]

In [14]:
optimizer = OurOptimizer({'x1': {'type': 'real', 'space': 'linear', 'range': (-3, 3)}, 'x2': {'type': 'real', 'space': 'linear', 'range': (-3, 3)}, 'x3': {'type': 'real', 'space': 'linear', 'range': (-3, 3)}, 'x4': {'type': 'real', 'space': 'linear', 'range': (-3, 3)}})

In [15]:
optimizer.observe(X[:1], y[:1])

In [16]:
optimizer.suggest(1)

ValueError: length of x0 != length of bounds

In [85]:
rs.suggest_dict?

[0;31mSignature:[0m
[0mrs[0m[0;34m.[0m[0msuggest_dict[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mX[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0my[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mmeta[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mn_suggestions[0m[0;34m=[0m[0;36m1[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mrandom[0m[0;34m=[0m[0mRandomState[0m[0;34m([0m[0mMT19937[0m[0;34m)[0m [0mat[0m [0;36m0x7F57F5825780[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Stateless function to create suggestions for next query point in random search optimization.

This implements the API for general structures of different data types.

Parameters
----------
X : list(dict)
    Places where the objective function has already been evaluated. Not actually used in random search.
y : :class:`numpy:numpy.ndarray`, shape (n,)
    Corresponding values where objective has been evaluated. Not actually used in random search.