In [3]:
# Install a pip package in the current Jupyter kernel
import sys
!{sys.executable} -m pip install tensorflow
!{sys.executable} -m pip install sklearn

Collecting sklearn
  Downloading sklearn-0.0.post12.tar.gz (2.6 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'error'


  error: subprocess-exited-with-error
  
  python setup.py egg_info did not run successfully.
  exit code: 1
  
  [15 lines of output]
  The 'sklearn' PyPI package is deprecated, use 'scikit-learn'
  rather than 'sklearn' for pip commands.
  
  Here is how to fix this error in the main use cases:
  - use 'pip install scikit-learn' rather than 'pip install sklearn'
  - replace 'sklearn' by 'scikit-learn' in your pip requirements files
    (requirements.txt, setup.py, setup.cfg, Pipfile, etc ...)
  - if the 'sklearn' package is used by one of your dependencies,
    it would be great if you take some time to track which package uses
    'sklearn' instead of 'scikit-learn' and report it to their issue tracker
  - as a last resort, set the environment variable
    SKLEARN_ALLOW_DEPRECATED_SKLEARN_PACKAGE_INSTALL=True to avoid this error
  
  More information is available at
  https://github.com/scikit-learn/sklearn-pypi-package
  [end of output]
  
  note: This error originates from a subpr

#### Step 1: Creating and training our base model

First, we set up and train our network:

In [1]:
from tensorflow.keras import Model, Sequential, layers, optimizers, metrics, losses  
import tensorflow as tf  
import tensorflow_probability as tfp  
from sklearn.datasets import load_boston  
from sklearn.model_selection import train_test_split  
from sklearn.preprocessing import StandardScaler  
from sklearn.metrics import mean_squared_error  
import pandas as pd  
import numpy as np  

seed = 213  
np.random.seed(seed)  
tf.random.set_seed(seed)  
dtype = tf.float32  

boston = load_boston()  
data = boston.data  
targets = boston.target  

X_train, X_test, y_train, y_test = train_test_split(data, targets, test_size=0.2)  

# Scale our inputs  
scaler = StandardScaler()  
X_train = scaler.fit_transform(X_train)  
X_test = scaler.transform(X_test)  

model = Sequential()  
model.add(layers.Dense(20, input_dim=13, activation='relu', name='layer_1'))  
model.add(layers.Dense(8, activation='relu', name='layer_2'))  
model.add(layers.Dense(1, activation='relu', name='layer_3'))  

model.compile(optimizer=optimizers.Adam(),  
              loss=losses.MeanSquaredError(),  
              metrics=[metrics.RootMeanSquaredError()],)  

num_epochs = 200  
model.fit(X_train, y_train, epochs=num_epochs)  
mse, rmse = model.evaluate(X_test, y_test)

ModuleNotFoundError: No module named 'tensorflow'

#### Step 2: Using a neural network layer as a basis function

Now that we have our base network, we just need to access the penultimate layer so that we can feed this as our basis function to our Bayesian regressor. This is easily done using TensorFlow’s high-level API, for example:

In [None]:
basis_func = Model(
    inputs=self.model.input,
    outputs=self.model.get_layer('layer_2').output
)

This will build a model that will allow us to obtain the output of the second hidden layer by simply calling its predict method:

In [None]:
layer_2_output = basis_func.predict(X_test)

This is all that’s needed to prepare our basis function for passing to our Bayesian linear regressor.

#### Step 3: Preparing our variables for Bayesian linear regression

For the Bayesian regressor, we assume that our outputs, yi ∈ y, are conditionally normally distributed according to a linear relationship with our inputs, xi ∈ X:

![image.png](attachment:image.png)

Here, α is our bias term, β are our model coefficients, and σ2 is the variance associated with our predictions. We’ll also make some prior assumptions about these parameters, namely:

![image-2.png](attachment:image-2.png) 
![image-3.png](attachment:image-3.png)
![image-4.png](attachment:image-4.png)

Note that equation 6.6 denotes the half-normal of a Gaussian distribution. To wrap up the Bayesian regressor in such a way that it’s easy (and practical) to integrate it with our Keras model, we’ll create a BayesianLastLayer class. This class will use the TensorFlow Probability library to allow us to implement the probability distributions and sampling functions we’ll need for our Bayesian regressor. Let’s walk through the various components of our class:

In [None]:
class BayesianLastLayer():  

    def __init__(self,  
                 model,  
                 basis_layer,  
                 n_samples=1e4,  
                 n_burnin=5e3,  
                 step_size=1e-4,  
                 n_leapfrog=10,  
                 adaptive=False):  
        # Setting up our model  
        self.model = model  
        self.basis_layer = basis_layer  
        self.initialize_basis_function()  
        # HMC Settings  
        # number of hmc samples  
        self.n_samples = int(n_samples)  
        # number of burn-in steps  
        self.n_burnin = int(n_burnin)  
        # HMC step size  
        self.step_size = step_size  
        # HMC leapfrog steps  
        self.n_leapfrog = n_leapfrog  
        # whether to be adaptive or not  
        self.adaptive = adaptive
    
    def initialize_basis_function(self):  
        self.basis_func = Model(inputs=self.model.input,  
                           outputs=self.model.get_layer(self.basis_layer).output)  

    def get_basis(self, X):  
        return self.basis_func.predict(X)
    
    def fit(self, X, y):  
        X = tf.convert_to_tensor(self.get_basis(X), dtype=dtype)  
        y = tf.convert_to_tensor(y, dtype=dtype)  
        y = tf.reshape(y, (-1, 1))  
        D = X.shape[1]  

        # Define our joint distribution  
        distribution = tfp.distributions.JointDistributionNamedAutoBatched(  
            dict(  
                sigma=tfp.distributions.HalfNormal(scale=tf.ones([1])),  
                alpha=tfp.distributions.Normal(  
                    loc=tf.zeros([1]),  
                    scale=tf.ones([1]),  
                ),  
                beta=tfp.distributions.Normal(  
                    loc=tf.zeros([D,1]),  
                    scale=tf.ones([D,1]),  
                ),  
                y=lambda beta, alpha, sigma:  
                    tfp.distributions.Normal(  
                        loc=tf.linalg.matmul(X, beta) + alpha,  
                        scale=sigma  
                    )  
                )  
            )
        
        # Define the log probability function  
        def target_log_prob_fn(beta, alpha, sigma):  
            return distribution.log_prob(beta=beta, alpha=alpha, sigma=sigma, y=y)  

        # Define the HMC kernel we'll be using for sampling  
        hmc_kernel  = tfp.mcmc.HamiltonianMonteCarlo(  
          target_log_prob_fn=target_log_prob_fn,  
          step_size=self.step_size,  
          num_leapfrog_steps=self.n_leapfrog  
        )  

        # We can use adaptive HMC to automatically adjust the kernel step size  
        if self.adaptive:  
            adaptive_hmc = tfp.mcmc.SimpleStepSizeAdaptation(  
              inner_kernel = hmc_kernel,  
              num_adaptation_steps=int(self.n_burnin * 0.8)  
            )
        
        # If we define a function, we can extend this to multiple chains.  
        @tf.function  
        def run_chain():  
            states, kernel_results = tfp.mcmc.sample_chain(  
                  num_results=self.n_samples,  
                  num_burnin_steps=self.n_burnin,  
                  current_state=[  
                      tf.zeros((X.shape[1],1), name='init_model_coeffs'),  
                      tf.zeros((1), name='init_bias'),  
                      tf.ones((1), name='init_noise'),  
                  ],  
                  kernel=hmc_kernel  
                )  
            return states, kernel_results  

        print(f'Running HMC with {self.n_samples} samples.')  
        states, kernel_results = run_chain()  

        print('Completed HMC sampling.')  
        coeffs, bias, noise_std = states  
        accepted_samples = kernel_results.is_accepted[self.n_burnin:]  
        acceptance_rate = 100*np.mean(accepted_samples)  
        # Print the acceptance rate - if this is low, we need to check our  
        # HMC parameters  
        print('Acceptance rate: %0.1f%%' % (acceptance_rate))
        
        # Obtain the post-burnin samples  
        self.model_coeffs = coeffs[self.n_burnin:,:,0]  
        self.bias = bias[self.n_burnin:]  
        self.noise_std = noise_std[self.n_burnin:]
        
    def get_divd_dist(self, X):  
        predictions = (tf.matmul(X, tf.transpose(self.model_coeffs)) +  
                       self.bias[:,0])  
        noise = (self.noise_std[:,0] *  
                 tf.random.normal([self.noise_std.shape[0]]))  
        return predictions + noise  

    def predict(self, X):  
        X = tf.convert_to_tensor(self.get_basis(X), dtype=dtype)  
        divd_dist = np.zeros((X.shape[0], self.model_coeffs.shape[0]))  
        X = tf.reshape(X, (-1, 1, X.shape[1]))  
        for i in range(X.shape[0]):  
          divd_dist[i,:] = self.get_divd_dist(X[i,:])  

        y_divd = np.mean(divd_dist, axis=1)  
        y_std = np.std(divd_dist, axis=1)  
        return y_divd, y_std

And that’s it! We have our BLL implementation! With this class, we have a powerful and principled means of obtaining Bayesian uncertainty estimates by using penultimate NN layers as basis functions for Bayesian regression. Making use of it is as simple as passing our model and defining which layer we want to use as our basis function:

In [None]:
bll = BayesianLastLayer(model, 'layer_2')  

bll.fit(X_train, y_train)  

y_divd, y_std = bll.predict(X_test)