In [0]:
%tensorflow_version 2.x

TensorFlow 2.x selected.


In [0]:
import tensorflow as tf

In [0]:
print(tf.config.list_physical_devices('GPU'))

[]


TensorFlow provides both a set of many common layers as a well as easy ways for you to write your own application-specific layers either from scratch or as the composition of existing layers.

Implementing custom layers

The best way to implement your own layer is extending the tf.keras.Layer class and implementing: * __init__ , where you can do all input-independent initialization * 
build, where you know the shapes of the input tensors and can do the rest of the initialization * call, where you do the forward computation

# Creating class for layer

In [0]:
class my_dense_layer(tf.keras.layers.Layer):
    def __init__(self, num_outputs):
        super().__init__()
        self.num_outputs = num_outputs
    
    def build(self, input_shape):
        self.kernel = self.add_weight("kernel", shape=[int(input_shape[-1]), self.num_outputs])
    
    def call(self, input):
        return tf.matmul(input, self.kernel)


In [0]:
layer = my_dense_layer(10)

In [0]:
op = layer(tf.zeros([10, 5]))

In [0]:
print([var.name for var in layer.trainable_variables])

['my_dense_layer/kernel:0']


Overall code is easier to read and maintain if it uses standard layers whenever possible, as other readers will be familiar with the behavior of standard layers

# Composing Layers to create Models

Many interesting layer-like things in machine learning models are implemented by composing existing layers. For example, each residual block in a resnet is a composition of convolutions, batch normalizations, and a shortcut. Layers can be nested inside other layers.

Typically you inherit from keras.Model when you need the model methods like: Model.fit,Model.evaluate, and Model.save 

One other feature provided by keras.Model (instead of keras.layers.Layer) is that in addition to tracking variables, a keras.Model also tracks its internal layers, making them easier to inspect.

In [0]:
class resnet_block(tf.keras.Model):
    def __init__(self, kernel_size, filters):
        super().__init__()
        filters1, filters2, filters3 = filters

        self.conv2a = tf.keras.layers.Conv2D(filters1, (1, 1))
        self.bn2a = tf.keras.layers.BatchNormalization()

        self.conv2b = tf.keras.layers.Conv2D(filters2, kernel_size, padding='same')
        self.bn2b = tf.keras.layers.BatchNormalization()

        self.conv2c = tf.keras.layers.Conv2D(filters3, kernel_size, padding='same')
        self.bn2c = tf.keras.layers.BatchNormalization()

    
    def call(self, input_tensor, training=False):
        x = self.conv2a(input_tensor)
        x = self.bn2a(x, training=training)
        x = tf.nn.relu(x)

        x = self.conv2b(x)
        x = self.bn2b(x, training=training)
        x = tf.nn.relu(x)

        x = self.conv2c(x)
        x = self.bn2c(x, training=training)
        
        x += input_tensor
        return tf.nn.relu(x)

In [0]:
block = resnet_block(1, [1,2,3])

In [0]:
_ = block(tf.zeros([1, 2, 3, 3]))

In [0]:
block.layers

[<tensorflow.python.keras.layers.convolutional.Conv2D at 0x7fa98f4e9e48>,
 <tensorflow.python.keras.layers.normalization_v2.BatchNormalization at 0x7fa98f4e7a58>,
 <tensorflow.python.keras.layers.convolutional.Conv2D at 0x7fa98f4e7b38>,
 <tensorflow.python.keras.layers.normalization_v2.BatchNormalization at 0x7fa98f4e74a8>,
 <tensorflow.python.keras.layers.convolutional.Conv2D at 0x7fa98f4e7518>,
 <tensorflow.python.keras.layers.normalization_v2.BatchNormalization at 0x7fa98f4e7f98>]

In [0]:
block.summary()

