# Neural Processes

Implementation of Neural Processes in PyTorch

# Overview

1. Data Generator
2. Plotting function
3. Encoder 
4. Decoder
5. Model

# To Do

- [ ] Docstrings
- [ ] Inline comments
- [ ] Visualisation of testing and training



In [224]:
import numpy as np
import torch
import matplotlib.pyplot as plt
import watermark
import plotly.graph_objects as go

import collections

In [20]:
%load_ext watermark

# Versions for reproducibility
%watermark --iversions
%watermark

The watermark extension is already loaded. To reload it, use:
  %reload_ext watermark
watermark : 2.3.1
numpy     : 1.23.5
matplotlib: 3.6.3
torch     : 1.13.1

Last updated: 2023-05-03T14:05:56.275214+10:00

Python implementation: CPython
Python version       : 3.9.16
IPython version      : 8.10.0

Compiler    : Clang 14.0.6 
OS          : Darwin
Release     : 22.3.0
Machine     : x86_64
Processor   : i386
CPU cores   : 8
Architecture: 64bit



## Data Generator

- using collections from namedtuple
- - start with generate curved function

In [174]:
# The (A)NP takes as input a `NPRegressionDescription` namedtuple with fields:
#   `query`: a tuple containing ((context_x, context_y), target_x)
#   `target_y`: a tensor containing the ground truth for the targets to be
#     predicted
#   `num_total_points`: A vector containing a scalar that describes the total
#     number of datapoints used (context + target)
#   `num_context_points`: A vector containing a scalar that describes the number
#     of datapoints used as context
# The GPCurvesReader returns the newly sampled data in this format at each
# iteration

NPRegressionDescription = collections.namedtuple(
    "NPRegressionDescription",
    ("query", "target_y", "num_total_points", "num_context_points"))


