/
skopt_learner.py
112 lines (93 loc) · 3.63 KB
/
skopt_learner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# -*- coding: utf-8 -*-
import numpy as np
from skopt import Optimizer
from adaptive.learner.base_learner import BaseLearner
from adaptive.notebook_integration import ensure_holoviews
from adaptive.utils import cache_latest
class SKOptLearner(Optimizer, BaseLearner):
"""Learn a function minimum using ``skopt.Optimizer``.
This is an ``Optimizer`` from ``scikit-optimize``,
with the necessary methods added to make it conform
to the ``adaptive`` learner interface.
Parameters
----------
function : callable
The function to learn.
**kwargs :
Arguments to pass to ``skopt.Optimizer``.
"""
def __init__(self, function, **kwargs):
self.function = function
self.pending_points = set()
self.data = {}
super().__init__(**kwargs)
def tell(self, x, y, fit=True):
self.pending_points.discard(x)
self.data[x] = y
super().tell([x], y, fit)
def tell_pending(self, x):
# 'skopt.Optimizer' takes care of points we
# have not got results for.
self.pending_points.add(x)
def remove_unfinished(self):
pass
@cache_latest
def loss(self, real=True):
if not self.models:
return np.inf
else:
model = self.models[-1]
# Return the in-sample error (i.e. test the model
# with the training data). This is not the best
# estimator of loss, but it is the cheapest.
return 1 - model.score(self.Xi, self.yi)
def ask(self, n, tell_pending=True):
if not tell_pending:
raise NotImplementedError(
"Asking points is an irreversible "
"action, so use `ask(n, tell_pending=True`."
)
points = super().ask(n)
# TODO: Choose a better estimate for the loss improvement.
if self.space.n_dims > 1:
return points, [self.loss() / n] * n
else:
return [p[0] for p in points], [self.loss() / n] * n
@property
def npoints(self):
"""Number of evaluated points."""
return len(self.Xi)
def plot(self, nsamples=200):
hv = ensure_holoviews()
if self.space.n_dims > 1:
raise ValueError("Can only plot 1D functions")
bounds = self.space.bounds[0]
if not self.Xi:
p = hv.Scatter([]) * hv.Curve([]) * hv.Area([])
else:
scatter = hv.Scatter(([p[0] for p in self.Xi], self.yi))
if self.models:
model = self.models[-1]
xs = np.linspace(*bounds, nsamples)
xsp = self.space.transform(xs.reshape(-1, 1).tolist())
y_pred, sigma = model.predict(xsp, return_std=True)
# Plot model prediction for function
curve = hv.Curve((xs, y_pred)).opts(style=dict(line_dash="dashed"))
# Plot 95% confidence interval as colored area around points
area = hv.Area(
(xs, y_pred - 1.96 * sigma, y_pred + 1.96 * sigma),
vdims=["y", "y2"],
).opts(style=dict(alpha=0.5, line_alpha=0))
else:
area = hv.Area([])
curve = hv.Curve([])
p = scatter * curve * area
# Plot with 5% empty margins such that the boundary points are visible
margin = 0.05 * (bounds[1] - bounds[0])
plot_bounds = (bounds[0] - margin, bounds[1] + margin)
return p.redim(x=dict(range=plot_bounds))
def _get_data(self):
return [x[0] for x in self.Xi], self.yi
def _set_data(self, data):
xs, ys = data
self.tell_many(xs, ys)