In [36]:
import tensorflow as tf
from tensorflow import keras

# 0. The most basic building block

## 0.1 Trainable layer

In [None]:
class Linear(keras.layers.Layer):
    def __init__(self, units = 32, input_dim = 32):
        # units: dimension of an output
        super().__init__()
        # initalize weights
        w_init = tf.random_normal_initializer()
        self.w = tf.Variable(
            initial_value = w_init(shape = (input_dim, units), dtype = 'float32'),
            trainable = True,
            name = 'w'
        )
        b_init = tf.zeros_initializer()
        self.b = tf.Variable(
            initial_value = b_init(shape= (units, ), dtype= 'float32'),
            trainable = True,
            name = 'b'
        )

    def call(self, x):
        return tf.matmul(x,self.w) + self.b

In [None]:
linear = Linear(4,2)
x = tf.ones((2,2))
linear(x)

<tf.Tensor: shape=(2, 4), dtype=float32, numpy=
array([[-0.03611   ,  0.01389586,  0.04623048, -0.09226894],
       [-0.03611   ,  0.01389586,  0.04623048, -0.09226894]],
      dtype=float32)>

In [None]:
# Note that the weights w and b are automatically tracked by the layer upon being set as a layer attributes.
linear.weights == [linear.w, linear.b]

True

## 0.2 Non trainable layer

In [None]:
# Layers can have non-trainable weights.
# Besides trainable weights, you can add non-trainable weights to a layer as well.
# Such weights are meant not to be taken into account during backpropagation
# when you are training the layer.
class ComputeSum(keras.layers.Layer):
    def __init__(self, input_dim):
        super(ComputeSum, self).__init__()
        self.total = tf.Variable(initial_value= tf.zeros((input_dim, )), trainable= False)

    def call(self, x):
        # return column sum.
        # sum by row (0)
        # assign and add the element which have been summed along the row axis to the self.total
        # the summation is accumulated as it recevies "call"s.
        return self.total.assign_add(tf.reduce_sum(x, axis= 1))

In [None]:
x = tf.ones((2,3)) # [[1,1,1],[1,1,1]]
my_sum = ComputeSum(x.shape[0])
y = my_sum(x)
print(y.numpy()) # [3,3]
y = my_sum(x)
print(y.numpy()) # [6,6]

[3. 3.]
[6. 6.]


In [None]:
# it's part of layer.weights, but it gets categorized as a non-trainable weight:
print("weights: ", len(my_sum.trainable_weights))
print("non-trainable weights: ", len(my_sum.non_trainable_weights))

print("trainable weights: ", my_sum.trainable_weights)

weights:  0
non-trainable weights:  1
trainable weights:  []


## 0.3 Deferring weight creation util the shape of the inputs is known

In [None]:
# Our Linear layer above took an input_dimargument that was used to compute the shape of the weight
# w and b in __init__()

# Use build method to create a layer.
class Linear(keras.layers.Layer):
    def __init__(self, units= 32):
        # input_dim is determined after the shape of the inputs is known.
        super(Linear, self).__init__()
        self.units= units

    def build(self, input_shape):
        # We do not know the input_shape yet.
        # Use self.add_weight
        self.w = self.add_weight(
            shape= (input_shape[-1], self.units),
            initializer = "random_normal",
            trainable = True
        )
        self.b = self.add_weight(
            shape = (self.units, ),
            initializer = "random_normal",
            trainable = True
        )
    
    def call(self, x):
        return tf.matmul(x, self.w) + self.b


In [None]:
# At instantiation, we don't know on what inputs this is going to get called.
linear_layer = Linear(32)

# The layer's weights are created dynamically the first time the layer is called.
y = linear_layer(x)

# 1. Various Layer Block    


## 1.1 Multi layer perceptron block     
Layers are recursively composable.

In [None]:
class MLPBlock(keras.layers.Layer):
    def __init__(self):
        super().__init__()
        self.linear1 = Linear(32)
        self.linear2 = Linear(16)
        self.linear3 = Linear(8)

    def call(self, x):
        x = self.linear1(x)
        x = self.linear2(x)
        x = self.linear3(x)
        return x

In [None]:
mlp = MLPBlock()
y = mlp(tf.ones(shape=(3,64))) # The first call to the mlp will create the weights
print("weights:", len(mlp.weights))
print("trainable weights:", len(mlp.trainable_weights))

