In [None]:
import tensorflow as tf
import tensorflow_probability as tfp
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Conv2D, MaxPooling2D
from tensorflow.keras.losses import SparseCategoricalCrossentropy, MeanSquaredError
from tensorflow.keras.optimizers import RMSprop

import tensorflow_datasets as tfds

tfd = tfp.distributions
tfpl = tfp.layers

# 1. Introductory Concepts

<b>Distribution objects</b> capture the essencial operations on probability distributions.

## 1.1 Univariate Distributions

Univariate distributions are distribution of a single random variable.

$$\begin{align}
\mathcal{N} &= (\mu,\sigma) \\
\mu &= 0 \\
\sigma &= 1 \\
\end{align}$$

In [None]:
# Create a univariate normal distribution with μ=0 and σ=1



Notice the properties `batch_shape` and `event_shape`. The `event_shape` property is what captures the dimensionality of the random variable. In this case we are using a univariate distribution, that is why the `event_shape` is empty. 

What can we do with this distribution object. We can, for instance, sample from it.

In [None]:
# Draw one sample from the normal distribution



In [None]:
# Draw 3 samples from the normal distribution



We can also evaluate the Probability Density Function (PDF), in the case of continuous random variables, on a given input.

In [None]:
# Calculate the pdf(0.2)



We will usually use the log probability of a given input.

In [None]:
# Calculate the log_prob(0.2)



In [None]:
# It is simply the log of the value returned by the prob method 

np.log(normal.prob(0.2))

In [None]:
# Plot a histogram, approximating the density of the distribution

plt.hist(normal.sample(10000).numpy(), bins=50, density=True)
plt.show()

We can also represent in a single object a batch of distribution of the same type.

In [None]:
# Create two univariate normal distributions:
# one with μ=0 and σ=1
# and another with μ=1 and σ=1



Notice the batch shape in the object above, it has a batch shape of two, meaning that we have two different normal distributions represented in that object. We can obviously sample from this distribution.

In [None]:
# Sample from the batch 2 normal distribution



Now, the shape of our samples is (3,2), as we are drawing 3 samples from each normal distribution. The same way we can get the values for the PDFs of both distributions.

In [None]:
# Get the PDF values for the batch distribution



The returned shape is the original batch shape. We can go crazy with dimensionality.

In [None]:
# Use a batch shape with higher rank

normal_batch_nD = tfd.Normal([[[0., 1],
                             [0.5, 1],
                             [3, 2]]], scale=1)
normal_batch_nD

In [None]:
# Sample from the batch of distributions



In [None]:
normal_batch_nD.sample(10).shape

The shape is now (10, 1, 3, 2), where the first dimension is the sample size and the rest are the batched distributions.

## 1.2 Multivariate Distributions

Let's start by defining a multivariate normal distribution. There are multiple ways to do it that you can check in the documentation. It is defined as:

$$\begin{align}
\mathcal{N_1} &= (\mu_1,\Sigma_1) \\
\mu_1 &= [0, 1] \\
\Sigma_1 &= \begin{bmatrix}
1 & 0\\
0 & 2
\end{bmatrix}
\end{align}$$

In [None]:
# Create a multivariate normal distribution with diagonal covariance
# with μ = [0, 1] and σ = [1, 2]


Notice the difference from the distributions that we have created above. The `event_shape` is now 2, indicating that the random variable for this distribution is 2-dimensional.

In [None]:
# Sample from the multivariate normal


In [None]:
# Plot the samples of the distribution

plt_sample = mv_normal.sample(10000).numpy()
plt.scatter(plt_sample[:,0], plt_sample[:,1], marker='.', alpha=0.05)
plt.axis('equal')
plt.show()

The data is more spread out in the second dimension (y-axis) as we defined a scale that is twice the value used for the first dimension. Also, there is no correlation between the dimensions, as we used a diagonal multivariate normal distribution (off-diagonal terms are 0).

We can define a batch of multivariate distributions.

$$\begin{align}
\mathcal{N_1} &= (\mu_1,\Sigma_1) \\
\mu_1 &= [0, 0] \\
\Sigma_1 &= \begin{bmatrix}
1 & 0\\
0 & 2
\end{bmatrix} \\[10pt]
\mathcal{N_2} &= (\mu_2,\Sigma_2) \\
\mu_1 &= [1, 1] \\
\Sigma_1 &= \begin{bmatrix}
1 & 0\\
0 & 1
\end{bmatrix} \\[10pt]
\mathcal{N_3} &= (\mu_3,\Sigma_3) \\
\mu_1 &= [0, 0] \\
\Sigma_1 &= \begin{bmatrix}
2 & 0\\
0 & 10
\end{bmatrix}
\end{align}$$

