In [52]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers 
import tensorflow_datasets as tfds
import tensorflow_probability as tfp

dir(tfp)
help(tfp.layers.DenseVariational)

from tensorflow.keras.layers import Dense

Help on class DenseVariational in module tensorflow_probability.python.layers.dense_variational_v2:

class DenseVariational(tf_keras.src.engine.base_layer.Layer)
 |  DenseVariational(units, make_posterior_fn, make_prior_fn, kl_weight=None, kl_use_exact=False, activation=None, use_bias=True, activity_regularizer=None, **kwargs)
 |  
 |  Dense layer with random `kernel` and `bias`.
 |  
 |  This layer uses variational inference to fit a "surrogate" posterior to the
 |  distribution over both the `kernel` matrix and the `bias` terms which are
 |  otherwise used in a manner similar to `tf_keras.layers.Dense`.
 |  
 |  This layer fits the "weights posterior" according to the following generative
 |  process:
 |  
 |  ```none
 |  [K, b] ~ Prior()
 |  M = matmul(X, K) + b
 |  Y ~ Likelihood(M)
 |  ```
 |  
 |  Method resolution order:
 |      DenseVariational
 |      tf_keras.src.engine.base_layer.Layer
 |      tensorflow.python.module.module.Module
 |      tensorflow.python.trackable.autotra

In [53]:
if tf.test.gpu_device_name() != '/device:GPU:0':
  print('WARNING: GPU device not found.')
else:
  print('SUCCESS: Found GPU: {}'.format(tf.test.gpu_device_name()))




In [54]:
def get_train_and_test_splits(train_size, batch_size=1):
    # We prefetch with a buffer the same size as the dataset because th dataset
    # is very small and fits into memory.
    dataset = (
        tfds.load(name="wine_quality", as_supervised=True, split="train")
        .map(lambda x, y: (x, tf.cast(y, tf.float32)))
        .prefetch(buffer_size=dataset_size)
        .cache()
    )
    # We shuffle with a buffer the same size as the dataset.
    train_dataset = (
        dataset.take(train_size).shuffle(buffer_size=train_size).batch(batch_size)
    )
    test_dataset = dataset.skip(train_size).batch(batch_size)

    return train_dataset, test_dataset

In [55]:
hidden_units = [8, 8]
learning_rate = 0.001


def run_experiment(model, loss, train_dataset, test_dataset):

    model.compile(
        optimizer=keras.optimizers.RMSprop(learning_rate=learning_rate),
        loss=loss,
        metrics=[keras.metrics.RootMeanSquaredError()],
    )

    print("Start training the model...")
    model.fit(train_dataset, epochs=num_epochs, validation_data=test_dataset)
    print("Model training finished.")
    _, rmse = model.evaluate(train_dataset, verbose=0)
    print(f"Train RMSE: {round(rmse, 3)}")

    print("Evaluating model performance...")
    _, rmse = model.evaluate(test_dataset, verbose=0)
    print(f"Test RMSE: {round(rmse, 3)}")

In [56]:
FEATURE_NAMES = [
    "fixed acidity",
    "volatile acidity",
    "citric acid",
    "residual sugar",
    "chlorides",
    "free sulfur dioxide",
    "total sulfur dioxide",
    "density",
    "pH",
    "sulphates",
    "alcohol",
]


def create_model_inputs():
    inputs = {}
    for feature_name in FEATURE_NAMES:
        inputs[feature_name] = layers.Input(
            name=feature_name, shape=(1,), dtype=tf.float32
        )
    return inputs

In [57]:
# Define the prior weight distribution as Normal of mean=0 and stddev=1.
# Note that, in this example, the we prior distribution is not trainable,
# as we fix its parameters.
def prior(kernel_size, bias_size, dtype=None):
    n = kernel_size + bias_size
    prior_model = keras.Sequential(
        [
            tfp.layers.DistributionLambda(
                lambda t: tfp.distributions.MultivariateNormalDiag(
                    loc=tf.zeros(n), scale_diag=tf.ones(n)
                )
            )
        ]
    )
    return prior_model


# Define variational posterior weight distribution as multivariate Gaussian.
# Note that the learnable parameters for this distribution are the means,
# variances, and covariances.
def posterior(kernel_size, bias_size, dtype=None):
    n = kernel_size + bias_size
    posterior_model = tf.keras.Sequential(
        [
            tfp.layers.VariableLayer(
                tfp.layers.MultivariateNormalTriL.params_size(n), dtype=dtype
            ),
            tfp.layers.MultivariateNormalTriL(n),
        ]
    )
    return posterior_model

In [58]:
def create_bnn_model(train_size):
    inputs = create_model_inputs()
    features = keras.layers.concatenate(list(inputs.values()))
    features = layers.BatchNormalization()(features)

    # Create hidden layers with weight uncertainty using the DenseVariational layer.
    for units in hidden_units:
        layer = tfp.layers.DenseVariational(
            units=units,
            make_prior_fn=prior,
            make_posterior_fn=posterior,
            kl_weight=1 / train_size,
            activation="sigmoid",
        )
        features = layer(features)

    # The output is deterministic: a single point estimate.
    outputs = layers.Dense(units=1)(features)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

In [59]:
dataset_size = 4898
batch_size = 256
train_size = int(dataset_size * 0.85)
train_dataset, test_dataset = get_train_and_test_splits(train_size, batch_size)

In [89]:
inputs = create_model_inputs()
features = keras.layers.concatenate(list(inputs.values()))
features = layers.BatchNormalization()(features)
# print(features.shape)
for units in hidden_units:
    layer = tfp.layers.DenseVariational(
        units=units,
        make_prior_fn=prior,
        make_posterior_fn=posterior,
        kl_weight=1 / train_size,
        activation="sigmoid",
    )
    inputs = tf.nest.flatten(features)
    print(features) # <KerasTensor shape=(None, 11), dtype=float32, sparse=False, name=keras_tensor_62>
    print(inputs) # [<KerasTensor shape=(None, 11), dtype=float32, sparse=False, name=keras_tensor_62>]
    for x in inputs:
        print(x) # <KerasTensor shape=(None, 11), dtype=float32, sparse=False, name=keras_tensor_66>
    #     ndim = x.shape.rank
    #     print(ndim)
    # features = layer(features)


<KerasTensor shape=(None, 11), dtype=float32, sparse=False, name=keras_tensor_68>
[<KerasTensor shape=(None, 11), dtype=float32, sparse=False, name=keras_tensor_68>]
<KerasTensor shape=(None, 11), dtype=float32, sparse=False, name=keras_tensor_68>
<KerasTensor shape=(None, 11), dtype=float32, sparse=False, name=keras_tensor_68>
[<KerasTensor shape=(None, 11), dtype=float32, sparse=False, name=keras_tensor_68>]
<KerasTensor shape=(None, 11), dtype=float32, sparse=False, name=keras_tensor_68>


In [63]:
num_epochs = 500
train_sample_size = int(train_size * 0.3)
small_train_dataset = train_dataset.unbatch().take(train_sample_size).batch(batch_size)
bnn_model_small = create_bnn_model(train_sample_size)
mse_loss = keras.losses.MeanSquaredError()
run_experiment(bnn_model_small, mse_loss, small_train_dataset, test_dataset)

AttributeError: 'tuple' object has no attribute 'rank'