In [38]:
import gpytorch
import torch
from gpytorch.kernels import RBFKernel
from torch.distributions import MultivariateNormal

In [39]:
X = torch.tensor(
    [[1, 3],
     [2, 5],
     [3, 4]], dtype=torch.float32)

In [40]:
X = X.view(-1) 

In [41]:
T = torch.tensor([1,2,3], dtype=torch.float32)

In [42]:
X = X - X.mean() # important the mean is assumed to be zero

In [43]:
class GPFAKernel(gpytorch.kernels.Kernel):
    def __init__(self, n_features, latent_kernel, psi = None, **kwargs):
        # Initalize Pyro GP Kernel
        super(GPFAKernel, self).__init__(**kwargs)
        
        # Number of features in the X for each time step
        self.n_features = n_features
        self.latent_dims = 1 # More than 1 features is not implemented yet
        
        self.register_parameter(
            name = "Lambda",
            parameter = torch.nn.Parameter(torch.ones(self.n_features, self.n_features)))
        
        self.latent_kernel = latent_kernel
        
        self.register_parameter(
            name = "raw_psi_diag",
            parameter = torch.nn.Parameter(torch.zeros(self.n_features) if psi is None else psi)) # check that is is actually working as intented
        self.register_constraint("raw_psi_diag", gpytorch.constraints.Positive())
    
    # now set up the 'actual' parameter
    @property
    def psi(self):
        # when accessing the parameter, apply the constraint transform
        return self.raw_psi_diag_constraint.transform(self.raw_psi_diag)

    @psi.setter
    def psi(self, value):
        return self._set_length(value)

    def _set_length(self, value):
        if not torch.is_tensor(value):
            value = torch.as_tensor(value).to(self.raw_psi_diag)
        # when setting the paramater, transform the actual value to a raw one by applying the inverse transform
        self.initialize(raw_length=self.raw_psi_diag_constraint.inverse_transform(value))

    def forward(self, t1, t2, diag = False, last_dim_is_batch=False, **params):

        # not implemented yet
        assert diag is False
        assert last_dim_is_batch is False

        # taken the number of observations in the input
        n_obs = t1.shape[0]

        # compute the latent kernel
        kT = self.latent_kernel(t1, t2, diag, last_dim_is_batch, **params)
        # pre allocate covariance matrix
        X = torch.empty(self.n_features * n_obs, self.n_features * n_obs)
        
        for i in range(n_obs):
            for j in range(n_obs):
                # since `latent_dim=1 kT[i,j]` is a scalar so the matrix multiplication can be expressed in this way
                cov = kT[i,j] * self.Lambda @ self.Lambda.T
                # on diagonals add the noise
                if i == j: cov += torch.diag(self.psi) 
                X[i*self.n_features:(i*self.n_features + self.n_features),j*self.n_features:(j*self.n_features+self.n_features)] = cov
                
        return X
    
    def num_outputs_per_input(self, x1,x2):
        return self.n_features

In [44]:
gp_k = GPFAKernel(n_features=2, latent_kernel=RBFKernel())

In [45]:
gp_k

GPFAKernel(
  (latent_kernel): RBFKernel(
    (raw_lengthscale_constraint): Positive()
  )
  (raw_psi_diag_constraint): Positive()
)

In [46]:
list(gp_k.named_sub_kernels())

[('latent_kernel',
  RBFKernel(
    (raw_lengthscale_constraint): Positive()
  ))]

In [47]:
list(gp_k.named_parameters())

[('Lambda',
  Parameter containing:
  tensor([[1., 1.],
          [1., 1.]], requires_grad=True)),
 ('raw_psi_diag',
  Parameter containing:
  tensor([0., 0.], requires_grad=True)),
 ('latent_kernel.raw_lengthscale',
  Parameter containing:
  tensor([[0.]], requires_grad=True))]

In [48]:
cov = gp_k(T,T).evaluate()

In [49]:
MultivariateNormal(torch.zeros(cov[0].shape), cov)

MultivariateNormal(loc: torch.Size([6]), covariance_matrix: torch.Size([6, 6]))

In [51]:
class GPFA(gpytorch.models.ExactGP):
    def __init__(self, train_x, train_y, likelihood):
        super(GPFA, self).__init__(train_x, train_y, likelihood)
        self.mean_module = gpytorch.means.ConstantMean()
        self.covar_module = GPFAKernel(2, RBFKernel)

    def forward(self, x):
        mean_x = self.mean_module(x)
        covar_x = self.covar_module(x)
        return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)

# initialize likelihood and model
likelihood = gpytorch.likelihoods.GaussianLikelihood()
model = GPFA(T, X, likelihood)

In [52]:
model

GPFA(
  (likelihood): GaussianLikelihood(
    (noise_covar): HomoskedasticNoise(
      (raw_noise_constraint): GreaterThan(1.000E-04)
    )
  )
  (mean_module): ConstantMean()
  (covar_module): GPFAKernel(
    (raw_psi_diag_constraint): Positive()
  )
)

In [53]:
model(T)

MultivariateNormal(loc: torch.Size([3]))

In [54]:
X.shape

torch.Size([6])

In [55]:
model.kernel.num_outputs_per_inputs()

AttributeError: 'GPFA' object has no attribute 'kernel'

In [59]:
model.covar_module.num_outputs_per_input(T, T)

2