In [None]:
# Create 3 batches of multivariate normals

normal_diag_batch = tfd.MultivariateNormalDiag(loc=[[0,0], [1,1], [0,0]],
                                              scale_diag=[[1,2], [1,1], [2,10]])

In [None]:
# sample from the distribution



When sampling we get an output with the following dimensions (number_samples, batch_shape, event_shape).

In [None]:
# Compute log probs



The log-probability has shape (number_samples, batch_shape) as expected.

In [None]:
# create a sample for a plot

plt_sample_batch = normal_diag_batch.sample(10000).numpy()
plt_sample_batch.shape # (samples, batch, event)

In [None]:
# Plot the samples of the mv normal

fig, axs = (plt.subplots(1, 3, sharex=True, sharey=True, figsize=(10, 3)))
titles = ['cov_diag=[1,2]', 'cov_diag=[1,1]', f'cov_diag=[2,10]']

for i, (ax, title) in enumerate(zip(axs, titles)):
    samples = plt_sample_batch[:,i,:] # take the ith batch, gives a shape (samples, event_shape)
    ax.scatter(samples[:,0], samples[:,1], marker='.', alpha=0.05)
    ax.set_title(title)
    axs[i].set_ylim(-25, 25)
    axs[i].set_xlim(-25, 25)
plt.show()

## 1.3 Independent Distribution

There are cases where we want to interpret a batch of independent distributions over an event space as a single joint distribution over a product of event spaces. This impacts the way we handle the batch and event shapes. An example of such a problem is the Naive Bayes classifier, where the features are independent given a class label.

To illustrate, let's define two normal distributions.
The first is a multivariate normal of the form:
$$\begin{align}
\mathcal{N_1} &= (\mu_1,\Sigma_1) \\
\mu_1 &= [0, 1] \\
\Sigma_1 &= \begin{bmatrix}
1 & 0\\
0 & 2
\end{bmatrix}
\end{align}$$


The second is a batched normal of the form:
$$\begin{align}
\mathcal{N_{2_1}} &= (\mu_{2_1},\sigma_{2_1}) \\
\mu_{2_1} &= 0 \\
\sigma_{2_1} &= 1 \\[10pt]
\mathcal{N_{2_2}} &= (\mu_{2_2},\sigma_{2_2}) \\
\mu_{2_2} &= 1 \\
\sigma_{2_2} &= 2
\end{align}$$

In [None]:
# Create a multivariate normal diagonal distribution
# with μ=[0,1], σ=[1, 2]



In [None]:
# Plot the independent dist

samples = mv_normal.sample(10000).numpy()
x1 = samples[:,0]
x2 = samples[:,1]
sns.jointplot(x = x1, y = x2, kind='kde', space=0, color='b', xlim=[-6, 7], ylim=[-6, 7]);

In [None]:
# Create a batched normal distribution
# with μ=[0,1], σ=[1, 2]



In [None]:
# Plot the batched normal

t = np.linspace(-4, 4, 10000)
densities = batched_normal.prob(np.repeat(t[:, np.newaxis], 2, axis=1))

sns.lineplot(x=t, y=densities[:, 0], label='loc={}, scale={}'.format(locs[0], scales[0]))
sns.lineplot(x=t, y=densities[:, 1], label='loc={}, scale={}'.format(locs[1], scales[1]))
plt.ylabel('Probability Density')
plt.xlabel('Value')
plt.legend()
plt.show()

Notice that the first distribution object returns a single log-probability while the second returns 2. The difference is that the array that we pass to the first is interpreted as a single realization of a 2-dimensional random variable. In the second case, the array is interpreted as different inputs for each of the random variables (also referred as batches).

In a nutshell the Independent distribution allows us to absorb whatever dimesions we want to absorb to the event dimension.

In [None]:
# Create an independent normal dist from the batched normal dist created above



In [None]:
# Plot the independent dist

samples = independent_normal.sample(10000).numpy()
x1 = samples[:,0]
x2 = samples[:,1]
sns.jointplot(x = x1, y = x2, kind='kde', space=0, color='b', xlim=[-6, 7], ylim=[-6, 7]);

Notice that now the independent normal created from the batched normal is equivalent to the multivariate normal created above.

## 1.4 Trainable Parameters

Now that we know the essencials about TensorFlow Probability objects, it is time to understand how we can train parameters for these distributions. This is the connection that we are missing to start applying what we have learned. 

In TensorFlow, `Variable` objects are what we use to capture the values of parameter of our deep learning models. These objects are updated during training by, for example, applying gradients obtained from a loss function and data.

