<a href="https://colab.research.google.com/github/jcandane/RCF/blob/main/gptorch_play.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [18]:
import torch
import math

###
try:
    import gpytorch
except:
    !pip install gpytorch==1.11
    import gpytorch

###
#try:
#    import pyro
#    from pyro.infer.mcmc import NUTS, MCMC, HMC
#except:
#    !pip install pyro-ppl
#    import pyro
#    from pyro.infer.mcmc import NUTS, MCMC, HMC

from matplotlib import pyplot as plt
import plotly.graph_objects as go

In [2]:
# Training data is 11 points in [0,1] inclusive regularly spaced
train_x = torch.linspace(0, 1, 4)
# True function is sin(2*pi*x) with Gaussian noise
train_y = torch.sin(train_x * (2 * math.pi)) + torch.randn(train_x.size()) * 0.2

# We will use the simplest form of GP model, exact inference
class ExactGPModel(gpytorch.models.ExactGP):
    def __init__(self, train_x, train_y, likelihood):
        super(ExactGPModel, self).__init__(train_x, train_y, likelihood)
        self.mean_module = gpytorch.means.ConstantMean()
        self.covar_module = gpytorch.kernels.ScaleKernel(gpytorch.kernels.RBFKernel())

    def forward(self, x):
        mean_x = self.mean_module(x)
        covar_x = self.covar_module(x)
        return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)



In [3]:
k = ( gpytorch.kernels.RBFKernel() )

In [4]:
covar_module = gpytorch.kernels.LinearKernel()
x1 = torch.randn(8, 3)
x2 = torch.randn(7, 3)
lazy_covar_matrix = covar_module(x1)              # Returns a RootLinearOperator, ## abstract-sparse
covariance_matrix = lazy_covar_matrix.to_dense()  # Gets the actual tensor for this kernel matrix

covariance_matrix

tensor([[ 3.0871,  1.0539,  1.9747, -0.4486,  0.2363,  3.1470,  0.6644,  0.9331],
        [ 1.0539,  1.8795, -1.7382,  0.0054, -0.0262,  0.4700,  0.8072, -0.7491],
        [ 1.9747, -1.7382,  5.1230, -0.4846,  0.3327,  3.0783, -0.5413,  2.5541],
        [-0.4486,  0.0054, -0.4846,  0.1761, -0.0246, -0.3351, -0.1144,  0.2113],
        [ 0.2363, -0.0262,  0.3327, -0.0246,  0.0302,  0.3243, -0.0073,  0.2479],
        [ 3.1470,  0.4700,  3.0783, -0.3351,  0.3243,  3.8120,  0.2925,  2.2755],
        [ 0.6644,  0.8072, -0.5413, -0.1144, -0.0073,  0.2925,  0.4299, -0.5880],
        [ 0.9331, -0.7491,  2.5541,  0.2113,  0.2479,  2.2755, -0.5880,  3.2578]],
       grad_fn=<MmBackward0>)

In [5]:
torch.rand(2, 3)

tensor([[0.4509, 0.4515, 0.8892],
        [0.9908, 0.8808, 0.2125]])

In [6]:
k = gpytorch.kernels.ScaleKernel(gpytorch.kernels.RBFKernel())

type( covar_module )

In [7]:
lazy_covar_matrix

<gpytorch.lazy.lazy_evaluated_kernel_tensor.LazyEvaluatedKernelTensor at 0x7ca8dc34faf0>

In [8]:
gpytorch.kernels.LinearKernel()(x1, x2).to_dense().shape
k(x1, x2).to_dense().shape

torch.Size([8, 7])

In [9]:
k(x1) #.to_dense()

<gpytorch.lazy.lazy_evaluated_kernel_tensor.LazyEvaluatedKernelTensor at 0x7ca9b396e7a0>

# RCF torch function

In [10]:
torch.set_default_tensor_type(torch.DoubleTensor)
torch.manual_seed(80)

Domain = torch.tensor([[0,10.],[-3,4.]]) #torch.tensor([[0,10.],[-3,4.],[-8,-2]]) ### numpy.2darray
N      = 3  ### number of defining points
MO     = 1   ### int (dimension of OUT)

