## Setup & Data

In [None]:
%load_ext autoreload
%autoreload 2

import tensorflow as tf
import tensorflow_datasets as tfds
from typing import Dict, Optional, Tuple
from gpflow.utilities import to_default_float
import gpflow
import numpy as np

gpflow.config.set_default_float(np.float32)

Import MNIST and preprocess

In [None]:
original_dataset, info = tfds.load(name="mnist", split=tfds.Split.TRAIN, with_info=True)

total_num_data = info.splits["train"].num_examples
image_shape = info.features["image"].shape
image_size = tf.reduce_prod(image_shape)
num_classes = info.features["label"].num_classes
batch_size = 32

# reshape each input x to [1, 28, 28] and normalise to (0, 1).
# one-hot encode the outputs to [1, 10].
# x and y have a leading dim of 1 so that the concat in the next cell works
def map_fn(input_slice: Dict[str, tf.Tensor]):
    updated = input_slice
    image = to_default_float(updated["image"]) / 255.0
    label = tf.expand_dims(tf.one_hot(updated["label"], num_classes), 0)
    return tf.reshape(image, [-1, 28, 28]), label

autotune = tf.data.experimental.AUTOTUNE
dataset = (
    original_dataset.shuffle(1024)
    .take(150)
    # .batch(batch_size, drop_remainder=True)
    .map(map_fn, num_parallel_calls=autotune)
    .prefetch(autotune)
    # .repeat()
)

Turn the dataset into usable tensors

In [None]:
X = []
Y = []
for (x, y) in dataset:
    X.append(x)
    Y.append(y)
X = tf.concat(X, 0)
Y = tf.concat(Y, 0)
X_train, X_test = X[:100], X[100:]
Y_train, Y_test = Y[:100], Y[100:]

## Run the model

In [None]:
import deep_ckern as dkern

def create_kern():
    return dkern.DeepKernel(
        [1, 28, 28],
        filter_sizes=[[5, 5], [2, 2], [5, 5], [2, 2]],
        recurse_kern=dkern.ExReLU(multiply_by_sqrt2=True),
        var_weight=1.,
        var_bias=1.,
        padding=["VALID", "SAME", "VALID", "SAME"],
        strides=[[1, 1]] * 4,
        data_format="NCHW",
        skip_freq=-1,
    )

kern = create_kern()

Compute the training covariance $K_{xx}$ and train-test cross covariance $K_{zx}$:

In [None]:
Kxx = kern.K(X_train)
Kzx = kern.K(X_test, X_train)

Treat the classification as a multi-output regression problem (with independent outputs) and solve for the predictive means:

In [None]:
K_inv_Y = tf.linalg.solve(Kxx, Y_train)
pred_means = tf.linalg.matmul(Kzx, K_inv_Y)

Y_pred = tf.one_hot(tf.math.argmax(pred_means, axis=1), 10)
corrects = tf.reduce_all(Y_pred == Y_test, axis=1)
accuracy = (tf.math.count_nonzero(corrects) / len(corrects)).numpy()

print(accuracy) # 82%