Model: "resnet_block"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              multiple                  4         
_________________________________________________________________
batch_normalization (BatchNo multiple                  4         
_________________________________________________________________
conv2d_1 (Conv2D)            multiple                  4         
_________________________________________________________________
batch_normalization_1 (Batch multiple                  8         
_________________________________________________________________
conv2d_2 (Conv2D)            multiple                  9         
_________________________________________________________________
batch_normalization_2 (Batch multiple                  12        
Total params: 41
Trainable params: 29
Non-trainable params: 12
_________________________________________________________

# Writing Custom layers and Models in Keras

In [0]:
tf.keras.backend.clear_session()

## The Layer class

The main data structure you'll work with is the Layer. A layer encapsulates both a state (the layer's "weights") and a transformation from inputs to outputs (a "call", the layer's forward pass).

Here's a densely-connected layer. It has a state: the variables w and b.

In [0]:
from tensorflow.keras import layers

In [0]:
class Linear(layers.Layer):

    def __init__(self, units=32, input_dim=32):
        super().__init__()
        w_init = tf.random_normal_initializer()
        self.w = tf.Variable(initial_value=w_init(shape=(input_dim, units), dtype='float32'), trainable=True)
        b_init = tf.zeros_initializer()
        self.b = tf.Variable(initial_value=b_init(shape=(units, ), dtype=tf.float32), trainable=True)

    
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b


In [0]:
x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)

In [0]:
print(y)

tf.Tensor(
[[ 0.0302961   0.03911199  0.06768572 -0.04566586]
 [ 0.0302961   0.03911199  0.06768572 -0.04566586]], shape=(2, 4), dtype=float32)


In [0]:
class Linear(layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        self.w = self.add_weight(shape=(input_dim, units), initializer='random_normal', trainable=True)
        self.b = self.add_weight(shape=(units, ), initializer='zeros', trainable=True)

    
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b



In [0]:
x = tf.ones((2,2))
linear_layer = Linear(4, 2)

In [0]:
y = linear_layer(x)

In [0]:
print(y)

tf.Tensor(
[[ 0.02960566  0.08698709  0.11366069 -0.01657881]
 [ 0.02960566  0.08698709  0.11366069 -0.01657881]], shape=(2, 4), dtype=float32)


# Building Models
- The Model class


In general, you will use the Layer class to define inner computation blocks, and will use the Model class to define the outer model -- the object you will train.

For instance, in a ResNet50 model, you would have several ResNet blocks subclassing Layer, and a single Model encompassing the entire ResNet50 network.

The Model class has the same API as Layer, with the following differences:


- It exposes built-in training, evaluation, and prediction loops (model.fit(), model.evaluate(), model.predict()).
- It exposes the list of its inner layers, via the model.layers property.
- It exposes saving and serialization APIs.


Effectively, the "Layer" class corresponds to what we refer to in the literature as a "layer" (as in "convolution layer" or "recurrent layer") or as a "block" (as in "ResNet block" or "Inception block").

Meanwhile, the "Model" class corresponds to what is referred to in the literature as a "model" (as in "deep learning model") or as a "network" (as in "deep neural network").

For instance, we could take our mini-resnet example above, and use it to build a Model that we could train with fit(), and that we could save with save_weights:

In [0]:
class ResNet(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.block_1 = ResNetBlock()
        self.block_2 = ResNetBlock()
        self.global_pool = layers.GlobalAveragePooling2D()
        self.classifier = Dense(num_classes)
    
    def call(self, inputs):
        x = self.block_1(inputs)
        x = self.block_2(x)
        x = self.global_pool(x)
        return self.classifier(x)
    
        

# End to end Example of doing VAE using custom class

Here's what you've learned so far:

    A Layer encapsulate a state (created in __init__ or build) and some computation (in call).
    Layers can be recursively nested to create new, bigger computation blocks.
    Layers can create and track losses (typically regularization losses).
    The outer container, the thing you want to train, is a Model. A Model is just like a Layer, but with added training and serialization utilities.

Let's put all of these things together into an end-to-end example: we're going to implement a Variational AutoEncoder (VAE). We'll train it on MNIST digits.

Our VAE will be a subclass of Model, built as a nested composition of layers that subclass Layer. It will feature a regularization loss (KL divergence).

In [0]:
from tensorflow.keras import layers

In [0]:
class sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

In [0]:
class Encoder(layers.Layer):
    """Maps MNIST digits to a triplet (z_mean, z_log_var, z)."""

    def __init__(self, latent_dim=32, intermediate_dim=64, name='encoder', **kwargs):
        super(Encoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
        self.dense_mean = layers.Dense(latent_dim)
        self.dense_log_var = layers.Dense(latent_dim)
        self.sampling = sampling()

    def call(self, inputs):
        x = self.dense_proj(inputs)
        z_mean = self.dense_mean(x)
        z_log_var = self.dense_log_var(x)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z



In [0]:
ec = Encoder(32, 64)

In [0]:
class Decoder(layers.Layer):
    """Converts z, the encoded digit vector, back into a readable digit."""
    def __init__(self, original_dim, intermediate_dim=64, name='decoder', **kwargs):
        super(Decoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
        self.dense_output = layers.Dense(original_dim, activation='sigmoid')

    def call(self, inputs):
        x = self.dense_proj(inputs)
        return self.dense_output(x)



In [0]:
dc = Decoder(32, 64)

In [0]:
class vae(tf.keras.Model):
    """Combines the encoder and decoder into an end-to-end model for training."""
    def __init__(self, orignal_dim, intermediate_dim=64, latent_dim=32, name='autoencoder', **kwargs):
        super().__init__()
        self.orignal_dims = orignal_dim
        self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
        self.decoder = Decoder(orignal_dim, intermediate_dim=intermediate_dim)
    
    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        kl_loss = - 0.5 * tf.reduce_mean(
        z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
        self.add_loss(kl_loss)
        return reconstructed


In [0]:
original_dim = 784
va = vae(original_dim, 64, 32)

In [0]:
optimizer = tf.keras.optimizers.Adam()
mse_loss = tf.keras.losses.MeanSquaredError()
loss_metric = tf.keras.metrics.Mean()

In [0]:
(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, x_train.shape[1] * x_train.shape[2]).astype('float32') / 255.

In [0]:
print(x_train.shape)

(60000, 784)


In [0]:
train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

In [0]:
epochs = 3

In [0]:
for epoch in range(epochs):
    print('Start of epoch %d' % (epoch,))

    # Iterate over the batches of the dataset.
    for step, x_batch_train in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            reconstructed = vae(x_batch_train)
            # Compute reconstruction loss
            loss = mse_loss_fn(x_batch_train, reconstructed)
            loss += sum(vae.losses)  # Add KLD regularization loss

    grads = tape.gradient(loss, vae.trainable_weights)
    optimizer.apply_gradients(zip(grads, vae.trainable_weights))

    loss_metric(loss)

    if step % 100 == 0:
        print('step %s: mean loss = %s' % (step, loss_metric.result()))


In [0]:
vae_2 = vae(784, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

vae_2.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae_2.fit(x_train, x_train, epochs=3, batch_size=64)


# Same stuff can be done with Functional API

You can also build models using the Functional API. Importantly, choosing one style or another does not prevent you from leveraging components written in the other style: you can always mix-and-match.

For instance, the Functional API example below reuses the same Sampling layer we defined in the example above.

In [0]:
orignal_dim = 784
intermediate_dim = 64
latent_dim = 32

In [0]:
class sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

In [0]:
original_inputs = tf.keras.Input(shape=(orignal_dim, ), name="encoder_inputs")
x = layers.Dense(intermediate_dim, activation='relu') (original_inputs)
z_mean = layers.Dense(latent_dim, name="z_mean") (x)
z_log_var = layers.Dense(latent_dim, name="z_log_var") (x)
z = sampling()((z_mean, z_log_var))
encoder = tf.keras.Model(inputs=original_inputs, outputs=z, name="encoder")

In [0]:
latent_inputs = tf.keras.Input(shape=(latent_dim, ), name="z_sampling")
x = layers.Dense(intermediate_dim, activation='relu') (latent_inputs)
outputs = layers.Dense(orignal_dim, activation='sigmoid') (x)
decoder = tf.keras.Model(inputs=latent_inputs, outputs=outputs, name="decoder")

In [0]:
outputs = decoder(z)

In [0]:
vae = tf.keras.Model(inputs=original_inputs, outputs=outputs, name='vae')

In [0]:
kl_loss = - 0.5 * tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
vae.add_loss(kl_loss)

In [0]:
optimizer = tf.keras.optimizers.Adam(lr=1e-3)
vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())

In [0]:
history = vae.fit(x_train, x_train, epochs=3, batch_size=64, verbose=1)

Train on 60000 samples
Epoch 1/3
Epoch 2/3
Epoch 3/3
