## Writing custom layers and models with Keras


In [2]:
import tensorflow as tf

tf.keras.backend.clear_session()

### Layers encapsulate a state (weights) and some computation

In [3]:
from tensorflow.keras import layers


class Linear(layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        w_init = tf.random_normal_initializer()
        self.w = tf.Variable(initial_value=w_init(shape=(input_dim, units),
                                                  dtype='float32'),
                             trainable=True)
        b_init = tf.zeros_initializer()
        self.b = tf.Variable(initial_value=b_init(shape=(units, ),
                                                  dtype='float32'),
                             trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b


x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
linear_layer(x)

<tf.Tensor: shape=(2, 4), dtype=float32, numpy=
array([[ 0.14288837, -0.06933805, -0.01277662, -0.12460624],
       [ 0.14288837, -0.06933805, -0.01277662, -0.12460624]],
      dtype=float32)>

In [4]:
assert linear_layer.weights == [linear_layer.w, linear_layer.b]

In [5]:
class Linear(layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        self.w = self.add_weight(shape=(input_dim, units),
                                 initializer='random_normal',
                                 trainable=True)
        self.b = self.add_weight(shape=(units, ),
                                 initializer='zeros',
                                 trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b


x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
linear_layer(x)

<tf.Tensor: shape=(2, 4), dtype=float32, numpy=
array([[0.04682288, 0.00891242, 0.06575564, 0.01191417],
       [0.04682288, 0.00891242, 0.06575564, 0.01191417]], dtype=float32)>

In [6]:
class ComputeSum(layers.Layer):
    def __init__(self, input_dim):
        super(ComputeSum, self).__init__()
        self.total = tf.Variable(initial_value=tf.zeros((input_dim)),
                                 trainable=False)

    def call(self, inputs):
        self.total.assign_add(tf.reduce_sum(inputs, axis=0))
        return self.total


x = tf.ones((2, 2))
my_sum = ComputeSum(2)
my_sum(x).numpy()
my_sum(x).numpy()

array([4., 4.], dtype=float32)

In [7]:
print(my_sum.weights)
print(my_sum.non_trainable_weights)
print(my_sum.trainable_weights)

[<tf.Variable 'Variable:0' shape=(2,) dtype=float32, numpy=array([4., 4.], dtype=float32)>]
[<tf.Variable 'Variable:0' shape=(2,) dtype=float32, numpy=array([4., 4.], dtype=float32)>]
[]


### Best practice: deferring weight creation until the shape of the inputs is known

In many cases, you may not know in advance the size of your inputs, and you would like to lazily create weights when that value becomes known, some time after instantiating the layer.

In the Keras API, we recommend creating layer weights in the build(inputs_shape) method of your layer. Like this:

In [8]:
class Linear(layers.Layer):
    def __init__(self, units=32):
        super(Linear, self).__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(shape=(input_shape[-1], self.units),
                                 initializer='random_normal',
                                 trainable=True)
        self.b = self.add_weight(shape=(self.units, ),
                                 initializer='zeros',
                                 trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

In [9]:
x = tf.ones((3, 2))
x = Linear(4)(x)
x = Linear(8)(x)
x = Linear(12)(x)
Linear(1)(x)

<tf.Tensor: shape=(3, 1), dtype=float32, numpy=
array([[1.6304086e-05],
       [1.6304086e-05],
       [1.6304086e-05]], dtype=float32)>

### Layers are recursively composable

In [10]:
class MLPBlock(layers.Layer):
    def __init__(self):
        super(MLPBlock, self).__init__()
        self.linear_1 = Linear(32)
        self.linear_2 = Linear(32)
        self.linear_3 = Linear(1)

    def call(self, inputs):
        x = self.linear_1(inputs)
        x = tf.nn.relu(x)
        x = self.linear_2(x)
        x = tf.nn.relu(x)
        return self.linear_3(x)


mlp = MLPBlock()
y = mlp(tf.ones(shape=(3, 64)))
print('weights:', len(mlp.weights))
print('trainable weights:', len(mlp.trainable_weights))

weights: 6
trainable weights: 6


### Layers recursively collect losses created during the forward pass

In [11]:
class ActivityRegularizationLayer(layers.Layer):
    def __init__(self, rate=1e-2):
        super(ActivityRegularizationLayer, self).__init__()
        self.rate = rate

    def call(self, inputs):
        self.add_loss(self.rate * tf.reduce_sum(inputs))
        return inputs

In [12]:
class OuterLayer(layers.Layer):
    def __init__(self):
        super(OuterLayer, self).__init__()
        self.activity_reg = ActivityRegularizationLayer(1e-2)

    def call(self, inputs):
        return self.activity_reg(inputs)


layer = OuterLayer()
assert len(layer.losses) == 0
_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1
_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1

In [13]:
class OuterLayer(layers.Layer):
    def __init__(self):
        super(OuterLayer, self).__init__()
        self.dense = layers.Dense(
            32, kernel_regularizer=tf.keras.regularizers.l2(1e-3))

    def call(self, inputs):
        return self.dense(inputs)


layer = OuterLayer()
_ = layer(tf.zeros((1, 1)))

assert len(layer.losses) == 1

In [14]:
from tensorflow import keras
import numpy as np

x_train = np.random.rand(1000, 1)
y_train = np.random.randint(2, size=(1000, 1))

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

for x_batch_train, y_batch_train in train_dataset:
    with tf.GradientTape() as tape:
        logits = layer(x_batch_train)
        loss_value = loss_fn(y_batch_train, logits)
        loss_value += sum(layer.losses)

    grads = tape.gradient(loss_value, layer.trainable_weights)
    optimizer.apply_gradients(zip(grads, layer.trainable_weights))



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.



### You can optionally enable serialization on your layers

In [15]:
class Linear(layers.Layer):
    def __init__(self, units=32):
        super(Linear, self).__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(shape=(input_shape[-1], self.units),
                                 initializer='random_normal',
                                 trainable=True)
        self.b = self.add_weight(shape=(self.units, ),
                                 initializer='random_normal',
                                 trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

    def get_config(self):
        return {'units': self.units}


# Now you can recreate the layer from its config:
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)

{'units': 64}


In [22]:
class Linear(layers.Layer):
    def __init__(self, units=32, **kwargs):
        super(Linear, self).__init__(**kwargs)
        
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(shape=(input_shape[-1], self.units),
                                 initializer='random_normal',
                                 trainable=True)
        self.b = self.add_weight(shape=(self.units, ),
                                 initializer='random_normal',
                                 trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

    def get_config(self):
        config = super(Linear, self).get_config()
        config.update({'units': self.units})
        return config

layer = Linear(units=64, name='new_layer')
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)

{'name': 'new_layer', 'trainable': True, 'dtype': 'float32', 'units': 64}


### Privileged training argument in the call method

In [30]:
class CustomDropout(layers.Layer):
    
    def __init__(self, rate, **kwargs):
        super(CustomDropout, self).__init__(**kwargs)
        self.rate = rate
    
    def call(self, inputs, training=None):
        if training:
            return tf.nn.dropout(inputs, rate=self.rate)
        return inputs
layer = Linear(units=64, name='new_layer')
dropout = CustomDropout(0.5)(layer)

### Building Models

In [31]:
class ResNet(tf.keras.Model):
    
    def __init__(self):
        super(ResNet, self).__init__()
        self.block_1 = ResNetBlock()
        self.block_2 = ResNetBlock()
        self.global_pool = layers.GlobalAveragePooling2D()
        self.classifier = Dense(num_classes)
        
    def call(self, inputs):
        x = self.block_1(inputs)
        x = self.block_2(x)
        x = self.global_pool(x)
        return self.classifier(x)
    
resnet = ResNet()
resnet.fit(dataset, epochs=10)
resnet.save_weights('file')

NameError: name 'ResNetBlock' is not defined

### Putting it all together: an end-to-end example

In [39]:
class Sampling(layers.Layer):
    
    def call(self,inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch,dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

In [40]:
class Encoder(layers.Layer):
    
    def __init__(self, latent_dim=32, intermediate_dim=64, name='encoder', **kwargs):
        super(Encoder,self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation=tf.nn.relu)
        self.dense_mean = layers.Dense(latent_dim)
        self.dense_log_var = layers.Dense(latent_dim)
        self.sampling = Sampling()
        
    def call(self, inputs):
        x = self.dense_proj(inputs)
        z_mean = self.dense_mean(x)
        z_log_var = self.dense_log_var(x)
        z = self.sampling((z_mean, z_log_var))

        return z_mean, z_log_var, z

In [41]:
class Decoder(layers.Layer):
    
    def __init__(self, original_dim, intermediate_dim=64, name='decoder', **kwargs):
        super(Decoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation=tf.nn.relu)
        self.dense_output = layers.Dense(original_dim, activation=tf.nn.sigmoid)
    
    def call(self, inputs):
        x = self.dense_proj(inputs)
        return self.dense_output(x)

In [50]:
class VariationalAutoEncoder(tf.keras.Model):
    
    def __init__(self, original_dim, intermediate_dim=64, latent_dim=32, name='autoencoder', **kwargs):
        super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)
        
    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        kl_loss = -0.5*tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
        self.add_loss(kl_loss)
        
        return reconstructed
    

TypeError: summary() missing 1 required positional argument: 'self'

In [54]:
original_dim = 784

vae = VariationalAutoEncoder(original_dim, 64, 32)
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
mse_loss_fn = tf.keras.losses.MeanSquaredError()
loss_metric = tf.keras.metrics.Mean()
(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype('float32') / 255
train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

In [48]:
epochs = 3
for epoch in range(epochs):
    print('Start of epoch %d' % (epoch,))
    for step, x_batch_train in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            reconstructed = vae(x_batch_train)
            loss = mse_loss_fn(x_batch_train, reconstructed)
            loss += sum(vae.losses)
        grads = tape.gradient(loss, vae.trainable_weights)
        optimizer.apply_gradients(zip(grads, vae.trainable_weights))
        
        loss_metric(loss)
        
        if step % 100 == 0:
            print('step %s: mean loss = %s' % (step, loss_metric.result().numpy()))

Start of epoch 0
step 0: mean loss = 0.06870404
step 100: mean loss = 0.0686866
step 200: mean loss = 0.06867831
step 300: mean loss = 0.06865769
step 400: mean loss = 0.06865475
step 500: mean loss = 0.06862473
step 600: mean loss = 0.06860683
step 700: mean loss = 0.06858565
step 800: mean loss = 0.068566255
step 900: mean loss = 0.06853539
Start of epoch 1
step 0: mean loss = 0.06853013
step 100: mean loss = 0.068517
step 200: mean loss = 0.0685093
step 300: mean loss = 0.06849508
step 400: mean loss = 0.06849425
step 500: mean loss = 0.068468966
step 600: mean loss = 0.068457216
step 700: mean loss = 0.06843864
step 800: mean loss = 0.06842598
step 900: mean loss = 0.06839999
Start of epoch 2
step 0: mean loss = 0.06839527
step 100: mean loss = 0.06838451
step 200: mean loss = 0.06838003
step 300: mean loss = 0.0683676
step 400: mean loss = 0.068367235
step 500: mean loss = 0.06834826
step 600: mean loss = 0.06833746
step 700: mean loss = 0.068323046
step 800: mean loss = 0.0683120

In [53]:
vae = VariationalAutoEncoder(784, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=3, batch_size=64)
vae.summary()

Train on 60000 samples
Epoch 1/3
Epoch 2/3
Epoch 3/3
Model: "autoencoder"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
encoder (Encoder)            multiple                  54400     
_________________________________________________________________
decoder (Decoder)            multiple                  53072     
Total params: 107,472
Trainable params: 107,472
Non-trainable params: 0
_________________________________________________________________


### Beyond object-oriented development: the Functional API

In [55]:
original_dim = 784
intermediate_dim = 64
latent_dim = 32

In [68]:
original_inputs = tf.keras.Input(shape=(original_dim,), name='encoder_input')
x = layers.Dense(intermediate_dim, activation='relu')(original_inputs)
z_mean = layers.Dense(latent_dim, name='z_mean')(x)
z_log_var = layers.Dense(latent_dim, name='z_log_var')(x)
z = Sampling()((z_mean, z_log_var))
encoder = tf.keras.Model(inputs=original_inputs, outputs=z, name='encoder')

latent_inputs = tf.keras.Input(shape=(latent_dim,), name='z_sampling')
x = layers.Dense(intermediate_dim, activation='relu')(latent_inputs)
outputs = layers.Dense(original_dim, activation='sigmoid')(x)
decoder = tf.keras.Model(inputs=latent_inputs, outputs=outputs, name='decoder')

outputs = decoder(z)
vae = tf.keras.Model(inputs=original_inputs, outputs=outputs, name='vae')

kl_loss = - 0.5 * tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
vae.add_loss(kl_loss)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=3, batch_size=64)
vae.summary()

Train on 60000 samples
Epoch 1/3
Epoch 2/3
Epoch 3/3
Model: "vae"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
encoder_input (InputLayer)      [(None, 784)]        0                                            
__________________________________________________________________________________________________
dense_75 (Dense)                (None, 64)           50240       encoder_input[0][0]              
__________________________________________________________________________________________________
z_mean (Dense)                  (None, 32)           2080        dense_75[0][0]                   
__________________________________________________________________________________________________
z_log_var (Dense)               (None, 32)           2080        dense_75[0][0]                   
___________________________________________

In [69]:
encoder.summary()

Model: "encoder"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
encoder_input (InputLayer)      [(None, 784)]        0                                            
__________________________________________________________________________________________________
dense_75 (Dense)                (None, 64)           50240       encoder_input[0][0]              
__________________________________________________________________________________________________
z_mean (Dense)                  (None, 32)           2080        dense_75[0][0]                   
__________________________________________________________________________________________________
z_log_var (Dense)               (None, 32)           2080        dense_75[0][0]                   
____________________________________________________________________________________________

In [70]:
decoder.summary()

Model: "decoder"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
z_sampling (InputLayer)      [(None, 32)]              0         
_________________________________________________________________
dense_76 (Dense)             (None, 64)                2112      
_________________________________________________________________
dense_77 (Dense)             (None, 784)               50960     
Total params: 53,072
Trainable params: 53,072
Non-trainable params: 0
_________________________________________________________________


In [71]:
vae.summary()

Model: "vae"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
encoder_input (InputLayer)      [(None, 784)]        0                                            
__________________________________________________________________________________________________
dense_75 (Dense)                (None, 64)           50240       encoder_input[0][0]              
__________________________________________________________________________________________________
z_mean (Dense)                  (None, 32)           2080        dense_75[0][0]                   
__________________________________________________________________________________________________
z_log_var (Dense)               (None, 32)           2080        dense_75[0][0]                   
________________________________________________________________________________________________