kernel = gpytorch.kernels.RBFKernel() ##gpx.kernels.RBF()
μ_i    = torch.zeros(N, dtype=torch.float64) ##jax.numpy.zeros(self.N)
seed   = 137


### set of randomly sampled points
R_ix  = torch.rand(N, Domain.shape[0], dtype=torch.float64) ## domain.shape ??
R_ix *= torch.diff(Domain, axis=1).reshape(-1)
R_ix += Domain[:,0] ## save this!!!!!

L_ij  = torch.linalg.cholesky( kernel(R_ix) ) ## .to_dense() for concrete implementation

D_iX  = torch.normal(0, 1, size=(N, MO))
D_iX *= torch.diag(L_ij.to_dense()).reshape(-1,1)
D_iX += μ_i.reshape(-1,1)
D_iX  = torch.matmul(L_ij, D_iX)
#S_jX  = torch.linalg.solve(L_ij, D_iX)
S_jX  = torch.cholesky_solve(D_iX, L_ij.to_dense()) ## save this!!!!!



#####################
### random points
D_ax  = torch.rand(N, Domain.shape[0]) ## [0,1) domain.shape ??
D_ax *= torch.diff(Domain, axis=1).reshape(-1)
D_ax += Domain[:,0]

D_ay  = torch.matmul(kernel(D_ax, R_ix), S_jX)

  _C._set_default_tensor_type(t)


In [11]:
kernel.state_dict()

OrderedDict([('raw_lengthscale', tensor([[0.]])),
             ('raw_lengthscale_constraint.lower_bound', tensor(0.)),
             ('raw_lengthscale_constraint.upper_bound', tensor(inf))])

In [12]:
from torch.autograd import grad
#d_loss_dx = grad(outputs=D_ay, inputs=R_ix)

In [13]:
#### generate mesh to plot
R_ax = torch.stack(torch.meshgrid(*[ torch.arange(Domain[i,0], Domain[i,1], 0.33) for i in range(len(Domain)) ]), axis=-1)
R_ax = R_ax.reshape((torch.prod( torch.asarray(R_ax.shape[:-1]) ), R_ax.shape[-1]))

R_ay = (torch.matmul(kernel(R_ax, R_ix), S_jX)).detach().numpy()
R_ax = R_ax.detach().numpy()

#### the plot
fig = go.Figure(data=[go.Scatter3d(x=R_ax[:,0], y=R_ax[:,1], z=R_ay[:,0], mode='markers'),
                      go.Scatter3d(x=R_ix.detach().numpy()[:,0], y=R_ix.detach().numpy()[:,1], z=D_iX.detach().numpy()[:,0], mode='markers'),
                      go.Scatter3d(x=D_ax.detach().numpy()[:,0], y=D_ax.detach().numpy()[:,1], z=D_ay.detach().numpy()[:,0], mode='markers')])
fig.show()

  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


In [14]:
import torch
import gpytorch

torch.set_default_tensor_type(torch.DoubleTensor)