weights: 6
trainable weights: 6


In [None]:
mlp.weights

## 1.2 The add_loss() method    
When writing the call() method of a layer, you can create loss tensors that you will want to use later      
When writing your training loop.      
This is doable by calling self.add_loss(value)

In [40]:
# A layer that creates an activity regularization loss.
class ActivityRegularizationLayer(keras.layers.Layer):
    def __init__(self, rate= 1e-2):
        super(ActivityRegularizationLayer, self).__init__()
        self.rate = rate
    
    def call(self, inputs):
        self.add_loss(self.rate * tf.reduce_sum(inputs))
        return inputs
    # These losses can be retrieved via layer.losses. This property is reset at the start of
    # every __call__() to the top-level layer, so that layer.losses
    # always contains the loss values created during the last forward pass.  

class OuterLayer(keras.layers.Layer):
    def __init__(self):
        super(OuterLayer, self).__init__()
        self.activity_reg = ActivityRegularizationLayer(1e-2)

    def call(self, inputs):
        return self.activity_reg(inputs)

In [47]:
layer = OuterLayer()
assert len(layer.losses) == 0
_ = layer(tf.zeros(1,1))
assert len(layer.losses) == 1

# layer.losses gets rest at the start of each __call__
_ = layer(tf.ones((1,1)))
assert len(layer.losses) == 1

print(layer.losses)

[<tf.Tensor: shape=(), dtype=float32, numpy=0.01>]