class GPCurvesReader(object):
  """Generates curves using a Gaussian Process (GP).

  Supports vector inputs (x) and vector outputs (y). Kernel is
  mean-squared exponential, using the x-value l2 coordinate distance scaled by
  some factor chosen randomly in a range. Outputs are independent gaussian
  processes.

  Functions:
  _gaussian_kernel()
  generate_curves()
  """

  def __init__(self,
               batch_size,
               max_num_context,
               x_size = 1,
               y_size = 1,
               l1_scale = 0.6,
               sigma_scale = 1.0,
               random_kernel_parameters = True,
               testing = False):
    """Creates a regression dataset of functions sampled from a GP.

    Args:
      batch_size: An integer.
      max_num_context: The max number of observations in the context.
      x_size: Integer >= 1 for length of "x values" vector.
      y_size: Integer >= 1 for length of "y values" vector.
      l1_scale: Float; typical scale for kernel distance function.
      sigma_scale: Float; typical scale for variance.
      random_kernel_parameters: If `True`, the kernel parameters (l1 and sigma) 
          will be sampled uniformly within [0.1, l1_scale] and [0.1, sigma_scale].
      testing: Boolean that indicates whether we are testing. If so there are
          more targets for visualization.
    """
    self._batch_size = batch_size
    self._max_num_context = max_num_context
    self._x_size = x_size
    self._y_size = y_size
    self._l1_scale = l1_scale
    self._sigma_scale = sigma_scale
    self._random_kernel_parameters = random_kernel_parameters
    self._testing = testing

  # _ in function name indicate internal use
  def _gaussian_kernel(self, xdata, l1, sigma_f, sigma_noise = 2e-2):
    """Applies the Gaussian kernel to generate curve data. generate_curved() calls this function

    Args:
      xdata: Tensor of shape [B, num_total_points, x_size] with
          the values of the x-axis data.
      l1: Tensor of shape [B, y_size, x_size], the scale
          parameter of the Gaussian kernel.
      sigma_f: Tensor of shape [B, y_size], the magnitude
          of the std.
      sigma_noise: Float, std of the noise that we add for stability.

    Returns:
      The kernel, a float tensor of shape
      [B, y_size, num_total_points, num_total_points].
    """
    # Extract second dim (dim 1)
    num_total_points = xdata.size(dim = 1)

    # Expand and take the difference
    xdata1 = torch.unsqueeze(xdata, dim = 1) # [B, 1, num_total_points, x_size]
    xdata2 = torch.unsqueeze(xdata, dim = 2) # [B, num_total_points, 1, x_size]
    diff = xdata1 - xdata2  # [B, num_total_points, num_total_points, x_size]

    # [B, y_size, num_total_points, num_total_points, x_size]
    # Square scaled difference
    norm = torch.square(diff[:, None, :, :, :] / l1[:, :, None, None, :])

    # Reduce along last dimension: x_size
    norm = torch.sum(norm, axis = -1) # [B, y_size, num_total_points, num_total_points]

    # [B, y_size, num_total_points, num_total_points]
    kernel = torch.square(sigma_f)[:, :, None, None] * torch.exp(- 0.5 * norm)

    # Add some noise to the diagonal to make the cholesky work.
    kernel += (sigma_noise ** 2) * torch.eye(num_total_points)

    return kernel

  def generate_curves(self):
    """Builds the op delivering the data.

    Generated functions are `float32` with x values between -2 and 2.
    
    Returns:
      A `NPRegressionDescription` namedtuple.
    """
    num_context = torch.randint(low = 3, high = self._max_num_context, size = [], dtype = torch.int32)

    # If we are TESTING we want to have more targets and have them evenly distributed in order to plot the function.
    if self._testing:
      # targets only
      num_target = 400
      num_total_points = num_target
      
      # torch.range includes end value.
      x_range = torch.arange(start = -2., end = 2, step = 1./100, dtype = torch.float32)
      # repeat for each batch
      x_tiles = torch.tile(input = x_range, dims = (self._batch_size, 1))
      # Unsqueeze to create explicit last dim 
      x_values = torch.unsqueeze(x_tiles, dim = -1)
    
    # During TRAINING the number of target points and their x-positions are selected at random
    else:
      # Set number of target points by uniformly sampling a random integer
      num_target = torch.randint(low = 3, high = (self._max_num_context - num_context), size = [], dtype = torch.int32)
      num_total_points = num_context + num_target
      # Uniformly sample random floats as x and scale between [-2, 2]
      x_values = ((torch.rand(size = (self._batch_size, num_total_points, self._x_size)) * 4) - 2)

    # Set kernel parameters
    # Either choose a set of random parameters for the mini-batch
    if self._random_kernel_parameters:
      # Scale [0, 1] outputs by range and Shift by bias
      l1 = ((torch.rand(size = (self._batch_size, self._y_size, self._x_size)) * (self._l1_scale - 0.1)) + 0.1)
      # Scale [0, 1] outputs by range and Shift by bias
      # No noise in x 
      sigma_f = ((torch.rand(size = (self._batch_size, self._y_size)) * (self._sigma_scale - 0.1)) + 0.1)
    
    # Or use the same fixed parameters for all mini-batches
    else:
      l1 = torch.ones(size = (self._batch_size, self._y_size, self._x_size)) * self._l1_scale
      sigma_f = torch.ones(size = (self._batch_size, self._y_size)) * self._sigma_scale

    # Pass the x_values through the Gaussian kernel
    # [batch_size, y_size, num_total_points, num_total_points]
    kernel = self._gaussian_kernel(x_values, l1, sigma_f)

    # Calculate Cholesky, using double precision for better stability:
    cholesky = torch.linalg.cholesky(kernel.type(torch.DoubleTensor)).type(torch.FloatTensor)

    y_values = torch.matmul(cholesky, torch.randn(size = (self._batch_size, self._y_size, num_total_points, 1)))

    # Sample a curve
    # [batch_size, y_size, num_total_points, 1]
    y_values = torch.matmul(cholesky, torch.randn(size = (self._batch_size, self._y_size, num_total_points, 1)))

    # [batch_size, num_total_points, y_size]
    # squeeze dim 3 and reorder
    y_values = torch.transpose(torch.squeeze(y_values, dim = 3), dim0 = 1, dim1 = 2)

    if self._testing:
      # Select the targets (all)
      target_x = x_values
      target_y = y_values

      # Select the observations (slicing instead of tf.gather and reordering based on permutation)
      idx = torch.randperm(num_target)
      context_x = x_values[:, idx[:num_context], :]
      context_y = y_values[:, idx[:num_context], :]

    else:
      # Select the targets which will consist of the context points as well as
      # some new target points
      target_x = x_values[:, :num_target + num_context, :]
      target_y = y_values[:, :num_target + num_context, :]

      # Select the observations
      context_x = x_values[:, :num_context, :]
      context_y = y_values[:, :num_context, :]

    query = ((context_x, context_y), target_x)

    return NPRegressionDescription(
        query = query,
        target_y = target_y,
        num_total_points = target_x.size(dim = 1),
        num_context_points = num_context)

