# Exact GP playground

In [1]:
import torch
import malt
import abc
import gpytorch

Using backend: pytorch


### Pre-reqs

In [2]:
# =============================================================================
# KERNELS
# =============================================================================
class RBF(torch.nn.Module):
    r"""A Gaussian Process Kernel that hosts parameters.

    Note
    ----
    l could be either of shape 1 or hidden dim

    """

    def __init__(self, in_features, scale=0.0, variance=0.0, ard=True):

        super(RBF, self).__init__()

        if ard is True:
            self.scale = torch.nn.Parameter(scale * torch.ones(in_features))

        else:
            self.scale = torch.nn.Parameter(torch.tensor(scale))

        self.variance = torch.nn.Parameter(torch.tensor(variance))

    def distance(self, x, x_):
        """ Distance between data points. """
        return torch.norm(x[:, None, :] - x_[None, :, :], p=2, dim=2)

    def forward(self, x, x_=None):
        """ Forward pass. """
        # replicate x if there's no x_
        if x_ is None:
            x_ = x

        # for now, only allow two dimension
        assert x.dim() == 2
        assert x_.dim() == 2

        x = x * torch.exp(self.scale)
        x_ = x_ * torch.exp(self.scale)

        # (batch_size, batch_size)
        distance = self.distance(x, x_)

        # convariant matrix
        # (batch_size, batch_size)
        k = torch.exp(self.variance) * torch.exp(-0.5 * distance)

        return k

In [40]:
class Regressor(torch.nn.Module, abc.ABC):
    """Base class for a regressor."""

    def __init__(self, in_features, out_features, *args, **kwargs):
        super(Regressor, self).__init__(*args, **kwargs)
        self.in_features = in_features
        self.out_features = out_features

class ExactGaussianProcessRegressor(Regressor, gpytorch.models.GP):
    epsilon = 1e-5

    def __init__(
        self,
        in_features: int = 128,
        out_features: int = 2,
        kernel_factory: torch.nn.Module = RBF,
        log_sigma: float = -3.0,
    ):
        assert out_features == 2
        super(ExactGaussianProcessRegressor, self).__init__(
            in_features=in_features,
            out_features=out_features,
        )

        # construct kernel
        self.kernel = kernel_factory(
            in_features=in_features,
        )

        self.in_features = in_features
        self.log_sigma = torch.nn.Parameter(
            torch.tensor(log_sigma),
        )

    def _get_kernel_and_auxiliary_variables(
        self,
        x_tr,
        y_tr,
        x_te=None,
    ):
        """ Get kernel and auxiliary variables for forward pass. """

        # compute the kernels
        k_tr_tr = self._perturb(self.kernel.forward(x_tr, x_tr))

        if x_te is not None:  # during test
            k_te_te = self._perturb(self.kernel.forward(x_te, x_te))
            k_te_tr = self._perturb(self.kernel.forward(x_te, x_tr))
            # k_tr_te = self.forward(x_tr, x_te)
            k_tr_te = k_te_tr.t()  # save time

        else:  # during train
            k_te_te = k_te_tr = k_tr_te = k_tr_tr

        # (batch_size_tr, batch_size_tr)
        k_plus_sigma = k_tr_tr + torch.exp(self.log_sigma) * torch.eye(
            k_tr_tr.shape[0], device=k_tr_tr.device
        )

        # (batch_size_tr, batch_size_tr)
        l_low = torch.cholesky(k_plus_sigma)
        l_up = l_low.t()

        # (batch_size_tr. 1)
        l_low_over_y, _ = torch.triangular_solve(
            input=y_tr, A=l_low, upper=False
        )

        # (batch_size_tr, 1)
        alpha, _ = torch.triangular_solve(
            input=l_low_over_y, A=l_up, upper=True
        )

        return k_tr_tr, k_te_te, k_te_tr, k_tr_te, l_low, alpha

    def condition(self, x_te, *args, x_tr=None, y_tr=None, **kwargs):
        r"""Calculate the predictive distribution given `x_te`.

        Note
        ----
        Here we allow the speicifaction of sampler but won't actually
        use it here in this version.

        Parameters
        ----------
        x_te : `torch.Tensor`, `shape=(n_te, hidden_dimension)`
            Test input.

        x_tr : `torch.Tensor`, `shape=(n_tr, hidden_dimension)`
             (Default value = None)
             Training input.

        y_tr : `torch.Tensor`, `shape=(n_tr, 1)`
             (Default value = None)
             Test input.

        sampler : `torch.optim.Optimizer` or `pinot.Sampler`
             (Default value = None)
             Sampler.

        Returns
        -------
        distribution : `torch.distributions.Distribution`
            Predictive distribution.

        """

        # get parameters
        (
            k_tr_tr,
            k_te_te,
            k_te_tr,
            k_tr_te,
            l_low,
            alpha,
        ) = self._get_kernel_and_auxiliary_variables(x_tr, y_tr, x_te)

        # compute mean
        # (batch_size_te, 1)
        mean = k_te_tr @ alpha

        # (batch_size_tr, batch_size_te)
        v, _ = torch.triangular_solve(input=k_tr_te, A=l_low, upper=False)

        # (batch_size_te, batch_size_te)
        variance = k_te_te - v.t() @ v

        # ensure symetric
        variance = 0.5 * (variance + variance.t())

        # $ p(y|X) = \int p(y|f)p(f|x) df $
        # variance += torch.exp(self.log_sigma) * torch.eye(
        #         *variance.shape,
        #         device=variance.device)

        # construct noise predictive distribution
        distribution = (
            torch.distributions.multivariate_normal.MultivariateNormal(
                mean.flatten(), variance
            )
        )

        return distribution

    def _perturb(self, k):
        """Add small noise `epsilon` to the diagonal of covariant matrix.
        Parameters
        ----------
        k : `torch.Tensor`, `shape=(n_data_points, n_data_points)`
            Covariant matrix.
        Returns
        -------
        k : `torch.Tensor`, `shape=(n_data_points, n_data_points)`
            Preturbed covariant matrix.
        """
        # introduce noise along the diagonal
        noise = self.epsilon * torch.eye(*k.shape, device=k.device)

        return k + noise

    def loss(self, x_tr, y_tr, *args, **kwargs):
        r"""Compute the loss.
        Note
        ----
        Defined to be negative Gaussian likelihood.
        Parameters
        ----------
        x_tr : `torch.Tensor`, `shape=(n_training_data, hidden_dimension)`
            Input of training data.
        y_tr : `torch.Tensor`, `shape=(n_training_data, 1)`
            Target of training data.
        Returns
        -------
        nll : `torch.Tensor`, `shape=(,)`
            Negative log likelihood.
        """
