In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn.metrics import r2_score

In [2]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental import preprocessing

In [None]:
print(tf.__version__)

In [None]:
def plot_loss(history):
  plt.plot(history.history['loss'], label='loss')
  plt.plot(history.history['val_loss'], label='val_loss')
#   plt.ylim([0, 10])
  plt.xlabel('Epoch')
  plt.ylabel('Error [MPG]')
  plt.legend()
  plt.grid(True)

In [None]:
url = 'http://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data'
column_names = ['MPG', 'Cylinders', 'Displacement', 'Horsepower', 'Weight',
                'Acceleration', 'Model Year', 'Origin']

raw_dataset = pd.read_csv(url, names=column_names,
                          na_values='?', comment='\t',
                          sep=' ', skipinitialspace=True)

In [None]:
dataset = raw_dataset.copy()
dataset.tail()

In [None]:
dataset.shape

In [None]:
dataset.isna().sum()

In [None]:
dataset = dataset.dropna()

In [None]:
dataset.shape

The "Origin" column is really categorical, not numeric. So convert that to a one-hot:

In [None]:
dataset['Origin'] = dataset['Origin'].map({1: 'USA', 2: 'Europe', 3: 'Japan'})

In [None]:
dataset = pd.get_dummies(dataset, prefix='', prefix_sep='')
dataset.tail()

In [None]:
dataset.shape

##### step 1: Train/test split
We'll separate validation data from the training set when calling `model.fit`

In [None]:
train_dataset = dataset.sample(frac=0.8, random_state=0)
test_dataset = dataset.drop(train_dataset.index)

In [None]:
# inspect the training/test data size
print("the training data size", train_dataset.shape)
print("the test data size", test_dataset.shape)

In [None]:
train_dataset.describe().transpose()

##### Step 2: Split `features` from `labels`

In [None]:
train_features = train_dataset.copy()
test_features = test_dataset.copy()

train_labels = train_features.pop('MPG')
test_labels = test_features.pop('MPG')

In [None]:
train_features.shape

In [None]:
train_labels.shape

Comment:

- We have 9 features and 1 response variable;
- We will do a regression problem;
- We have small dataset, which only contains 314 cases;

##### Step 3: Normalization

featurewise normalization written by hand

In [None]:
# traditional feature-wise normalization
print(train_features.mean(axis=0))
print(train_features.std(axis=0))

train_mean = train_features.mean(axis=0)
train_std = train_features.std(axis=0)

train_features = (train_features - train_mean) / train_std
test_features = (test_features - train_mean) / train_std

In [None]:
# convert pd.DataFrame to np.array

x_train  = np.array(train_features)
x_test  = np.array(test_features)
y_train  = np.array(train_labels)
y_test = np.array(test_labels)

In [None]:
# print out the shapes of data
print("training features", x_train.shape)
print("training labels", y_train.shape)
print("test features", x_test.shape)
print("test labels", y_test.shape)

In [None]:
test_results = {}

# Training models

##### 1. Mitiple linear regression

In [None]:
# linear_model = tf.keras.Sequential([
#     layers.Dense(1, input_shape=(9,))
# ])

In [None]:
# linear_model.compile(
#     optimizer=tf.optimizers.Adam(learning_rate=0.1),
#     loss='mean_absolute_error')
# #%%time
# history = linear_model.fit(
#     train_features, train_labels, 
#     epochs=100,
#     # suppress logging
#     verbose=0,
#     # Calculate validation results on 20% of the training data
#     validation_split = 0.2)

In [None]:
# plot_loss(history)

#### Performance on test data

In [None]:
#linear_model.metrics_names

In [None]:
# # show the 'MAE' on test data
# test_results['linear_model'] = linear_model.evaluate(test_features, test_labels)


#### Plot for predictions

In [None]:
# tst_pred = linear_model.predict(test_features)
# tst_pred.flatten();