## Plotting function

In [237]:
def plot_test_data(target_x, target_y, context_x, context_y):
    """ Plotly go function that creates plot from test data

    Args:
        target_x (_type_): _description_
        target_y (_type_): _description_
        context_x (_type_): _description_
        context_y (_type_): _description_
    """    
    fig = go.Figure()

    # Add line of target points
    fig.add_trace(go.Scatter(x = torch.squeeze(target_x), y = torch.squeeze(target_y), mode = 'lines', name = 'Target'))
    
    # Add line of context points
    fig.add_trace(go.Scatter(x = torch.squeeze(context_x), y = torch.squeeze(context_y), mode = 'markers', name = 'Context'))

    fig.show()

In [238]:
MAX_CONTEXT_POINTS = 50 #@param {type:"number"}
random_kernel_parameters = True #@param {type:"boolean"}

# Train dataset
dataset_train = GPCurvesReader(
    batch_size = 16, max_num_context = MAX_CONTEXT_POINTS, random_kernel_parameters = random_kernel_parameters)
data_train = dataset_train.generate_curves()

# Test dataset
dataset_test = GPCurvesReader(
    batch_size = 1, max_num_context = MAX_CONTEXT_POINTS, testing = True, random_kernel_parameters = random_kernel_parameters)
data_test = dataset_test.generate_curves()

In [289]:
# Unpack train data (better to visualise)
((context_x, context_y), target_x) = data_train.query
target_y = data_train.target_y

print(f"Size of train data context_x: {context_x.size()}")
print("Size of train data context_y: ", context_y.size())

print("Size of train data target_x: ", target_x.size())
print("Size of train data target_y: ", target_y.size())

Size of train data context_x: torch.Size([16, 5, 1])
Size of train data context_y:  torch.Size([16, 5, 1])
Size of train data target_x:  torch.Size([16, 23, 1])
Size of train data target_y:  torch.Size([16, 23, 1])


In [239]:
# Unpack test data (better to visualise)
((context_x, context_y), target_x) = data_test.query
target_y = data_test.target_y

# Contexts are a subset 
print("Size of test data context_x: ", context_x.size())
print("Size of test data context_y: ", context_y.size())

print("Size of test data target_x: ", target_x.size())
print("Size of test data target_y: ", target_y.size())

# Plot
plot_test_data(target_x, target_y, context_x, context_y)

Size of context_x:  torch.Size([1, 37, 1])
Size of context_y:  torch.Size([1, 37, 1])
Size of target_x:  torch.Size([1, 400, 1])
Size of target_y:  torch.Size([1, 400, 1])


## Encoder

::

    input -> [linear -> relu ->] * 4
          -> Mean
          -> mu, sigma  
          -> MSELoss
          -> loss

### Batch MLP utlity method

In [None]:
# utility methods
def batch_mlp(input, output_sizes, variable_scope):
  """Apply MLP to the final axis of a 3D tensor (reusing already defined MLPs).
  
  Args:
    input: input tensor of shape [B,n,d_in].
    output_sizes: An iterable containing the output sizes of the MLP as defined 
        in `basic.Linear`. (e.g. [128, 128, 128, 128])
    variable_scope: String giving the name of the variable scope. If this is set
        to be the same as a previously defined MLP, then the weights are reused.
    
  Returns:
    tensor of shape [B,n,d_out] where d_out=output_sizes[-1]
  """
  # Get the shapes of the input and reshape to parallelise across observations
  # d_in considered filter_size
  batch_size, _ , filter_size = list(input.shape)
  # Combines datapoints across batches (first dim) but preserved filter_size as last dimension (second dim)
  output = torch.reshape(input, shape = (-1, filter_size))

  # Pass through MLP
  with tf.variable_scope(variable_scope, reuse=tf.AUTO_REUSE):
  # i: number of (layers - 1), last layer defined directly
  # size: hidden_size defined per layer
    for i, size in enumerate(output_sizes[:-1]):

      torch.nn.ReLU(torch.nn.Linear(in_features = filter_size, out_features = size))
      output = tf.nn.relu(
          tf.layers.dense(output, size, name="layer_{}".format(i)))

    # Last layer without a ReLu
    output = tf.layers.dense(
        output, output_sizes[-1], name="layer_{}".format(i + 1))

  # Bring back into original shape
  output = tf.reshape(output, (batch_size, -1, output_sizes[-1]))
  return output

