In [1]:
# @title Utils

import math
import torch
import torch.optim as optim

from tqdm import tqdm

try:
    import gpytorch
except:
    !pip install gpytorch
    import gpytorch

try:
    import botorch
except:
    !pip install botorch
    import botorch

from matplotlib import pyplot as plt

%load_ext autoreload
%autoreload 2

from math import exp
import numpy as np

np.random.seed(1)

# from botorch.models import SingleTaskGP
from gpytorch.constraints import GreaterThan
from gpytorch.mlls import ExactMarginalLogLikelihood
from torch.optim import SGD


# https://gist.github.com/neubig/e859ef0cc1a63d1c2ea4
def rbf_kernel(x1, x2, variance=0.05):
    return exp(-1 * ((x1 - x2) ** 2) / (2 * variance))


def gram_matrix(xs):
    return [[rbf_kernel(x1, x2) for x2 in xs] for x1 in xs]


def ground_truth(draw=False):
    xs = np.arange(0, 1, 0.001)
    mean = np.zeros(xs.shape[0])
    gram = gram_matrix(xs)

    np.random.seed(62)
    ys = np.random.multivariate_normal(mean, gram)
    if draw:
        plt.plot(xs, ys, color="blue", alpha=0.1)

    return xs, ys


def query(x):
    xs, ys = ground_truth()
    xs_new = np.concatenate([xs, x])
    mean_new = np.zeros(xs_new.shape[0])
    gram_new = gram_matrix(xs_new)

    np.random.seed(62)
    ys_new = np.random.multivariate_normal(mean_new, gram_new)

    return ys_new[-x.shape[0] :]


# We will use the simplest form of GP model, exact inference
class ExactGPModel(gpytorch.models.ExactGP):
    def __init__(self, train_x, train_y, likelihood):
        super(ExactGPModel, self).__init__(train_x, train_y, likelihood)
        self.mean_module = gpytorch.means.ConstantMean()
        self.covar_module = gpytorch.kernels.ScaleKernel(gpytorch.kernels.RBFKernel())

    def forward(self, x):
        mean_x = self.mean_module(x)
        covar_x = self.covar_module(x)
        return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting gpytorch
  Downloading gpytorch-1.8.1-py2.py3-none-any.whl (361 kB)
