In [None]:
%load_ext autoreload
%autoreload 2
%config Completer.use_jedi = False

In [None]:
from copy import deepcopy
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
import sys
import torch
from tqdm import tqdm

In [None]:
sys.path.append("..")
plt.rcParams["figure.figsize"] = (3, 2)
mpl.rcParams['figure.dpi'] = 300

In [None]:
from easybo.gp import EasySingleTaskGPRegressor
from easybo.bo import ask
from easybo.logger import logging_mode
from easybo.utils import get_dummy_2d_data, set_grids, grids_to_coordinates

# Active learning in a 2d space

While it isn't really an "interesting" form of Bayesian Optimization, it is a useful one. Pure active learning is a 100% exploratory algorithm which seeks to minimize the overall variance of the estimator subject to some boundary constraints. For example, given an estimator with mean $\mu(x)$ and standard deviation $\sigma(x),$ the Active Learning policy, also called Maximum Variance (MaxVar) is defined by acquisition function

$$ A(x) = \sigma^2(x)$$

As the goal is to maximize the acquisition function, new points will always be sampled where the variance of the estimator is highest.

## The oracle

Every problem in autonomous experimentation must have a source of truth. This source of truth can be a real-world experiment, a simulation, or something else. Regardless, it must be "where the buck stops". The source of truth is considered the "right answer" with respect to the estimator prediction.

In this notebook, we create some dummy training data and also get a source of truth (a function called `truth`) which can be queried to get the "right" answer. However, in principle, you can define the `truth` function to be whatever you want. It must only take an input in the same vector space as your estimator's inputs, and return an output in the same vector space as your estimator's output.

In [None]:
grid_x1, grid_x2, train_x, train_y, truth, truth_meshgrid = get_dummy_2d_data(seed=124)
grid = grids_to_coordinates([grid_x1, grid_x2])

For example:

In [None]:
input_dimension = 2
X = np.array([0.234, 0.567]).reshape(1, input_dimension)
truth(X)

## The estimator

In this work, we will be using a **Gaussian Process** (GP) as our estimator for modeling the space. Specifically, suppose the input dimension is $D$, and the output dimension is 1. The GP is a mapping from $\mathbb{R}^D \mapsto \mathbb{R}$. The GP can also be sampled to produce a measure of uncertainty given some input point.

## The data

Suppose we have an experiment where observations are acquired sequentially, but on an arbitrary delay. In other words, suppose we propose an experiment on iteration `i`. That experiment's results will not be observed until the (`i+n`)th iteration. `n=1` corresponds to a delay time of zero.

To start the experiment, we choose some random points within the bounds of the experiment we wish to perform. In this simple example, we have a dimension `D=2` and, let's say, we have a delay time of `n=5`. Thus, we should propose 5 experiments to start. Once the first of these is observed, there will be 4 remaining in the queue, and our next proposed experiment should take into account that we have 4 points still in the queue.

In order to bootstrap the experiment, let's choose 4 random points from a `LatinHypercube` sampler in the space `D=2` subject to the constraints $x_1 \in [-4, 5]$ and $x_2 \in [-5, 4]$.

In [None]:
bounds = [[-4.0, 5.0], [-5.0, 4.0]]

In [None]:
n_pending = 5

In [None]:
from scipy.stats.qmc import LatinHypercube

In [None]:
sampler = LatinHypercube(d=2, seed=123)
sample = sampler.random(n=n_pending)

# We should scale the sample to our boundaries
sample[:, 0] = (bounds[0][1] - bounds[0][0]) * sample[:, 0] + bounds[0][0]
sample[:, 1] = (bounds[1][1] - bounds[1][0]) * sample[:, 1] + bounds[1][0]
sample

## Running the experiment

We now have our initial data, and our source of truth. All that remains is to run the experiment! The general process will be as follows:

1. Pop the queue to get the experiment that will be observed next.
2. Observe the result of that experiment.
3. Add that input-output pair to the training set.
4. Condition and train a GP on the current data.
5. Use that GP, acquisition function and optimizer to find the next experiment, _given_ the current pending experiments.
6. Add that new point to the queue.
7. Repeat from step 1.

In [None]:
# X = train_x.copy()
# Y = train_y.copy()

X = sample.copy()
Y = truth(X).reshape(-1, 1)
pointer = 1
normalize_inputs_to_unity = True

with logging_mode(success=True, warning=False):
    for iteration in tqdm(range(85), disable=True):

            # Condition the GP on the current data
            model = EasySingleTaskGPRegressor(
                train_x=X[:pointer + 1, :].copy(),
                train_y=Y[:pointer + 1, :].copy(),
                normalize_inputs_to_unity=normalize_inputs_to_unity,
                standardize_outputs=True
            )

            # Train the hyperparameters. In the case where this fails (it sometimes does,
            # see here, for example: `https://stats.stackexchange.com/questions/547490/
            # gaussian-process-regression-normalization-of-data-worsens-fit-why`),
            # we adjust the way the model scales its input parameterrs
            model.train_()
            if not model.training_state_successful:
                normalize_inputs_to_unity = False
                continue

            # Ask the BO engine which next point we should use
            new_points = ask(
                model=model,
                bounds=bounds,
                X_pending=X[pointer + 1:, :].copy(),
                acquisition_function="qMaxVariance",
                acquisition_function_kwargs=dict(),
                optimize_acqf_kwargs={"q": 1, "num_restarts": 5, "raw_samples": 20}
            )
            
            # Get the current observation. Here, `truth` will have to be implemented in a real experiment!
            current_obs = truth(new_points).reshape(-1, 1)

            # Add this new point to the queue
            X = np.concatenate([X, new_points.reshape(1, input_dimension)], axis=0)
            Y = np.concatenate([Y, current_obs], axis=0)

            # Repeat!
            pointer += 1

In [None]:
pred = model.predict(grid=grid)
mu = pred["mean"].reshape(len(grid_x2), len(grid_x1))
z = truth_meshgrid(grid_x1, grid_x2)
z_min = -np.abs(z).max()
z_max = np.abs(z).max()

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(4, 2), sharey=True, sharex=True)

ax = axs[0]
c = ax.imshow(
    z.T, cmap='rainbow', vmin=z_min, vmax=z_max,
    extent=[grid_x1.min(), grid_x1.max(), grid_x2.min(), grid_x2.max()],
    interpolation ='nearest', origin ='lower'
)
set_grids(ax, grid=False)
ax.set_title("Function")

ax = axs[1]
c = ax.imshow(
    mu, cmap='rainbow', vmin=z_min, vmax=z_max,
    extent=[grid_x1.min(), grid_x1.max(), grid_x2.min(), grid_x2.max()],
    interpolation ='nearest', origin ='lower'
)
set_grids(ax, grid=False)
ax.scatter(X[:, 0], X[:, 1], s=5, color="black")
ax.scatter(sample[:, 0], sample[:, 1], s=3, color="blue")
ax.set_title("GP")

plt.show()