# Setup

In [1]:
import tensorflow as tf
from tensorflow import keras

# The `Layer` class: the combination of state (weights) and some computation
- A layer encapsulates both **a state** (the layer's "weights") and **a transform** from inputs to outputs (a "call", the layer's forward pass).
- Here's a densely-connected linear layer.
    - It has a state: the variables `w` and `b`.

In [2]:
class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        
        w_init = tf.random_normal_initializer()
        self.w = tf.Variable(
            initial_value = w_init(shape=(input_dim, units), dtype='float32'),
            trainable=True
        )
        
        b_init = tf.zeros_initializer()
        self.b = tf.Variable(
            initial_value=b_init(shape=(units,), dtype='float32'),
            trainable=True
        )
        
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

- You would use a layer by calling it on some tensor input(s), much like a Python function.

In [3]:
X = tf.ones((2,2))
linear_layer = Linear(4, 2)
y = linear_layer(X)
y

<tf.Tensor: id=28, shape=(2, 4), dtype=float32, numpy=
array([[ 0.05453006,  0.03039527, -0.0578984 , -0.07564189],
       [ 0.05453006,  0.03039527, -0.0578984 , -0.07564189]],
      dtype=float32)>

- Note that the weights `w` and `b` are automatically tracked by the layer upon being set as layer attributes.

In [4]:
assert linear_layer.weights == [linear_layer.w, linear_layer.b]

- Note that a quicker shortcut for adding weight to a layer is the **`add_weight` method**.

In [5]:
class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        
        self.w = self.add_weight(
            shape=(input_dim, units),
            initializer='random_normal',
            trainable=True
        )
        
        self.b = self.add_weight(
            shape=(units,),
            initializer='zeros',
            trainable=True
        )
        
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

In [6]:
X = tf.ones((2,2))
linear_layer = Linear(4, 2)
y = linear_layer(X)
y

<tf.Tensor: id=56, shape=(2, 4), dtype=float32, numpy=
array([[ 0.09064088, -0.10099248,  0.07526717, -0.03107435],
       [ 0.09064088, -0.10099248,  0.07526717, -0.03107435]],
      dtype=float32)>

# Layers can have non-trianable weights
- Besides trainable weights, you can add non-trainable weights to a layer as well.
- Such weights are meant not to be taken into account during backpropagation, when the layer is trained.

In [7]:
class ComputeSum(keras.layers.Layer):
    def __init__(self, input_dim):
        super(ComputeSum, self).__init__()
        self.total = tf.Variable(initial_value=tf.zeros((input_dim,)),
                                trainable=False)
        
    def call(self, inputs):
        self.total.assign_add(tf.reduce_sum(inputs, axis=0))
        return self.total

In [8]:
x = tf.ones((2, 2))
my_sum = ComputeSum(2)

print(X)

y = my_sum(x)
print(y.numpy())

y = my_sum(x)
print(y.numpy())

tf.Tensor(
[[1. 1.]
 [1. 1.]], shape=(2, 2), dtype=float32)
[2. 2.]
[4. 4.]


# Best practice: deferring weight creation until the shape of the inputs is known
- Our `Linear` layer above took an `input_dim` argument in `__init__()`, which was used to compute the shape of the weights `w` and `b`.
- However, in many cases, you may not know in advance the size of your inputs, and you would like to create weights when that value becomes known, some time after instantiating the layer.
- In the Keras API, we (TensorFlow team) recommend **creating layer weights in the `build(self, inputs_shape)` method** of the layer.
    - The `__call__()` method of the layer will automatically run `build()` the first time it is called.

In [9]:
# Previous version
class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        
        self.w = self.add_weight(
            shape=(input_dim, units),
            initializer='random_normal',
            trainable=True
        )
        
        self.b = self.add_weight(
            shape=(units,),
            initializer='zeros',
            trainable=True
        )
        
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

In [10]:
# Best Practice
class Linear(keras.layers.Layer):
    def __init__(self, units=32):
        super().__init__()
        self.units = units
        
    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer='random_normal',
            trainable=True
        )
        
        self.b = self.add_weight(
            shape = (self.units,),
            initializer='random_normal',
            trainable=True
        )
        
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

In [11]:
linear_layer = Linear(30)
y = linear_layer(X)
y