In [None]:
# Create a normal distribution with the μ parameter trainable



For the training procedure, Maximum Likelihood is the usual suspect in deep learning models. In a nutshell, we are looking for the parameters of our model that maximize the probability of the data. 

The probability density function of a continuous random variable roughly indicates the probability of a sample taking a particular value. We will denote this function $P(x | \theta)$ where $x$ is the value of the sample and $\theta$ is the parameter describing the probability distribution:

$$
P(x | \theta) = \text{Prob} (\text{sampling value $x$ from a distribution with parameter $\theta$}).
$$

In [None]:
# Calculate pdf of 0 for a standard normal dist



When more than one sample is drawn *independently* from the same distribution (which we usually assume), the probability density function of the sample values $x_1, \ldots, x_n$ is the product of the probability density functions for each individual $x_i$. Written formally:

$$
P(x_1, \ldots, x_n | \theta) = \prod_{i=1}^n P(x_i | \theta).
$$

In [None]:
X = [-0.5, 0, 1.5]



Probability density functions are usually considered functions of $x_1, \ldots, x_n$, with the parameter $\theta$ considered fixed. They are used when you know the parameter $\theta$ and want to know the probability of a sample taking some values $x_1, \ldots, x_n$. You use this function in *probability*, where you know the distribution and want to make deductions about possible values sampled from it.

