/
analytic.py
588 lines (501 loc) · 23.1 KB
/
analytic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
r"""
Analytic Acquisition Functions that evaluate the posterior without performing
Monte-Carlo sampling.
"""
from abc import ABC
from copy import deepcopy
from typing import Dict, Optional, Tuple, Union
import torch
from torch import Tensor
from torch.distributions import Normal
from ..exceptions import UnsupportedError
from ..models.gp_regression import FixedNoiseGP
from ..models.gpytorch import GPyTorchModel
from ..models.model import Model
from ..posteriors.posterior import Posterior
from ..sampling.samplers import SobolQMCNormalSampler
from ..utils.transforms import convert_to_target_pre_hook, t_batch_mode_transform
from .acquisition import AcquisitionFunction
from .objective import ScalarizedObjective
class AnalyticAcquisitionFunction(AcquisitionFunction, ABC):
r"""Base class for analytic acquisition functions."""
def __init__(
self, model: Model, objective: Optional[ScalarizedObjective] = None
) -> None:
r"""Base constructor for analytic acquisition functions.
Args:
model: A fitted single-outcome model.
objective: A ScalarizedObjective (optional).
"""
super().__init__(model=model)
if objective is None:
if model.num_outputs != 1:
raise UnsupportedError(
"Must specify an objective when using a multi-output model."
)
elif not isinstance(objective, ScalarizedObjective):
raise UnsupportedError(
"Only objectives of type ScalarizedObjective are supported for "
"analytic acquisition functions."
)
self.objective = objective
def _get_posterior(self, X: Tensor) -> Posterior:
r"""Compute the posterior at the input candidate set X.
Applies the objective if provided.
Args:
X: The input candidate set.
Returns:
The posterior at X. If a ScalarizedObjective is defined, this
posterior can be single-output even if the underlying model is a
multi-output model.
"""
posterior = self.model.posterior(X)
if self.objective is not None:
# Unlike MCAcquisitionObjective (which transform samples), this
# transforms the posterior
posterior = self.objective(posterior)
return posterior
def set_X_pending(self, X_pending: Optional[Tensor] = None) -> None:
raise UnsupportedError(
f"Analytic acquisition functions do not account for X_pending yet."
)
class ExpectedImprovement(AnalyticAcquisitionFunction):
r"""Single-outcome Expected Improvement (analytic).
Computes classic Expected Improvement over the current best observed value,
using the analytic formula for a Normal posterior distribution. Unlike the
MC-based acquisition functions, this relies on the posterior at single test
point being Gaussian (and require the posterior to implement `mean` and
`variance` properties). Only supports the case of `q=1`. The model must be
single-outcome.
`EI(x) = E(max(y - best_f, 0)), y ~ f(x)`
Example:
>>> model = SingleTaskGP(train_X, train_Y)
>>> EI = ExpectedImprovement(model, best_f=0.2)
>>> ei = EI(test_X)
"""
def __init__(
self,
model: Model,
best_f: Union[float, Tensor],
objective: Optional[ScalarizedObjective] = None,
maximize: bool = True,
) -> None:
r"""Single-outcome Expected Improvement (analytic).
Args:
model: A fitted single-outcome model.
best_f: Either a scalar or a `b`-dim Tensor (batch mode) representing
the best function value observed so far (assumed noiseless).
objective: A ScalarizedObjective (optional).
maximize: If True, consider the problem a maximization problem.
"""
super().__init__(model=model, objective=objective)
self.maximize = maximize
if not torch.is_tensor(best_f):
best_f = torch.tensor(best_f)
self.register_buffer("best_f", best_f)
@t_batch_mode_transform(expected_q=1)
def forward(self, X: Tensor) -> Tensor:
r"""Evaluate Expected Improvement on the candidate set X.
Args:
X: A `b1 x ... bk x 1 x d`-dim batched tensor of `d`-dim design points.
Expected Improvement is computed for each point individually,
i.e., what is considered are the marginal posteriors, not the
joint.
Returns:
A `b1 x ... bk`-dim tensor of Expected Improvement values at the
given design points `X`.
"""
self.best_f = self.best_f.to(X)
posterior = self._get_posterior(X=X)
mean = posterior.mean
# deal with batch evaluation and broadcasting
view_shape = mean.shape[:-2] if mean.dim() >= X.dim() else X.shape[:-2]
mean = mean.view(view_shape)
sigma = posterior.variance.clamp_min(1e-9).sqrt().view(view_shape)
u = (mean - self.best_f.expand_as(mean)) / sigma
if not self.maximize:
u = -u
normal = Normal(torch.zeros_like(u), torch.ones_like(u))
ucdf = normal.cdf(u)
updf = torch.exp(normal.log_prob(u))
ei = sigma * (updf + u * ucdf)
return ei
class PosteriorMean(AnalyticAcquisitionFunction):
r"""Single-outcome Posterior Mean.
Only supports the case of q=1. Requires the model's posterior to have a
`mean` property. The model must be single-outcome.
Example:
>>> model = SingleTaskGP(train_X, train_Y)
>>> PM = PosteriorMean(model)
>>> pm = PM(test_X)
"""
@t_batch_mode_transform(expected_q=1)
def forward(self, X: Tensor) -> Tensor:
r"""Evaluate the posterior mean on the candidate set X.
Args:
X: A `(b) x 1 x d`-dim Tensor of `(b)` t-batches of `d`-dim design
points each.
Returns:
A `(b)`-dim Tensor of Posterior Mean values at the given design
points `X`.
"""
posterior = self._get_posterior(X=X)
return posterior.mean.view(X.shape[:-2])
class ProbabilityOfImprovement(AnalyticAcquisitionFunction):
r"""Single-outcome Probability of Improvement.
Probability of improvment over the current best observed value, computed
using the analytic formula under a Normal posterior distribution. Only
supports the case of q=1. Requires the posterior to be Gaussian. The model
must be single-outcome.
`PI(x) = P(y >= best_f), y ~ f(x)`
Example:
>>> model = SingleTaskGP(train_X, train_Y)
>>> PI = ProbabilityOfImprovement(model, best_f=0.2)
>>> pi = PI(test_X)
"""
def __init__(
self,
model: Model,
best_f: Union[float, Tensor],
objective: Optional[ScalarizedObjective] = None,
maximize: bool = True,
) -> None:
r"""Single-outcome analytic Probability of Improvement.
Args:
model: A fitted single-outcome model.
best_f: Either a scalar or a `b`-dim Tensor (batch mode) representing
the best function value observed so far (assumed noiseless).
objective: A ScalarizedObjective (optional).
maximize: If True, consider the problem a maximization problem.
"""
super().__init__(model=model, objective=objective)
self.maximize = maximize
if not torch.is_tensor(best_f):
best_f = torch.tensor(best_f)
self.register_buffer("best_f", best_f)
@t_batch_mode_transform(expected_q=1)
def forward(self, X: Tensor) -> Tensor:
r"""Evaluate the Probability of Improvement on the candidate set X.
Args:
X: A `(b) x 1 x d`-dim Tensor of `(b)` t-batches of `d`-dim design
points each.
Returns:
A `(b)`-dim tensor of Probability of Improvement values at the given
design points `X`.
"""
self.best_f = self.best_f.to(X)
posterior = self._get_posterior(X=X)
mean, sigma = posterior.mean, posterior.variance.sqrt()
batch_shape = X.shape[:-2]
mean = posterior.mean.view(batch_shape)
sigma = posterior.variance.sqrt().clamp_min(1e-9).view(batch_shape)
u = (mean - self.best_f.expand_as(mean)) / sigma
if not self.maximize:
u = -u
normal = Normal(torch.zeros_like(u), torch.ones_like(u))
return normal.cdf(u)
class UpperConfidenceBound(AnalyticAcquisitionFunction):
r"""Single-outcome Upper Confidence Bound (UCB).
Analytic upper confidence bound that comprises of the posterior mean plus an
additional term: the posterior standard deviation weighted by a trade-off
parameter, `beta`. Only supports the case of `q=1` (i.e. greedy, non-batch
selection of design points). The model must be single-outcome.
`UCB(x) = mu(x) + sqrt(beta) * sigma(x)`, where `mu` and `sigma` are the
posterior mean and standard deviation, respectively.
Example:
>>> model = SingleTaskGP(train_X, train_Y)
>>> UCB = UpperConfidenceBound(model, beta=0.2)
>>> ucb = UCB(test_X)
"""
def __init__(
self,
model: Model,
beta: Union[float, Tensor],
objective: Optional[ScalarizedObjective] = None,
maximize: bool = True,
) -> None:
r"""Single-outcome Upper Confidence Bound.
Args:
model: A fitted single-outcome GP model (must be in batch mode if
candidate sets X will be)
beta: Either a scalar or a one-dim tensor with `b` elements (batch mode)
representing the trade-off parameter between mean and covariance
objective: A ScalarizedObjective (optional).
maximize: If True, consider the problem a maximization problem.
"""
super().__init__(model=model, objective=objective)
self.maximize = maximize
if not torch.is_tensor(beta):
beta = torch.tensor(beta)
self.register_buffer("beta", beta)
@t_batch_mode_transform(expected_q=1)
def forward(self, X: Tensor) -> Tensor:
r"""Evaluate the Upper Confidence Bound on the candidate set X.
Args:
X: A `(b) x 1 x d`-dim Tensor of `(b)` t-batches of `d`-dim design
points each.
Returns:
A `(b)`-dim Tensor of Upper Confidence Bound values at the given
design points `X`.
"""
self.beta = self.beta.to(X)
posterior = self._get_posterior(X=X)
batch_shape = X.shape[:-2]
mean = posterior.mean.view(batch_shape)
variance = posterior.variance.view(batch_shape)
delta = (self.beta.expand_as(mean) * variance).sqrt()
if self.maximize:
return mean + delta
else:
return mean - delta
class ConstrainedExpectedImprovement(AnalyticAcquisitionFunction):
r"""Constrained Expected Improvement (feasibility-weighted).
Computes the analytic expected improvement for a Normal posterior
distribution, weighted by a probability of feasibility. The objective and
constraints are assumed to be independent and have Gaussian posterior
distributions. Only supports the case `q=1`. The model should be
multi-outcome, with the index of the objective and constraints passed to
the constructor.
`Constrained_EI(x) = EI(x) * Product_i P(y_i \in [lower_i, upper_i])`,
where `y_i ~ constraint_i(x)` and `lower_i`, `upper_i` are the lower and
upper bounds for the i-th constraint, respectively.
Example:
>>> # example where 0th output has a non-negativity constraint and
... # 1st output is the objective
>>> model = SingleTaskGP(train_X, train_Y)
>>> constraints = {0: (0.0, None)}
>>> cEI = ConstrainedExpectedImprovement(model, 0.2, 1, constraints)
>>> cei = cEI(test_X)
"""
def __init__(
self,
model: Model,
best_f: Union[float, Tensor],
objective_index: int,
constraints: Dict[int, Tuple[Optional[float], Optional[float]]],
maximize: bool = True,
) -> None:
r"""Analytic Constrained Expected Improvement.
Args:
model: A fitted single-outcome model.
best_f: Either a scalar or a `b`-dim Tensor (batch mode) representing
the best function value observed so far (assumed noiseless).
objective_index: The index of the objective.
constraints: A dictionary of the form `{i: [lower, upper]}`, where
`i` is the output index, and `lower` and `upper` are lower and upper
bounds on that output (resp. interpreted as -Inf / Inf if None)
maximize: If True, consider the problem a maximization problem.
"""
# use AcquisitionFunction constructor to avoid check for objective
super(AnalyticAcquisitionFunction, self).__init__(model=model)
self.objective = None
self.maximize = maximize
self.objective_index = objective_index
self.constraints = constraints
self.register_buffer("best_f", torch.as_tensor(best_f))
self._preprocess_constraint_bounds(constraints=constraints)
self.register_forward_pre_hook(convert_to_target_pre_hook)
@t_batch_mode_transform(expected_q=1)
def forward(self, X: Tensor) -> Tensor:
r"""Evaluate Constrained Expected Improvement on the candidate set X.
Args:
X: A `(b) x 1 x d`-dim Tensor of `(b)` t-batches of `d`-dim design
points each.
Returns:
A `(b)`-dim Tensor of Expected Improvement values at the given
design points `X`.
"""
posterior = self._get_posterior(X=X)
means = posterior.mean.squeeze(dim=-2) # (b) x m
sigmas = posterior.variance.squeeze(dim=-2).sqrt().clamp_min(1e-9) # (b) x m
# (b) x 1
mean_obj = means[..., [self.objective_index]]
sigma_obj = sigmas[..., [self.objective_index]]
u = (mean_obj - self.best_f.expand_as(mean_obj)) / sigma_obj
if not self.maximize:
u = -u
normal = Normal(
torch.zeros(1, device=u.device, dtype=u.dtype),
torch.ones(1, device=u.device, dtype=u.dtype),
)
ei_pdf = torch.exp(normal.log_prob(u)) # (b) x 1
ei_cdf = normal.cdf(u)
ei = sigma_obj * (ei_pdf + u * ei_cdf)
prob_feas = self._compute_prob_feas(X=X, means=means, sigmas=sigmas)
ei = ei.mul(prob_feas)
return ei.squeeze(dim=-1)
def _preprocess_constraint_bounds(
self, constraints: Dict[int, Tuple[Optional[float], Optional[float]]]
) -> None:
r"""Set up constraint bounds.
Args:
constraints: A dictionary of the form `{i: [lower, upper]}`, where
`i` is the output index, and `lower` and `upper` are lower and upper
bounds on that output (resp. interpreted as -Inf / Inf if None)
"""
constraint_lower, self.constraint_lower_inds = [], []
constraint_upper, self.constraint_upper_inds = [], []
constraint_both, self.constraint_both_inds = [], []
constraint_indices = list(constraints.keys())
if len(constraint_indices) == 0:
raise ValueError("There must be at least one constraint.")
if self.objective_index in constraint_indices:
raise ValueError(
"Output corresponding to objective should not be a constraint."
)
for k in constraint_indices:
if constraints[k][0] is not None and constraints[k][1] is not None:
if constraints[k][1] <= constraints[k][0]:
raise ValueError("Upper bound is less than the lower bound.")
self.constraint_both_inds.append(k)
constraint_both.append([constraints[k][0], constraints[k][1]])
elif constraints[k][0] is not None:
self.constraint_lower_inds.append(k)
constraint_lower.append(constraints[k][0])
elif constraints[k][1] is not None:
self.constraint_upper_inds.append(k)
constraint_upper.append(constraints[k][1])
self.register_buffer(
"constraint_both", torch.tensor(constraint_both, dtype=torch.float)
)
self.register_buffer(
"constraint_lower", torch.tensor(constraint_lower, dtype=torch.float)
)
self.register_buffer(
"constraint_upper", torch.tensor(constraint_upper, dtype=torch.float)
)
def _compute_prob_feas(self, X: Tensor, means: Tensor, sigmas: Tensor) -> Tensor:
r"""Compute feasibility probability for each batch of X.
Args:
X: A `(b) x 1 x d`-dim Tensor of `(b)` t-batches of `d`-dim design
points each.
means: A `(b) x m`-dim Tensor of means.
sigmas: A `(b) x m`-dim Tensor of standard deviations.
Returns:
A `(b) x 1`-dim tensor of feasibility probabilities
Note: This function does case-work for upper bound, lower bound, and both-sided
bounds. Another way to do it would be to use 'inf' and -'inf' for the
one-sided bounds and use the logic for the both-sided case. But this
causes an issue with autograd since we get 0 * inf.
TODO: Investigate further.
"""
output_shape = X.shape[:-2] + torch.Size([1])
prob_feas = torch.ones(output_shape, device=X.device, dtype=X.dtype)
if len(self.constraint_lower_inds) > 0:
normal_lower = _construct_dist(means, sigmas, self.constraint_lower_inds)
prob_l = 1 - normal_lower.cdf(self.constraint_lower)
prob_feas = prob_feas.mul(torch.prod(prob_l, dim=-1, keepdim=True))
if len(self.constraint_upper_inds) > 0:
normal_upper = _construct_dist(means, sigmas, self.constraint_upper_inds)
prob_u = normal_upper.cdf(self.constraint_upper)
prob_feas = prob_feas.mul(torch.prod(prob_u, dim=-1, keepdim=True))
if len(self.constraint_both_inds) > 0:
normal_both = _construct_dist(means, sigmas, self.constraint_both_inds)
prob_u = normal_both.cdf(self.constraint_both[:, 1])
prob_l = normal_both.cdf(self.constraint_both[:, 0])
prob_feas = prob_feas.mul(torch.prod(prob_u - prob_l, dim=-1, keepdim=True))
return prob_feas
class NoisyExpectedImprovement(ExpectedImprovement):
r"""Single-outcome Noisy Expected Improvement (via fantasies).
This computes Noisy Expected Improvement by averaging over the Expected
Improvemnt values of a number of fantasy models. Only supports the case
`q=1`. Assumes that the posterior distribution of the model is Gaussian.
The model must be single-outcome.
`NEI(x) = E(max(y - max Y_baseline), 0)), (y, Y_baseline) ~ f((x, X_baseline))`,
where `X_baseline` are previously observed points.
Note: This acquisition function currently relies on using a FixedNoiseGP (required
for noiseless fantasies).
Example:
>>> model = FixedNoiseGP(train_X, train_Y, train_Yvar=train_Yvar)
>>> NEI = NoisyExpectedImprovement(model, train_X)
>>> nei = NEI(test_X)
"""
def __init__(
self,
model: GPyTorchModel,
X_observed: Tensor,
num_fantasies: int = 20,
maximize: bool = True,
) -> None:
r"""Single-outcome Noisy Expected Improvement (via fantasies).
Args:
model: A fitted single-outcome model.
X_observed: A `n x d` Tensor of observed points that are likely to
be the best observed points so far.
num_fantasies: The number of fantasies to generate. The higher this
number the more accurate the model (at the expense of model
complexity and performance).
maximize: If True, consider the problem a maximization problem.
"""
if not isinstance(model, FixedNoiseGP):
raise UnsupportedError(
"Only FixedNoiseGPs are currently supported for fantasy NEI"
)
# sample fantasies
with torch.no_grad():
posterior = model.posterior(X=X_observed)
sampler = SobolQMCNormalSampler(num_fantasies)
Y_fantasized = sampler(posterior).squeeze(-1)
batch_X_observed = X_observed.expand(num_fantasies, *X_observed.shape)
# The fantasy model will operate in batch mode
fantasy_model = _get_noiseless_fantasy_model(
model=model, batch_X_observed=batch_X_observed, Y_fantasized=Y_fantasized
)
if maximize:
best_f = Y_fantasized.max(dim=-1)[0]
else:
best_f = Y_fantasized.min(dim=-1)[0]
super().__init__(model=fantasy_model, best_f=best_f, maximize=maximize)
def forward(self, X: Tensor) -> Tensor:
r"""Evaluate Expected Improvement on the candidate set X.
Args:
X: A `b1 x ... bk x 1 x d`-dim batched tensor of `d`-dim design points.
Returns:
A `b1 x ... bk`-dim tensor of Noisy Expected Improvement values at
the given design points `X`.
"""
# add batch dimension for broadcasting to fantasy models
return super().forward(X.unsqueeze(-3)).mean(dim=-1)
def _construct_dist(means: Tensor, sigmas: Tensor, inds: Tensor) -> Normal:
mean = means[..., inds]
sigma = sigmas[..., inds]
return Normal(loc=mean, scale=sigma)
def _get_noiseless_fantasy_model(
model: FixedNoiseGP, batch_X_observed: Tensor, Y_fantasized: Tensor
) -> FixedNoiseGP:
r"""Construct a fantasy model from a fitted model and provided fantasies.
The fantasy model uses the hyperparameters from the original fitted model and
assumes the fantasies are noiseless.
Args:
model: a fitted FixedNoiseGP
batch_X_observed: A `b x n x d` tensor of inputs where `b` is the number of
fantasies.
Y_fantasized: A `b x n` tensor of fantasized targets where `b` is the number of
fantasies.
Returns:
The fantasy model.
"""
# initialize a copy of FixedNoiseGP on the original training inputs
# this makes FixedNoiseGP a non-batch GP, so that the same hyperparameters
# are used across all batches (by default, a GP with batched training data
# uses independent hyperparameters for each batch).
fantasy_model = FixedNoiseGP(
train_X=model.train_inputs[0],
train_Y=model.train_targets.unsqueeze(-1),
train_Yvar=model.likelihood.noise_covar.noise.unsqueeze(-1),
)
# update training inputs/targets to be batch mode fantasies
fantasy_model.set_train_data(
inputs=batch_X_observed, targets=Y_fantasized, strict=False
)
# use noiseless fantasies
fantasy_model.likelihood.noise_covar.noise = torch.full_like(Y_fantasized, 1e-7)
# load hyperparameters from original model
state_dict = deepcopy(model.state_dict())
fantasy_model.load_state_dict(state_dict)
return fantasy_model