# Difffusion Kernel 

In [6]:
from typing import Optional
import torch
import gpytorch
import numpy as np

import math
from matplotlib import pyplot as plt

First, let's set up the training data and some convenience functions.

In [7]:
# Smoke test to ensure kernel is working
import os
smoke_test = ('CI' in os.environ)
training_iter = 2 if smoke_test else 50

# Training data is simple square matrix of 1's
train_x = torch.tensor(np.array([[1, 0], [0, 1]]), dtype=torch.float32)
print(train_x)
# True function is sin(2*pi*x) with Gaussian noise
train_y = torch.sin(2 * math.pi * train_x) + torch.randn(train_x.size()) * 0.2

# Wrap training, prediction and plotting from the ExactGP-Tutorial into a function,
# so that we do not have to repeat the code later on
def train(model, likelihood, training_iter=training_iter):
    # Use the adam optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=0.1)  # Includes GaussianLikelihood parameters

    # "Loss" for GPs - the marginal log likelihood
    mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, model)

    for i in range(training_iter):
        # Zero gradients from previous iteration
        optimizer.zero_grad()
        # Output from model
        output = model(train_x)
        # Calc loss and backprop gradients
        loss = -mll(output, train_y)
        loss.backward()
        optimizer.step()    

def predict(model, likelihood, test_x = torch.linspace(0, 1, 51)):
    model.eval()
    likelihood.eval()
    # Make predictions by feeding model through likelihood
    with torch.no_grad(), gpytorch.settings.fast_pred_var():
        # Test points are regularly spaced along [0,1]
        return likelihood(model(test_x))
    
def plot(observed_pred, test_x=torch.linspace(0, 1, 51)):
    with torch.no_grad():
        # Initialize plot
        f, ax = plt.subplots(1, 1, figsize=(4, 3))

        # Get upper and lower confidence bounds
        lower, upper = observed_pred.confidence_region()
        # Plot training data as black stars
        ax.plot(train_x.numpy(), train_y.numpy(), 'k*')
        # Plot predictive means as blue line
        ax.plot(test_x.numpy(), observed_pred.mean.numpy(), 'b')
        # Shade between the lower and upper confidence bounds
        ax.fill_between(test_x.numpy(), lower.numpy(), upper.numpy(), alpha=0.5)
        ax.set_ylim([-3, 3])
        ax.legend(['Observed Data', 'Mean', 'Confidence'])

tensor([[1., 0.],
        [0., 1.]])


Define the model using Aryan's (WSU Ph.D student) code for Diffusion Kernel.

In [8]:
class DiffusionKernel(gpytorch.kernels.Kernel):
    r"""
        Computes diffusion kernel over discrete spaces with arbitrary number of categories. 
        Input type: n dimensional discrete input with c_i possible categories/choices for each dimension i 
        As an example, binary {0,1} combinatorial space corresponds to c_i = 2 for each dimension i
        References:
        - https://www.ml.cmu.edu/research/dap-papers/kondor-diffusion-kernels.pdf (Section 4.4)
        - https://arxiv.org/abs/1902.00448
        - https://arxiv.org/abs/2012.07762
        
        Args:
        :attr:`categories`(tensor, list):
            array with number of possible categories in each dimension            
    """
    has_lengthscale = True
    def __init__(self, categories, **kwargs):
        if categories is None:
            raise RunTimeError("Can't create a diffusion kernel without number of categories. Please define them!")
        super().__init__(**kwargs)
        self.cats = categories

    def forward(self, x1, x2, diag: Optional[bool] = False, last_dim_is_batch: Optional[bool] = False, **params):
        if last_dim_is_batch:
            x1 = x1.transpose(-1, -2).unsqueeze(-1)
            x2 = x2.transpose(-1, -2).unsqueeze(-1)        

        if diag:
            res = 1.
            for i in range(x1.shape[-1]):
                res *= ((1 - torch.exp(-self.lengthscale[..., i] * self.cats[i]))/(1 + (self.cats[i] - 1) * torch.exp(-self.lengthscale[..., i]*self.cats[i]))).unsqueeze(-1) ** ((x1[..., i] != x2[..., i])[:, 0, ...])
            return res

        res = 1.
        for i in range(x1.shape[-1]): 
            res *= ((1 - torch.exp(-self.lengthscale[..., i] * self.cats[i]))/(1 + (self.cats[i] - 1) * torch.exp(-self.lengthscale[..., i]*self.cats[i]))).unsqueeze(-1) ** ((x1[..., i].unsqueeze(-2)[..., None] != x2[..., i].unsqueeze(-2))[0, ...])
        return res

Now we generate our own custom model!

In [9]:
class DiffGPModel(gpytorch.models.ExactGP):
    def __init__(self, train_x, train_y, likelihood):
        super().__init__(train_x, train_y, likelihood)
        self.mean_module = gpytorch.means.ConstantMean()
        self.covar_module = DiffusionKernel(categories=torch.tensor(np.array([[2], [2]])))

    def forward(self, x):
        mean_x = self.mean_module(x)
        covar_x = self.covar_module(x)
        return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)


Now to train and evaluate the model...

In [10]:
# Initialize likelihood and model
likelihood = gpytorch.likelihoods.GaussianLikelihood()
model = DiffGPModel(train_x, train_y, likelihood)

# Set to training mode and train
model.train()
likelihood.train()
train(model, likelihood)

# Get into evaluation (predictive posterior) mode and predict
model.eval()
likelihood.eval()
observed_pred = predict(model, likelihood)

# Plot results
plot(observed_pred)

IndexError: index 1 is out of bounds for dimension 1 with size 1