In [None]:
# plt.figure(figsize=(8,8))
# a = plt.axes(aspect='equal')
# plt.scatter(test_labels, tst_pred)
# plt.xlabel('Ground Truth [MPG]')
# plt.ylabel('Predictions [MPG]')
# lims = [0, 50]
# plt.xlim(lims)
# plt.ylim(lims)
# _ = plt.plot(lims, lims)

#### $R^2$ for test data

In [None]:
# linear_model_r2 = r2_score(test_labels, tst_pred)
# linear_model_r2

##### 2. Deterministic DNN regression

Architecture of the DNN:
- Two hidden, nonlinear, `Dense` layer using the `relu` function
- A linear single output layer

In [None]:
DNN_model = keras.Sequential([
      layers.Dense(64, activation='relu', input_shape=(9,)),
      layers.Dense(64, activation='relu'),
      layers.Dense(1)
  ])

In [None]:
DNN_model.compile(loss='mse',
                optimizer=tf.keras.optimizers.Adam(0.001) , metrics=['mae'])

In [None]:
DNN_model.summary()

In [None]:
history = DNN_model.fit(train_features, train_labels, 
    epochs=300,
    # suppress logging
    verbose=2,
    # Calculate validation results on 20% of the training data
    validation_split = 0.2)

In [None]:
plot_loss(history)

##### Performance check

1. 'mae' value based on test data set
2. $R^{2}$ on test data

In [None]:
test_results['DNN_model'] = DNN_model.evaluate(x_test,y_test, verbose=0)
test_results

In [None]:
tst_pred = DNN_model.predict(x_test)
tst_pred.shape

In [None]:
tst_pred = DNN_model.predict(x_test)
# tst_pred.flatten()

plt.figure(figsize=(6,6))
a = plt.axes(aspect='equal')

plt.scatter(test_labels, tst_pred)
plt.xlabel('Ground Truth [MPG]')
plt.ylabel('Predictions [MPG]')
lims = [0, 50]
plt.xlim(lims)
plt.ylim(lims)
_ = plt.plot(lims, lims)

#### $R^2$ for test data

In [None]:
DNN_model_r2 = r2_score(test_labels, tst_pred)
DNN_model_r2

## 3. Aleatoric DNN regression

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
import tensorflow_probability as tfp
tfd = tfp.distributions
tfpl = tfp.layers

In [None]:
model = keras.Sequential([
      layers.Dense(64, activation='relu', input_shape=(9,)),
      layers.Dense(64, activation='relu'),
      Dense(2),
      tfpl.IndependentNormal(1)
  ])

In [None]:
# define the "negative log likelihood" as loss function
def nll(y_true, y_pred):
    return -y_pred.log_prob(y_true)

In [None]:
model.compile(loss=nll,
             optimizer='adam',
             metrics=['mae'])

In [None]:
history = model.fit(train_features, train_labels, validation_split=0.2, epochs=300)

In [None]:
plot_loss(history)

In [None]:
y_preds = model(test_features)

In [None]:
y_predictive_mean = y_preds.mean().numpy()
y_std = np.squeeze(y_preds.stddev().numpy())

In [None]:
# y_mean;
# y_std

In [None]:
plt.figure(figsize=(8,8))
plt.plot([5,45], [5,45], 'k--')
plt.errorbar(test_labels, y_predictive_mean, yerr=y_std, fmt='o', color='blue', ecolor='lightblue', elinewidth=3, capsize=0)
plt.xlabel('Ground truth')
plt.ylabel('Predictions')
plt.title('Confidence interval of predictions on test data')
plt.show()

#### $R^2$ for test data

In [None]:
DNN_model_r2 = r2_score(test_labels, y_predictive_mean)
DNN_model_r2

In [None]:
# mae metric for test data
model.evaluate(test_features, test_labels)

## 4. Bayesian DNN model via `DenseVariational`

Steps:
1. define prior
2. define posterior (w/ or w/o covariances)
3. create the Sequential model structure with `DenseVariational` layer
4. define the loss function
5. train: model.fit()

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.optimizers import RMSprop
import numpy as np
import matplotlib.pyplot as plt