<tf.Tensor: id=102, shape=(2, 30), dtype=float32, numpy=
array([[-0.06031368,  0.05893853,  0.01399888, -0.08956892,  0.11585691,
        -0.06668353,  0.05894025, -0.03837348,  0.20267746,  0.03161312,
         0.04975317, -0.01819842, -0.0004446 , -0.09089342, -0.03513056,
         0.0654503 ,  0.02855569,  0.02434471, -0.05429384, -0.01382153,
        -0.00905077,  0.02210669,  0.04630881,  0.0711771 , -0.04193326,
         0.03831274, -0.05932758,  0.12649518, -0.0847965 ,  0.07688626],
       [-0.06031368,  0.05893853,  0.01399888, -0.08956892,  0.11585691,
        -0.06668353,  0.05894025, -0.03837348,  0.20267746,  0.03161312,
         0.04975317, -0.01819842, -0.0004446 , -0.09089342, -0.03513056,
         0.0654503 ,  0.02855569,  0.02434471, -0.05429384, -0.01382153,
        -0.00905077,  0.02210669,  0.04630881,  0.0711771 , -0.04193326,
         0.03831274, -0.05932758,  0.12649518, -0.0847965 ,  0.07688626]],
      dtype=float32)>

# Layers are recursively composable
- If you assign a `Layer` instance as attribute of another layer, the outer layer will start tracking the weights of the inner layer.
- We recommend **creating sublayers in the `__init__()` method** since the sublayers will typically have a `build()` method, they will be built when the outlayer gets built.
- Let's try to build a `MLPBlock` that contains three `Linear` sublayers.

In [12]:
class MLPBlock(keras.layers.Layer):
    def __init__(self):
        super().__init__()
        self.linear_1 = Linear(32)
        self.linear_2 = Linear(32)
        self.linear_3 = Linear(1)
        
    def call(self, inputs):
        X = self.linear_1(inputs)
        X = tf.nn.relu(X)
        
        X = self.linear_2(X)
        X = tf.nn.relu(X)
        
        return self.linear_3(X)

In [13]:
mlp = MLPBlock()
y = mlp(tf.ones(shape=(3, 64)))

In [14]:
print("weights: ", len(mlp.weights))
print("trainable weights: ", len(mlp.trainable_weights))

weights:  6
trainable weights:  6


# The `add_loss()` method
- When writing the `call()` method of a layer, you can create loss tensors that you will want to use later, when writing your training loop.
- This is doable by calling the `self.add_loss(value)`.

In [15]:
# A layer that creates an activity regularization loss
class ActivityRegularizationLayer(keras.layers.Layer):
    def __init__(self, rate=1e-2):
        super().__init__()
        self.rate = rate
        
    def call(self, inputs):
        self.add_loss(self.rate * tf.reduce_sum(inputs))
        return inputs

- These losses (including those created by any inner layer) can be retrieved via `layer.losses`.
- This property is reset at the start of every `__call__()` to the top-level layer, so that `layer.losses`always contains the loss value created during the last forward pass. 

In [16]:
class OuterLayer(keras.layers.Layer):
    def __init__(self):
        super(OuterLayer, self).__init__()
        self.activity_reg = ActivityRegularizationLayer(1e-2)
        
    def call(self, inputs):
        return self.activity_reg(inputs)

In [17]:
layer = OuterLayer()
assert len(layer.losses) == 0

In [18]:
layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # We created one loss value

In [19]:
layer(tf.zeros(1, 1)) # layer.losses gets reset at the start of each __call__
assert len(layer.losses) == 1  # This is the loss created during the call above

- In addition, the `loss` property also contains regularization losses created for the weights of any inner layer.

In [20]:
class OuterLayerWithKernelRegularizer(keras.layers.Layer):
    def __init__(self):
        super().__init__()
        self.dense = keras.layers.Dense(30, kernel_regularizer=keras.regularizers.l2(1e-3))
        
    def call(self, inputs):
        return self.dense(inputs)

In [21]:
layer = OuterLayerWithKernelRegularizer()
layer(tf.zeros((1,1)))
print(layer.losses)

[<tf.Tensor: id=242, shape=(), dtype=float32, numpy=0.0017028431>]


- These losses are meant to be taken into account when writing training loops, like the following example.

In [22]:
# Instantiate an optimizer

# optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)
# loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

In [23]:
# Iterate over the batches of a dataset

# for X_batch_train, y_batch_train in train_dataset:
#     with tf.GradientTape() as tape:
#         logits = layer(X_batch_train) # Get logits fro this minibatch
#         loss_value = loss_fn(y_batch_train, logits) # Get loss value for this minibatch
#         loss_value += sum(model.losses) # Add extra losses created during this forward pass
        
#     grads = tape.gradient(loss_value, model.trainable_weights)
#     optimizer.apply_gradients(zip(grads, model.trainable_weights))

- These losses also work seamlessly with `fit()`.
    - They get automatically summed and added to the main loss, if any.

In [24]:
import numpy as np