#         # point data to object
#         self._x_tr = x_tr
#         self._y_tr = y_tr

#         # get the parameters
#         (
#             k_tr_tr,
#             k_te_te,
#             k_te_tr,
#             k_tr_te,
#             l_low,
#             alpha,
#         ) = self._get_kernel_and_auxiliary_variables(x_tr, y_tr)

#         import math

#         # we return the exact nll with constant
#         nll = (
#             0.5 * (y_tr.t() @ alpha)
#             + torch.trace(l_low)
#             + 0.5 * y_tr.shape[0] * math.log(2.0 * math.pi)
#         )
        
        preds = self.condition(x_tr, x_tr=x_tr, y_tr=y_tr)
        # nll = preds.log_prob(y_tr.ravel())
        
        import gpytorch
        likelihood = gpytorch.likelihoods.GaussianLikelihood()
        mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, self)

        output = gpytorch.distributions.MultivariateNormal(preds.loc.cpu(), preds.covariance_matrix.cpu())
        nll = -mll(output, y_tr.cpu()).mean()
        
        return nll

### Testing

In [37]:
torch.manual_seed(0)

<torch._C.Generator at 0x1f9cb137e30>

In [11]:
# mvn = net.condition(graph)

# torch.exp(mvn.log_prob(torch.tensor([[5.0]])))

In [12]:
# import torch
# import dgl
# import malt

# y = torch.Tensor([[5.0]])
# mol = malt.Molecule("C")
# mol.featurize()
# graph = dgl.batch([mol.g])

# y = net.loss(graph, torch.tensor([[5.0]]))
# # y.backward()

# # y = net.condition(graph)

In [31]:
from malt.data.collections import linear_alkanes

la_data = linear_alkanes()
g, y = la_data.batch()

net = malt.models.supervised_model.GaussianProcessSupervisedModel(
    representation=malt.models.representation.DGLRepresentation(
        out_features=128
    ),
    regressor=ExactGaussianProcessRegressor(
        in_features=128,
        out_features=2,
    ),
    likelihood=malt.models.likelihood.HeteroschedasticGaussianLikelihood(),
).cuda()

l = net.loss(g, y)

In [14]:
l

tensor(5.2379, device='cuda:0', grad_fn=<SubBackward0>)

In [15]:
# import gpytorch
# likelihood = gpytorch.likelihoods.GaussianLikelihood()
# mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, net)

# output = net.condition(g)
# output = gpytorch.distributions.MultivariateNormal(output.loc.cpu(), output.covariance_matrix.cpu())
# print(-mll(output, y.cpu()).mean())