In [None]:
import tensorflow as tf
import tensorflow_probability as tfp

tfd = tfp.distributions
tfpl = tfp.layers

print('TF version:', tf.__version__)
print('TFP version:', tfp.__version__)

Indenpendent prior

In [None]:
# Define the prior weight distribution -- all N(0, 1) -- and not trainable

def prior(kernel_size, bias_size, dtype=None):
    # number of parameters
    n = kernel_size + bias_size
    prior_model = Sequential([
        tfpl.DistributionLambda(
        lambda t : tfd.Independent(tfd.Normal(loc=tf.zeros(n, dtype=dtype), scale=1),
                                  reinterpreted_batch_ndims=1))
    ])
    return prior_model

Independent posterior

In [None]:
# Define variational posterior weight distribution -- multivariate Gaussian

def posterior(kernel_size, bias_size, dtype=None):
    n = kernel_size + bias_size 
    posterior_model = Sequential([
        tfpl.VariableLayer(tfpl.IndependentNormal.params_size(n), dtype=dtype),
        tfpl.IndependentNormal(n, convert_to_tensor_fn=tfd.Distribution.sample)
    ])
    return posterior_model

#### Create the model with `DenseVariational` layers

In [None]:
# Create probabilistic regression with one hidden layer, weight uncertainty

model = Sequential([
    tfpl.DenseVariational(units=64,
                          input_shape=(9,),
                          make_prior_fn=prior,
                          make_posterior_fn=posterior,
                          kl_weight=1/train_features.shape[0],
                          activation='relu'),
    tfpl.DenseVariational(units=64,
                          make_prior_fn=prior,
                          make_posterior_fn=posterior,
                          kl_weight=1/train_features.shape[0],
                          activation='relu'),
    tfpl.DenseVariational(units=1,
                          make_prior_fn=prior,
                          make_posterior_fn=posterior,
                          kl_weight=1/train_features.shape[0]),
    tfp.layers.DistributionLambda(lambda t: tfd.Normal(loc=t, scale=1)),
])


*Comment:*
- Look at the shape of the output distribution, it says batch_shape as [78, 1]. 
- we can further add tfd.Independent to the last layer to make it [78,]

In [None]:
def nll(y_true, y_pred):
    return -y_pred.log_prob(y_true)
model.compile(loss=nll, optimizer=RMSprop(learning_rate=0.01))

In [None]:
model.summary()

In [None]:
# Train the model

BNN_history = model.fit(train_features, train_labels, epochs=500, validation_split=0.2, verbose=0)

In [None]:
plot_loss(BNN_history)

In [None]:
test_results['BNN_model'] = model.evaluate(test_features, test_labels)

Let's see the predictions of BNN

In [None]:
# a quick look at the test data
test_features[0].shape

In [None]:
# a random draw from weights' distributions and do a one time naive prediction

In [None]:
# y_tst = model.predict(test_features)
y_tst = model(test_features[0][np.newaxis])
# print('type', y_tst)
# print('shape', y_tst.shape)
print(y_tst.mean().numpy().flatten())

In [None]:
# # inspect the shape of predictions
# y_tst = model(test_features)
# y_tst.mean()

In [None]:
# # hand calculated the MAE error of the test data
# from sklearn.metrics import mean_absolute_error
# mean_absolute_error(test_labels.to_numpy(), y_tst.mean().numpy().flatten())

In [None]:
test_features.shape

#### Predictive distribution

In [None]:
# simply record the results and w/o plotting (hight dimensional cases)
import tqdm

y_pred_list = []

for _ in tqdm.tqdm(range(1000)):
    y_pred = model(test_features)
    y_pred_list.append(y_pred.mean().numpy())
    

In [None]:
y_pred_list[0].shape

In [None]:
y_preds = np.concatenate(y_pred_list, axis=1)
y_preds.shape

In [None]:
y_mean = np.mean(y_preds, axis=1)
y_sigma = np.std(y_preds, axis=1)

In [None]:
print(y_mean.shape)