inputs = keras.Input(shape=(3,))
outputs = ActivityRegularizationLayer()(inputs)
model = keras.Model(inputs, outputs)

In [25]:
# If there is a loss passed in 'compile', 3 regularization losses get added to it
model.compile(optimizer='nadam', loss='mse')
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))

Train on 2 samples


<tensorflow.python.keras.callbacks.History at 0x7f8d03463d90>

In [26]:
# It's also possible not to pass any loss in 'compile', since the model already has a loss to minimize, via the
# 'add_loss' call during the forward pass

# model.compile(optimizer='nadam')
# model.fit(np.random.random((2, 3)), np.random.random((2, 3)))

# The `add_metric()` method
- Similarly to `add_loss()`, layers also have an `add_metric()` method for tracking the moving average of a quantity during training.
- Consider the following "logistic endpoint" layer.
    - It takes inputs, predictions, and targets.
    - It computes a loss which it tracks via `add_loss()`.
    - It computes an accuracy scalar, which it tracks via `add_metric()`.

In [27]:
class LogisticEndpoint(keras.layers.Layer):
    def __init__(self, name=None):
        super().__init__(name=name)
        self.loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)
        self.accuracy_fn = keras.metrics.BinaryAccuracy()
        
    def call(self, targets, logits, sample_weights=None):
        # Compute the training-time loss value and add it to the layer using self.add_loss()
        loss = self.loss_fn(targets, logits, sample_weights)
        self.add_loss(loss)
        
        # Compute og accuracy as a metric and add it to the layer using self.add_metric()
        acc = self.accuracy_fn(targets, logits, sample_weights)
        self.add_metric(acc, name='accuracy')
        
        return tf.nn.softmax(logits)

- Metrics trakced in this way are accessible via `layer.metrics`.

In [28]:
layer = LogisticEndpoint()

targets = tf.ones((2, 2))
logits = tf.ones((2, 2))
y = layer(targets, logits)

print("layer.metrics:", layer.metrics)
print("current accuracy value:", float(layer.metrics[0].result()))

layer.metrics: [<tensorflow.python.keras.metrics.BinaryAccuracy object at 0x7f8d03463450>]
current accuracy value: 1.0


- Just like for `add_loss()`, these metrics are tracked by `fit()`.

In [29]:
inputs = keras.Input(shape=(3,), name='inputs')
targets = keras.Input(shape=(10,), name='targets')
logits = keras.layers.Dense(10)(inputs)
predictions = LogisticEndpoint(name='predictions')(logits, targets)

In [30]:
model = keras.Model(inputs=[inputs, targets], outputs=predictions)
model.compile(optimizer='adam')



In [31]:
data = {
    "inputs": np.random.random((3,3)),
    "targets": np.random.random((3, 10))
}

In [32]:
model.fit(data)

Train on 3 samples


<tensorflow.python.keras.callbacks.History at 0x7f8d02f7e1d0>

# You can optionally enable serialization on your layers
- If you need your custom layers to be serializable as part of a Functional model, you can optionally implement a `get_config()` method.

In [33]:
class Linear(keras.layers.Layer):
    def __init__(self, units=32):
        super().__init__()
        self.units = units
        
    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer='random_normal',
            trainable=True
        )
        
        self.b = self.add_weight(
            shape=(self.units),
            initializer='random_normal',
            trainable=True
        )
        
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b
    
    def get_config(self):
        return {"units": self.units}

In [34]:
layer = Linear(64)
config = layer.get_config()
print(config)

{'units': 64}


In [35]:
new_layer = Linear.from_config(config)
new_layer.get_config()

{'units': 64}

- Note that the `__init__()` method of the base `Layer` class takes some keyword arguments, in particular a `name` and a `dtype`.
- It's good practice to pass these arguments to the parent class in `__init__()` and to include them in the layer config.

In [36]:
class Linear(keras.layers.Layer):
    def __init__(self, units=32, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        
    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer='random_normal',
            trainable=True
        )
        
        self.b = self.add_weight(
            shape=(self.units),
            initializer='random_normal',
            trainable=True
        )
        
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "units": self.units}

In [37]:
layer = Linear(64)
config = layer.get_config()
config

{'name': 'linear_8', 'trainable': True, 'dtype': 'float32', 'units': 64}

In [38]:
new_layer = Linear.from_config(config)
new_layer.get_config()

{'name': 'linear_8', 'trainable': True, 'dtype': 'float32', 'units': 64}

# Privileged `training` argument in the `call()` method
- Some layers, in particular the `BatchNormalization` layer and the `Dropout` layer, have different behaviors during training and inference.
- For such layers, it is a standard practice to expose a `training` (boolean) argument in the `call()` method.
- By exposing this argument in `call()`, you enbale the built-in training and evaluation loops (e.g. `fit()`) to correctly use the layer in training and inference. 