## 1.3 Kernel regularizer      
[Regularizers](https://www.tensorflow.org/api_docs/python/tf/keras/regularizers/Regularizer)     
* `kernel_regularizer`: Regularizer to apply a penalty on the layer's kernel
* `bias_regularizer`: Regularizer to apply a penalty on the layer's bias
* `activity_regularizer`: Regularizer to apply a penalty on the layer's output

```python
# Available penalties
tf.keras.regularizers.L1(0.3)  # L1 Regularization Penalty
tf.keras.regularizers.L2(0.1)  # L2 Regularization Penalty
tf.keras.regularizers.L1L2(l1=0.01, l2=0.01)  # L1 + L2 penalties

```

In [51]:
class OuterLayerWithKernelRegularizer(keras.layers.Layer):
    def __init__(self, kernel_reg_rate = 1e-3):
        super(OuterLayerWithKernelRegularizer, self).__init__()
        self.dense = keras.layers.Dense(
            32,
            activation= 'relu',
            use_bias = True,
            activity_regularizer= None,
            kernel_regularizer = tf.keras.regularizers.l2(1e-3)
        )
    
    def call(self, inputs):
        return self.dense(inputs)
         

## 1.3 Regularizers in details   

### 1.3.1 Directly calling a regularizer

In [49]:
regularizer = tf.keras.regularizers.L2(2.)
kernel = tf.ones(shape=(5, 5))
regularizer(kernel) 

<tf.Tensor: shape=(), dtype=float32, numpy=50.0>

### 1.3.2 Developing new regularizers

Any function that takes in a weight matrix and returns a scalar tensor can be used as a regularizer.


In [59]:
@tf.keras.utils.register_keras_serializable(package='Custom', name='myl1')
def l1_reg(weight_matrix):
    return 0.01 * tf.math.reduce_sum(tf.math.abs(weight_matrix))

class L1CustomLossLayer(keras.layers.Layer):
    def __init__(self):
        super(L1CustomLossLayer, self).__init__()
        self.dense = keras.layers.Dense(
            32,
            activation= 'relu',
            use_bias = True,
            activity_regularizer= l1_reg,
            kernel_regularizer = None
        )

    def call(self, inputs):
        return self.dense(inputs)

In [60]:
layer = L1CustomLossLayer()
inputs = tf.ones(shape= (3,4))
layer(inputs)
print(layer.losses) # 0.01 * tf.reduce_sum(inputs)

[<tf.Tensor: shape=(), dtype=float32, numpy=0.08917602>]


In [91]:
# Registration is required for Keras model_to_estimator, 
# saving and loading models to HDF5 formats, Keras model cloning, 
# some visualization utilities, and exporting models to and from JSON
@tf.keras.utils.register_keras_serializable(package='Custom', name='myl2') 
class L2Regularizer(tf.keras.regularizers.Regularizer):
    def __init__(self, l2= 0.):
        self.l2 = l2
    
    def __call__(self, x):
        return self.l2 * tf.math.reduce_sum(tf.math.square(x))
    
    def get_config(self):
        return {'l2':float(self.l2)}

In [92]:
@tf.keras.utils.register_keras_serializable(package='Custom', name='myl1l2')
class L1L2Regularizer(tf.keras.regularizers.Regularizer):
    def __init__(self, l1= 0., l2= 0.):
        self.l1, self.l2 = l1, l2
    
    def __call__(self, inputs):
        return self.l1 * tf.math.reduce_sum(tf.math.abs(inputs)) + self.l2 * tf.math.reduce_sum(tf.math.square(inputs))
    
    def get_config(self):
        return {'l1':float(self.l1), 'l2':float(self.l2)}

In [93]:
class L2CustomLossLayer(keras.layers.Layer):
    def __init__(self, l1 = 1e-2, l2 = 1e-2):
        super(L2CustomLossLayer, self).__init__()
        self.l1, self.l2 = l1, l2
        self.dense = keras.layers.Dense(
            units = 32,
            activation = 'relu',
            kernel_regularizer= L2Regularizer(l2= self.l2)
        )
    
    def call(self, x):
        return self.dense(x)
    

In [94]:
layer = L2CustomLossLayer()
inputs = tf.ones((3,4), dtype= tf.float32)
layer(inputs)
print(layer.losses)

[<tf.Tensor: shape=(), dtype=float32, numpy=0.067631654>]


A note on serialization and deserialization:    
(@tf.keras.utils.register_keras_serializable(package='Custom', name='myl2'))    

    
(1) Registering the regularizers as serializable is optional if you are just training and executing models, exporting to and from SavedModels, or saving and loading weight checkpoints.
    
(2) Registration is required for Keras model_to_estimator, saving and loading models to HDF5 formats, Keras model cloning, some visualization utilities, and exporting models to and from JSON. If using this functionality, you must make sure any python process running your model has also defined and registered your custom regularizer.

### 1.3.3 Losses during the training step

```python
# Those losses are meant to be taken into account when writing training loops like this:
# instantiate an optimizer...
optimizer = tf.keras.optimizer.SGD(learning_rate= 1e-3)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits= True) 

# Iterate over the batches of a dataset.
for x_batch_train, y_batch_train in train_dataset:
    with tf.GradientTape() ad tape:
        logits = layer(x_batch_train)
        loss_value = loss_fn(y_batch_train, logits)
        # Add extra losses created during this forward loss.
        loos_value += sum(model.losses)
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))

```

In [96]:
# These losses also work seamlessly with fit() 
# they get automatically summed and added to the main loss, if any.

import numpy as np

inputs = keras.Input(shape= (3,))
outputs = ActivityRegularizationLayer()(inputs)
model = keras.Model(inputs, outputs)

# If there is a loss passed in compile, the regularization
# losses get added to it.
model.compile(optimizer= 'adam', loss = 'mse')
model.fit(np.random.random((2,3)), np.random.random((2,3)))

# It's also possible not to pass any loss in compile
# since the model already ahs a loss to minimze, via the add_loss
# call during the forward pass!
model.compile(optimizer = 'adam')
model.fit(np.random.random((2,3)))



<keras.callbacks.History at 0x7f14036fa650>

## 1.4 The add_metric() method  
Similarly to add_loss(), layers also have an add_metric() method for tracking the moving average of a quantity during training.    
Consider the following layer.

In [98]:
class LogisticEndpoint(keras.layers.Layer):
    def __init__(self, name= None):
        super(LogisticEndpoint, self).__init__(name= name)
        self.loss_fn = keras.losses.BinaryCrossentropy(from_logits= True)
        self.accuracy_fn = keras.metrics.BinaryAccuracy()
        
    def call(self, targets, logits, sample_weights=None):
        # Compute the training-time loss value and add it to the
        # layer using self.add_loss()
        loss = self.loss_fn(targets, logits, sample_weights)
        self.add_loss(loss)

        # Log accuarcy as a metric and add it
        # to the layer using self.add_metric()
        acc = self.accuracy_fn(targets, logits, sample_weights)
        self.add_metric(acc, name= 'accuracy')

        # Return the inference-time prediction tensor. (for .predict())
        return tf.nn.softmax(logits)

In [99]:
# Metrics tracked in this way are accessible via layer.metrics:
layer = LogisticEndpoint()
targets = tf.ones((2,2))
logits = tf.ones((2,2))
y = layer(targets, logits)

print("layer.metrics: ", layer.metrics)
print("current accuracy value:", float(layer.metrics[0].result()))

layer.metrics:  [<keras.metrics.BinaryAccuracy object at 0x7f1404f37c10>]
current accuracy value: 1.0


In [100]:
# Just like for add_loss(), these metrics are tracked by fit()
inputs = keras.Input(shape=(3,), name= "inputs")
targets = keras.Input(shape=(10,), name= "targets")
logits = keras.layers.Dense(10)(inputs)
predictions = LogisticEndpoint(name="predictions")(logits, targets)

model = keras.Model(inputs = [inputs, targets], outputs= predictions)
model.compile(optimizer= 'adam')

data = {
    "inputs":np.random.random((3,3)),
    "targets":np.random.random((3,10))
}

model.fit(data)



<keras.callbacks.History at 0x7f140d9cbb50>

In [101]:
model.metrics_names

['loss', 'binary_accuracy']

## 1.5 Optionally enabling serialization on your layers

In [103]:
class Linear(keras.layers.Layer):
    def __init__(self, units= 32):
        super(Linear, self).__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape = (input_shape, self.units),
            initializer = "random_normal",
            trainable = True
        )
        self.b = self.add_weight(
            shape = (self.units, ),
            initializer = "random_normal",
            trainable = True
        )
    
    def call(self, x):
        return tf.matmul(x, self.w) + self.b
    
    def get_config(self):
        return {"units": self.units}