In [None]:
y_sigma.shape

In [None]:
# test_labels.describe()

In [None]:
plt.figure(figsize=(8,8))
plt.plot([5,45], [5,45], 'k--')
plt.errorbar(test_labels.to_numpy(), y_mean, yerr=y_sigma, fmt='o', color='blue', ecolor='lightblue', elinewidth=3, capsize=0)
plt.xlabel('Ground truth')
plt.ylabel('Predictions')
plt.title('Results of test data')
plt.show()

## 5. MC Dropout

We are working on a scalar regression task. Instead of a single point estimate for the final prediction $y^{*}$, given a vecor of features $\mathbf{x}$, we now have a predictive distribution on $Y$

In [None]:
# let's have a Dropout model first

model_mc = keras.Sequential([
    layers.Dense(64, activation='relu', input_shape=(9,)),
    layers.Dropout(0.5),
    layers.Dense(64, activation='relu'),
    layers.Dense(1)
  ])

In [None]:
model_mc.compile(loss='mse',
             optimizer='adam',
             metrics=['mae'])

In [None]:
history_mc = model_mc.fit(x_train, y_train, validation_split=0.2, epochs=300, verbose=0)

In [None]:
plot_loss(history_mc)

In [None]:
# this file demonstrates how to toggle MC Dropout in keras
# this answer is sourced from https://stackoverflow.com/questions/63238203/how-to-get-intermediate-outputs-in-tf-2-3-eager-with-learning-phase


from tensorflow.python.keras.backend import eager_learning_phase_scope
from tensorflow.python.keras import backend as K
f = K.function([model_mc.input], [model_mc.output])

# run in training mode, i.e. 1 means training
# this code is for a single input
with eager_learning_phase_scope(value=1):
    output_train = f(x_test[0][np.newaxis])[0]
    
print(output_train)

**conceptually, show the predictive distribution for a single input vector $\mathbf{x}^{*}$**

In [None]:
# # run in test mode, i.e. 0 means test
# with eager_learning_phase_scope(value=0):
#     output_test = f([x])

# Run the function for the number of mc_samples with learning_phase enabled
# What's important is the the learning phase. When set to 0, all weights are fixed; when set to 1, dropout is used at test time

with eager_learning_phase_scope(value=1): # 0=test, 1=train
    Y_hat_mc = np.squeeze(np.array([(f(x_test)[0])[0] for _ in range(50)]))

**for the whole test data set**

In [None]:
with eager_learning_phase_scope(value=1): # 0=test, 1=train
    Y_hat_mc = np.squeeze(np.array([(f(x_test))[0] for _ in range(50)]))

In [None]:
Y_hat_mc.shape

In [None]:
Y_hat_mc[0]  == Y_hat_mc[34]

## Bayesian DNN model via `Flipout`

In [None]:
# Create probabilistic regression with one hidden layer, weight uncertainty

kernel_divergence_fn=lambda q, p, _: tfp.distributions.kl_divergence(q, p) / (train_features.shape[0] * 1.0)
bias_divergence_fn=lambda q, p, _: tfp.distributions.kl_divergence(q, p) / (x.shape.shape[0] * 1.0)


model_flipout = Sequential([
    tfpl.DenseFlipout(units=64, input_shape=(9,),activation='relu',
                      kernel_divergence_fn=kernel_divergence_fn,
                      bias_divergence_fn=bias_divergence_fn,),
    tfpl.DenseFlipout(1,
                      kernel_divergence_fn=kernel_divergence_fn,
                      bias_divergence_fn=bias_divergence_fn,),
    tfp.layers.DistributionLambda(lambda t: tfd.Normal(loc=t, scale=1)),
])


In [None]:
def nll(y_true, y_pred):
    return -y_pred.log_prob(y_true)

model_flipout.compile(loss=nll, optimizer=RMSprop(learning_rate=0.005))

In [None]:
model_flipout.summary()

In [None]:
model_flipout.fit(train_features, train_labels, epochs=500, verbose=1)