The *likelihood* function is the same, but with the $x_1, \ldots, x_n$ considered fixed and with $\theta$ considered the independent variable. You usually use this function when you know the sample values $x_1, \ldots, x_n$ (because you've observed them by collecting data), but don't know the parameter $\theta$. You use this function in *statistics*, where you know the data and want to make inferences about the distribution they came from. 

For the likelihood, the convention is using the letter $L$, so that

$$
\underbrace{L(x_1, \ldots, x_n | \theta)}_{\text{ likelihood,} \\ \text{function of $\theta$}} = \underbrace{P(x_1, \ldots, x_n | \theta)}_{\text{probabiliy density,} \\ \text{ function of $x_1, \ldots, x_n$}}
$$

We are ready to define our likelihood function for a normal distribution with parameters $\mu$ and $\sigma$:

$$\begin{aligned}
L = f(X|\theta) &= f(x_1|\theta) f(x_2|\theta),..., f(x_n|\theta) \\
&= \prod^n_{j=1}f(X| \mu,\sigma^2) \\
&= (2\pi\sigma^2)^{-n/2} \exp{\big(-\frac{1}{2\sigma^2} \sum^n_{j=1}(x_i-\mu)^2\big)}
\end{aligned}$$

In [None]:
# Create a grid of values for μ and σ and calculate the likelihood
# for the different parameter values

μ = np.linspace(-2, 2, 100)
σ = np.linspace(0, 3, 100)

l_x = []
for mu in μ:
    for sigma in σ:
        l_x.append(np.prod(tfd.Normal(mu, sigma).prob(X)))
        
l_x = np.asarray(l_x).reshape((100, 100)).T

In [None]:
# Plot the likelihood based on the observed data (fixed)

plt.contourf(μ, σ, l_x)
plt.xlabel('μ')
plt.ylabel('σ')
plt.colorbar()
plt.title('Likelihood');

We now get to a new problem, multiplying many small probabilities together can be numerically unstable. To overcome this, we can use the log of the same function. The natural logarithm is a monotonically increasing function, which means that if the value on the x-axis increases, the value on the y-axis also increases. This is important because it ensures that the maximum value of the log of the probability occurs at the same point as the original probability function. It does another very conveniently thing for us, it transforms our products into sums.

Let's perform the transformation:

$$\begin{aligned}
\log(L(X|\theta)) &= \log\big((2\pi\sigma^2)^{-n/2} \exp{\big(-\frac{1}{2\sigma^2} \sum^n_{j=1}(y_i-\mu)^2\big)\big)} \\
&= -\frac{n}{2}\log(2\pi)-\frac{n}{2}\log(\sigma^2)-\frac{1}{2\sigma^2}\sum_{j=1}^{n}(y_i-\mu)^2
\end{aligned}$$

Almost there! We will now work our optimization problem at hand. We can put it as simple as

$$\max_{\mu,\sigma^2}\log(L(X|\theta))$$

The expression derived above can be differentiated to find the maximum. Expanding our parameters we have $\log(L(X|\mu, \sigma))$. As it is a function of the two variables $\mu$ and $\sigma$ we use partial derivatives to find the MLE. 

Let's focus on $\hat \mu$ (the hat indicates that it is an estimator, i.e. our output), we compute it from

$$\begin{aligned}
& \quad \frac{\partial}{\partial \mu} \log(L(Y|\mu, \sigma)) \\
&= \frac{\partial}{\partial \mu} \big(-\frac{n}{2}\log(2\pi)-\frac{n}{2}\log(\sigma^2)-\frac{1}{2\sigma^2}\sum_{j=1}^{n}(x_i-\mu)^2\big)
\\
&= \sum^n_{j=1} \frac{(x_i - \mu)}{\sigma^2}
\end{aligned}$$

Setting the expression above equal to zero we get


$$\sum^n_{j=1} \frac{(x_i - \mu)}{\sigma^2} = 0 $$

Then
$$\begin{aligned}
\hat\mu &= \frac{\sum^n_{j=1}x_i}{n} \\
\hat\mu &= \bar x
\end{aligned}$$

Surprisingly or not this is the mean of the data.

In [None]:
# Get maximum values for the μ and σ and compare with true values

index_mu_max = np.argmax(l_x, axis=1)[-1]
print(f'μ True Value: {np.array(X).mean()}')
print(f'μ Calculated Value: {μ[index_mu_max]}')
print(f'σ True Value: {np.array(X).std()}')
print(f'σ Calculated Value: {σ[np.nanargmax(l_x[:,index_mu_max], axis=0)]}')

##### Calculate using TensorFlow

In [None]:
# Create a random variable normall distributed and sample from it

x_train = np.random.normal(loc=1, scale=5, size=1000).astype('float32')[:, np.newaxis]

In [None]:
# Plot a histogram of the random variable

plt.hist(x_train, bins=50);

In [None]:
# Calculate the mean of the samples of the random variable



In [None]:
# Define the negative log likelihood function (loss function)



Notice that above we are supplying data points to our model and computing the corresponding log-probability for each data point. As we already saw, the `log_prob` method returns a tensor that has the same shape as the data. The log-probability of our data will be the sum of the log-probabilities of each data point.

In [None]:
# Custom training loop

@tf.function
def get_loss_and_grads(x_train):
    with tf.GradientTape() as tape:
        tape.watch(normal.trainable_variables)
        loss = nll(x_train)
        grads = tape.gradient(loss, normal.trainable_variables)
    return loss, grads

optimizer = tf.keras.optimizers.SGD(learning_rate=0.001)

for i in range(2000):
    loss, grads = get_loss_and_grads(x_train)
    optimizer.apply_gradients(zip(grads, normal.trainable_variables))
    
    loc_value = normal.loc.value()
    print("Step {:03d}: Loss: {:.3f} Loc: {:.3f}".format(i, loss, loc_value))

In [None]:
# Compare the true value and the estimated parameter

print(f'True Value: {x_train.mean()}')
print(f'Estimated Value: {normal.trainable_variables[0].numpy()}')

# 2. Linear Regression

#### Create data

The data we'll be working with is artificially created from the following equation:

$$y_i = x_i + \frac{3}{10}\epsilon_i$$

where $\epsilon_i \sim \mathcal{N}(0,1)$ are independent and identically distributed

In [None]:
# Create and plot 100 points of training data

x_train = np.linspace(-1, 1, 100)[:, np.newaxis]
y_train = x_train + 0.3*np.random.randn(100)[:, np.newaxis]

plt.scatter(x_train, y_train, alpha=0.4)
plt.xlabel('x')
plt.ylabel('y')
plt.show()

#### Deterministic linear regression with MSE loss

Let's define a model that receives each input as a length one vector (`input_shape`=(1,)) and the model is predicting only one target variable (using a Dense layer with 1 unit). We are compiling the model with the mean squared error loss, using the `RMSprop` optimizer and training for 200 epochs.

In [None]:
# Create linear regression via Sequential model
# Train the deterministic linear model using mean squared error loss



In [None]:
# Plot the data and the model
plt.scatter(x_train, y_train, alpha=0.4, label='data')
plt.plot(x_train, model.predict(x_train), color='red', alpha=0.8, label='model')
plt.legend()
plt.show()

The deterministic linear regression fails to capture the aleatoric uncertainty (uncertainty on the process generation of the data). We can see this by the output above, where we get the y_value (our prediction) but we don't get any reference to the uncertainty of that prediction (there is significant uncertainty as we can see from the distance between the blue points and the red line).

#### Probabilistic linear regression to model the aleatoric uncertainty

To build our probabilistic model, we add a final layer which is a `DistributionLambda` layer. This layer includes the normal distribution. Notice that the output of the previous layer is what defines the mean of the normal distribution and we are assuming that the standard deviation is fixed (in our case 0.6). The constructor of the `DistributionLambda` has one required argument which is a function. This function takes the output of the previous layer as an input and returns a distribution object. In this case we are using a lambda function to instantiate the `DistributionLambda` layer. The lambda function receives an input t, which is the output tensor of the previous `Dense` layer and returns a normal distribution with mean defined by the tensor t.

With this setup the model returns a distribution object when it is called.

In [None]:
# Create probabilistic regression with normal distribution as final layer



Notice that the output of the model is a distribution object with `batch_shape` equal to batch of the data (we did not create batches) by the number of units in the `Dense` layer (or number of ouput variables).

In [None]:
# Print model summary



It is time to define our loss function. The loss function in `Keras` receives the true labels and predictions as inputs. To compute the negative loglikelihood we just need to bear in mind that our model outputs a distribution object, so `y_pred` is the normal distribution. Using the `log_prob` method, we can easily compute the log probability of the data.

In [None]:
# Train model using the negative loglikelihood



In [None]:
# Compile and fit the model



In [None]:
# Plot the data and a sample from the model

y_model = model(x_train)
y_sample = y_model.sample()
y_hat = y_model.mean()
y_sd = y_model.stddev()
y_hat_m2sd = y_hat -2 * y_sd
y_hat_p2sd = y_hat + 2*y_sd

fig, (ax1, ax2) =plt.subplots(1, 2, figsize=(15, 5), sharey=True)
ax1.scatter(x_train, y_train, alpha=0.4, label='data')
ax1.scatter(x_train, y_sample, alpha=0.4, color='red', label='model sample')
ax1.legend()
ax2.scatter(x_train, y_train, alpha=0.4, label='data')
ax2.plot(x_train, y_hat, color='red', alpha=0.8, label='model $\mu$')
ax2.plot(x_train, y_hat_m2sd, color='green', alpha=0.8, label='model $\mu \pm 2 \sigma$')
ax2.plot(x_train, y_hat_p2sd, color='green', alpha=0.8)
ax2.legend()
plt.show()

As we can see we capture the mean correctly but as we did not infer the standard deviation (it was manually chosen) we see that its value is far from correct. Let's add the standard deviation as a parameter of the model.

We need to constraint the standard deviation to be positive. We use a SoftPlus function, which is a smooth approximation to the ReLU function and can indeed constrain the output of a model to always be positive.

In [None]:
# Create probabilistic regression with normal distribution as final layer



In [None]:
# Print model summary

model.summary()

In [None]:
# Train model using the negative loglikelihood



In [None]:
# Compile and fit the model



In [None]:
# Plot the data and a sample from the model

y_model = model(x_train)
y_sample = y_model.sample()
y_hat = y_model.mean()
y_sd = y_model.stddev()
y_hat_m2sd = y_hat -2 * y_sd
y_hat_p2sd = y_hat + 2*y_sd

fig, (ax1, ax2) =plt.subplots(1, 2, figsize=(15, 5), sharey=True)
ax1.scatter(x_train, y_train, alpha=0.4, label='data')
ax1.scatter(x_train, y_sample, alpha=0.4, color='red', label='model sample')
ax1.legend()
ax2.scatter(x_train, y_train, alpha=0.4, label='data')
ax2.plot(x_train, y_hat, color='red', alpha=0.8, label='model $\mu$')
ax2.plot(x_train, y_hat_m2sd, color='green', alpha=0.8, label='model $\mu \pm 2 \sigma$')
ax2.plot(x_train, y_hat_p2sd, color='green', alpha=0.8)
ax2.legend()
plt.show()

# 3. Non-Linear Regression

#### Probabilitistic linear regression with nonlinear learned mean & variance

Let's change the data to being nonlinear:

$$y_i = x_i^3+\frac{2}{5}(1+x_i)\epsilon_i$$

where $\epsilon_i \sim \mathcal{N}(0,1)$ are independent and identically distributed.

In [None]:
# Create and plot 10000 data points

x_train = np.linspace(-1, 1, 1000)[:, np.newaxis]
y_train = np.power(x_train, 3) + (2/5)*(1+x_train)*np.random.randn(1000)[:, np.newaxis]

plt.scatter(x_train, y_train, alpha=0.1)
plt.show()

To simplify the implementation of our last layer we can use a wrapper that TensorFlow Probability provides to build a similar distribution that we built with `DistributionLambda` - it is called `IndependentNormal`. At the same time we can use a static method that outputs the number of parameters that are required to the probabilistic layer and use it to define the number of units in the previous `Dense` layer: `tfpl.IndependentNormal.params_size(event_shape=1)`.

The real difference between the linear and non-linear models is that we added a new Dense layer as the first layer of the model.

In [None]:
# Create probabilistic regression: normal distribution with fixed variance

model = Sequential([
    Dense(input_shape=(1,), units=8, activation='sigmoid'),
    Dense(tfpl.IndependentNormal.params_size(event_shape=1)),
    tfpl.IndependentNormal(event_shape=1)
    # Dense(2),
    # tfpl.DistributionLambda(lambda t:tfd.Normal(loc=t[...,:1], scale=tf.math.softplus(t[...,1:])))
])

model.compile(loss=nll, optimizer=RMSprop(learning_rate=0.01))
model.summary()

In [None]:
# Train model

model.fit(x_train, y_train, epochs=500, verbose=False)
model.evaluate(x_train, y_train)

In [None]:
# Plot the data and a sample from the model

y_model = model(x_train)
y_sample = y_model.sample()
y_hat = y_model.mean()
y_sd = y_model.stddev()
y_hat_m2sd = y_hat -2 * y_sd
y_hat_p2sd = y_hat + 2*y_sd

fig, (ax1, ax2) =plt.subplots(1, 2, figsize=(15, 5), sharey=True)
ax1.scatter(x_train, y_train, alpha=0.4, label='data')
ax1.scatter(x_train, y_sample, alpha=0.4, color='red', label='model sample')
ax1.legend()
ax2.scatter(x_train, y_train, alpha=0.4, label='data')
ax2.plot(x_train, y_hat, color='red', alpha=0.8, label='model $\mu$')
ax2.plot(x_train, y_hat_m2sd, color='green', alpha=0.8, label='model $\mu \pm 2 \sigma$')
ax2.plot(x_train, y_hat_p2sd, color='green', alpha=0.8)
ax2.legend()
plt.show()

# 4. Deterministic Deep Learning Model

### 4.1 The MNIST dataset

In this article we use the [MNIST](http://yann.lecun.com/exdb/mnist/) dataset and its corrupted version. The corrupted version have grey spatters on top of the numbers, make it harder to classify the numbers. Our goal is to build a Convolutional Neural Network (CNN) that classifies the images of the handwritten digits to 10 different classes.

In [None]:
# Function to load training and testing data, with labels in integer and one-hot form

def load_data(name):
    data_dir = os.path.join('data', name)
    x_train = 1 - np.load(os.path.join(data_dir, 'x_train.npy')) / 255.
    x_train = x_train.astype(np.float32)
    y_train = np.load(os.path.join(data_dir, 'y_train.npy'))
    y_train_oh = tf.keras.utils.to_categorical(y_train)
    x_test  = 1 - np.load(os.path.join(data_dir, 'x_test.npy')) / 255.
    x_test = x_test.astype(np.float32)
    y_test  = np.load(os.path.join(data_dir, 'y_test.npy'))
    y_test_oh = tf.keras.utils.to_categorical(y_test)
    
    return (x_train, y_train, y_train_oh), (x_test, y_test, y_test_oh)

In [None]:
# Function to inspect dataset digits

def inspect_images(data, num_images):
    fig, ax = plt.subplots(nrows=1, ncols=num_images, figsize=(2*num_images, 2))
    for i in range(num_images):
        ax[i].imshow(data[i, :, :], cmap='gray')
        ax[i].axis('off')
    plt.show()

In [None]:
# Load and inspect the MNIST dataset

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

inspect_images(data=x_train, num_images=8)

In [None]:
x_train = np.expand_dims(x_train, axis=-1)
y_train = np.expand_dims(y_train, axis=-1)
x_test = np.expand_dims(x_test, axis=-1)
y_test = np.expand_dims(y_test, axis=-1)

In [None]:
# Load and inspect the MNIST-C dataset

(xy_c_train, xy_c_test) = tfds.load('mnist_corrupted/spatter',
                               split=['train', 'test'],
                               as_supervised=True)

In [None]:
i = 0
fig, ax = plt.subplots(nrows=1, ncols=8, figsize=(2*8, 2))

for x, y in xy_c_train:
    if i < 8:
        ax[i].imshow(np.squeeze(x), cmap='gray')
    else:
        break
    i+=1

In [None]:
# not good practice (doing it here to simplify visualizations essencially)

x_c_train = []
y_c_train = []
for x, y in xy_c_train:
    x_c_train.append(x)
    y_c_train.append(y)
x_c_train = np.asarray(x_c_train)
y_c_train = np.asarray(y_c_train)


x_c_test = []
y_c_test = []
for x, y in xy_c_test:
    x_c_test.append(x)
    y_c_test.append(y)
x_c_test = np.asarray(x_c_test)
y_c_test = np.asarray(y_c_test)

We need to build a classifier model that can output predictions to 10 different classes. In our deterministic model, the final layer has to be defined with a `Dense` layer with 10 units and a softmax activation function.

We first define the deterministic model. It is a CNN classifier model with: 
* a Conv2D layer with 8 filters, 5x5 kernel size, ReLU activation and `'VALID'` padding.
* a MaxPooling2D layer with a 6x6 window size.
* a Flatten layer
* a Dense layer with 10 units and softmax activation

In [None]:
# define a function that returns the deterministic model defined above

def get_deterministic_model(input_shape, loss, optimizer, metrics):
    
    # model = 
    
    model.compile(loss=loss, optimizer=optimizer, metrics=metrics)
    return model

In [None]:
# Run your function to get the benchmark model

tf.random.set_seed(0)
deterministic_model = get_deterministic_model(
    input_shape=(28, 28, 1), 
    loss=SparseCategoricalCrossentropy(), 
    optimizer=RMSprop(), 
    metrics=['accuracy']
)

In [None]:
# Print the model summary



In [None]:
# Train the model

deterministic_model.fit(x_train, y_train, epochs=5)

In [None]:
# Evaluate the model

print('Accuracy on MNIST test set: ',
      str(deterministic_model.evaluate(x_test, y_test, verbose=False)[1]))
print('Accuracy on corrupted MNIST test set: ',
      str(deterministic_model.evaluate(x_c_test, y_c_test, verbose=False)[1]))

The pointwise performance on the corrupted MNIST set is worse as we would expect. The dataset is noisier, which makes it hard to the model to differentiate the classes.

# 5. Probabilistic Deep Learning Model


In [None]:
# Define the negative loglikelihood function



For our probabilistic model, the final layer is different from what we defined in the deterministic model. As we saw previously, we want our model to output a distribution object. In this case, the model outputs a One-Hot Categorical distribution object. With this approach we are able to model the aleatoric uncertainty on the image labels.

OneHotCategorical is a discrete distribution over one-hot bit vectors whereas Categorical is a discrete distribution over positive integers. OneHotCategorical is equivalent to Categorical except Categorical has event_dim=() while OneHotCategorical has event_dim=K, where K is the number of classes.

Our probabilistic CNN consists of: 
* a Conv2D layer with 8 filters, 5x5 kernel size, ReLU activation and `'VALID'` padding.
* a MaxPooling2D layer with a 6x6 window size.
* a Flatten layer
* a Dense layer with the number of units required to parameterize the probabilistic layer that follows
* a OneHotCategorical distribution with event shape equal to 10, corresponding to the 10 classes

In [None]:
# Define the probabilistic CNN

def get_probabilistic_model(input_shape, loss, optimizer, metrics):
    
    # model = 
    
    model.compile(loss=loss, optimizer=optimizer, metrics=metrics)
    return model

In [None]:
# Run your function to get the probabilistic model

tf.random.set_seed(0)
probabilistic_model = get_probabilistic_model(
    input_shape=(28, 28, 1), 
    loss=nll, 
    optimizer=RMSprop(), 
    metrics=['accuracy'])

In [None]:
# Print the model summary



In [None]:
# Train the model

probabilistic_model.fit(x_train, tf.keras.utils.to_categorical(y_train), epochs=5)

In [None]:
# Evaluate the model

print('Accuracy on MNIST test set: ',
      str(probabilistic_model.evaluate(x_test, tf.keras.utils.to_categorical(y_test), verbose=False)[1]))
print('Accuracy on corrupted MNIST test set: ',
      str(probabilistic_model.evaluate(x_c_test, tf.keras.utils.to_categorical(y_c_test), verbose=False)[1]))

Note that the test accuracy of the probabilistic model is identical to the deterministic model. This is because the model architectures for both are equivalent; the only difference being that the probabilistic model returns a distribution object.

In [None]:
# Check all the weights of the deterministic and probabilistic 
# models are identical

for deterministic_variable, probabilistic_variable in zip(deterministic_model.weights, probabilistic_model.weights):
    print(np.allclose(deterministic_variable.numpy(), probabilistic_variable.numpy()))

# 6. Results

In [None]:
# Function to make plots of the probabilities that the model estimates for an image

def analyse_model_prediction(data, true_labels, model, run_ensemble=False):
    if run_ensemble:
        ensemble_size = 200
    else:
        ensemble_size = 1
    image = data
    try:
        if len(true_labels)>0:
            true_labels = np.argmax(true_labels)
    except:
        pass
    true_label = true_labels
    predicted_probabilities = np.empty(shape=(ensemble_size, 10))
    for i in range(ensemble_size):
        predicted_probabilities[i] = model(image[np.newaxis, :]).mean().numpy()[0]
    model_prediction = model(image[np.newaxis, :])
    fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(10, 2),
                                   gridspec_kw={'width_ratios': [2, 4]})
    
    # Show the image and the true label
    ax1.imshow(image[..., 0], cmap='gray')
    ax1.axis('off')
    ax1.set_title('True label: {}'.format(str(true_label)))
    
    # Show a 95% prediction interval of model predicted probabilities
    pct_2p5 = np.array([np.percentile(predicted_probabilities[:, i], 2.5) for i in range(10)])
    pct_97p5 = np.array([np.percentile(predicted_probabilities[:, i], 97.5) for i in range(10)])    
    bar = ax2.bar(np.arange(10), pct_97p5, color='red')
    bar[int(true_label)].set_color('green')
    ax2.bar(np.arange(10), pct_2p5-0.02, color='white', linewidth=1, edgecolor='white')
    ax2.set_xticks(np.arange(10))
    ax2.set_ylim([0, 1])
    ax2.set_ylabel('Probability')
    ax2.set_title('Model estimated probabilities')
    plt.show()

In [None]:
# Prediction examples on MNIST

for i in [0, 18]:
    analyse_model_prediction(x_test[i], np.squeeze(y_test[i]), probabilistic_model)

The model is very confident that the first image is a 7, which is correct. For the second image, the model struggles, assigning nonzero probabilities to many different classes.

In [None]:
# Prediction examples on MNIST-C

for i in [0, 17]:
    analyse_model_prediction(x_c_test[i], np.squeeze(y_c_test[i]), probabilistic_model)

Once again the model is confident about its prediction of the first image. Despite the spatters, the number is still easy to identify. The second number is significantly harder to identify. The model still does a good job by predicting the right number, but also by showing uncertainty about that choice.

We can also make some analysis of the model's uncertainty across the full test set, instead of for individual values. One way to do this is to calculate the entropy of the distribution. The entropy is the expected information of a random variable, and is a measure of the uncertainty of the random variable. The entropy of the estimated probabilities for sample  𝑖  is defined as

$$
H_i = -\sum_{j=1}^{10} p_{ij} \text{log}_{2}(p_{ij})
$$
 
where  𝑝𝑖𝑗  is the probability that the model assigns to sample  𝑖  corresponding to label  𝑗. The higher the value, the more unsure the model is.

In [None]:
# Functions to plot the distribution of the information entropy across samples,
# split into whether the model prediction is correct or incorrect

def get_correct_indices(model, x, labels):
    y_model = model(x)
    correct = np.argmax(y_model.mean(), axis=1) == np.squeeze(labels)
    correct_indices = [i for i in range(x.shape[0]) if correct[i]]
    incorrect_indices = [i for i in range(x.shape[0]) if not correct[i]]
    return correct_indices, incorrect_indices


def plot_entropy_distribution(model, x, labels):
    probs = model(x).mean().numpy()
    entropy = -np.sum(probs * np.log2(probs), axis=1)
    fig, axes = plt.subplots(1, 2, figsize=(10, 4))
    for i, category in zip(range(2), ['Correct', 'Incorrect']):
        entropy_category = entropy[get_correct_indices(model, x, labels)[i]]
        mean_entropy = np.mean(entropy_category)
        num_samples = entropy_category.shape[0]
        title = category + 'ly labelled ({:.1f}% of total)'.format(num_samples / x.shape[0] * 100)
        axes[i].hist(entropy_category, weights=(1/num_samples)*np.ones(num_samples))
        axes[i].annotate('Mean: {:.3f} bits'.format(mean_entropy), (0.4, 0.9), ha='center')
        axes[i].set_xlabel('Entropy (bits)')
        axes[i].set_ylim([0, 1])
        axes[i].set_ylabel('Probability')
        axes[i].set_title(title)
    plt.show()

In [None]:
# Entropy plots for the MNIST dataset

print('MNIST test set:')
plot_entropy_distribution(probabilistic_model, x_test, y_test)

In [None]:
# Entropy plots for the MNIST-C dataset

print('Corrupted MNIST test set:')
plot_entropy_distribution(probabilistic_model, x_c_test, y_c_test)

# 7. Conclusions

From the results above we can see that the model is more unsure on the predictions it got wrong this means it knows when the prediction may be wrong. These are great properties to have in a machine learning model, and is one of the advantages of probabilistic modelling.