In [104]:
# Now you can recreate the layer from its config:
layer = Linear(64)
config = layer.get_config()
print(config)

{'units': 64}


In [105]:
# recreate
new_layer = Linear.from_config(config)

# 2. Some useful options

##. 2.1 Arguments of \_\_init__() method

In [108]:
# Note that the __init__() method of the base Layer class takes some keyword arguments,
# In particular a name and a dtype.
# It's good practice to pass these arguments to the parent class in 
# __init__() and to include them in the layer config.
class Linear(keras.layers.Layer):

    def __init__(self, units= 32, **kargs):
        super(Linear, self).__init__(**kargs)
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape = (input_shape[-1], self.units),
            initializer = "random_normal",
            trainable = True
        )
        self.b = self.add_weight(
            shape = (self.units,), 
            initializer = "random_normal",
            trainable = True
        )        

    def call(self, x):
        return tf.matmul(x, self.w) + self.b

###################################################
    def get_config(self):                         #
        config = super(Linear, self).get_config() #
        config.update({"units": self.units})      #
        return config                             #
###################################################

layer = Linear(64)
config = layer.get_config()
print(config)

{'name': 'linear_10', 'trainable': True, 'dtype': 'float32', 'units': 64}


### 2.2 Privileged training argument in the call() method
* training: boolean

In [109]:
class CustomDropout(keras.layers.Layer):
    def __init__(self, rate, **kargs):
        super(CustomDropout, self).__init__(**kargs)
        self.rate = rate
    
    def call(self, x, training = None):
        if training:
            return tf.nn.dropout(x, rate= self.rate)
        return x

# mask argument
# You will find it in all keras RNN layers
# A mask is a boolean tensor (one boolean value per timestep in the input)
# used to skip certain input timesteps when processing tim-series data
# Keras will automatically  pass the correct mask argument to __call__() for layers
# that support it. when a mask is generated by a prior layer.

In [110]:
# When to use layer or model
# fit(), save() --> model
# basic building block --> layer

# 3. When to use layer or model     


*   fit(), save() --> model
*   basic building block --> layer





```python
# Example: ResNet
class ResNet(tf.keras.Model):
    
    def __init__(self):
        super(ResNet, self).__init__()
        self.block_1 = ResNetBlock()
        self.block_2 = ResNetBlock()
        self.global_pool = layers.GlobalAveragePooling2D()
        self.classifier = keras.layers.Dense(num_classes)

    def call(self, x):
        x = self.block_1(x)
        x = self.block_2(x)
        x = self.global_pool(x)
        x = self.classifier(x)
        return x
```



# 4. Putting it all together...

## 4.1 Variational Autoencoder