In [274]:
output_sizes = [HIDDEN_SIZE]*4

for i, size in enumerate(output_sizes[:-1]):
    print("i", i)
    print("size", size)

i 0
size 128
i 1
size 128
i 2
size 128


In [None]:
class Encoder(object):
  """Encoder class"""

  def __init__(self, output_sizes, num_latents):
    """NP Encoder

    Instance variables:
      output_sizes: An iterable containing the output sizes of the encoding MLP. (e.g. [128, 128, 128, 128])
      num_latents: The latent dimensionality.
    """
    # self._ : global variables 
    self._output_sizes = output_sizes
    self._num_latents = num_latents

  def __call__(self, x, y):
    """Encodes the inputs into one representation.

    Args:
      x: Tensor of shape [B, observations, d_x]. For this 1D regression
          task this corresponds to the x-values.
      y: Tensor of shape [B, observations, d_y]. For this 1D regression
          task this corresponds to the y-values.

    Returns:
      A normal distribution over tensors of shape [B, num_latents]
    """

    # Concatenate x and y along the last axis 
    encoder_input = torch.cat(tensors = (x, y), dim = -1)

    # Pass last axis through MLP
    hidden = batch_mlp(encoder_input, self._output_sizes, "latent_encoder")

    ###
    # Get the shapes of the input and reshape to parallelise across observations
    batch_size, _ , filter_size = list(encoder_input.shape)
    
    # Combines datapoints across batches (first dim) but preserved filter_size as last dimension (second dim)
    parallel_encoder_input = torch.reshape(encoder_input, shape = (-1, filter_size))
    
    # Initialise EncoderNet
    encodernet = EncoderNet(filter_size, self._num_latents, self._num_latents)

    mu, log_sigma = encodernet(parallel_encoder_input)


    ###
      
    # Aggregator: take the mean over all points
    hidden = tf.reduce_mean(hidden, axis=1)
    
    # Have further MLP layers that map to the parameters of the Gaussian latent
    with tf.variable_scope("latent_encoder", reuse=tf.AUTO_REUSE):
      # First apply intermediate relu layer 
      hidden = tf.nn.relu(
          tf.layers.dense(hidden, 
                          (self._output_sizes[-1] + self._num_latents)/2, 
                          name="penultimate_layer"))
      # Then apply further linear layers to output latent mu and log sigma
      mu = tf.layers.dense(hidden, self._num_latents, name="mean_layer")
      log_sigma = tf.layers.dense(hidden, self._num_latents, name="std_layer")
      
    # Compute sigma
    sigma = 0.1 + 0.9 * tf.sigmoid(log_sigma)

    # torch.distributions.normal.Normal(0, 1)
    return tf.contrib.distributions.Normal(loc=mu, scale=sigma)

In [294]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class EncoderNet(nn.Module):
    ### Leaving out the additional intermediate layer

    def __init__(self, filter_size, hidden_size, num_latents): # Layers and variables are defined in the __init__ method
        super(EncoderNet, self).__init__()
        # VARIABLES
        self.filter_size = filter_size
        self.hidden_size = hidden_size
        self.num_latents = num_latents

        # LAYERS
        # an affine operation: y = Wx + b
        self.fc1 = nn.Linear(self.filter_size, self.hidden_size)
        self.fc2 = nn.Linear(self.hidden_size, self.hidden_size)
        self.fc3 = nn.Linear(self.hidden_size, self.hidden_size)
        self.fc4 = nn.Linear(self.hidden_size, self.num_latents)

        # Average pooling

        # Output layers
        self.olmu = nn.Linear(self.hidden_size, self.num_latents)
        self.ollogsig = nn.Linear(self.hidden_size, self.num_latents)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)

        # Linear layer to both outputs
        mu = self.olmu(x)
        log_sigma = self.ollogsig(x)

        return mu, log_sigma

# encodernet = EncoderNet()
# print(encodernet)