In [39]:
class CustomDropout(keras.layers.Layer):
    def __init__(self, rate, **kwargs):
        super().__init__(**kwargs)
        self.rate = rate
        
    def call(self, inputs, training=None):
        if training:
            return tf.nn.dropout(inputs, rate=self.rate)
        else:
            return inputs

# The `Model` Class
- In general, you will use the `Layer` class to define inner computation blocks, and will use the `Model` class to define the outer model - the object you will train.
- The `Model` class has the same API as `Layer`, with the following differences:
    - It exposes **built-in training, evaluation, and prediction loops** (`model.fit()`, `model.evaluate()`, `model.predict()`).
    - It exposes the **list of its inner layers**, via the `model.layers` property.
    - It exposes **saving and serialization APIs** (`save()`, `save_weights()`, etc.).
- In general, if you will need to call `fit()` or `save()` on the class you are creating, go with `Model`, otherwise go with `Layer`.

In [40]:
# Assume we've created a ResNetBlock subclass of Layer

# class ResNet(tf.keras.Model):
#     def __init__(self):
#         super().__init__()
#         self.block_1 = ResNetBlock()
#         self.block_2 = ResNetBlock()
#         self.global_pool = layers.GlobalAveragePooling2D()
#         self.classifier = Dense(num_classes)
        
#     def call(self, inputs):
#         X = self.block_1(inputs)
#         X = self.block_2(X)
#         X = self.global_pool(X)
#         return self.classifier(X)
    
# resnet = ResNet()
# dataset = ...
# resnet.fit(dataset, epochs=10)
# resnet.save('filepath')

# Putting it all together: an end-to-end example
- Here's what you've learned so far:
    - A `Layer` encapsulate a state (created in `__init__()` or `build()`) and some computation (defined in `call()`).
    - Layers can be recursively nested to create new, bigger computation blocks.
    - Layers can create and track losses (typically regularization losses) as well as metrics, via `add_loss()` and `add_metric()`.
    - The outer container, the thing you want to train, is a `Model`.
    - A `Model` is just like a `Layer`, but with added training and serialization utilities.
- Let's put all of these things together into an end-to-end example: we're going to implement a **Variational AutoEncoder (VAE)**, and we'll train it on MNIST digits.
    - Our VAE will be a subclass of `Model`, built as nested composition of layers that subclass `Layer`.
    - It will feature a regularization loss (KL divergence).

In [41]:
class Sampling(keras.layers.Layer):
    # Uses (z_mean, z_log_var) to sample z, the vector encoding a digit.
    
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon    

In [42]:
class Encoder(keras.layers.Layer):
    # Maps MNIST digits to a triplet (z_mean, z_log_var, z).
    
    def __init__(self, latent_dim=32, intermediate_dim=64, name='encoder', **kwargs):
        super().__init__(name=name, **kwargs)
        self.dense_proj = keras.layers.Dense(intermediate_dim, activation='relu')
        self.dense_mean = keras.layers.Dense(latent_dim)
        self.dense_log_var = keras.layers.Dense(latent_dim)
        self.sampling = Sampling()
        
    def call(self, inputs):
        X = self.dense_proj(inputs)
        z_mean = self.dense_mean(X)
        z_log_var = self.dense_log_var(X)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z

In [43]:
class Decoder(keras.layers.Layer):
    # Converts z, the encoded digit vector, back into a readable digit.

    def __init__(self, original_dim, intermediate_dim=64, name="decoder", **kwargs):
        super(Decoder, self).__init__(name=name, **kwargs)
        self.dense_proj = keras.layers.Dense(intermediate_dim, activation="relu")
        self.dense_output = keras.layers.Dense(original_dim, activation="sigmoid")

    def call(self, inputs):
        x = self.dense_proj(inputs)
        return self.dense_output(x)

In [44]:
class VariationalAutoEncoder(keras.Model):
    # Combines the encoder and decoder into an end-to-end model for training
    
    def __init__(self, original_dim, intermediate_dim=64, latent_dim=32, name='autoencoder', **kwargs):
        super().__init__(name=name, **kwargs)
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)
        
    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        
        # Add KL divergence regularization loss
        kl_loss = -0.5 * tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
        
        self.add_loss(kl_loss)
        return reconstructed

In [45]:
vae = VariationalAutoEncoder(784, 64, 32)
vae.compile(loss=tf.keras.losses.MeanSquaredError(), optimizer=keras.optimizers.Adam(learning_rate=1e-3))
# vae.fit(X_train, y_train, epochs=2, batch_size=64)