In [111]:
# We'll train it on MNIST digits
# Our VAE will be a subclass of Model, built as a nested composition of layers that
# subclass Layer.
# It willl feature a regularization loss (KL divergence)

In [113]:
# layers
from tensorflow.keras import layers

class Sampling(layers.Layer):
    # Uses (z_mean, z_log_var) to sample z, the vector encoding a digit

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch,dim))
        return z_mean + tf.exp(0.5*z_log_var) * epsilon

class Encoder(layers.Layer):
    # Maps MNIST digits to a triplet (z_mean, z_log_var, z).

    def __init__(self, latent_dim=32, intermediate_dim=64, name="encoder", **kargs):
        super(Encoder, self).__init__()
        self.dense_proj = layers.Dense(intermediate_dim, activation= 'relu')
        self.dense_mean = layers.Dense(latent_dim)
        self.dense_log_var = layers.Dense(latent_dim)
        self.sampling = Sampling()

    def call(self, inputs):
        x = self.dense_proj(inputs)
        z_mean = self.dense_mean(x)
        z_log_var = self.dense_log_var(x)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z

class Decoder(layers.Layer):
    # Converts z, the encoded digit vector, back into a readable digit.

    def __init__(self, original_dim, intermediate_dim=64, name="decoder", **kargs):
        super(Decoder, self).__init__(name=name, **kargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
        self.dense_output = layers.Dense(original_dim, activation= 'sigmoid')

    def call(self, inputs):
        x = self.dense_proj(inputs)
        return self.dense_output(x)

class KL_divergence(layers.Layer):

    def __init__(self, exp= 0.5):
        super(KL_divergence, self).__init__()
        self.exp = exp

    def call(self, z_mean, z_log_var, z):
        
        kl_loss = -self.exp * tf.reduce_mean(
            z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1
        )
        self.add_loss(kl_loss)

In [114]:
# VAE model
class VariationalAutoEncoder(keras.Model):
    # Combines the encoder and decoder into an end-to-end model for training
    def __init__(self, 
                 original_dim,
                 intermediate_dim=64,
                 latent_dim=32,
                 name="autoencoder",
                 exp = 0.5,
                 **kargs
                ):
        super(VariationalAutoEncoder, self).__init__()
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim=original_dim, intermediate_dim=intermediate_dim)
        self.kl = KL_divergence(exp)

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        # Add KL divergence regularization loss.
        self.kl(z_mean, z_log_var, z)
        # kl_loss = -0.5 * tf.reduce_mean(
        #     z_log_var - tf.square(z_mean) - tf.exp(z_log_var) +1
        # )
        # self.add_loss(kl_loss)
        return reconstructed


In [118]:
# training loop for mnist
original_dim = 784
vae = VariationalAutoEncoder(original_dim, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate= 1e-3)
mse_loss_fn = tf.keras.losses.MeanSquaredError()

loss_metric = tf.keras.metrics.Mean()

(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32") / 255

train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size= 1024).batch(64)

epochs = 2

# Iterate over epoches
for epoch in range(epochs):

    print(f"Start of epoch {epoch}")

    # Iterate over the batches of the dataset
    for step, x_batch_train in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            reconstructed = vae(x_batch_train)
            loss = mse_loss_fn(x_batch_train, reconstructed)
            assert len(vae.losses) > 0
            loss += sum(vae.losses)
        grads = tape.gradient(loss, vae.trainable_weights)
        optimizer.apply_gradients(zip(grads, vae.trainable_weights))

        loss_metric(loss)

        if step % 100 == 0:
            print(f"step {step}: mean loss = {loss_metric.result():4f}")

Start of epoch 0
step 0: mean loss = 0.361525
step 100: mean loss = 0.126191
step 200: mean loss = 0.099398
step 300: mean loss = 0.089305
step 400: mean loss = 0.084341
step 500: mean loss = 0.080981
step 600: mean loss = 0.078833
step 700: mean loss = 0.077227
step 800: mean loss = 0.076041
step 900: mean loss = 0.075009
Start of epoch 1
step 0: mean loss = 0.074718
step 100: mean loss = 0.074056
step 200: mean loss = 0.073552
step 300: mean loss = 0.073059
step 400: mean loss = 0.072737
step 500: mean loss = 0.072342
step 600: mean loss = 0.072026
step 700: mean loss = 0.071741
step 800: mean loss = 0.071506
step 900: mean loss = 0.071244
