-
Notifications
You must be signed in to change notification settings - Fork 53
/
bayesian_optimization.py
363 lines (298 loc) · 14.6 KB
/
bayesian_optimization.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
import numpy
import logging
import sherpa
from sherpa.algorithms import Algorithm
import pandas
from sherpa.core import Choice, Continuous, Discrete, Ordinal
import collections
import GPyOpt as gpyopt_package
import GPy
import warnings
bayesoptlogger = logging.getLogger(__name__)
class GPyOpt(Algorithm):
"""
Sherpa wrapper around the GPyOpt package
(https://github.com/SheffieldML/GPyOpt).
Args:
model_type (str): The model used:
- 'GP', standard Gaussian process.
- 'GP_MCMC', Gaussian process with prior in the hyper-parameters.
- 'sparseGP', sparse Gaussian process.
- 'warperdGP', warped Gaussian process.
- 'InputWarpedGP', input warped Gaussian process
- 'RF', random forest (scikit-learn).
num_initial_data_points (int): Number of data points to collect before
fitting model. Needs to be greater/equal to the number of hyper-
parameters that are being optimized. Using default 'infer' corres-
ponds to number of hyperparameters + 1 or 0 if results are not empty.
initial_data_points (list[dict] or pandas.Dataframe): Specifies initial
data points. If len(initial_data_points)<num_initial_data_points
then the rest is randomly sampled. Use this option to provide
hyperparameter configurations that are known to be good.
acquisition_type (str): Type of acquisition function to use.
- 'EI', expected improvement.
- 'EI_MCMC', integrated expected improvement (requires GP_MCMC model).
- 'MPI', maximum probability of improvement.
- 'MPI_MCMC', maximum probability of improvement (requires GP_MCMC model).
- 'LCB', GP-Lower confidence bound.
- 'LCB_MCMC', integrated GP-Lower confidence bound (requires GP_MCMC model).
max_concurrent (int): The number of concurrent trials. This generates
a batch of max_concurrent trials from GPyOpt to evaluate. If a new
observation becomes available, the model is re-evaluated and a new
batch is created regardless of whether the previous batch was used
up. The used method is local penalization.
verbosity (bool): Print models and other options during the optimization.
max_num_trials (int): maximum number of trials to run for.
"""
allows_repetition = True
def __init__(self, model_type='GP', num_initial_data_points='infer',
initial_data_points=[], acquisition_type='EI',
max_concurrent=4, verbosity=False, max_num_trials=None):
self.model_type = model_type
assert (num_initial_data_points == 'infer'
or isinstance(num_initial_data_points, int)),\
"num_initial_data_points needs to be 'infer' or int."
self.num_initial_data_points = num_initial_data_points
self._num_initial_data_points = -1
self.initial_data_points = initial_data_points
self.acquisition_type = acquisition_type
assert model_type != 'GP_MCMC' and acquisition_type != 'EI_MCMC'\
if max_concurrent > 1 else True,\
"GPyOpt has a bug for _MCMC with batch size > 1."
self.max_concurrent = max_concurrent
self.verbosity = verbosity
self.next_trials = collections.deque()
self.num_points_seen_by_model = 0
self.random_search = sherpa.algorithms.RandomSearch()
self.domain = []
self.max_num_trials = max_num_trials
self.count = 0
def get_suggestion(self, parameters, results, lower_is_better):
self.count += 1
if self.max_num_trials and self.count > self.max_num_trials:
return None
# setup
if self._num_initial_data_points == -1:
self._num_initial_data_points = self._infer_num_initial_data_points(
self.num_initial_data_points,
parameters)
self.next_trials.extend(
self._process_initial_data_points(self.initial_data_points,
parameters))
num_completed_trials = self._num_completed_trials(results)
if (num_completed_trials >= self._num_initial_data_points
and num_completed_trials > self.num_points_seen_by_model):
# generate a new batch from bayes opt and set it as next
# observations
# clear previous batch since new data is available
self.next_trials.clear()
X, y, y_var = self._prepare_data_for_bayes_opt(parameters, results)
domain = self._initialize_domain(parameters)
batch = self._generate_bayesopt_batch(X, y, lower_is_better, domain)
batch_list_of_dicts = self._reverse_to_sherpa_format(batch,
parameters)
self.next_trials.extend(batch_list_of_dicts)
self.num_points_seen_by_model = num_completed_trials
if len(self.next_trials) == 0:
random_trial = self.random_search.get_suggestion(parameters,
results,
lower_is_better)
self.next_trials.append(random_trial)
return self.next_trials.popleft()
@classmethod
def _num_completed_trials(cls, results):
return (len(results.query("Status == 'COMPLETED'"))
if results is not None and len(results) > 0 else 0)
def _generate_bayesopt_batch(self, X, y, lower_is_better, domain):
y_adjusted = y * (-1)**(not lower_is_better)
bo_step = gpyopt_package.methods.BayesianOptimization(f=None,
domain=domain,
X=X, Y=y_adjusted,
acquisition_type=self.acquisition_type,
evaluator_type='local_penalization',
batch_size=self.max_concurrent,
verbosity=self.verbosity,
maximize=False,
exact_feval=False,
model_type=self.model_type)
return bo_step.suggest_next_locations()
def get_best_pred(self, parameters, results, lower_is_better):
if self._num_completed_trials(results) >= self._num_initial_data_points:
X, y, y_var = self._prepare_data_for_bayes_opt(parameters, results)
domain = self._initialize_domain(parameters)
best_pred = self._generate_best_predicted(X, y, lower_is_better, domain)
list_of_dict = self._reverse_to_sherpa_format(best_pred,
parameters)
return list_of_dict[0]
else:
return {}
def _generate_best_predicted(self, X, y, lower_is_better, domain):
"""
This is a work-around to utilize GPyOpt's maximizers for the acquisition
function to get the parameter setting that is the best as predicted by
the model.
"""
y_adjusted = y * (-1)**(not lower_is_better)
bo = gpyopt_package.methods.BayesianOptimization(f=None,
domain=domain,
X=X,
Y=y_adjusted,
acquisition_type='LCB',
batch_size=1,
verbosity=self.verbosity,
maximize=False,
exact_feval=False,
model_type=self.model_type)
bo.acquisition.exploration_weight = 0.
return bo.suggest_next_locations()
@staticmethod
def _infer_num_initial_data_points(num_initial_data_points,
parameters):
"""
Infers number of initial data points, or overwrites and warns user if
she defined less than the number of points needed.
"""
if num_initial_data_points == 'infer':
return len(parameters) + 1
elif num_initial_data_points >= len(parameters):
return num_initial_data_points
else:
warnings.warn("num_initial_data_points < number of "
"parameters found. Setting "
"num_initial_data_points to "
"len(parameters)+1.", UserWarning)
return len(parameters) + 1
@staticmethod
def _process_initial_data_points(initial_data_points, parameters):
"""
Turns initial_data_points into list of dicts (if Pandas.DataFrame) and
assures that all defined parameters have settings in the
initial_data_points.
"""
if isinstance(initial_data_points, pandas.DataFrame):
_initial_data_points = list(initial_data_points.T.to_dict().values())
else:
_initial_data_points = initial_data_points
for p in parameters:
if not all(p.name in data_point
for data_point in _initial_data_points):
raise ValueError("Missing parameter in initial_data_point. Check that you " \
"included all specified hyperparameters.")
return _initial_data_points
@staticmethod
def _prepare_data_for_bayes_opt(parameters, results):
"""
Turn historical data from Sherpa results dataframe into design matrix
X and objective values y to be consumed by GPyOpt.
"""
completed = results.query("Status == 'COMPLETED'")
X = numpy.zeros((len(completed), len(parameters)))
for i, p in enumerate(parameters):
transform = ParameterTransform.from_parameter(p)
historical_data = completed[p.name]
X[:, i] = transform.sherpa_format_to_gpyopt_design_format(
historical_data)
y = numpy.array(completed.Objective).reshape((-1, 1))
if 'ObjectiveStdErr' in completed.columns:
y_var = numpy.array(completed.ObjectiveStdErr).reshape((-1, 1))
else:
y_var = None
return X, y, y_var
@staticmethod
def _initialize_domain(parameters):
"""
Turn Sherpa parameter definitions into GPyOpt parameter definitions.
"""
domain = []
for p in parameters:
domain.append(
ParameterTransform.from_parameter(p).to_gpyopt_domain())
return domain
@staticmethod
def _reverse_to_sherpa_format(X_next, parameters):
"""
Turn design matrix from GPyOpt back into a list of dictionaries with
Sherpa-style parameters.
"""
col_dict = {}
for i, p in enumerate(parameters):
transform = ParameterTransform.from_parameter(p)
col_dict[p.name] = transform.gpyopt_design_format_to_list_in_sherpa_format(X_next[:, i])
return list(pandas.DataFrame(col_dict).astype(numpy.object).T.to_dict().values())
class ParameterTransform(object):
"""
ParamterTransform base class, creates correct object
depending on parameter.
"""
def __init__(self, parameter):
self.parameter = parameter
@staticmethod
def from_parameter(parameter):
if isinstance(parameter, Choice) or isinstance(parameter, Ordinal):
return ChoiceTransform(parameter)
elif isinstance(parameter, Continuous):
if parameter.scale == 'log':
return LogContinuousTransform(parameter)
else:
return ContinuousTransform(parameter)
elif isinstance(parameter, Discrete):
if parameter.scale == 'log':
warnings.warn("GPyOpt discrete parameter does not "
"support log-scale.", UserWarning)
return DiscreteTransform(parameter)
def to_gpyopt_domain(self):
raise NotImplementedError
def gpyopt_design_format_to_list_in_sherpa_format(self, x):
raise NotImplementedError
def sherpa_format_to_gpyopt_design_format(self, x):
raise NotImplementedError
class ContinuousTransform(ParameterTransform):
"""
Transforms/reverses Continuous variables.
"""
def to_gpyopt_domain(self):
return {'name': self.parameter.name,
'type': 'continuous',
'domain': tuple(self.parameter.range)}
def gpyopt_design_format_to_list_in_sherpa_format(self, x):
return x
def sherpa_format_to_gpyopt_design_format(self, x):
return x
class LogContinuousTransform(ParameterTransform):
"""
Transforms/reverses Continuous variables if on log-scale.
"""
def to_gpyopt_domain(self):
return {'name': self.parameter.name,
'type': 'continuous',
'domain': (numpy.log10(self.parameter.range[0]),
numpy.log10(self.parameter.range[1]))}
def gpyopt_design_format_to_list_in_sherpa_format(self, x):
return 10**x
def sherpa_format_to_gpyopt_design_format(self, x):
return numpy.log10(x)
class ChoiceTransform(ParameterTransform):
"""
Transforms/reverses Choice variables to numeric choices since GPyOpt
does not accept string choices.
"""
def to_gpyopt_domain(self):
return {'name': self.parameter.name, 'type': 'categorical',
'domain': numpy.array(range(len(self.parameter.range)))}
def gpyopt_design_format_to_list_in_sherpa_format(self, x):
return [self.parameter.range[int(elem)] for elem in x]
def sherpa_format_to_gpyopt_design_format(self, x):
return [self.parameter.range.index(elem) for elem in x]
class DiscreteTransform(ParameterTransform):
"""
Transforms Discrete parameter from/to GPyOpt
"""
def to_gpyopt_domain(self):
return {'name': self.parameter.name,
'type': 'discrete',
'domain': tuple(range(self.parameter.range[0],
self.parameter.range[1]+1))}
def gpyopt_design_format_to_list_in_sherpa_format(self, x):
return list(x.astype('int'))
def sherpa_format_to_gpyopt_design_format(self, x):
return x