class RCF():
    """ built: 3/19/2024
    this an object of a Random-Contionus-Function (RCF), with-respect-to a gpytorch kernel
    RCF : IN -> OUT = R^(MO)
    we define a prior, and then sample to form a posterior.
    """

    def __init__(self, Domain, N:int, MO:int=1, seed:int=777,
                 IN_noise=None, OUT_noise=None,
                 kernel=gpytorch.kernels.RBFKernel()):
        """
        GIVEN >
            Domain  : 2d-torch.Tensor (domain of input points)
                 N  : int (number of points)
                MO  : int (Multiple-Output Dimension)
             **seed : int
           **kernel : gpytorch.kernels
        ** IN_noise : 1d-torch.Tensor
        **OUT_noise : 1d-torch.Tensor
        GET   >
            None
        """

        self.dtype  = torch.float64
        self.IN     = Domain.type(self.dtype) ### 2d-torch.Tensor
        self.N      = N      ### number of defining points
        self.MO     = MO     ### int (dimension of OUT)
        self.kernel = kernel
        self.seed   = seed ### define random sampling key

        torch.manual_seed(self.seed)

        ### define anisotropic i.i.d white noise
        if IN_noise is None:
            self.IN_noise = torch.zeros( self.IN.shape[0] , dtype=self.dtype)
        else:
            self.IN_noise = IN_noise
        if OUT_noise is None:
            self.OUT_noise = torch.zeros( self.MO , dtype=self.dtype)
        else:
            self.OUT_noise = OUT_noise

        ### find a series of random defining points,
        ### keep looping until we find a stable configuration of initial-points
        ### --> "A not p.d., added jitter of 1.0e-08 to the diagonal" pytorch safety
        self.R_ix  = torch.rand(N, self.IN.shape[0], dtype=self.dtype)
        self.R_ix *= torch.diff(self.IN, axis=1).reshape(-1)
        self.R_ix += self.IN[:,0]

        ### compute cholesky-factorization
        L_ij       = torch.linalg.cholesky( self.kernel(self.R_ix) ).to_dense()

        ### compute OUT-space defining-points
        D_iX       = torch.normal(0, 1, size=(self.N, self.MO), dtype=self.dtype)
        D_iX      *= torch.diag(L_ij).reshape(-1,1)
        D_iX       = torch.matmul(L_ij, D_iX)

        ### compute (L \ D) used to interpolate arbtirary points
        self.S_jX  = torch.cholesky_solve(D_iX, L_ij)

    def __call__(self, D_ax):
        """ evaluate for arbitrary values/points in OUT given points in IN.
        GIVEN >
              self
              D_ax : 2d-torch.Tensor (D_ax ∈ IN)
        GET   >
              D_aX : 2d-torch.Tensor (D_aX ∈ OUT, note captial 'X')
        """
        D_ax += self.IN_noise*torch.normal(0, 1, size=D_ax.shape, dtype=self.dtype)
        D_aX  = torch.matmul(self.kernel(D_ax, self.R_ix), self.S_jX)
        D_aX += self.OUT_noise*torch.normal(0, 1, size=D_aX.shape, dtype=self.dtype)
        return D_aX

## Try out

In [15]:
Domain = torch.tensor([[0,10.],[-3,4.]]) #torch.tensor([[0,10.],[-3,4.],[-8,-2]]) ### numpy.2darray


f = RCF(Domain, 23, seed=1287)

#### generate mesh to plot
R_ax = torch.stack(torch.meshgrid(*[ torch.linspace(Domain[i,0], Domain[i,1], 20) for i in range(len(Domain)) ]), axis=-1)
R_ax = R_ax.reshape((torch.prod( torch.asarray(R_ax.shape[:-1]) ), R_ax.shape[-1]))

R_ay = f(R_ax).detach().numpy()
R_ax = R_ax.detach().numpy()

#### the plot
fig = go.Figure(data=[go.Scatter3d(x=R_ax[:,0], y=R_ax[:,1], z=R_ay[:,0], mode='markers'),
                      go.Scatter3d(x=f.R_ix.detach().numpy()[:,0], y=f.R_ix.detach().numpy()[:,1], z=f(f.R_ix).detach().numpy()[:,0], mode='markers')])
fig.show()

## very small domain, hence close test

looks straight, because on the length-scale of the kernel, its essentially constant (a point).

In [16]:
Domain = torch.tensor([[0,0.001],[-0.001,0.004]], dtype=torch.float16)

f = RCF(Domain, 222, seed=12087)

#### generate mesh to plot
R_ax = torch.stack(torch.meshgrid(*[ torch.linspace(Domain[i,0], Domain[i,1], 20) for i in range(len(Domain)) ]), axis=-1)
R_ax = R_ax.reshape((torch.prod( torch.asarray(R_ax.shape[:-1]) ), R_ax.shape[-1]))

R_ay = f(R_ax).detach().numpy()
R_ax = R_ax.detach().numpy()

#### the plot
fig = go.Figure(data=[go.Scatter3d(x=R_ax[:,0], y=R_ax[:,1], z=R_ay[:,0], mode='markers'),
                      go.Scatter3d(x=f.R_ix.detach().numpy()[:,0], y=f.R_ix.detach().numpy()[:,1], z=f(f.R_ix).detach().numpy()[:,0], mode='markers')])
fig.show()


A not p.d., added jitter of 1.0e-08 to the diagonal



In [17]:
Domain.dtype, f.IN.dtype

(torch.float16, torch.float64)