In [309]:
encodernet = EncoderNet(filter_size = 2, hidden_size = 128, num_latents = 128)
params = list(encodernet.parameters())
print(len(params))
print(params[0].size()) 
print(params[1].size()) 
print(params[2].size()) 
print(params[3].size()) 
print(params[4].size()) 
print(params[5].size()) 

12
torch.Size([128, 2])
torch.Size([128])
torch.Size([128, 128])
torch.Size([128])
torch.Size([128, 128])
torch.Size([128])


In [None]:
class Encoder(object):
  """Encoder class"""

  def __init__(self, output_sizes, num_latents):
    """NP Encoder

    Instance variables:
      output_sizes: An iterable containing the output sizes of the encoding MLP. (e.g. [128, 128, 128, 128])
      num_latents: The latent dimensionality.
    """
    # self._ : global variables 
    self._output_sizes = output_sizes
    self._num_latents = num_latents

  def __call__(self, x, y):
    """Encodes the inputs into one representation.

    Args:
      x: Tensor of shape [B, observations, d_x]. For this 1D regression
          task this corresponds to the x-values.
      y: Tensor of shape [B, observations, d_y]. For this 1D regression
          task this corresponds to the y-values.

    Returns:
      A normal distribution over tensors of shape [B, num_latents]
    """

    # Concatenate x and y along the last axis 
    encoder_input = torch.cat(tensors = (x, y), dim = -1)

    

    # Pass last axis through MLP
    hidden = batch_mlp(encoder_input, self._output_sizes, "latent_encoder")
      
    # Aggregator: take the mean over all points
    hidden = tf.reduce_mean(hidden, axis=1)
    
    # Have further MLP layers that map to the parameters of the Gaussian latent
    with tf.variable_scope("latent_encoder", reuse=tf.AUTO_REUSE):
      # First apply intermediate relu layer 
      hidden = tf.nn.relu(
          tf.layers.dense(hidden, 
                          (self._output_sizes[-1] + self._num_latents)/2, 
                          name="penultimate_layer"))
      # Then apply further linear layers to output latent mu and log sigma
      mu = tf.layers.dense(hidden, self._num_latents, name="mean_layer")
      log_sigma = tf.layers.dense(hidden, self._num_latents, name="std_layer")
      
    # Compute sigma
    sigma = 0.1 + 0.9 * tf.sigmoid(log_sigma)

    # torch.distributions.normal.Normal(0, 1)
    return tf.contrib.distributions.Normal(loc=mu, scale=sigma)

In [271]:
encoder_input = torch.cat(tensors = (context_x, context_y), dim = -1)
print(encoder_input.shape)
batch_size, _ , filter_size = list(encoder_input.shape)

torch.reshape(encoder_input, shape = (-1, filter_size))

torch.Size([16, 5, 2])


tensor([[ 1.6695e+00,  1.3893e+00],
        [-1.6097e+00,  6.0845e-01],
        [ 1.5475e+00,  1.2541e+00],
        [ 9.6082e-01, -6.3821e-02],
        [ 1.1155e+00,  1.9211e-01],
        [ 1.7744e+00, -5.2131e-01],
        [ 1.3607e+00, -4.6908e-01],
        [-1.2684e+00,  2.2224e-02],
        [-7.1997e-02,  2.4700e-01],
        [ 6.6764e-01, -1.3847e-01],
        [-1.2043e+00, -2.0594e-01],
        [ 2.3439e-01, -7.8805e-02],
        [ 6.0307e-01,  4.3825e-01],
        [-1.3070e+00, -3.2210e-01],
        [-1.4676e+00, -4.7742e-01],
        [-3.2465e-01,  6.1068e-01],
        [ 1.0823e+00,  8.8236e-02],
        [-1.8250e+00, -5.8394e-01],
        [-9.4264e-01, -3.4113e-01],
        [ 2.1516e-01,  7.8454e-01],
        [ 1.7815e+00, -1.2643e+00],
        [-3.1779e-01, -1.6950e-01],
        [-4.3276e-01, -4.1213e-01],
        [ 7.4593e-01, -4.5936e-01],
        [ 1.3781e+00, -8.7830e-01],
        [-1.1310e+00,  3.5035e-01],
        [-1.4030e-01, -1.4355e-02],
        [ 6.6925e-02, -1.038

In [275]:
torch.distributions.normal.Normal(0, 1)

Normal(loc: 0.0, scale: 1.0)