In [None]:
import logging
import numpy as np
import edward2 as ed
import tensorflow as tf
import matplotlib.pyplot as plt

from tensorflow.keras.layers import Input, Dense, Lambda, LeakyReLU
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import L2

from utils import (train,
                   backprop,
                   select_bands, 
                   select_subset,
                   style,
                   plot_data, 
                   plot_prediction, 
                   plot_uncertainty)

%matplotlib inline

In [None]:
rng = np.random.RandomState(123)

def f(x):
    """Sinusoidal function."""
    return 0.5 * np.sin(25 * x) + 0.5 * x


def noise(x, slope, rng=np.random):
    """Create heteroskedastic noise."""
    noise_std = np.maximum(0.0, x + 1.0) * slope
    return rng.normal(0, noise_std).astype(np.float32)

x = np.linspace(-1.0, 1.0, 1000, dtype=np.float32).reshape(-1, 1)

# Noisy samples from f (with heteroskedastic noise)
y = f(x) + noise(x, slope=0.2, rng=rng)

# Select data from 2 of 5 bands (regions)
x_bands, y_bands = select_bands(x, y, mask=[False, True, False, True, False])

# Select 40 random samples from these regions
x_train, y_train = select_subset(x_bands, y_bands, num=40, rng=rng)

plot_data(x_train, y_train, x, f(x))
plt.scatter(x, y, **style['bg_data'], label='Noisy data')
plt.legend();

In [None]:
batch_size = x_train.shape[0]

In [None]:
def create_model(n_hidden=200):
    leaky_relu = LeakyReLU(alpha=0.2)
    
    x_in = Input(shape=(1,))
    x = ed.layers.NCPNormalPerturb(stddev=0.5)(x_in)  # double input batch
    x = Dense(n_hidden, activation=leaky_relu)(x)
    x = Dense(n_hidden, activation=leaky_relu)(x)

    means = ed.layers.DenseVariationalDropout(1, activation=None)(x)  # get mean
    means = ed.layers.NCPNormalOutput(mean=y_train, stddev=1.0)(means)  # halve input batch
    
    stddevs = tf.keras.layers.Dense(1, activation='softplus')(x[:batch_size])
    outputs = tf.keras.layers.Lambda(lambda x: ed.Normal(x[0], x[1]))([means, stddevs])

    return Model(x_in, outputs)

In [None]:
model = create_model()

optimizer = tf.optimizers.Adam(learning_rate=1e-3)

loss_tracker = tf.keras.metrics.Mean(name='loss')
mse_tracker = tf.keras.metrics.MeanSquaredError(name='mse')

epochs = 15000

for epoch in range(1, epochs + 1):
    with tf.GradientTape() as tape:
        predictions = model(x_train)
        loss = -tf.reduce_mean(predictions.distribution.log_prob(y_train))
        loss += 0.1 * model.losses[0] / batch_size  # KL regularizer for output layer
        loss += 0.1 * model.losses[-1]
        loss_tracker.update_state(loss)
        
    if epoch % 1000 == 0:
        print(f'epoch = {epoch}, loss = {loss_tracker.result()}')
        loss_tracker.reset_states()
        
    trainable_vars = model.trainable_variables
    gradients = tape.gradient(loss, trainable_vars)
    optimizer.apply_gradients(zip(gradients, trainable_vars))

In [None]:
x_test = np.sort(x_train, axis=0)

out_dist = model(x_train)

aleatoric_uncertainty=out_dist.distribution.stddev()
expected_output = out_dist.distribution.mean()

In [None]:
plt.figure(figsize=(15, 5))

plt.subplot(1, 2, 1)
plot_data(x_train, y_train, x, f(x))
plot_prediction(x_test, 
                expected_output, 
                aleatoric_uncertainty=aleatoric_uncertainty)
plt.ylim(-2, 2)
plt.legend()

plt.subplot(1, 2, 2)
plot_uncertainty(x_test, 
                 aleatoric_uncertainty=aleatoric_uncertainty)
plt.ylim(0, 1)
plt.legend();