[K     |████████████████████████████████| 361 kB 5.1 MB/s 
Installing collected packages: gpytorch
Successfully installed gpytorch-1.8.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting botorch
  Downloading botorch-0.6.6-py3-none-any.whl (387 kB)
[K     |████████████████████████████████| 387 kB 4.5 MB/s 
[?25hCollecting pyro-ppl>=1.8.0
  Downloading pyro_ppl-1.8.2-py3-none-any.whl (722 kB)
[K     |████████████████████████████████| 722 kB 9.8 MB/s 
Collecting pyro-api>=0.1.1
  Downloading pyro_api-0.1.2-py3-none-any.whl (11 kB)
Installing collected packages: pyro-api, pyro-ppl, botorch
Successfully installed botorch-0.6.6 pyro-api-0.1.2 pyro-ppl-1.8.2


In [2]:
# @title SingleTaskGP

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

r"""
Gaussian Process Regression models based on GPyTorch models.

These models are often a good starting point and are further documented in the
tutorials.

`SingleTaskGP`, `FixedNoiseGP`, and `HeteroskedasticSingleTaskGP` are all
single-task exact GP models, differing in how they treat noise. They use
relatively strong priors on the Kernel hyperparameters, which work best when
covariates are normalized to the unit cube and outcomes are standardized (zero
mean, unit variance).

These models all work in batch mode (each batch having its own hyperparameters).
When the training observations include multiple outputs, these models use
batching to model outputs independently.

These models all support multiple outputs. However, as single-task models,
`SingleTaskGP`, `FixedNoiseGP`, and `HeteroskedasticSingleTaskGP` should be
used only when the outputs are independent and all use the same training data.
If outputs are independent and outputs have different training data, use the
`ModelListGP`. When modeling correlations between outputs, use a multi-task
model like `MultiTaskGP`.
"""

from __future__ import annotations

from typing import Any, List, Optional, Union

import torch
from botorch import settings
from botorch.models.gpytorch import BatchedMultiOutputGPyTorchModel
from botorch.models.transforms.input import InputTransform
from botorch.models.transforms.outcome import Log, OutcomeTransform
from botorch.models.utils import fantasize as fantasize_flag, validate_input_scaling
from botorch.sampling.samplers import MCSampler
from gpytorch.constraints.constraints import GreaterThan
from gpytorch.distributions.multivariate_normal import MultivariateNormal
from gpytorch.kernels.matern_kernel import MaternKernel
from gpytorch.kernels.scale_kernel import ScaleKernel
from gpytorch.kernels import RBFKernel
from gpytorch.likelihoods.gaussian_likelihood import (
    _GaussianLikelihoodBase,
    FixedNoiseGaussianLikelihood,
    GaussianLikelihood,
)
from gpytorch.likelihoods.likelihood import Likelihood
from gpytorch.likelihoods.noise_models import HeteroskedasticNoise
from gpytorch.means.constant_mean import ConstantMean
from gpytorch.means.mean import Mean
from gpytorch.mlls.noise_model_added_loss_term import NoiseModelAddedLossTerm
from gpytorch.models.exact_gp import ExactGP
from gpytorch.module import Module
from gpytorch.priors.smoothed_box_prior import SmoothedBoxPrior
from gpytorch.priors.torch_priors import GammaPrior
from torch import Tensor


MIN_INFERRED_NOISE_LEVEL = 1e-4


class SingleTaskGP(BatchedMultiOutputGPyTorchModel, ExactGP):
    r"""A single-task exact GP model.

    A single-task exact GP using relatively strong priors on the Kernel
    hyperparameters, which work best when covariates are normalized to the unit
    cube and outcomes are standardized (zero mean, unit variance).

    This model works in batch mode (each batch having its own hyperparameters).
    When the training observations include multiple outputs, this model will use
    batching to model outputs independently.

    Use this model when you have independent output(s) and all outputs use the
    same training data. If outputs are independent and outputs have different
    training data, use the ModelListGP. When modeling correlations between
    outputs, use the MultiTaskGP.

    Example:
        >>> train_X = torch.rand(20, 2)
        >>> train_Y = torch.sin(train_X).sum(dim=1, keepdim=True)
        >>> model = SingleTaskGP(train_X, train_Y)
    """

    def __init__(
        self,
        train_X: Tensor,
        train_Y: Tensor,
        likelihood: Optional[Likelihood] = None,
        covar_module: Optional[Module] = None,
        mean_module: Optional[Mean] = None,
        outcome_transform: Optional[OutcomeTransform] = None,
        input_transform: Optional[InputTransform] = None,
    ) -> None:
        r"""
        Args:
            train_X: A `batch_shape x n x d` tensor of training features.
            train_Y: A `batch_shape x n x m` tensor of training observations.
            likelihood: A likelihood. If omitted, use a standard
                GaussianLikelihood with inferred noise level.
            covar_module: The module computing the covariance (Kernel) matrix.
                If omitted, use a `MaternKernel`.
            mean_module: The mean function to be used. If omitted, use a
                `ConstantMean`.
            outcome_transform: An outcome transform that is applied to the
                training data during instantiation and to the posterior during
                inference (that is, the `Posterior` obtained by calling
                `.posterior` on the model will be on the original scale).
            input_transform: An input transform that is applied in the model's
                forward pass.
        """
        with torch.no_grad():
            transformed_X = self.transform_inputs(
                X=train_X, input_transform=input_transform
            )
        if outcome_transform is not None:
            train_Y, _ = outcome_transform(train_Y)
        self._validate_tensor_args(X=transformed_X, Y=train_Y)
        ignore_X_dims = getattr(self, "_ignore_X_dims_scaling_check", None)
        validate_input_scaling(
            train_X=transformed_X, train_Y=train_Y, ignore_X_dims=ignore_X_dims
        )
        self._set_dimensions(train_X=train_X, train_Y=train_Y)
        train_X, train_Y, _ = self._transform_tensor_args(X=train_X, Y=train_Y)
        if likelihood is None:
            # noise_prior = GammaPrior(1.1, 0.05)
            noise_prior = GammaPrior(concentration=0.5, rate=1)
            noise_prior_mode = (noise_prior.concentration - 1) / noise_prior.rate
            likelihood = GaussianLikelihood(
                noise_prior=noise_prior,
                batch_shape=self._aug_batch_shape,
                noise_constraint=GreaterThan(
                    MIN_INFERRED_NOISE_LEVEL,
                    transform=None,
                    initial_value=noise_prior_mode,
                ),
            )
        else:
            self._is_custom_likelihood = True
        ExactGP.__init__(self, train_X, train_Y, likelihood)
        if mean_module is None:
            mean_module = ConstantMean(batch_shape=self._aug_batch_shape)
        self.mean_module = mean_module
        if covar_module is None:
            covar_module = ScaleKernel(
                # MaternKernel(
                #     nu=2.5,
                #     ard_num_dims=transformed_X.shape[-1],
                #     batch_shape=self._aug_batch_shape,
                #     lengthscale_prior=GammaPrior(3.0, 6.0),),
                RBFKernel(
                    ard_num_dims=transformed_X.shape[-1],
                    batch_shape=self._aug_batch_shape,
                    lengthscale_prior=GammaPrior(3.0, 6.0),
                ),
                batch_shape=self._aug_batch_shape,
                outputscale_prior=GammaPrior(2.0, 0.15),
            )
            self._subset_batch_dict = {
                "likelihood.noise_covar.raw_noise": -2,
                "mean_module.raw_constant": -1,
                "covar_module.raw_outputscale": -1,
                "covar_module.base_kernel.raw_lengthscale": -3,
            }
        self.covar_module = covar_module
        # TODO: Allow subsetting of other covar modules
        if outcome_transform is not None:
            self.outcome_transform = outcome_transform
        if input_transform is not None:
            self.input_transform = input_transform
        self.to(train_X)

    def forward(self, x: Tensor) -> MultivariateNormal:
        if self.training:
            x = self.transform_inputs(x)
        mean_x = self.mean_module(x)
        covar_x = self.covar_module(x)
        return MultivariateNormal(mean_x, covar_x)

In [3]:
# @title ModelInit

train = False
noise = False

train_x = np.linspace(0, 1, 4)
train_x = np.append(train_x, np.array([0.2]))
train_y = query(train_x)

train_x = torch.tensor(train_x, dtype=torch.float64)
train_y = torch.tensor(train_y, dtype=torch.float64)
train_x = train_x[:, None]
train_y = train_y[:, None]

model = SingleTaskGP(train_X=train_x, train_Y=train_y)
if noise:
    model.likelihood.noise_covar.register_constraint("raw_noise", GreaterThan(1e-6))

mll = ExactMarginalLogLikelihood(likelihood=model.likelihood, model=model)
mll = mll.to(train_x)

if not train:
    model.covar_module.base_kernel.lengthscale = 1
    model.covar_module.outputscale = 1
    model.likelihood.noise = 1e-10
else:
    optimizer = SGD([{"params": model.parameters()}], lr=0.05)
    NUM_EPOCHS = 10000

    model.train()

    for epoch in range(NUM_EPOCHS):
        optimizer.zero_grad()
        # forward pass through the model to
        # obtain the output MultivariateNormal
        output = model(train_x)
        # Compute negative marginal log likelihood
        loss = -mll(output, model.train_targets)
        # back prop gradients
        loss.backward()
        # print every 200 iterations
        if (epoch + 1) % 200 == 0:
            print(
                f"Epoch {epoch+1:>3}/{NUM_EPOCHS} - Loss: {loss.item():>4.10f} "
                f"lengthscale: {model.covar_module.base_kernel.lengthscale.item():>4.10f} "
                f"outputscale: {model.covar_module.outputscale.item():>4.10f} "
                f"noise: {model.likelihood.noise.item():>4.10f}"
            )

        optimizer.step()

# set model (and likelihood)
model.eval()

SingleTaskGP(
  (likelihood): GaussianLikelihood(
    (noise_covar): HomoskedasticNoise(
      (noise_prior): GammaPrior()
      (raw_noise_constraint): GreaterThan(1.000E-04)
    )
  )
  (mean_module): ConstantMean()
  (covar_module): ScaleKernel(
    (base_kernel): RBFKernel(
      (lengthscale_prior): GammaPrior()
      (raw_lengthscale_constraint): Positive()
    )
    (outputscale_prior): GammaPrior()
    (raw_outputscale_constraint): Positive()
  )
)

In [None]:
# @title Naive Myopic
prev_x = 0.2
temp = []
for x0 in torch.linspace(prev_x - 0.1, prev_x + 0.1, 10):
    x0 = x0.reshape(1)
    p_y0_on_x0_D0 = model.posterior(x0)

    p_f_on_D1 = model.condition_on_observations(x0, p_y0_on_x0_D0.mean)

    for a in torch.linspace(x0.item() - 0.1, x0.item() + 0.1, 10):
        a = a.reshape(1)
        p_y1_a_D1 = p_f_on_D1.posterior(a)
        temp.append([x0, a, p_y1_a_D1.mean])

temp = torch.tensor(temp)
best = torch.argmin(temp, dim=0)[2].item()
best_x, best_a, best_hes = temp[best].numpy().tolist()

plt.figure(figsize=(7, 7))
ground_truth(draw=True)

plt.vlines(prev_x, -1, 1, color="black")
plt.vlines(prev_x - 0.1, -1, 1, color="black", linestyle="--")
plt.vlines(prev_x + 0.1, -1, 1, color="black", linestyle="--")

plt.vlines(best_x, -1, 1, color="red")
plt.vlines(best_a, -1, 1, color="blue")

plt.plot(train_x.cpu().numpy(), train_y.cpu().numpy(), "k*")

# compute posterior
test_x = torch.linspace(0, 1, 100)
posterior = model.posterior(test_x)
test_y = posterior.mean
lower, upper = posterior.mvn.confidence_region()

plt.plot(test_x.cpu().detach().numpy(), test_y.cpu().detach().numpy(), "green")

plt.fill_between(
    test_x.cpu().detach().numpy(),
    lower.cpu().detach().numpy(),
    upper.cpu().detach().numpy(),
    alpha=0.25,
)

plt.tight_layout()
plt.ylim(-1, 1)

plt.show()

In [None]:
# @title Naive 3 steps

prev_x = 0.2
temp = []
for x0 in tqdm(torch.linspace(prev_x - 0.1, prev_x + 0.1, 10)):
    x0 = x0.reshape(1)
    p_y0_on_x0_D0 = model.posterior(x0)
    p_f_on_D1 = model.condition_on_observations(x0, p_y0_on_x0_D0.mean)

    for x1 in torch.linspace(x0.item() - 0.1, x0.item() + 0.1, 10):
        x1 = x1.reshape(1)
        p_y1_on_x1_D1 = p_f_on_D1.posterior(x1)
        p_f_on_D2 = p_f_on_D1.condition_on_observations(x1, p_y1_on_x1_D1.mean)

        for x2 in torch.linspace(x1.item() - 0.1, x1.item() + 0.1, 10):
            x2 = x2.reshape(1)
            p_y2_on_x2_D2 = p_f_on_D2.posterior(x2)
            p_f_on_D3 = p_f_on_D2.condition_on_observations(x2, p_y2_on_x2_D2.mean)

            for a in torch.linspace(x2.item() - 0.1, x2.item() + 0.1, 10):
                a = a.reshape(1)
                p_y3_a_D3 = p_f_on_D3.posterior(a)
                temp.append([x0, a, p_y3_a_D3.mean])

temp = torch.tensor(temp)
best = torch.argmin(temp, dim=0)[2].item()
best_x, best_a, best_hes = temp[best].numpy().tolist()

plt.figure(figsize=(7, 7))
ground_truth(draw=True)

plt.vlines(prev_x, -1, 1, color="black")
plt.vlines(prev_x - 0.1, -1, 1, color="black", linestyle="--")
plt.vlines(prev_x + 0.1, -1, 1, color="black", linestyle="--")

plt.vlines(best_x, -1, 1, color="red")
plt.vlines(best_a, -1, 1, color="blue")

plt.plot(train_x.cpu().numpy(), train_y.cpu().numpy(), "k*")

# compute posterior
test_x = torch.linspace(0, 1, 100)
posterior = model.posterior(test_x)
test_y = posterior.mean
lower, upper = posterior.mvn.confidence_region()

plt.plot(test_x.cpu().detach().numpy(), test_y.cpu().detach().numpy(), "green")

plt.fill_between(
    test_x.cpu().detach().numpy(),
    lower.cpu().detach().numpy(),
    upper.cpu().detach().numpy(),
    alpha=0.25,
)

plt.tight_layout()
plt.ylim(-1, 1)

plt.show()

In [None]:
# @title Joint 3 Steps


def compute_ehig(x0, x1, x2, a):
    p_y0_on_x0_D0 = model.posterior(x0)
    p_f_on_D1 = model.condition_on_observations(x0, p_y0_on_x0_D0.mean)

    p_y1_on_x1_D1 = p_f_on_D1.posterior(x1)
    p_f_on_D2 = p_f_on_D1.condition_on_observations(x1, p_y1_on_x1_D1.mean)

    p_y2_on_x2_D2 = p_f_on_D2.posterior(x2)
    p_f_on_D3 = p_f_on_D2.condition_on_observations(x2, p_y2_on_x2_D2.mean)

    p_y3_a_D3 = p_f_on_D3.posterior(a)

    ehig = p_y3_a_D3.mean

    return ehig


prev_x = 0.2
temp = []
for x0 in tqdm(torch.linspace(prev_x - 0.1, prev_x + 0.1, 10)):
    if not 0 <= x0.item() <= 1:
        continue
    x0 = x0.reshape(1)
    for x1 in torch.linspace(x0.item() - 0.1, x0.item() + 0.1, 10):
        if not 0 <= x1.item() <= 1:
            continue
        x1 = x1.reshape(1)
        for x2 in torch.linspace(x1.item() - 0.1, x1.item() + 0.1, 10):
            if not 0 <= x2.item() <= 1:
                continue
            x2 = x2.reshape(1)
            for a in torch.linspace(x2.item() - 0.1, x2.item() + 0.1, 10):
                if not 0 <= a.item() <= 1:
                    continue
                a = a.reshape(1)

                ehig = compute_ehig(x0, x1, x2, a)

                temp.append([x0, a, ehig])

temp = torch.tensor(temp)
best = torch.argmin(temp, dim=0)[2].item()
best_x, best_a, best_hes = temp[best].numpy().tolist()

plt.figure(figsize=(7, 7))
ground_truth(draw=True)

plt.vlines(prev_x, -1, 1, color="black")
plt.vlines(prev_x - 0.1, -1, 1, color="black", linestyle="--")
plt.vlines(prev_x + 0.1, -1, 1, color="black", linestyle="--")

plt.vlines(best_x, -1, 1, color="red")
plt.vlines(best_a, -1, 1, color="blue")

plt.plot(train_x.cpu().numpy(), train_y.cpu().numpy(), "k*")

# compute posterior
test_x = torch.linspace(0, 1, 100)
posterior = model.posterior(test_x)
test_y = posterior.mean
lower, upper = posterior.mvn.confidence_region()

plt.plot(test_x.cpu().detach().numpy(), test_y.cpu().detach().numpy(), "green")

plt.fill_between(
    test_x.cpu().detach().numpy(),
    lower.cpu().detach().numpy(),
    upper.cpu().detach().numpy(),
    alpha=0.25,
)

plt.tight_layout()
plt.ylim(-1, 1)

plt.show()

In [None]:
# @title Joint 3 Steps with sigmoid


def compute_ehig(x0, x1, x2, a):
    """
    x0 to a: unconstraint optimization parameters
    """
    x0 = torch.sigmoid(x0) * 0.2 + (prev_x - 0.1)

    x1 = torch.sigmoid(x1) * 0.2 + (x0 - 0.1)

    x2 = torch.sigmoid(x2) * 0.2 + (x1 - 0.1)
    # x2 = x2 * 2

    a = torch.sigmoid(a) * 0.2 + (x2 - 0.1)
    # a = a * 2

    x0 = x0.reshape(1)
    x1 = x1.reshape(1)
    x2 = x2.reshape(1)
    a = a.reshape(1)

    p_y0_on_x0_D0 = model.posterior(x0)
    p_f_on_D1 = model.condition_on_observations(x0, p_y0_on_x0_D0.mean)

    p_y1_on_x1_D1 = p_f_on_D1.posterior(x1)
    p_f_on_D2 = p_f_on_D1.condition_on_observations(x1, p_y1_on_x1_D1.mean)

    p_y2_on_x2_D2 = p_f_on_D2.posterior(x2)
    p_f_on_D3 = p_f_on_D2.condition_on_observations(x2, p_y2_on_x2_D2.mean)

    p_y3_a_D3 = p_f_on_D3.posterior(a)

    ehig = p_y3_a_D3.mean

    return x0, a, ehig


prev_x = 0.2
temp = []
for x0 in tqdm(torch.linspace(-10, 10, 2)):
    for x1 in torch.linspace(-10, 10, 2):
        for x2 in torch.linspace(-10, 10, 2):
            for a in torch.linspace(-10, 10, 2):
                x0_, a_, ehig = compute_ehig(x0, x1, x2, a)

                temp.append([x0_, a_, ehig])

temp = torch.tensor(temp)
best = torch.argmin(temp, dim=0)[2].item()
best_x, best_a, best_hes = temp[best].numpy().tolist()

plt.figure(figsize=(7, 7))
ground_truth(draw=True)

plt.vlines(prev_x, -1, 1, color="black")
plt.vlines(prev_x - 0.1, -1, 1, color="black", linestyle="--")
plt.vlines(prev_x + 0.1, -1, 1, color="black", linestyle="--")

plt.vlines(best_x, -1, 1, color="red")
plt.vlines(best_a, -1, 1, color="blue")

plt.plot(train_x.cpu().numpy(), train_y.cpu().numpy(), "k*")

# compute posterior
test_x = torch.linspace(0, 1, 100)
posterior = model.posterior(test_x)
test_y = posterior.mean
lower, upper = posterior.mvn.confidence_region()

plt.plot(test_x.cpu().detach().numpy(), test_y.cpu().detach().numpy(), "green")

plt.fill_between(
    test_x.cpu().detach().numpy(),
    lower.cpu().detach().numpy(),
    upper.cpu().detach().numpy(),
    alpha=0.25,
)

plt.tight_layout()
plt.ylim(-1, 1)

plt.show()

In [None]:
# @title Joint 3 Steps with sigmoid and adam


def compute_ehig(x0, x1, x2, a):
    """
    x0 to a: unconstraint optimization parameters
    """

    x0 = torch.sigmoid(x0) * 0.2 + (prev_x - 0.1)

    x1 = torch.sigmoid(x1) * 0.2 + (x0 - 0.1)

    x2 = torch.sigmoid(x2) * 0.2 + (x1 - 0.1)
    # x2 = x2 * 2

    a = torch.sigmoid(a) * 0.2 + (x2 - 0.1)
    # a = a * 2

    x0 = x0.reshape(1)
    x1 = x1.reshape(1)
    x2 = x2.reshape(1)
    a = a.reshape(1)

    p_y0_on_x0_D0 = model.posterior(x0)
    p_f_on_D1 = model.condition_on_observations(x0, p_y0_on_x0_D0.mean)

    p_y1_on_x1_D1 = p_f_on_D1.posterior(x1)
    p_f_on_D2 = p_f_on_D1.condition_on_observations(x1, p_y1_on_x1_D1.mean)

    p_y2_on_x2_D2 = p_f_on_D2.posterior(x2)
    p_f_on_D3 = p_f_on_D2.condition_on_observations(x2, p_y2_on_x2_D2.mean)

    p_y3_a_D3 = p_f_on_D3.posterior(a)
    ehig = p_y3_a_D3.mean

    return x0, a, ehig.squeeze()


prev_x = 0.2
temp = []

import torch.optim as optim

x0 = (torch.rand(1) * 6).requires_grad_(True)
x1 = (torch.rand(1) * 6).requires_grad_(True)
x2 = (torch.rand(1) * 6).requires_grad_(True)
a = (torch.rand(1) * 6).requires_grad_(True)

optimizer = optim.Adam([x0, x1, x2, a], lr=0.1)
for epoch in tqdm(range(1000)):
    optimizer.zero_grad()
    x0_, a_, ehig = compute_ehig(x0, x1, x2, a)
    temp.append([x0_, a_, ehig])
    ehig.backward(retain_graph=True)
    optimizer.step()

    if epoch % 100 == 0:
        print(ehig.item(), x0, x1, x2, a)


temp = torch.tensor(temp)
best = torch.argmin(temp, dim=0)[2].item()
best_x, best_a, best_hes = temp[best].numpy().tolist()

plt.figure(figsize=(7, 7))
ground_truth(draw=True)

plt.vlines(prev_x, -1, 1, color="black")
plt.vlines(prev_x - 0.1, -1, 1, color="black", linestyle="--")
plt.vlines(prev_x + 0.1, -1, 1, color="black", linestyle="--")

plt.vlines(best_x, -1, 1, color="red")
plt.vlines(best_a, -1, 1, color="blue")

plt.plot(train_x.cpu().numpy(), train_y.cpu().numpy(), "k*")

# compute posterior
test_x = torch.linspace(0, 1, 100)
posterior = model.posterior(test_x)
test_y = posterior.mean
lower, upper = posterior.mvn.confidence_region()

plt.plot(test_x.cpu().detach().numpy(), test_y.cpu().detach().numpy(), "green")

plt.fill_between(
    test_x.cpu().detach().numpy(),
    lower.cpu().detach().numpy(),
    upper.cpu().detach().numpy(),
    alpha=0.25,
)

plt.tight_layout()
plt.ylim(-1, 1)

plt.show()

In [None]:
# @title Joint 3 Steps with sigmoid and adam and sampling

from botorch.sampling.samplers import SobolQMCNormalSampler
from botorch import settings


def compute_ehig(x0, x1, x2, a):
    """
    x0 to a: unconstraint optimization parameters
    """
    with settings.propagate_grads(state=True):
        x0 = torch.sigmoid(x0) * 0.2 + (prev_x - 0.1)
        x1 = torch.sigmoid(x1) * 0.2 + (x0 - 0.1)
        x2 = torch.sigmoid(x2) * 0.2 + (x1 - 0.1)
        a = torch.sigmoid(a) * 0.2 + (x2 - 0.1)

        ehigs = 0
        p_y0_on_x0_D0 = model.posterior(x0)
        sampler = SobolQMCNormalSampler(
            num_samples=4, resample=False, collapse_batch_dims=True
        )
        sample_y0_on_x0_D0 = sampler(p_y0_on_x0_D0)

        for ind_x1, y0_on_x0_D0 in enumerate(sample_y0_on_x0_D0):
            p_f_on_D1 = model.condition_on_observations(x0, y0_on_x0_D0)
            p_y1_on_x1_D1 = p_f_on_D1.posterior(x1[ind_x1, :])
            sample_y1_on_x1_D1 = sampler(p_y1_on_x1_D1)

            for ind_x2, y1_on_x1_D1 in enumerate(sample_y1_on_x1_D1):
                p_f_on_D2 = p_f_on_D1.condition_on_observations(
                    x1[ind_x1, :], y1_on_x1_D1
                )
                p_y2_on_x2_D2 = p_f_on_D2.posterior(x2[ind_x1, ind_x2, :])
                sample_y2_on_x2_D2 = sampler(p_y2_on_x2_D2)

                for ind_a, p_y2_on_x2_D2 in enumerate(sample_y2_on_x2_D2):
                    p_f_on_D3 = p_f_on_D2.condition_on_observations(
                        x2[ind_x1, ind_x2, :], p_y2_on_x2_D2
                    )
                    p_y3_a_D3 = p_f_on_D3.posterior(a[ind_x1, ind_x2, ind_a, :])

                    ehig = p_y3_a_D3.mean
                    ehigs = ehigs + ehig

        ehig = ehigs / (4**3)
    return x0, a, ehig.squeeze()


prev_x = 0.2
temp = []

x0 = (torch.rand(1) * 10).requires_grad_(True)
x1 = (torch.rand(4, 1) * 10).requires_grad_(True)
x2 = (torch.rand(4, 4, 1) * 10).requires_grad_(True)
a = (torch.rand(4, 4, 4, 1) * 10).requires_grad_(True)

optimizer = optim.Adam([x0, x1, x2, a], lr=5)
for epoch in tqdm(range(100)):
    optimizer.zero_grad()
    x0_, a_, ehig = compute_ehig(x0, x1, x2, a)
    temp.append([x0_, a_, ehig])
    ehig.backward(retain_graph=True)
    optimizer.step()

    if epoch % 10 == 0:
        print("x0 ", x0.item(), "a", a[0, 0, 0].item(), "loss", ehig.item())

# for x0 in tqdm(torch.linspace(-10, 10, 2)):
#     x0 = x0.reshape(1)
#     x0_, a_, ehig = compute_ehig(x0, x1, x2, a)
#     print(ehig)

tmp = []
for i in range(len(temp)):
    tmp.append([temp[i][0], temp[i][1][0, 0, 0], temp[i][2]])

tmp = torch.tensor(tmp)
best = torch.argmin(tmp, dim=0)[2].item()
best_x, best_a, best_hes = tmp[best].numpy().tolist()

plt.figure(figsize=(7, 7))
ground_truth(draw=True)

plt.vlines(prev_x, -1, 1, color="black", label="current location")
plt.vlines(prev_x - 0.1, -1, 1, color="black", linestyle="--")
plt.vlines(prev_x + 0.1, -1, 1, color="black", linestyle="--")

plt.vlines(best_x, -1, 1, color="red", label="optimal query")
plt.vlines(best_a, -1, 1, color="blue", label="optimal action")

plt.plot(train_x.cpu().numpy(), train_y.cpu().numpy(), "k*")

# compute posterior
test_x = torch.linspace(0, 1, 100)
posterior = model.posterior(test_x)
test_y = posterior.mean
lower, upper = posterior.mvn.confidence_region()

plt.plot(
    test_x.cpu().detach().numpy(),
    test_y.cpu().detach().numpy(),
    "green",
    label="Posterior mean",
)

plt.fill_between(
    test_x.cpu().detach().numpy(),
    lower.cpu().detach().numpy(),
    upper.cpu().detach().numpy(),
    alpha=0.25,
)

plt.tight_layout()
plt.ylim(-1, 1)
plt.legend()

plt.show()

In [None]:
# @title Joint 5 Steps with sigmoid and adam and sampling

from botorch.sampling.samplers import SobolQMCNormalSampler
from botorch import settings


def compute_ehig(x0, x1, x2, x3, x4, a):
    """
    x0 to a: unconstraint optimization parameters
    """
    with settings.propagate_grads(state=True):
        x0 = torch.sigmoid(x0) * 0.2 + (prev_x - 0.1)
        x1 = torch.sigmoid(x1) * 0.2 + (x0 - 0.1)
        x2 = torch.sigmoid(x2) * 0.2 + (x1 - 0.1)
        x3 = torch.sigmoid(x3) * 0.2 + (x2 - 0.1)
        x4 = torch.sigmoid(x4) * 0.2 + (x3 - 0.1)
        a = torch.sigmoid(a) * 0.2 + (x4 - 0.1)

        ehigs = 0
        p_y0_on_x0_D0 = model.posterior(x0)
        sampler = SobolQMCNormalSampler(
            num_samples=4, resample=False, collapse_batch_dims=True
        )
        sample_y0_on_x0_D0 = sampler(p_y0_on_x0_D0)

        for ind_x1, y0_on_x0_D0 in enumerate(sample_y0_on_x0_D0):
            p_f_on_D1 = model.condition_on_observations(x0, y0_on_x0_D0)
            p_y1_on_x1_D1 = p_f_on_D1.posterior(x1[ind_x1, :])
            sample_y1_on_x1_D1 = sampler(p_y1_on_x1_D1)

            for ind_x2, y1_on_x1_D1 in enumerate(sample_y1_on_x1_D1):
                p_f_on_D2 = p_f_on_D1.condition_on_observations(
                    x1[ind_x1, :], y1_on_x1_D1
                )
                p_y2_on_x2_D2 = p_f_on_D2.posterior(x2[ind_x1, ind_x2, :])
                sample_y2_on_x2_D2 = sampler(p_y2_on_x2_D2)

                for ind_x3, y2_on_x2_D2 in enumerate(sample_y2_on_x2_D2):
                    p_f_on_D3 = p_f_on_D2.condition_on_observations(
                        x2[ind_x1, ind_x2, :], y2_on_x2_D2
                    )
                    p_y3_x3_D3 = p_f_on_D3.posterior(x3[ind_x1, ind_x2, ind_x3, :])
                    sample_y3_x3_D3 = sampler(p_y3_x3_D3)

                    for ind_x4, y3_on_x3_D3 in enumerate(sample_y3_x3_D3):
                        p_f_on_D4 = p_f_on_D3.condition_on_observations(
                            x3[ind_x1, ind_x2, ind_x3, :], y3_on_x3_D3
                        )
                        p_y4_on_x4_D4 = p_f_on_D4.posterior(
                            x4[ind_x1, ind_x2, ind_x3, ind_x4, :]
                        )
                        sample_y4_on_x4_D4 = sampler(p_y4_on_x4_D4)

                        for ind_a, y4_on_x4_D4 in enumerate(sample_y4_on_x4_D4):
                            p_f_on_D5 = p_f_on_D4.condition_on_observations(
                                x4[ind_x1, ind_x2, ind_x3, ind_x4, :], y4_on_x4_D4
                            )
                            p_y5_on_a_D5 = p_f_on_D5.posterior(
                                a[ind_x1, ind_x2, ind_x3, ind_x4, ind_a, :]
                            )

                            ehig = p_y5_on_a_D5.mean
                            ehigs = ehigs + ehig

        ehig = ehigs / (4**5)
    return x0, a, ehig.squeeze()


prev_x = 0.2
temp = []

x0 = (torch.rand(1) * 10).requires_grad_(True)
x1 = (torch.rand(4, 1) * 10).requires_grad_(True)
x2 = (torch.rand(4, 4, 1) * 10).requires_grad_(True)
x3 = (torch.rand(4, 4, 4, 1) * 10).requires_grad_(True)
x4 = (torch.rand(4, 4, 4, 4, 1) * 10).requires_grad_(True)
a = (torch.rand(4, 4, 4, 4, 4, 1) * 10).requires_grad_(True)

optimizer = optim.Adam([x0, x1, x2, x3, x4, a], lr=2)
for epoch in tqdm(range(20)):
    optimizer.zero_grad()
    x0_, a_, ehig = compute_ehig(x0, x1, x2, x3, x4, a)
    temp.append([x0_, a_, ehig])
    ehig.backward(retain_graph=True)
    optimizer.step()

    if epoch % 2 == 0:
        print("x0 ", x0.item(), "a", a[0, 0, 0, 0, 0].item(), "loss", ehig.item())

# for x0 in tqdm(torch.linspace(-10, 10, 2)):
#     x0 = x0.reshape(1)
#     x0_, a_, ehig = compute_ehig(x0, x1, x2, a)
#     print(ehig)

tmp = []
for i in range(len(temp)):
    tmp.append([temp[i][0], temp[i][1][0, 0, 0, 0, 0], temp[i][2]])

tmp = torch.tensor(tmp)
best = torch.argmin(tmp, dim=0)[2].item()
best_x, best_a, best_hes = tmp[best].numpy().tolist()

plt.figure(figsize=(7, 7))
ground_truth(draw=True)

plt.vlines(prev_x, -1, 1, color="black", label="current location")
plt.vlines(prev_x - 0.1, -1, 1, color="black", linestyle="--")
plt.vlines(prev_x + 0.1, -1, 1, color="black", linestyle="--")

plt.vlines(best_x, -1, 1, color="red", label="optimal query")
plt.vlines(best_a, -1, 1, color="blue", label="optimal action")

plt.plot(train_x.cpu().numpy(), train_y.cpu().numpy(), "k*")

# compute posterior
test_x = torch.linspace(0, 1, 100)
posterior = model.posterior(test_x)
test_y = posterior.mean
lower, upper = posterior.mvn.confidence_region()

plt.plot(
    test_x.cpu().detach().numpy(),
    test_y.cpu().detach().numpy(),
    "green",
    label="Posterior mean",
)

plt.fill_between(
    test_x.cpu().detach().numpy(),
    lower.cpu().detach().numpy(),
    upper.cpu().detach().numpy(),
    alpha=0.25,
)

plt.tight_layout()
plt.ylim(-1, 1)
plt.legend()

plt.show()

In [None]:
# @title Joint 5 Steps with sigmoid and adam and sampling -- parallel

from botorch.sampling.samplers import SobolQMCNormalSampler
from botorch import settings
import itertools


def compute_ehig(xi, horizon=5, num_samples=2):
    with settings.propagate_grads(state=True):
        for h in range(0, horizon + 1):
            pvs_x = xi[h - 1] if h > 0 else prev_x
            xi[h] = torch.sigmoid(xi[h]) * 0.2 + (pvs_x - 0.1)

        ehigs = 0
        sample_yi_on_xi_Di = {}
        prev_ind = np.ones(horizon) * -1
        p_f_on_Di = {}
        p_f_on_Di[0] = model
        sampler = SobolQMCNormalSampler(
            num_samples=num_samples, resample=False, collapse_batch_dims=True
        )

        p_y0_on_x0_D0 = model.posterior(xi[0])
        sample_yi_on_xi_Di[0] = sampler(p_y0_on_x0_D0)

        for ind in itertools.product(range(num_samples), repeat=horizon):
            equal_bool = np.equal(ind, prev_ind)
            prev_ind = ind

            for i in range(horizon):
                if not equal_bool[i]:
                    p_f_on_Di[i + 1] = p_f_on_Di[i].condition_on_observations(
                        xi[i][ind[:i]], sample_yi_on_xi_Di[i][ind[i]]
                    )
                    sample_yi_on_xi_Di[i + 1] = sampler(
                        p_f_on_Di[i + 1].posterior(xi[i + 1][ind[: i + 1]])
                    )

            ehig = sample_yi_on_xi_Di[horizon].mean()
            ehigs = ehigs + ehig

        ehig = ehigs / (num_samples**horizon)

        return xi[0], xi[-1], ehig.squeeze()


prev_x = 0.2
temp = []

xi = []
# xi[-1] = prev_x
dim_xi = [1]
horizon = 10
for h in range(horizon + 1):
    xi.append((torch.rand(dim_xi) * 10).requires_grad_(True))
    dim_xi.insert(0, 4)

optimizer = optim.Adam(xi, lr=0.5)
for epoch in tqdm(range(20)):
    optimizer.zero_grad()

    xi_in = [element_xi.clone() for element_xi in xi]
    x0_, a_, ehig = compute_ehig(xi_in, horizon=horizon)
    temp.append([x0_, a_, ehig.detach()])

    ehig.backward(retain_graph=True)
    optimizer.step()

    if epoch % 2 == 0:
        print(
            "x0 ",
            xi[0].item(),
            "a",
            xi[horizon][tuple([0] * horizon)].item(),
            "loss",
            ehig.item(),
        )

tmp = []
for i in range(len(temp)):
    tmp.append([temp[i][0], temp[i][1][tuple([0] * horizon)], temp[i][2]])

tmp = torch.tensor(tmp)
best = torch.argmin(tmp, dim=0)[2].item()
best_x, best_a, best_hes = tmp[best].numpy().tolist()

plt.figure(figsize=(7, 7))
ground_truth(draw=True)

plt.vlines(prev_x, -1, 1, color="black", label="current location")
plt.vlines(prev_x - 0.1, -1, 1, color="black", linestyle="--")
plt.vlines(prev_x + 0.1, -1, 1, color="black", linestyle="--")

plt.vlines(best_x, -1, 1, color="red", label="optimal query")
plt.vlines(best_a, -1, 1, color="blue", label="optimal action")

plt.plot(train_x.cpu().numpy(), train_y.cpu().numpy(), "k*")

# compute posterior
test_x = torch.linspace(0, 1, 100)
posterior = model.posterior(test_x)
test_y = posterior.mean
lower, upper = posterior.mvn.confidence_region()

plt.plot(
    test_x.cpu().detach().numpy(),
    test_y.cpu().detach().numpy(),
    "green",
    label="Posterior mean",
)

plt.fill_between(
    test_x.cpu().detach().numpy(),
    lower.cpu().detach().numpy(),
    upper.cpu().detach().numpy(),
    alpha=0.25,
)

plt.tight_layout()
plt.ylim(-1, 1)
plt.legend()

plt.show()

  0%|          | 0/20 [00:00<?, ?it/s]

In [None]:
# gold standard -- exhastive search over x and a

# set noise level to almost 0

# first query at a

# for x in (a-0.1, a+0.1):

# compute y | x, D

# for y in sample y (only need 1 sample if the noise is small)

# for a in (x-0.1, x+0.1):

# compute  y' | x, y, a

# compute the mean

In [None]:
i = 4
i + i**2 + i**3 + i**4 + i**5 + i**6 + i**7 + i**8 + i**9 + i**10