# Test with ESOL

In [17]:
%load_ext autoreload
%autoreload 2

import torch
import dgl
import malt
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--data", type=str, default="esol")
parser.add_argument("--model", type=str, default="gp")

args = parser.parse_args([])

data = getattr(malt.data.collections, args.data)()

def test(data):
    
    data.shuffle(seed=1)
    ds_tr_vl, ds_te = data.split([0.9, 0.1])
    ds_tr, _ = ds_tr_vl.split([0.9, 0.1])
    
    if args.model == "gp":
        model = malt.models.supervised_model.GaussianProcessSupervisedModel(
            representation=malt.models.representation.DGLRepresentation(
                out_features=32, hidden_features=32,
            ),
            regressor=malt.models.regressor.ExactGaussianProcessRegressor(
                train_targets=ds_tr.batch(by='y'), in_features=32, out_features=2,
            ),
            likelihood=malt.models.likelihood.HeteroschedasticGaussianLikelihood(),
        ).cuda()


    elif args.model == "nn":
        model = malt.models.supervised_model.SimpleSupervisedModel(
            representation=malt.models.representation.DGLRepresentation(
                out_features=32,
            ),
            regressor=malt.models.regressor.NeuralNetworkRegressor(
                in_features=32, out_features=1,
            ),
            likelihood=malt.models.likelihood.HomoschedasticGaussianLikelihood(),
        ).cuda()


    trainer = malt.trainer.get_default_trainer(
        without_player=True,
        n_epochs=100,
        learning_rate=1e-3,
    )
    
    mll = malt.models.marginal_likelihood.ExactMarginalLogLikelihood(
        model.likelihood,
        model
    )
    
    # return model
    model = trainer(model, ds_tr_vl, mll)

    # model_cpu = model.to('cpu')
    model.eval()
    r2 = malt.metrics.supervised_metrics.R2()(model, ds_tr)

    # rmse = malt.metrics.supervised_metrics.RMSE()(model, ds_te)
    # print(rmse)
    return r2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
Processing dgl graphs from scratch...
Processing molecule 1000/1128


In [18]:
model = test(data)

In [19]:
model

tensor(0.9924, device='cuda:0', grad_fn=<RsubBackward1>)

In [2]:
# data.shuffle(seed=1)
# ds_tr, ds_te = data.split([8, 2])
# train_targets = ds_tr.batch(by='y')

# reg = malt.models.regressor.ExactGaussianProcessRegressor(
#     train_targets=train_targets,
#     in_features=32,
#     out_features=2,
# )

In [3]:
# ds_tr, ds_te = data.split([9, 1])
# g, y = ds_tr.batch()
# model.representation(g).shape

In [14]:
ds_tr_vl, ds_te = data.split([0.9, 0.1])
ds_tr, _ = ds_tr_vl.split([0.9, 0.1])

In [17]:
ds_tr

Dataset with 913 molecules

In [13]:
ds_tr_vl

NameError: name 'ds_tr_vl' is not defined

In [8]:
data.shuffle(seed=1)
ds_tr_vl, ds_te = data.split([0.9, 0.1])
ds_tr, _ = ds_tr_vl.split([0.9, 0.1])

print(ds_tr.batch(by='y').shape)

if args.model == "gp":
    model = malt.models.supervised_model.GaussianProcessSupervisedModel(
        representation=malt.models.representation.DGLRepresentation(
            out_features=32, hidden_features=32,
        ),
        regressor=malt.models.regressor.ExactGaussianProcessRegressor(
            train_targets=ds_tr.batch(by='y'), in_features=32, out_features=2,
        ),
        likelihood=malt.models.likelihood.HeteroschedasticGaussianLikelihood(),
    ).cuda()


elif args.model == "nn":
    model = malt.models.supervised_model.SimpleSupervisedModel(
        representation=malt.models.representation.DGLRepresentation(
            out_features=32,
        ),
        regressor=malt.models.regressor.NeuralNetworkRegressor(
            in_features=32, out_features=1,
        ),
        likelihood=malt.models.likelihood.HomoschedasticGaussianLikelihood(),
    ).cuda()


trainer = malt.trainer.get_default_trainer(
    without_player=True,
    batch_size=len(ds_tr_vl),
    n_epochs=100,
    learning_rate=1e-3,
)

mll = malt.models.marginal_likelihood.ExactMarginalLogLikelihood(
    model.likelihood,
    model
)

# return model
model = trainer(model, ds_tr_vl, mll)

# model_cpu = model.to('cpu')
r2 = malt.metrics.supervised_metrics.R2()(model, ds_te)

torch.Size([913, 1])


NameError: name 'math' is not defined